org.objectweb.joram.mom.dest
Class ClusterQueueImpl

java.lang.Object
  extended by org.objectweb.joram.mom.dest.DestinationImpl
      extended by org.objectweb.joram.mom.dest.QueueImpl
          extended by org.objectweb.joram.mom.dest.ClusterQueueImpl
All Implemented Interfaces:
java.io.Serializable, DestinationImplMBean, QueueImplMBean

public class ClusterQueueImpl
extends QueueImpl

The ClusterQueueImpl class implements the MOM queue behavior, basically storing messages and delivering them upon clients requests or delivering to an other cluster queue.

See Also:
Serialized Form

Field Summary
private  long clusterDeliveryCount
          Number of message send to cluster
protected  java.util.Hashtable clusters
          key = agentId of ClusterQueue value = rateOfFlow (Float)
protected  LoadingFactor loadingFactor
          to evaluate the loading factor, overloading, ...
private static long serialVersionUID
          define serialVersionUID for interoperability
private  java.util.LinkedHashMap timeTable
          key = msgId value = date
private  long timeThreshold
          Maximum period of time before forwarding a waiting message or request to other queues of the cluster.
private  java.util.Hashtable visitTable
          key = msgId value = List (alreadyVisit)
private  long waitAfterClusterReq
          Waiting after a cluster request
 
Fields inherited from class org.objectweb.joram.mom.dest.QueueImpl
arrivalsCounter, consumers, contexts, defaultDMQId, defaultThreshold, deliveredMsgs, logger, messages, msgTxPrefix, msgTxPrefixLength, nbMaxMsg, receiving, requests
 
Fields inherited from class org.objectweb.joram.mom.dest.DestinationImpl
_rights, agent, clients, creationDate, dmqId, freeReading, freeWriting, nbMsgsDeliverSinceCreation, nbMsgsReceiveSinceCreation, nbMsgsSentToDMQSinceCreation, READ, READWRITE, strbuf, WAKEUP_PERIOD, WRITE
 
Constructor Summary
ClusterQueueImpl(AgentId adminId, java.util.Properties prop)
          Constructs a ClusterQueueImpl instance.
 
Method Summary
 void ackJoinQueueCluster(AckJoinQueueCluster not)
           
protected  void addQueueCluster(java.lang.String joiningQueue, float rateOfFlow)
          send to joiningQueue a JoinQueueCluster not.
protected  void broadcastLeave(java.lang.String removeQueue)
          broadcast to cluster the removeQueue.
protected  java.lang.Object doList(ListClusterQueue req)
          return the cluster list.
protected  ClientMessages getClientMessages(int nb, java.lang.String selector, boolean remove)
          get a client message contain nb messages.
 long getClusterDeliveryCount()
          return the number of Message send to cluster.
protected  Message getQueueMessage(java.lang.String msgId, boolean remove)
          get mom message, delete if remove = true.
 void initialize(boolean firstTime)
          Initializes the destination.
 void joinQueueCluster(JoinQueueCluster not)
          new queue come in cluster, update clusters.
 void lBCycleLife(AgentId from, LBCycleLife not)
          If the messages are not consumed by an other cluster's queue in a period of time, try to consume in this queue.
 void lBMessageGive(AgentId from, LBMessageGive not)
          load balancing message give by an other cluster queue. process ClientMessages, no need to check if sender is writer.
 void lBMessageHope(AgentId from, LBMessageHope not)
          load balancing message hope by the "from" queue.
protected  void messageDelivered(java.lang.String msgId)
          call in deliverMessages just after forward(msg), overload this method to process a specific treatment.
protected  void monitoringMsgSendToCluster(java.lang.String msgId)
           
 void postProcess(ClientMessages not)
          call factorCheck to evaluate the loading factor, activity, ... and send message to cluster if need.
 ClientMessages preProcess(AgentId from, ClientMessages not)
          overload preProcess(AgentId, ClientMessages) store all msgId in timeTable and visitTable.
 void receiveRequest(AgentId from, ReceiveRequest not)
          Method implementing the reaction to a ReceiveRequest instance, requesting a message.
 void removeQueueCluster(java.lang.String removeQueue)
          removeQueue leave the cluster.
protected  void sendToCluster(QueueClusterNot not)
          send to all queue in cluster.
 void setAutoEvalThreshold(boolean autoEvalThreshold)
           
 void setConsumThreshold(int consumThreshold)
           
 void setProducThreshold(int producThreshold)
           
 void setWaitAfterClusterReq(long waitAfterClusterReq)
           
 java.lang.Object specialAdminProcess(SpecialAdminRequest not)
          use to add or remove ClusterQueue to cluster.
private  void storeMsgIdInTimeTable(java.lang.String msgId, java.lang.Long date)
           
