org.objectweb.joram.mom.dest
Class Queue

java.lang.Object
  extended by fr.dyade.aaa.agent.Agent
      extended by org.objectweb.joram.mom.dest.Destination
          extended by org.objectweb.joram.mom.dest.Queue
All Implemented Interfaces:
AgentMBean, BagSerializer, java.io.Serializable, DestinationMBean, QueueMBean
Direct Known Subclasses:
AcquisitionQueue, ClusterQueue, DistributionQueue, FtpQueue, JMSBridgeQueue, SchedulerQueue

public class Queue
extends Destination
implements QueueMBean, BagSerializer

The Queue class implements the MOM queue behavior, basically storing messages and delivering them upon clients requests.

See Also:
Serialized Form

Field Summary
protected  long arrivalsCounter
          Counter of messages arrivals.
protected  java.util.Hashtable consumers
          Table keeping the messages' consumers identifiers.
protected  java.util.Hashtable contexts
          Table keeping the messages' consumers contexts.
protected  java.util.Hashtable deliveredMsgs
          Table holding the delivered messages before acknowledgment.
static org.objectweb.util.monolog.api.Logger logger
           
protected  java.util.Vector messages
          Vector holding the messages before delivery.
protected  int nbMaxMsg
          nb Max of Message store in queue (-1 no limit).
protected  boolean receiving
          true if the queue is currently receiving messages.
protected  java.util.Vector requests
          Vector holding the requests before reply or expiry.
 
Fields inherited from class org.objectweb.joram.mom.dest.Destination
_rights, clients, creationDate, dmqId, freeReading, freeWriting, nbMsgsDeliverSinceCreation, nbMsgsReceiveSinceCreation, nbMsgsSentToDMQSinceCreation, READ, READWRITE, task, WAKEUP_PERIOD, WRITE
 
Fields inherited from class fr.dyade.aaa.agent.Agent
emptyString, fixed, logmon
 
Constructor Summary
Queue()
           
 
Method Summary
protected  void abortReceiveRequest(AgentId from, AbortReceiveRequest not)
           
protected  void acknowledgeRequest(AcknowledgeRequest not)
          Method implementing the reaction to an AcknowledgeRequest instance, requesting messages to be acknowledged.
 void addClientMessages(ClientMessages clientMsgs)
          Adds the client messages in the queue.
protected  boolean addMessage(Message message)
          Adds a message in the list of messages to deliver.
protected  void browseRequest(AgentId from, BrowseRequest not)
          Method implementing the queue reaction to a BrowseRequest instance, requesting an enumeration of the messages on the queue.
protected  boolean checkDelivery(Message msg)
          Returns true if conditions are ok to deliver the message.
 void cleanPendingMessage()
          Removes all messages that the time-to-live is expired.
protected  DMQManager cleanPendingMessage(long currentTime)
          Cleans the pending messages list.
 void cleanWaitingRequest()
          Removes all request that the expiration time is expired.
protected  void cleanWaitingRequest(long currentTime)
          Cleans the waiting request list.
protected  void deliverMessages(int index)
          Actually tries to answer the pending "receive" requests.
protected  void denyRequest(AgentId from, DenyRequest not)
          Method implementing the reaction to a DenyRequest instance, requesting messages to be denied.
protected  void doClientMessages(AgentId from, ClientMessages not)
          Method specifically processing a ClientMessages instance.
protected  void doDeleteNot(DeleteNot not)
          Method specifically processing a fr.dyade.aaa.agent.DeleteNot instance.
protected  void doRightRequest(AgentId user, int right)
          Method specifically processing a SetRightRequest instance.
protected  void doUnknownAgent(UnknownAgent uA)
          Method specifically processing an UnknownAgent instance.
protected  ClientMessages getClientMessages(int nb, java.lang.String selector, boolean remove)
          Get a client message contain nb messages.
static AgentId getDefaultDMQId()
          Static method returning the default DMQ identifier.
static int getDefaultThreshold()
          Static method returning the default threshold.
 int getDeliveredMessageCount()
          Returns the number of messages delivered and waiting for acknowledge.
 javax.management.openmbean.CompositeData getMessage(java.lang.String msgId)
          Returns the description of a particular pending message.
 javax.management.openmbean.TabularData getMessages()
          Returns the description of all pending messages.
 java.util.List getMessagesView()
          Returns the description of all pending messages.
