org.objectweb.joram.mom.dest
Class ClusterQueue

java.lang.Object
  extended by fr.dyade.aaa.agent.Agent
      extended by org.objectweb.joram.mom.dest.Destination
          extended by org.objectweb.joram.mom.dest.Queue
              extended by org.objectweb.joram.mom.dest.ClusterQueue
All Implemented Interfaces:
AgentMBean, BagSerializer, java.io.Serializable, ClusterDestinationMBean, ClusterQueueMBean, DestinationMBean, QueueMBean

public class ClusterQueue
extends Queue
implements ClusterQueueMBean

The ClusterQueue class implements the cluster queue behavior.

See Also:
Serialized Form

Field Summary
protected  java.util.Map clusters
          key = agentId of ClusterQueue value = rateOfFlow (Float)
protected  LoadingFactor loadingFactor
          to evaluate the loading factor, overloading, ...
 
Fields inherited from class org.objectweb.joram.mom.dest.Queue
arrivalsCounter, consumers, contexts, deliveredMsgs, logger, messages, nbMaxMsg, receiving, requests
 
Fields inherited from class org.objectweb.joram.mom.dest.Destination
_rights, clients, creationDate, dmqId, freeReading, freeWriting, nbMsgsDeliverSinceCreation, nbMsgsReceiveSinceCreation, nbMsgsSentToDMQSinceCreation, READ, READWRITE, task, WAKEUP_PERIOD, WRITE
 
Fields inherited from class fr.dyade.aaa.agent.Agent
emptyString, fixed, logmon
 
Constructor Summary
ClusterQueue()
           
 
Method Summary
protected  void doDeleteNot(DeleteNot not)
          Method specifically processing a fr.dyade.aaa.agent.DeleteNot instance.
protected  void doUnknownAgent(UnknownAgent uA)
          Method specifically processing an UnknownAgent instance.
protected  ClientMessages getClientMessages(int nb, java.lang.String selector, boolean remove)
          get a client message contain nb messages.
 long getClusterDeliveryCount()
          return the number of Message send to cluster.
 java.lang.String[] getClusterElements()
           
 java.lang.String getConsumerStatus()
          Gets consumer status (NO, NORMAL, HIGH).
 int getConsumThreshold()
          Gets the number of pending "receive" requests above which a queue will request messages from the other queues of the cluster.
 java.lang.String getProducerStatus()
          Gets producer status (NO, NORMAL, HIGH).
 int getProducThreshold()
          Gets the number of messages above which a queue is considered loaded.
protected  Message getQueueMessage(java.lang.String msgId, boolean remove)
          get mom message, delete if remove = true.
 float getRateOfFlow()
          Gets an evaluation of the flow of messages handled by the queue.
 java.lang.String getStatus()
          Gets the status of the queue (RUN, INIT or WAIT).
 long getWaitAfterClusterReq()
          Gets the time (in ms) during which a queue which requested something from the cluster is not authorized to do it again.
 void handleAdminRequestNot(AgentId from, FwdAdminRequestNot not)
           
 void initialize(boolean firstTime)
          Initializes the destination.
 boolean isAutoEvalThreshold()
          True if an automatic reevaluation of the queues' thresholds values is allowed according to their activity.
 boolean isOverloaded()
          Tells if the queue is overloaded.
protected  void messageDelivered(java.lang.String msgId)
          call in deliverMessages just after forward(msg), overload this method to process a specific treatment.
protected  void monitoringMsgSendToCluster(java.lang.String msgId)
           
 void postProcess(ClientMessages not)
          call factorCheck to evaluate the loading factor, activity, ... and send message to cluster if need.
 ClientMessages preProcess(AgentId from, ClientMessages not)
          overload preProcess(AgentId, ClientMessages) store all msgId in timeTable and visitTable.
 void react(AgentId from, Notification not)
          Distributes the received notifications to the appropriate reactions.
 void receiveRequest(AgentId from, ReceiveRequest not)
          Method implementing the reaction to a ReceiveRequest instance, requesting a message.