private  void storeMsgIdInVisitTable(java.lang.String msgId, AgentId destId)
           
 java.lang.String toString()
          Returns a string representation of this destination.
 void wakeUpNot(WakeUpNot not)
          wake up, and call factorCheck to evaluate the loading factor... if a message stays more than a period of time in timeTable, it is sent to an other (not visited) queue in cluster.
 
Methods inherited from class org.objectweb.joram.mom.dest.QueueImpl
abortReceiveRequest, acknowledgeRequest, addClientMessages, addMessage, browseRequest, checkDelivery, cleanPendingMessage, cleanPendingMessage, cleanWaitingRequest, cleanWaitingRequest, deliverMessages, denyRequest, doClientMessages, doDeleteNot, doRightRequest, doUnknownAgent, getDefaultDMQId, getDefaultThreshold, getDeliveredMessageCount, getMessage, getMessages, getMessagesView, getMsgTxPrefix, getNbMaxMsg, getNbMsgsReceiveSinceCreation, getPendingMessageCount, getThreshold, getWaitingRequestCount, handleAdminRequestNot, handleExpiredNot, isUndeliverable, isValidJMXAttribute, messageRemoved, readBag, setMsgTxName, setNbMaxMsg, setThreshold, storeMessage, storeMessageHeader, writeBag
 
Methods inherited from class org.objectweb.joram.mom.dest.DestinationImpl
canBeDeleted, clientMessages, delete, deleteNot, forward, getCreationDate, getCreationTimeInMillis, getDestinationId, getDMQAgentId, getDMQId, getId, getJMXStatistics, getName, getNbMsgsDeliverSinceCreation, getNbMsgsSentToDMQSinceCreation, getPeriod, getRight, getRights, getRights, handleDeniedMessage, isAdministrator, isFreeReading, isFreeWriting, isLocal, isReader, isWriter, processSetRight, replyToTopic, requestGroupNot, setAgent, setFreeReading, setFreeWriting, setNoSave, setPeriod, setRight, setSave, specialAdminRequest, unknownAgent
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 
Methods inherited from interface org.objectweb.joram.mom.dest.DestinationImplMBean
delete, getCreationDate, getCreationTimeInMillis, getDestinationId, getDMQId, getName, getNbMsgsDeliverSinceCreation, getNbMsgsSentToDMQSinceCreation, getPeriod, getRight, getRights, isFreeReading, isFreeWriting, setFreeReading, setFreeWriting, setPeriod
 

Field Detail

serialVersionUID

private static final long serialVersionUID
define serialVersionUID for interoperability

See Also:
Constant Field Values

clusters

protected java.util.Hashtable clusters
key = agentId of ClusterQueue value = rateOfFlow (Float)


loadingFactor

protected LoadingFactor loadingFactor
to evaluate the loading factor, overloading, ...


timeTable

private java.util.LinkedHashMap timeTable
key = msgId value = date


visitTable

private java.util.Hashtable visitTable
key = msgId value = List (alreadyVisit)


clusterDeliveryCount

private long clusterDeliveryCount
Number of message send to cluster


waitAfterClusterReq

private long waitAfterClusterReq
Waiting after a cluster request


timeThreshold

private long timeThreshold
Maximum period of time before forwarding a waiting message or request to other queues of the cluster. By default it is set to QueueImpl.period.

Constructor Detail

ClusterQueueImpl

public ClusterQueueImpl(AgentId adminId,
                        java.util.Properties prop)
Constructs a ClusterQueueImpl instance.

Parameters:
adminId - Identifier of the administrator of the queue.
prop - The initial set of properties.
Method Detail

initialize

public void initialize(boolean firstTime)
Initializes the destination.

Overrides:
initialize in class QueueImpl
Parameters:
firstTime - true when first called by the factory

toString

public java.lang.String toString()
Description copied from class: QueueImpl
Returns a string representation of this destination.

Specified by:
toString in interface DestinationImplMBean
Overrides:
toString in class QueueImpl

specialAdminProcess

public java.lang.Object specialAdminProcess(SpecialAdminRequest not)
                                     throws RequestException
use to add or remove ClusterQueue to cluster.

Overrides:
specialAdminProcess in class DestinationImpl
Parameters:
not -
Throws:
RequestException

doList

protected java.lang.Object doList(ListClusterQueue req)
return the cluster list.

Parameters:
req -
Returns:
the cluster list.

addQueueCluster

protected void addQueueCluster(java.lang.String joiningQueue,
                               float rateOfFlow)
send to joiningQueue a JoinQueueCluster not.

Parameters:
joiningQueue -
rateOfFlow -

broadcastLeave

protected void broadcastLeave(java.lang.String removeQueue)
broadcast to cluster the removeQueue.