protected  java.lang.StringBuffer getMsgTxPrefix()
           
 int getNbMaxMsg()
          Returns the maximum number of message for the destination.
 long getNbMsgsReceiveSinceCreation()
          Returns the number of messages received since creation time of this destination.
 int getPendingMessageCount()
          Returns the number of pending messages in the queue.
protected  Message getQueueMessage(java.lang.String msgId, boolean remove)
          get mom message, delete if remove = true.
 int getThreshold()
          Returns the threshold value of this queue, -1 if not set.
 byte getType()
          Returns the type of this destination: Queue or Topic.
 int getWaitingRequestCount()
          Returns the number of waiting requests in the queue.
 void handleAdminRequestNot(AgentId from, FwdAdminRequestNot not)
           
protected  void handleExpiredNot(AgentId from, ExpiredNot not)
           
 void initialize(boolean firstTime)
          Initializes the destination.
protected  boolean isUndeliverable(Message message)
          Returns true if a given message is considered as undeliverable, because its delivery count matches the queue's threshold, if any, or the server's default threshold value (if any).
protected  boolean isValidJMXAttribute(java.lang.String attrName)
          This method allows to exclude some JMX attribute of getJMXStatistics method.
protected  void messageDelivered(java.lang.String msgId)
          call in deliverMessages just after forward(msg), overload this method to process a specific treatment.
protected  void messageRemoved(java.lang.String msgId)
          call in deliverMessages just after a remove message (invalid), overload this method to process a specific treatment.
 void react(AgentId from, Notification not)
          Distributes the received notifications to the appropriate reactions.
 void readBag(java.io.ObjectInputStream in)
          The readBag method is responsible for reading from the stream and restoring the agent's transient state.
protected  void receiveRequest(AgentId from, ReceiveRequest not)
          Method implementing the reaction to a ReceiveRequest instance, requesting a message.
protected  void setMsgTxName(Message msg)
           
 void setNbMaxMsg(int nbMaxMsg)
          Sets the maximum number of message for the destination.
 void setThreshold(int threshold)
          Sets or unsets the threshold for this queue.
protected  void storeMessage(Message msg)
          Actually stores a message in the deliverables vector.
protected  void storeMessageHeader(Message message)
          Actually stores a message header in the deliverables vector.
 java.lang.String toString()
          Returns a string representation of this destination.
 void wakeUpNot(WakeUpNot not)
          wake up, and cleans the queue.
 void writeBag(java.io.ObjectOutputStream out)
          The writeBag method is responsible for writing the extra data of this particular agent so that the corresponding readBag method can restore it.
 
Methods inherited from class org.objectweb.joram.mom.dest.Destination
agentFinalize, agentInitialize, clientMessages, delete, deleteNot, forward, getCreationDate, getCreationTimeInMillis, getDestinationId, getDMQAgentId, getDMQId, getJMXStatistics, getMBeanName, getNbMsgsDeliverSinceCreation, getNbMsgsSentToDMQSinceCreation, getPeriod, getRight, getRights, getRights, handleDeniedMessage, interceptorsAvailable, isAdministrator, isFreeReading, isFreeWriting, isLocal, isReader, isWriter, postProcess, preProcess, processAdminCommand, processInterceptors, processSetRight, processStartHandler, processStopHandler, replyToTopic, requestGroupNot, setAdminId, setFreeReading, setFreeWriting, setPeriod, setProperties, setRight, unknownAgent, updateProperties
 
Methods inherited from class fr.dyade.aaa.agent.Agent
delete, delete, deploy, deploy, getAgentId, getCommitTime, getId, getLogTopic, getName, getReactNb, getReactTime, isDeployed, isFixed, needToBeCommited, save, sendTo, sendTo, sendTo, setName, setNoSave, setSave
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 
Methods inherited from interface org.objectweb.joram.mom.dest.DestinationMBean
delete, getCreationDate, getCreationTimeInMillis, getDestinationId, getDMQId, getName, getNbMsgsDeliverSinceCreation, getNbMsgsSentToDMQSinceCreation, getPeriod, getRight, getRights, isFreeReading, isFreeWriting, setFreeReading, setFreeWriting, setPeriod
 