protected  void sendToCluster(Notification not)
          Sends a notification to all queue in cluster.
 void setAutoEvalThreshold(boolean autoEvalThreshold)
          Automatic reevaluation of the queues' thresholds can be done according to their activity.
 void setConsumThreshold(int consumThreshold)
          Sets the number of pending "receive" requests above which a queue will request messages from the other queues of the cluster.
 void setProducThreshold(int producThreshold)
          Sets the number of messages above which a queue is considered loaded.
 void setProperties(java.util.Properties prop)
          Configures a ClusterQueue instance.
 void setWaitAfterClusterReq(long waitAfterClusterReq)
          Sets the time (in ms) during which a queue which requested something from the cluster is not authorized to do it again.
 java.lang.String toString()
          Returns a string representation of this destination.
 void wakeUpNot(WakeUpNot not)
          wake up, and call factorCheck to evaluate the loading factor... if a message stays more than a period of time in timeTable, it is sent to an other (not visited) queue in cluster.
 
Methods inherited from class org.objectweb.joram.mom.dest.Queue
abortReceiveRequest, acknowledgeRequest, addClientMessages, addMessage, browseRequest, checkDelivery, cleanPendingMessage, cleanPendingMessage, cleanWaitingRequest, cleanWaitingRequest, deliverMessages, denyRequest, doClientMessages, doRightRequest, getDefaultDMQId, getDefaultThreshold, getDeliveredMessageCount, getMessage, getMessages, getMessagesView, getMsgTxPrefix, getNbMaxMsg, getNbMsgsReceiveSinceCreation, getPendingMessageCount, getThreshold, getType, getWaitingRequestCount, handleExpiredNot, isUndeliverable, isValidJMXAttribute, messageRemoved, readBag, setMsgTxName, setNbMaxMsg, setThreshold, storeMessage, storeMessageHeader, writeBag
 
Methods inherited from class org.objectweb.joram.mom.dest.Destination
agentFinalize, agentInitialize, clientMessages, delete, deleteNot, forward, getCreationDate, getCreationTimeInMillis, getDestinationId, getDMQAgentId, getDMQId, getJMXStatistics, getMBeanName, getNbMsgsDeliverSinceCreation, getNbMsgsSentToDMQSinceCreation, getPeriod, getRight, getRights, getRights, handleDeniedMessage, interceptorsAvailable, isAdministrator, isFreeReading, isFreeWriting, isLocal, isReader, isWriter, processAdminCommand, processInterceptors, processSetRight, processStartHandler, processStopHandler, replyToTopic, requestGroupNot, setAdminId, setFreeReading, setFreeWriting, setPeriod, setRight, unknownAgent, updateProperties
 
Methods inherited from class fr.dyade.aaa.agent.Agent
delete, delete, deploy, deploy, getAgentId, getCommitTime, getId, getLogTopic, getName, getReactNb, getReactTime, isDeployed, isFixed, needToBeCommited, save, sendTo, sendTo, sendTo, setName, setNoSave, setSave
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 
Methods inherited from interface org.objectweb.joram.mom.dest.QueueMBean
cleanPendingMessage, cleanWaitingRequest, getDeliveredMessageCount, getMessage, getMessages, getMessagesView, getNbMaxMsg, getPendingMessageCount, getThreshold, getWaitingRequestCount, setNbMaxMsg, setThreshold
 
Methods inherited from interface org.objectweb.joram.mom.dest.DestinationMBean
delete, getCreationDate, getCreationTimeInMillis, getDestinationId, getDMQId, getName, getNbMsgsDeliverSinceCreation, getNbMsgsReceiveSinceCreation, getNbMsgsSentToDMQSinceCreation, getPeriod, getRight, getRights, isFreeReading, isFreeWriting, setFreeReading, setFreeWriting, setPeriod
 
Methods inherited from interface fr.dyade.aaa.agent.AgentMBean
getAgentId, getCommitTime, getReactNb, getReactTime, isFixed
 

Field Detail

clusters

protected java.util.Map clusters
key = agentId of ClusterQueue value = rateOfFlow (Float)


loadingFactor

protected LoadingFactor loadingFactor
to evaluate the loading factor, overloading, ...

Constructor Detail

ClusterQueue

public ClusterQueue()
Method Detail

setProperties

public void setProperties(java.util.Properties prop)
                   throws RequestException
Configures a ClusterQueue instance.

Overrides:
setProperties in class Destination
Parameters:
prop - The initial set of properties.
Throws:
RequestException

