public class ClusterQueue extends Queue implements ClusterQueueMBean
ClusterQueue
class implements the cluster queue behavior.Queue.QueueFactory
Modifier and Type | Field and Description |
---|---|
private long |
clusterDeliveryCount
Number of message send to cluster
|
protected Map |
clusters
key = agentId of ClusterQueue
value = rateOfFlow (Float)
|
static boolean |
DEFAULT_AUTO_EVAL_THRESHOLD |
static int |
DEFAULT_CONSUM_THRESHOLD |
static int |
DEFAULT_PRODUC_THRESHOLD |
static long |
DEFAULT_TIME_THRESHOLD |
static long |
DEFAULT_WAIT_AFTER_CLUSTER_REQ |
protected LoadingFactor |
loadingFactor
to evaluate the loading factor, overloading, ...
|
(package private) int |
receivedFromCluster |
(package private) int |
sentToCluster |
private static long |
serialVersionUID
define serialVersionUID for interoperability
|
private Map |
timeTable
key = msgId
value = date
|
private long |
timeThreshold
Maximum period of time before forwarding a waiting message or request to
other queues of the cluster.
|
private Map |
visitTable
key = msgId value = List (alreadyVisit)
|
ARRIVAL_STATE_PREFIX, arrivalState, cload, defaultDMQId, defaultRedeliveryDelay, defaultThreshold, DELIVERY_TABLE_PREFIX, DELIVERY_TIME_TABLE_PREFIX, deliveryTable, deliveryTimeTable, hcons, hprod, logger, logmsg, messages, msgTxPrefix, msgTxPrefixLength, nbMaxMsg, nbMsgsDeniedSinceCreation, pload, receiving, REDELIVERY_DELAY, requests
_rights, clients, creationDate, dmqId, freeReading, freeWriting, nbMsgsDeliverSinceCreation, nbMsgsSentToDMQSinceCreation, READ, READWRITE, strbuf, task, WRITE
agentProfiling, emptyString, fixed, logmon
BOOLEAN_ENCODED_SIZE, BYTE_ENCODED_SIZE, DOUBLE_ENCODED_SIZE, FLOAT_ENCODED_SIZE, INT_ENCODED_SIZE, LONG_ENCODED_SIZE, SHORT_ENCODED_SIZE
Constructor and Description |
---|
ClusterQueue() |
Modifier and Type | Method and Description |
---|---|
private void |
clusterAdd(FwdAdminRequestNot req,
String joiningQueue)
Reaction to the request of adding a new cluster element.
|
private void |
clusterJoin(ClusterJoinNot not)
Method implementing the reaction to a
ClusterJoinNot notification,
sent by a fellow queue for notifying this queue to join the cluster, doing
a transitive closure of clusters, if any. |
private void |
clusterJoinAck(ClusterJoinAck not)
Method implementing the reaction to a
ClusterJoinAck notification,
doing a transitive closure with the current cluster and the one of the new
cluster element. |
private void |
clusterLeave()
Ask this queue to leave the cluster.
|
private List |
clusterList()
Returns the cluster list.
|
private void |
clusterRemove(AgentId queue)
Remove the specified queue from current cluster.
|
protected void |
doDeleteNot(DeleteNot not)
Method specifically processing a
fr.dyade.aaa.agent.DeleteNot instance. |
protected void |
doUnknownAgent(UnknownAgent uA)
Method specifically processing an
UnknownAgent instance. |
protected ClientMessages |
getClientMessages(int nb,
String selector,
boolean remove)
get a client message contain nb messages.
|
long |
getClusterDeliveryCount()
return the number of Message send to cluster.
|
String[] |
getClusterElements() |
String |
getConsumerStatus()
Gets consumer status (NO, NORMAL, HIGH).
|
int |
getConsumThreshold()
Gets the number of pending "receive" requests above which a queue will
request messages from the other queues of the cluster.
|
int |
getEncodableClassId()
Enables the sub classes not to implement this method.
|
String |
getProducerStatus()
Gets producer status (NO, NORMAL, HIGH).
|
int |
getProducThreshold()
Gets the number of messages above which a queue is considered loaded.
|
protected Message |
getQueueMessage(String msgId,
boolean remove)
get mom message, delete if remove = true.
|
float |
getRateOfFlow()
Gets an evaluation of the flow of messages handled by the queue.
|
int |
getReceivedFromCluster() |
int |
getSentToCluster() |
String |
getStatus()
Gets the status of the queue (RUN, INIT or WAIT).
|
long |
getWaitAfterClusterReq()
Gets the time (in ms) during which a queue which requested something from
the cluster is not authorized to do it again.
|
void |
handleAdminRequestNot(AgentId from,
FwdAdminRequestNot not) |
void |
initialize(boolean firstTime)
Initializes the destination.
|
boolean |
isAutoEvalThreshold()
True if an automatic reevaluation of the queues' thresholds values is
allowed according to their activity.
|
private void |
lBCycleLife(AgentId from,
LBCycleLife not)
If the messages are not consumed by an other cluster's queue
in a period of time, try to consume in this queue.
|
private void |
lBMessageGive(AgentId from,
LBMessageGive not)
load balancing message give by an other cluster queue. process
ClientMessages, no need to check if sender is writer.
|
private void |
lBMessageHope(AgentId from,
LBMessageHope not)
load balancing message hope by the "from" queue.
|
protected void |
messageDelivered(String msgId)
call in deliverMessages just after forward(msg),
overload this method to process a specific treatment.
|
protected void |
monitoringMsgSendToCluster(String msgId) |
void |
postProcess(ClientMessages not)
call factorCheck to evaluate the loading factor,
activity, ... and send message to cluster if need.
|
ClientMessages |
preProcess(AgentId from,
ClientMessages not)
overload preProcess(AgentId, ClientMessages)
store all msgId in timeTable and visitTable.
|
void |
react(AgentId from,
Notification not)
Distributes the received notifications to the appropriate reactions.
|
void |
receiveRequest(AgentId from,
ReceiveRequest not)
Method implementing the reaction to a
ReceiveRequest
instance, requesting a message. |
protected void |
sendToCluster(Notification not)
Sends a notification to all queue in cluster.
|
void |
setAutoEvalThreshold(boolean autoEvalThreshold)
Automatic reevaluation of the queues' thresholds can be done according to
their activity.
|
void |
setConsumThreshold(int consumThreshold)
Sets the number of pending "receive" requests above which a queue will
request messages from the other queues of the cluster.
|
void |
setProducThreshold(int producThreshold)
Sets the number of messages above which a queue is considered loaded.
|
void |
setProperties(Properties prop,
boolean firstTime)
Configures a
ClusterQueue instance. |
void |
setWaitAfterClusterReq(long waitAfterClusterReq)
Sets the time (in ms) during which a queue which requested something from
the cluster is not authorized to do it again.
|
private void |
storeMsgIdInTimeTable(String msgId,
Long date) |
private void |
storeMsgIdInVisitTable(String msgId,
AgentId destId) |
String |
toString()
Returns a string representation of this destination.
|
void |
wakeUpNot(WakeUpNot not)
wake up, and call factorCheck to evaluate the loading factor... if a message
stays more than a period of time in timeTable, it is sent to an other (not
visited) queue in cluster.
|
abortReceiveRequest, acknowledgeRequest, addClientMessages, addDeliveryTimeMessage, addMessage, agentSave, browseRequest, checkDelivery, cleanPendingMessage, cleanPendingMessage, cleanWaitingRequest, cleanWaitingRequest, decode, deliverMessages, denyRequest, doClientMessages, doRightRequest, encode, finalize, getConsumerLoad, getDefaultDMQId, getDefaultRedeliveryDelay, getDefaultThreshold, getDeliveredMessageCount, getDeliveryTimeMessageCount, getEncodedSize, getMessage, getMessages, getMessagesView, getMsgTxPrefix, getNbMaxMsg, getNbMsgsDeliverSinceCreation, getNbMsgsDeniedSinceCreation, getNbMsgsReceiveSinceCreation, getPendingMessageCount, getProducerLoad, getRedeliveryDelay, getStats, getThreshold, getType, getWaitingRequestCount, handleExpiredNot, isSyncExceptionOnFullDest, isUndeliverable, isValidJMXAttribute, messageRemoved, processDeliveryTime, removeMessages, setDefaultRedeliveryDelay, setMsgTxName, setNbMaxMsg, setRedeliveryDelay, setSyncExceptionOnFullDest, setThreshold, storeMessage, storeMessageHeader
agentFinalize, agentInitialize, clientMessages, delete, deleteNot, forward, getCreationDate, getCreationTimeInMillis, getDestinationId, getDMQAgentId, getDMQId, getJMXStatistics, getMBeanName, getNbMsgsSentToDMQSinceCreation, getPeriod, getRight, getRights, getRights, handleDeniedMessage, interceptorsAvailable, isAdministrator, isFreeReading, isFreeWriting, isLocal, isReader, isWriter, processAdminCommand, processInterceptors, processSetRight, processStartHandler, processStopHandler, replyToTopic, requestGroupNot, setAdminId, setFreeReading, setFreeWriting, setPeriod, setRight, unknownAgent
delete, delete, deploy, deploy, getAgentId, getCommitTime, getId, getLogTopic, getName, getReactNb, getReactTime, hasName, isAgentProfiling, isDeployed, isFixed, isUpdated, needToBeCommited, resetCommitTime, resetReactTime, resetTimer, save, sendTo, sendTo, sendTo, setAgentProfiling, setName, setNoSave, setSave
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
cleanPendingMessage, cleanWaitingRequest, getConsumerLoad, getDeliveredMessageCount, getMessage, getMessages, getMessagesView, getNbMaxMsg, getNbMsgsDeniedSinceCreation, getPendingMessageCount, getProducerLoad, getRedeliveryDelay, getThreshold, getWaitingRequestCount, setNbMaxMsg, setRedeliveryDelay, setThreshold
delete, getCreationDate, getCreationTimeInMillis, getDestinationId, getDMQId, getName, getNbMsgsDeliverSinceCreation, getNbMsgsReceiveSinceCreation, getNbMsgsSentToDMQSinceCreation, getPeriod, getRight, getRights, isFreeReading, isFreeWriting, setFreeReading, setFreeWriting, setPeriod
getAgentId, getCommitTime, getReactNb, getReactTime, isAgentProfiling, isFixed, resetCommitTime, resetReactTime, resetTimer, setAgentProfiling
private static final long serialVersionUID
protected Map clusters
protected LoadingFactor loadingFactor
private Map timeTable
private Map visitTable
private long clusterDeliveryCount
private long timeThreshold
Queue.period
.public static final int DEFAULT_PRODUC_THRESHOLD
public static final int DEFAULT_CONSUM_THRESHOLD
public static final boolean DEFAULT_AUTO_EVAL_THRESHOLD
public static final long DEFAULT_WAIT_AFTER_CLUSTER_REQ
public static final long DEFAULT_TIME_THRESHOLD
int receivedFromCluster
int sentToCluster
public void setProperties(Properties prop, boolean firstTime) throws Exception
ClusterQueue
instance.setProperties
in class Queue
prop
- The initial set of properties.Exception
public void initialize(boolean firstTime) throws Exception
initialize
in class Queue
firstTime
- true when first called by the factoryException
public void handleAdminRequestNot(AgentId from, FwdAdminRequestNot not)
public void react(AgentId from, Notification not) throws Exception
public String toString()
Queue
toString
in interface AgentMBean
toString
in interface DestinationMBean
toString
in class Queue
private void clusterAdd(FwdAdminRequestNot req, String joiningQueue)
private void clusterJoin(ClusterJoinNot not)
ClusterJoinNot
notification,
sent by a fellow queue for notifying this queue to join the cluster, doing
a transitive closure of clusters, if any.private void clusterJoinAck(ClusterJoinAck not)
ClusterJoinAck
notification,
doing a transitive closure with the current cluster and the one of the new
cluster element.private List clusterList()
public String[] getClusterElements()
getClusterElements
in interface ClusterDestinationMBean
private void clusterLeave()
private void clusterRemove(AgentId queue)
queue
- The queue which left the clusterpublic ClientMessages preProcess(AgentId from, ClientMessages not)
preProcess
in class Destination
from
- not
- public void postProcess(ClientMessages not)
postProcess
in class Destination
not
- public void wakeUpNot(WakeUpNot not)
private void lBCycleLife(AgentId from, LBCycleLife not)
from
- not
- public void receiveRequest(AgentId from, ReceiveRequest not) throws AccessException
Queue
ReceiveRequest
instance, requesting a message.
This method stores the request and launches a delivery sequence.
receiveRequest
in class Queue
not
- ReceiveRequestAccessException
- If the sender is not a reader.private void lBMessageGive(AgentId from, LBMessageGive not) throws UnknownNotificationException
from
- AgentIdnot
- LBMessageGiveUnknownNotificationException
private void lBMessageHope(AgentId from, LBMessageHope not)
from
- not
- LBMessageHopeprotected ClientMessages getClientMessages(int nb, String selector, boolean remove)
getClientMessages
in class Queue
nb
- number of messages returned in ClientMessage.selector
- jms selectorremove
- delete all messages returned if trueprotected Message getQueueMessage(String msgId, boolean remove)
getQueueMessage
in class Queue
msgId
- message identificationremove
- if true delete messageprotected void sendToCluster(Notification not)
not
- The notification to send.protected void doDeleteNot(DeleteNot not)
Queue
fr.dyade.aaa.agent.DeleteNot
instance.
ExceptionReply
replies are sent to the pending receivers,
and the remaining messages are sent to the DMQ and deleted.
doDeleteNot
in class Queue
protected void doUnknownAgent(UnknownAgent uA)
Queue
UnknownAgent
instance.
The specific processing is done when a QueueMsgReply
was
sent to a requester which does not exist anymore. In that case, the
messages sent to this requester and not yet acknowledged are marked as
"denied" for delivery to an other requester, and a new delivery sequence
is launched. Messages considered as undeliverable are removed and sent to
the DMQ.
doUnknownAgent
in class Queue
public long getClusterDeliveryCount()
private void storeMsgIdInTimeTable(String msgId, Long date)
msgId
- date
- private void storeMsgIdInVisitTable(String msgId, AgentId destId)
msgId
- destId
- protected void messageDelivered(String msgId)
Queue
messageDelivered
in class Queue
msgId
- protected void monitoringMsgSendToCluster(String msgId)
msgId
- public void setWaitAfterClusterReq(long waitAfterClusterReq)
ClusterQueueMBean
setWaitAfterClusterReq
in interface ClusterQueueMBean
waitAfterClusterReq
- public void setProducThreshold(int producThreshold)
ClusterQueueMBean
setProducThreshold
in interface ClusterQueueMBean
producThreshold
- public void setConsumThreshold(int consumThreshold)
ClusterQueueMBean
setConsumThreshold
in interface ClusterQueueMBean
consumThreshold
- public void setAutoEvalThreshold(boolean autoEvalThreshold)
ClusterQueueMBean
setAutoEvalThreshold
in interface ClusterQueueMBean
autoEvalThreshold
- public int getProducThreshold()
ClusterQueueMBean
getProducThreshold
in interface ClusterQueueMBean
public int getConsumThreshold()
ClusterQueueMBean
getConsumThreshold
in interface ClusterQueueMBean
public boolean isAutoEvalThreshold()
ClusterQueueMBean
isAutoEvalThreshold
in interface ClusterQueueMBean
public long getWaitAfterClusterReq()
ClusterQueueMBean
getWaitAfterClusterReq
in interface ClusterQueueMBean
public float getRateOfFlow()
ClusterQueueMBean
getRateOfFlow
in interface ClusterQueueMBean
public String getStatus()
ClusterQueueMBean
getStatus
in interface ClusterQueueMBean
LoadingFactor.Status
public String getConsumerStatus()
ClusterQueueMBean
getConsumerStatus
in interface ClusterQueueMBean
LoadingFactor.ConsumerStatus
public String getProducerStatus()
ClusterQueueMBean
getProducerStatus
in interface ClusterQueueMBean
LoadingFactor.ProducerStatus
public int getReceivedFromCluster()
getReceivedFromCluster
in interface ClusterQueueMBean
public int getSentToCluster()
getSentToCluster
in interface ClusterQueueMBean
public int getEncodableClassId()
Agent
getEncodableClassId
in interface Encodable
getEncodableClassId
in class Queue
Copyright © 2018 ScalAgent D.T.. All Rights Reserved.