Methods inherited from interface fr.dyade.aaa.agent.AgentMBean
getAgentId, getCommitTime, getReactNb, getReactTime, isFixed
 

Field Detail

logger

public static org.objectweb.util.monolog.api.Logger logger

consumers

protected java.util.Hashtable consumers
Table keeping the messages' consumers identifiers.


contexts

protected java.util.Hashtable contexts
Table keeping the messages' consumers contexts.


arrivalsCounter

protected long arrivalsCounter
Counter of messages arrivals.


requests

protected java.util.Vector requests
Vector holding the requests before reply or expiry.


receiving

protected transient boolean receiving
true if the queue is currently receiving messages.


messages

protected transient java.util.Vector messages
Vector holding the messages before delivery.


deliveredMsgs

protected transient java.util.Hashtable deliveredMsgs
Table holding the delivered messages before acknowledgment.


nbMaxMsg

protected int nbMaxMsg
nb Max of Message store in queue (-1 no limit).

Constructor Detail

Queue

public Queue()
Method Detail

getThreshold

public int getThreshold()
Returns the threshold value of this queue, -1 if not set.

Specified by:
getThreshold in interface QueueMBean
Returns:
the threshold value of this queue; -1 if not set.

setThreshold

public void setThreshold(int threshold)
Sets or unsets the threshold for this queue.

Specified by:
setThreshold in interface QueueMBean
Parameters:
threshold - The threshold value to be set (-1 for unsetting previous value).

getDefaultThreshold

public static int getDefaultThreshold()
Static method returning the default threshold.


getDefaultDMQId

public static AgentId getDefaultDMQId()
Static method returning the default DMQ identifier.


react

public void react(AgentId from,
                  Notification not)
           throws java.lang.Exception
Distributes the received notifications to the appropriate reactions.

Overrides:
react in class Destination
Parameters:
from - agent sending notification
not - notification to react to
Throws:
java.lang.Exception

cleanWaitingRequest

public void cleanWaitingRequest()
Removes all request that the expiration time is expired.

Specified by:
cleanWaitingRequest in interface QueueMBean

cleanWaitingRequest

protected void cleanWaitingRequest(long currentTime)
Cleans the waiting request list. Removes all request that the expiration time is less than the time given in parameter.

Parameters:
currentTime - The current time.

getWaitingRequestCount

public final int getWaitingRequestCount()
Returns the number of waiting requests in the queue.

Specified by:
getWaitingRequestCount in interface QueueMBean
Returns:
The number of waiting requests.

cleanPendingMessage

public void cleanPendingMessage()
Removes all messages that the time-to-live is expired.

Specified by:
cleanPendingMessage in interface QueueMBean

getType

public byte getType()
Description copied from class: Destination
Returns the type of this destination: Queue or Topic.

Specified by:
getType in class Destination
Returns:
the type of this destination.
See Also:
DestinationConstants.TOPIC_TYPE, DestinationConstants.QUEUE_TYPE

cleanPendingMessage

protected DMQManager cleanPendingMessage(long currentTime)
Cleans the pending messages list. Removes all messages which expire before the date given in parameter.

Parameters:
currentTime - The current time.
Returns:
A DMQManager which contains the expired messages. null if there wasn't any.

getPendingMessageCount

public final int getPendingMessageCount()
Returns the number of pending messages in the queue.

Specified by:
getPendingMessageCount in interface QueueMBean
Returns:
The number of pending messages.

getDeliveredMessageCount

public final int getDeliveredMessageCount()
Returns the number of messages delivered and waiting for acknowledge.

Specified by:
getDeliveredMessageCount in interface QueueMBean
Returns:
The number of messages delivered.

getNbMsgsReceiveSinceCreation

public final long getNbMsgsReceiveSinceCreation()
Description copied from class: Destination
Returns the number of messages received since creation time of this destination.

Specified by:
getNbMsgsReceiveSinceCreation in interface DestinationMBean
Specified by:
getNbMsgsReceiveSinceCreation in class Destination
Returns:
the number of messages received since creation time.