initialize

public void initialize(boolean firstTime)
Initializes the destination.

Overrides:
initialize in class Queue
Parameters:
firstTime - true when first called by the factory

handleAdminRequestNot

public void handleAdminRequestNot(AgentId from,
                                  FwdAdminRequestNot not)
Overrides:
handleAdminRequestNot in class Queue
See Also:
Destination.handleAdminRequestNot(fr.dyade.aaa.agent.AgentId, org.objectweb.joram.mom.notifications.FwdAdminRequestNot)

react

public void react(AgentId from,
                  Notification not)
           throws java.lang.Exception
Distributes the received notifications to the appropriate reactions.

Overrides:
react in class Queue
Parameters:
from - agent sending notification
not - notification to react to
Throws:
java.lang.Exception

toString

public java.lang.String toString()
Description copied from class: Queue
Returns a string representation of this destination.

Specified by:
toString in interface AgentMBean
Specified by:
toString in interface DestinationMBean
Overrides:
toString in class Queue
Returns:
A string representation of this agent.

getClusterElements

public java.lang.String[] getClusterElements()
Specified by:
getClusterElements in interface ClusterDestinationMBean
Returns:
an array containing the ids of the cluster elements.

preProcess

public ClientMessages preProcess(AgentId from,
                                 ClientMessages not)
overload preProcess(AgentId, ClientMessages) store all msgId in timeTable and visitTable.

Overrides:
preProcess in class Destination
Parameters:
from -
not -
Returns:
The incoming messages after processing.

postProcess

public void postProcess(ClientMessages not)
call factorCheck to evaluate the loading factor, activity, ... and send message to cluster if need.

Overrides:
postProcess in class Destination
Parameters:
not -

wakeUpNot

public void wakeUpNot(WakeUpNot not)
wake up, and call factorCheck to evaluate the loading factor... if a message stays more than a period of time in timeTable, it is sent to an other (not visited) queue in cluster.

Overrides:
wakeUpNot in class Queue
Parameters:
not -

receiveRequest

public void receiveRequest(AgentId from,
                           ReceiveRequest not)
                    throws AccessException
Description copied from class: Queue
Method implementing the reaction to a ReceiveRequest instance, requesting a message.

This method stores the request and launches a delivery sequence.

Overrides:
receiveRequest in class Queue
Parameters:
not - ReceiveRequest
Throws:
AccessException - If the sender is not a reader.

getClientMessages

protected ClientMessages getClientMessages(int nb,
                                           java.lang.String selector,
                                           boolean remove)
get a client message contain nb messages. add cluster monitoring value.

Overrides:
getClientMessages in class Queue
Parameters:
nb - number of messages returned in ClientMessage.
selector - jms selector
remove - delete all messages returned if true
Returns:
ClientMessages (contains nb Messages)

getQueueMessage

protected Message getQueueMessage(java.lang.String msgId,
                                  boolean remove)
get mom message, delete if remove = true. add cluster monitoring value.

Overrides:
getQueueMessage in class Queue
Parameters:
msgId - message identification
remove - if true delete message
Returns:
mom message

sendToCluster

protected void sendToCluster(Notification not)
Sends a notification to all queue in cluster.

Parameters:
not - The notification to send.

doDeleteNot

protected void doDeleteNot(DeleteNot not)
Description copied from class: Queue
Method specifically processing a fr.dyade.aaa.agent.DeleteNot instance.

ExceptionReply replies are sent to the pending receivers, and the remaining messages are sent to the DMQ and deleted.

Overrides:
doDeleteNot in class Queue

doUnknownAgent

protected void doUnknownAgent(UnknownAgent uA)
Description copied from class: Queue
Method specifically processing an UnknownAgent instance.

The specific processing is done when a QueueMsgReply was sent to a requester which does not exist anymore. In that case, the messages sent to this requester and not yet acknowledged are marked as "denied" for delivery to an other requester, and a new delivery sequence is launched. Messages considered as undeliverable are removed and sent to the DMQ.

Overrides:
doUnknownAgent in class Queue

getClusterDeliveryCount

public long getClusterDeliveryCount()
return the number of Message send to cluster.


messageDelivered

protected void messageDelivered(java.lang.String msgId)
Description copied from class: Queue
call in deliverMessages just after forward(msg), overload this method to process a specific treatment.