Parameters:
removeQueue -

removeQueueCluster

public void removeQueueCluster(java.lang.String removeQueue)
removeQueue leave the cluster.

Parameters:
removeQueue -

preProcess

public ClientMessages preProcess(AgentId from,
                                 ClientMessages not)
overload preProcess(AgentId, ClientMessages) store all msgId in timeTable and visitTable.

Overrides:
preProcess in class DestinationImpl
Parameters:
from -
not -
Returns:
The incoming messages after processing.

postProcess

public void postProcess(ClientMessages not)
call factorCheck to evaluate the loading factor, activity, ... and send message to cluster if need.

Overrides:
postProcess in class DestinationImpl
Parameters:
not -

wakeUpNot

public void wakeUpNot(WakeUpNot not)
wake up, and call factorCheck to evaluate the loading factor... if a message stays more than a period of time in timeTable, it is sent to an other (not visited) queue in cluster.

Overrides:
wakeUpNot in class QueueImpl
Parameters:
not -

lBCycleLife

public void lBCycleLife(AgentId from,
                        LBCycleLife not)
If the messages are not consumed by an other cluster's queue in a period of time, try to consume in this queue. update visitTable, and process clientMessages.

Parameters:
from -
not -

joinQueueCluster

public void joinQueueCluster(JoinQueueCluster not)
new queue come in cluster, update clusters. and spread to clusters the AckjoiningQueue.

Parameters:
not - JoinQueueCluster

ackJoinQueueCluster

public void ackJoinQueueCluster(AckJoinQueueCluster not)
Parameters:
not - AckJoinQueueCluster

receiveRequest

public void receiveRequest(AgentId from,
                           ReceiveRequest not)
                    throws AccessException
Description copied from class: QueueImpl
Method implementing the reaction to a ReceiveRequest instance, requesting a message.

This method stores the request and launches a delivery sequence.

Overrides:
receiveRequest in class QueueImpl
Parameters:
not - ReceiveRequest
Throws:
AccessException - If the sender is not a reader.

lBMessageGive

public void lBMessageGive(AgentId from,
                          LBMessageGive not)
                   throws UnknownNotificationException
load balancing message give by an other cluster queue. process ClientMessages, no need to check if sender is writer.

Parameters:
from - AgentId
not - LBMessageGive
Throws:
UnknownNotificationException

lBMessageHope

public void lBMessageHope(AgentId from,
                          LBMessageHope not)
load balancing message hope by the "from" queue.

Parameters:
from -
not - LBMessageHope

getClientMessages

protected ClientMessages getClientMessages(int nb,
                                           java.lang.String selector,
                                           boolean remove)
get a client message contain nb messages. add cluster monitoring value.

Overrides:
getClientMessages in class QueueImpl
Parameters:
nb - number of messages returned in ClientMessage.
selector - jms selector
remove - delete all messages returned if true
Returns:
ClientMessages (contains nb Messages)

getQueueMessage

protected Message getQueueMessage(java.lang.String msgId,
                                  boolean remove)
get mom message, delete if remove = true. add cluster monitoring value.

Overrides:
getQueueMessage in class QueueImpl
Parameters:
msgId - message identification
remove - if true delete message
Returns:
mom message

sendToCluster

protected void sendToCluster(QueueClusterNot not)
send to all queue in cluster.

Parameters:
not -

getClusterDeliveryCount

public long getClusterDeliveryCount()
return the number of Message send to cluster.


storeMsgIdInTimeTable

private void storeMsgIdInTimeTable(java.lang.String msgId,
                                   java.lang.Long date)
Parameters:
msgId -
date -

storeMsgIdInVisitTable

private void storeMsgIdInVisitTable(java.lang.String msgId,
                                    AgentId destId)
Parameters:
msgId -
destId -

messageDelivered

protected void messageDelivered(java.lang.String msgId)
Description copied from class: QueueImpl
call in deliverMessages just after forward(msg), overload this method to process a specific treatment.

Overrides:
messageDelivered in class QueueImpl
Parameters:
msgId -

monitoringMsgSendToCluster

protected void monitoringMsgSendToCluster(java.lang.String msgId)
Parameters:
msgId -

setWaitAfterClusterReq

public void setWaitAfterClusterReq(long waitAfterClusterReq)
Parameters:
waitAfterClusterReq -

setProducThreshold

public void setProducThreshold(int producThreshold)
Parameters:
producThreshold -

setConsumThreshold

public void setConsumThreshold(int consumThreshold)
Parameters:
consumThreshold -

setAutoEvalThreshold

public void setAutoEvalThreshold(boolean autoEvalThreshold)
Parameters:
autoEvalThreshold -


Copyright © 2010 ScalAgent D.T.. All Rights Reserved.