getNbMaxMsg

public final int getNbMaxMsg()
Returns the maximum number of message for the destination. If the limit is unset the method returns -1.

Specified by:
getNbMaxMsg in interface QueueMBean
Returns:
the maximum number of message for subscription if set; -1 otherwise.

setNbMaxMsg

public void setNbMaxMsg(int nbMaxMsg)
Sets the maximum number of message for the destination.

Specified by:
setNbMaxMsg in interface QueueMBean
Parameters:
nbMaxMsg - the maximum number of message (-1 set no limit).

initialize

public void initialize(boolean firstTime)
Initializes the destination.

Specified by:
initialize in class Destination
Parameters:
firstTime - true when first called by the factory

toString

public java.lang.String toString()
Returns a string representation of this destination.

Specified by:
toString in interface AgentMBean
Specified by:
toString in interface DestinationMBean
Overrides:
toString in class Agent
Returns:
A string representation of this agent.

wakeUpNot

public void wakeUpNot(WakeUpNot not)
wake up, and cleans the queue.

Specified by:
wakeUpNot in class Destination

isValidJMXAttribute

protected boolean isValidJMXAttribute(java.lang.String attrName)
This method allows to exclude some JMX attribute of getJMXStatistics method. It excludes.

Overrides:
isValidJMXAttribute in class Destination
Parameters:
attrName - name of attribute to test.
Returns:
true if the attribute is a valid one.

receiveRequest

protected void receiveRequest(AgentId from,
                              ReceiveRequest not)
                       throws AccessException
Method implementing the reaction to a ReceiveRequest instance, requesting a message.

This method stores the request and launches a delivery sequence.

Throws:
AccessException - If the sender is not a reader.

browseRequest

protected void browseRequest(AgentId from,
                             BrowseRequest not)
                      throws AccessException
Method implementing the queue reaction to a BrowseRequest instance, requesting an enumeration of the messages on the queue.

The method sends a BrowseReply back to the client. Expired messages are sent to the DMQ.

Throws:
AccessException - If the requester is not a reader.

acknowledgeRequest

protected void acknowledgeRequest(AcknowledgeRequest not)
Method implementing the reaction to an AcknowledgeRequest instance, requesting messages to be acknowledged.


denyRequest

protected void denyRequest(AgentId from,
                           DenyRequest not)
Method implementing the reaction to a DenyRequest instance, requesting messages to be denied.

This method denies the messages and launches a delivery sequence. Messages considered as undeliverable are sent to the DMQ.


abortReceiveRequest

protected void abortReceiveRequest(AgentId from,
                                   AbortReceiveRequest not)

handleAdminRequestNot

public void handleAdminRequestNot(AgentId from,
                                  FwdAdminRequestNot not)
Overrides:
handleAdminRequestNot in class Destination
See Also:
Destination.handleAdminRequestNot(fr.dyade.aaa.agent.AgentId, org.objectweb.joram.mom.notifications.FwdAdminRequestNot)

doRightRequest

protected void doRightRequest(AgentId user,
                              int right)
Method specifically processing a SetRightRequest instance.

When a reader is removed, and receive requests of this reader are still on the queue, they are replied to by an ExceptionReply.

Specified by:
doRightRequest in class Destination
Parameters:
user - The user about right modification.
right - The right modification.

doClientMessages

protected void doClientMessages(AgentId from,
                                ClientMessages not)
Method specifically processing a ClientMessages instance.

This method stores the messages and launches a delivery sequence.

Specified by:
doClientMessages in class Destination

doUnknownAgent

protected void doUnknownAgent(UnknownAgent uA)
Method specifically processing an UnknownAgent instance.

The specific processing is done when a QueueMsgReply was sent to a requester which does not exist anymore. In that case, the messages sent to this requester and not yet acknowledged are marked as "denied" for delivery to an other requester, and a new delivery sequence is launched. Messages considered as undeliverable are removed and sent to the DMQ.

Specified by:
doUnknownAgent in class Destination

doDeleteNot

protected void doDeleteNot(DeleteNot not)
Method specifically processing a fr.dyade.aaa.agent.DeleteNot instance.