Overrides:
messageDelivered in class Queue
Parameters:
msgId -

monitoringMsgSendToCluster

protected void monitoringMsgSendToCluster(java.lang.String msgId)
Parameters:
msgId -

setWaitAfterClusterReq

public void setWaitAfterClusterReq(long waitAfterClusterReq)
Description copied from interface: ClusterQueueMBean
Sets the time (in ms) during which a queue which requested something from the cluster is not authorized to do it again.

Specified by:
setWaitAfterClusterReq in interface ClusterQueueMBean
Parameters:
waitAfterClusterReq -

setProducThreshold

public void setProducThreshold(int producThreshold)
Description copied from interface: ClusterQueueMBean
Sets the number of messages above which a queue is considered loaded.

Specified by:
setProducThreshold in interface ClusterQueueMBean
Parameters:
producThreshold -

setConsumThreshold

public void setConsumThreshold(int consumThreshold)
Description copied from interface: ClusterQueueMBean
Sets the number of pending "receive" requests above which a queue will request messages from the other queues of the cluster.

Specified by:
setConsumThreshold in interface ClusterQueueMBean
Parameters:
consumThreshold -

setAutoEvalThreshold

public void setAutoEvalThreshold(boolean autoEvalThreshold)
Description copied from interface: ClusterQueueMBean
Automatic reevaluation of the queues' thresholds can be done according to their activity.

Specified by:
setAutoEvalThreshold in interface ClusterQueueMBean
Parameters:
autoEvalThreshold -

getProducThreshold

public int getProducThreshold()
Description copied from interface: ClusterQueueMBean
Gets the number of messages above which a queue is considered loaded.

Specified by:
getProducThreshold in interface ClusterQueueMBean
Returns:
the produce threshold

getConsumThreshold

public int getConsumThreshold()
Description copied from interface: ClusterQueueMBean
Gets the number of pending "receive" requests above which a queue will request messages from the other queues of the cluster.

Specified by:
getConsumThreshold in interface ClusterQueueMBean
Returns:
the consume threshold

isAutoEvalThreshold

public boolean isAutoEvalThreshold()
Description copied from interface: ClusterQueueMBean
True if an automatic reevaluation of the queues' thresholds values is allowed according to their activity.

Specified by:
isAutoEvalThreshold in interface ClusterQueueMBean
Returns:
true if auto evaluation of thresholds is allowed.

getWaitAfterClusterReq

public long getWaitAfterClusterReq()
Description copied from interface: ClusterQueueMBean
Gets the time (in ms) during which a queue which requested something from the cluster is not authorized to do it again.

Specified by:
getWaitAfterClusterReq in interface ClusterQueueMBean
Returns:
the minimum time to wait before another cluster request.

getRateOfFlow

public float getRateOfFlow()
Description copied from interface: ClusterQueueMBean
Gets an evaluation of the flow of messages handled by the queue.

Specified by:
getRateOfFlow in interface ClusterQueueMBean
Returns:
the rate of flow

isOverloaded

public boolean isOverloaded()
Description copied from interface: ClusterQueueMBean
Tells if the queue is overloaded.

Specified by:
isOverloaded in interface ClusterQueueMBean
Returns:
true if the queue is overloaded

getStatus

public java.lang.String getStatus()
Description copied from interface: ClusterQueueMBean
Gets the status of the queue (RUN, INIT or WAIT).

Specified by:
getStatus in interface ClusterQueueMBean
Returns:
the status of the queue
See Also:
LoadingFactor.Status

getConsumerStatus

public java.lang.String getConsumerStatus()
Description copied from interface: ClusterQueueMBean
Gets consumer status (NO, NORMAL, HIGH).

Specified by:
getConsumerStatus in interface ClusterQueueMBean
Returns:
consumer status
See Also:
LoadingFactor.ConsumerStatus

getProducerStatus

public java.lang.String getProducerStatus()
Description copied from interface: ClusterQueueMBean
Gets producer status (NO, NORMAL, HIGH).

Specified by:
getProducerStatus in interface ClusterQueueMBean
Returns:
producer status
See Also:
LoadingFactor.ProducerStatus


Copyright © 2011 ScalAgent D.T.. All Rights Reserved.