class Engine extends Object implements Runnable, AgentEngine, EngineMBean
Engine
class provides multiprogramming of agents. It
realizes the program loop which successively gets the notifications from
the message queue and calls the relevant reaction function member of the
target agent. The engine's basic behaviour is:
While (true) { // get next message in channel Message msg = qin.get(); // get the agent to process event Agent agent = load(msg.to); // execute relevant reaction, all notification sent during this // reaction is inserted into persistent queue in order to processed // by the channel. agent.react(msg.from, msg.not); // save changes, then commit. <BEGIN TRANSACTION> qin.pop(); channel.dispatch(); agent.save(); <COMMIT TRANSACTION> }
The Engine
class ensures the atomic handling of an agent
reacting to a notification:
Two types of errors may occur: errors of first type are detected in the
source code and signaled by an Exception
; serious errors lead
to an Error
being raised then the engine exits. In the first
case the exception may be handled at any level, even partially. Most of
them are signaled up to the engine loop. Two cases are then distinguished
depending on the recovery policy:
recoveryPolicy
is set to RP_EXC_NOT
(default value) then the agent state and the message queue are restored
(ROLLBACK); an ExceptionNotification
notification is sent
to the sender and the engine may then proceed with next notification;
recoveryPolicy
is set to RP_EXIT
the engine
stops the agent server.
Modifier and Type | Class and Description |
---|---|
(package private) class |
Engine.EngineAverageLoadTask |
Modifier and Type | Field and Description |
---|---|
(package private) Agent |
agent
The current agent running.
|
boolean |
agentProfiling
Boolean value indicating if the agent profiling is on, by default false.
|
(package private) Hashtable<AgentId,Agent> |
agents
This table is used to maintain a list of agents already in memory
using the AgentId as primary key.
|
(package private) Engine.EngineAverageLoadTask |
averageLoadTask |
protected boolean |
canStop
Boolean variable used to stop the engine properly.
|
private long |
commitTime
Time consumed during reaction commit.
|
(package private) Vector<AgentId> |
fixedAgentIdList
Vector containing id's of all fixed agents.
|
protected boolean |
isRunning
Boolean variable used to stop the engine properly.
|
protected org.objectweb.util.monolog.api.Logger |
logmon |
private boolean |
modified
True if the timestamp is modified since last save.
|
protected Queue |
mq |
(package private) Message |
msg
The message in progress.
|
private String |
name |
(package private) int |
NbMaxAgents
Maximum number of memory loaded agents.
|
protected boolean |
needToBeCommited |
(package private) boolean |
noTxIfTransient
Flag to avoid transaction when not needed
|
(package private) long |
now
Virtual time counter use in FIFO swap-in/swap-out mechanisms.
|
private boolean |
persistentPush |
protected MessageQueue |
qin
Queue of messages to be delivered to local agents.
|
private long |
reactTime
Time consumed during agent's reaction.
|
(package private) int |
recoveryPolicy
recovery policy in case of exception in agent specific code.
|
(package private) static int |
RP_EXC_NOT
Send
ExceptionNotification notification in case of exception
in agent specific code. |
(package private) static int |
RP_EXIT
Stop agent server in case of exception in agent specific code.
|
(package private) static String[] |
rpStrings
String representations of
RP_* constant values
for the recoveryPolicy variable. |
private int |
stamp
Logical timestamp information for messages in "local" domain.
|
private byte[] |
stampBuf
Buffer used to optimize
|
(package private) EngineThread |
thread
The active component of this engine.
|
protected long |
timeout |
Modifier | Constructor and Description |
---|---|
protected |
Engine()
Initializes a new
Engine object (can only be used by
subclasses). |
Modifier and Type | Method and Description |
---|---|
(package private) void |
abort(Exception exc)
Abort the agent reaction in case of error during execution.
|
(package private) void |
addFixedAgentId(AgentId id)
Adds an
AgentId in the fixedAgentIdList
Vector . |
(package private) void |
clean()
Cleans the Channel queue of all pushed notifications.
|
(package private) void |
commit()
Commit the agent reaction in case of right termination:
suppress the processed notification from message queue,
then deletes it ;
push all new notifications in qin and qout, and saves them ;
saves the agent state ;
then commit the transaction to validate all changes.
|
(package private) void |
createAgent(Agent agent)
Creates and initializes an agent.
|
void |
createAgent(AgentId id,
Agent agent)
Creates and initializes an agent.
|
void |
delete()
This operation always throws an IllegalStateException.
|
void |
deleteAgent(AgentId from)
Deletes an agent.
|
(package private) void |
dispatch()
|
String |
dumpAgent(AgentId id)
Returns a string representation of the specified agent.
|
String |
dumpAgent(String id)
Returns a string representation of the specified agent.
|
(package private) void |
garbage()
The
garbage method should be called regularly , to swap out
from memory all the agents which have not been accessed for a time. |
float |
getAverageLoad1()
Returns the load averages for the last minute.
|
float |
getAverageLoad15()
Returns the load averages for the past 15 minutes.
|
float |
getAverageLoad5()
Returns the load averages for the past 5 minutes.
|
long |
getCommitTime()
Returns the total commit time calculated for this engine.
|
String |
getDomainName()
Returns the corresponding domain's name.
|
(package private) AgentId[] |
getLoadedAgentIdlist()
Method used for debug and monitoring.
|
String |
getName()
Returns this
Engine 's name. |
int |
getNbAgents()
Returns the number of agents actually loaded in memory.
|
int |
getNbFixedAgents()
Returns the number of fixed agents.
|
int |
getNbMaxAgents()
Returns the maximum number of agents loaded in memory.
|
int |
getNbMessages()
Gets the number of messages posted to this engine since creation.
|
long |
getNbReactions()
Returns the number of agent's reaction since last boot.
|
int |
getNbWaitingMessages()
Gets the number of waiting messages in this engine.
|
MessageQueue |
getQueue()
Get this engine's
MessageQueue qin. |
long |
getReactTime()
Returns the total reaction time calculated for this engine.
|
protected int |
getStamp() |
void |
init(AgentEngineContext agentEngineContext)
Initializes the engine.
|
void |
insert(Message msg)
Insert a message in the
MessageQueue . |
boolean |
isAgentProfiling()
Returns true if the agent profiling is on.
|
boolean |
isEngineThread()
Checks if the current thread calling this method
belongs to the engine.
|
boolean |
isNoTxIfTransient()
Returns the flag to avoid transactions.
|
boolean |
isRunning()
Tests if the engine is alive.
|
(package private) Agent |
load(AgentId id)
The
load method return the Agent object
designed by the AgentId parameter. |
(package private) static Engine |
newInstance()
Creates a new instance of Engine (real class depends of server type).
|
protected void |
onTimeOut() |
void |
post(Message msg)
Adds a message in "ready to deliver" list.
|
void |
postAndValidate(Message msg)
Posts a message and validates it at the same time.
|
void |
push(AgentId from,
AgentId to,
Notification not)
Push a new message in temporary queue until the end of current reaction.
|
void |
push(AgentId to,
Notification not)
Push a new message in temporary queue until the end of current reaction.
|
(package private) Agent |
reload(AgentId id)
The
reload method return the Agent object
loaded from the storage. |
(package private) void |
removeFixedAgentId(AgentId id)
Removes an
AgentId in the fixedAgentIdList
Vector . |
String |
report()
Returns a report about the distribution of messages type in queue.
|
void |
resetAverageLoad() |
void |
resetCommitTime()
reset the commitTime
|
void |
resetReactTime()
reset the reactTime
|
void |
resetTimer()
Reset reactTime and commitTime
|
void |
restore()
Restores logical clock information from persistent storage.
|
void |
run()
Main loop of agent server
Engine . |
void |
save()
Saves logical clock information to persistent storage.
|
void |
setAgentProfiling(boolean agentProfiling)
Sets the agent profiling.
|
void |
setNbMaxAgents(int NbMaxAgents)
Sets the maximum number of agents that can be loaded simultaneously
in memory.
|
protected void |
setStamp(int stamp) |
protected void |
stamp(Message msg) |
void |
start()
Causes this engine to begin execution.
|
void |
stop()
Forces the engine to stop executing.
|
(package private) void |
terminate() |
String |
toString()
Returns a string representation of this engine.
|
void |
validate()
Validates all messages pushed in queue during transaction session.
|
protected MessageQueue qin
protected volatile boolean isRunning
protected volatile boolean canStop
isRunning
variable between
each reaction)private int stamp
private byte[] stampBuf
private boolean modified
Hashtable<AgentId,Agent> agents
long now
int NbMaxAgents
boolean noTxIfTransient
Agent agent
Message msg
EngineThread thread
static final int RP_EXC_NOT
ExceptionNotification
notification in case of exception
in agent specific code.
Constant value for the recoveryPolicy
variable.static final int RP_EXIT
recoveryPolicy
variable.static final String[] rpStrings
RP_*
constant values
for the recoveryPolicy
variable.int recoveryPolicy
RP_EXC_NOT
.private String name
protected Queue mq
private boolean persistentPush
protected org.objectweb.util.monolog.api.Logger logmon
protected boolean needToBeCommited
protected long timeout
public boolean agentProfiling
private long reactTime
private long commitTime
Engine.EngineAverageLoadTask averageLoadTask
public long getNbReactions()
getNbReactions
in interface EngineMBean
public int getNbMaxAgents()
getNbMaxAgents
in interface EngineMBean
public void setNbMaxAgents(int NbMaxAgents)
setNbMaxAgents
in interface EngineMBean
NbMaxAgents
- the maximum number of agentspublic int getNbAgents()
getNbAgents
in interface EngineMBean
public int getNbMessages()
getNbMessages
in interface EngineMBean
public int getNbWaitingMessages()
getNbWaitingMessages
in interface AgentEngine
getNbWaitingMessages
in interface EngineMBean
public int getNbFixedAgents()
getNbFixedAgents
in interface EngineMBean
public boolean isNoTxIfTransient()
isNoTxIfTransient
in interface AgentEngine
isNoTxIfTransient
in interface EngineMBean
public final String getName()
Engine
's name.getName
in interface EngineMBean
getName
in interface MessageConsumer
Engine
's name.public final String getDomainName()
getDomainName
in interface MessageConsumer
static Engine newInstance() throws Exception
engine
's instance.Exception
public final void push(AgentId from, AgentId to, Notification not)
push
in interface AgentEngine
from
- the sourceto
- the destinationnot
- the notification to pushpublic final void push(AgentId to, Notification not)
push
in interface AgentEngine
to
- the destinationnot
- the notification to pushfinal void dispatch() throws Exception
MessageConsumer
:
Engine
component and
Network
components.Handle persistent information in respect with engine transaction.
IOException
- error when accessing the local persistent
storage.Exception
final void clean()
public void init(AgentEngineContext agentEngineContext) throws Exception
AgentEngine
AgentEngineContext
parameter should be for the unique private use of this
AgentEngine
and should not be shared
with any other components otherwise
the agent server security would be broken.init
in interface AgentEngine
agentEngineContext
- context enabling this
AgentEngine
to invoke operations
that cannot be accessed outside of the
fr.dyade.aaa.agent
package.
The AgentEngineContext
should be for the unique private use of this
AgentEngine
and should not be shared
with any other components otherwise
the agent server security would be broken.Exception
void terminate()
public final void createAgent(AgentId id, Agent agent) throws Exception
createAgent
in interface AgentEngine
agent
- agent object to createException
- unspecialized exceptionfinal void createAgent(Agent agent) throws Exception
agent
- agent object to createException
- unspecialized exceptionpublic void deleteAgent(AgentId from) throws Exception
deleteAgent
in interface AgentEngine
agent
- agent to deleteException
void garbage()
garbage
method should be called regularly , to swap out
from memory all the agents which have not been accessed for a time.void removeFixedAgentId(AgentId id) throws IOException
AgentId
in the fixedAgentIdList
Vector
.id
- the AgentId
of no more used fixed agent.IOException
void addFixedAgentId(AgentId id) throws IOException
AgentId
in the fixedAgentIdList
Vector
.id
- the AgentId
of new fixed agent.IOException
AgentId[] getLoadedAgentIdlist()
public String dumpAgent(String id) throws Exception
dumpAgent
in interface EngineMBean
id
- The string representation of the agent's unique identification.Exception
dumpAgent(AgentId)
public String dumpAgent(AgentId id) throws IOException, ClassNotFoundException
dumpAgent
in interface AgentEngine
id
- The agent's unique identification.IOException
ClassNotFoundException
final Agent load(AgentId id) throws IOException, ClassNotFoundException, Exception
load
method return the Agent
object
designed by the AgentId
parameter. If the Agent
object is not already present in the server memory, it is loaded from
the storage.
Be careful, if the save method can be overloaded to optimize the save
process, the load procedure used by engine is always load.id
- The agent identification.IOException
- If an I/O error occurs.ClassNotFoundException
- Should never happen (the agent has already been loaded in deploy).UnknownAgentException
- There is no corresponding agent on secondary storage.Exception
- when executing class specific initializationfinal Agent reload(AgentId id) throws IOException, ClassNotFoundException, Exception
reload
method return the Agent
object
loaded from the storage.id
- The agent identification.IOException
- when accessing the stored imageClassNotFoundException
- if the stored image class may not be foundException
- unspecialized exceptionpublic void insert(Message msg)
MessageQueue
.
This method is used during initialization to restore the component
state from persistent storage.insert
in interface MessageConsumer
msg
- the messagepublic void validate()
validate
in interface MessageConsumer
public void start()
start
in interface EngineMBean
start
in interface MessageConsumer
stop
public void stop()
stop
in interface EngineMBean
stop
in interface MessageConsumer
start
public MessageQueue getQueue()
MessageQueue
qin.getQueue
in interface MessageConsumer
Engine
's queue.public boolean isRunning()
isRunning
in interface EngineMBean
isRunning
in interface MessageConsumer
MessageConsumer
is alive; false
otherwise.public void save() throws IOException
save
in interface MessageConsumer
IOException
public void restore() throws Exception
restore
in interface MessageConsumer
Exception
public void delete() throws IllegalStateException
delete
in interface MessageConsumer
IllegalStateException
Transaction
protected final int getStamp()
protected final void setStamp(int stamp)
protected final void stamp(Message msg)
public void post(Message msg) throws Exception
post
in interface MessageConsumer
Exception
public void postAndValidate(Message msg) throws Exception
postAndValidate
in interface MessageConsumer
Exception
public boolean isAgentProfiling()
isAgentProfiling
in interface AgentEngine
isAgentProfiling
in interface EngineMBean
EngineMBean.isAgentProfiling()
public void setAgentProfiling(boolean agentProfiling)
setAgentProfiling
in interface AgentEngine
setAgentProfiling
in interface EngineMBean
EngineMBean.setAgentProfiling(boolean)
public long getReactTime()
EngineMBean
getReactTime
in interface AgentEngine
getReactTime
in interface EngineMBean
public void resetReactTime()
resetReactTime
in interface EngineMBean
public long getCommitTime()
EngineMBean
getCommitTime
in interface AgentEngine
getCommitTime
in interface EngineMBean
public void resetCommitTime()
resetCommitTime
in interface EngineMBean
public void resetTimer()
resetTimer
in interface EngineMBean
void commit() throws Exception
Exception
void abort(Exception exc) throws Exception
Exception
public void resetAverageLoad()
resetAverageLoad
in interface AgentEngine
public float getAverageLoad1()
getAverageLoad1
in interface AgentEngine
getAverageLoad1
in interface EngineMBean
getAverageLoad1
in interface MessageConsumer
public float getAverageLoad5()
getAverageLoad5
in interface AgentEngine
getAverageLoad5
in interface EngineMBean
getAverageLoad5
in interface MessageConsumer
public float getAverageLoad15()
getAverageLoad15
in interface AgentEngine
getAverageLoad15
in interface EngineMBean
getAverageLoad15
in interface MessageConsumer
public String report()
report
in interface EngineMBean
public String toString()
toString
in interface EngineMBean
toString
in class Object
public boolean isEngineThread()
isEngineThread
in interface AgentEngine
Copyright © 2013 ScalAgent D.T.. All Rights Reserved.