ExceptionReply replies are sent to the pending receivers, and the remaining messages are sent to the DMQ and deleted.

Specified by:
doDeleteNot in class Destination

getMsgTxPrefix

protected final java.lang.StringBuffer getMsgTxPrefix()

setMsgTxName

protected final void setMsgTxName(Message msg)

storeMessage

protected final void storeMessage(Message msg)
Actually stores a message in the deliverables vector.

Parameters:
msg - The message to store.

storeMessageHeader

protected final void storeMessageHeader(Message message)
Actually stores a message header in the deliverables vector.

Parameters:
message - The message to store.

addMessage

protected final boolean addMessage(Message message)
Adds a message in the list of messages to deliver.

Parameters:
message - the message to add.
Returns:
true if the message has been added. false if the queue is full.

getClientMessages

protected ClientMessages getClientMessages(int nb,
                                           java.lang.String selector,
                                           boolean remove)
Get a client message contain nb messages.

Parameters:
nb - number of messages returned in ClientMessage.
selector - jms selector
remove - delete all messages returned if true
Returns:
ClientMessages (contains nb Messages)

getQueueMessage

protected Message getQueueMessage(java.lang.String msgId,
                                  boolean remove)
get mom message, delete if remove = true.

Parameters:
msgId - message identification
remove - if true delete message
Returns:
mom message

getMessage

public javax.management.openmbean.CompositeData getMessage(java.lang.String msgId)
                                                    throws java.lang.Exception
Returns the description of a particular pending message. The message is pointed out through its unique identifier.

Specified by:
getMessage in interface QueueMBean
Parameters:
msgId - The unique message's identifier.
Returns:
the description of the message.
Throws:
java.lang.Exception
See Also:
MessageJMXWrapper

getMessages

public javax.management.openmbean.TabularData getMessages()
                                                   throws java.lang.Exception
Returns the description of all pending messages.

Specified by:
getMessages in interface QueueMBean
Returns:
the description of the message.
Throws:
java.lang.Exception
See Also:
MessageJMXWrapper

getMessagesView

public java.util.List getMessagesView()
Description copied from interface: QueueMBean
Returns the description of all pending messages.

Specified by:
getMessagesView in interface QueueMBean
Returns:
the description of the message.

deliverMessages

protected void deliverMessages(int index)
Actually tries to answer the pending "receive" requests.

The method may send QueueMsgReply replies to clients.

Parameters:
index - Index where starting to "browse" the requests.

checkDelivery

protected boolean checkDelivery(Message msg)
Returns true if conditions are ok to deliver the message. This method must be overloaded in subclasses. Be careful only the message header is accessible.


messageDelivered

protected void messageDelivered(java.lang.String msgId)
call in deliverMessages just after forward(msg), overload this method to process a specific treatment.


messageRemoved

protected void messageRemoved(java.lang.String msgId)
call in deliverMessages just after a remove message (invalid), overload this method to process a specific treatment.


isUndeliverable

protected boolean isUndeliverable(Message message)
Returns true if a given message is considered as undeliverable, because its delivery count matches the queue's threshold, if any, or the server's default threshold value (if any).


addClientMessages

public void addClientMessages(ClientMessages clientMsgs)
Adds the client messages in the queue.

Parameters:
clientMsgs - client message notification.

readBag

public void readBag(java.io.ObjectInputStream in)
             throws java.io.IOException,
                    java.lang.ClassNotFoundException
Description copied from interface: BagSerializer
The readBag method is responsible for reading from the stream and restoring the agent's transient state.

Specified by:
readBag in interface BagSerializer
Throws:
java.io.IOException
java.lang.ClassNotFoundException

writeBag

public void writeBag(java.io.ObjectOutputStream out)
              throws java.io.IOException
Description copied from interface: BagSerializer
The writeBag method is responsible for writing the extra data of this particular agent so that the corresponding readBag method can restore it.

Specified by:
writeBag in interface BagSerializer
Throws:
java.io.IOException

handleExpiredNot

protected void handleExpiredNot(AgentId from,
                                ExpiredNot not)


Copyright © 2011 ScalAgent D.T.. All Rights Reserved.