00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 package org.openmobileis.synchro.openmsp.server.synctarget;
00026
00027 import java.io.ByteArrayOutputStream;
00028 import java.io.IOException;
00029 import java.io.OutputStreamWriter;
00030 import java.io.Writer;
00031
00032 import org.openmobileis.common.context.SessionContext;
00033 import org.openmobileis.common.context.SessionContextManager;
00034 import org.openmobileis.common.user.UserNotFoundException;
00035 import org.openmobileis.common.util.codec.GeneralCoder;
00036 import org.openmobileis.common.util.collection.Array;
00037 import org.openmobileis.common.util.exception.ServiceException;
00038 import org.openmobileis.common.util.log.LogManager;
00039 import org.openmobileis.synchro.algo.replication.AtomicObjectArrayReplication;
00040 import org.openmobileis.synchro.algo.replication.ReplicationReturnData;
00041 import org.openmobileis.synchro.algo.replication.ReplicationSAOEvent;
00042 import org.openmobileis.synchro.algo.replication.SynchroAtomicObject;
00043 import org.openmobileis.synchro.algo.replication.SynchroConflicResolver;
00044 import org.openmobileis.synchro.algo.syncnumber.SyncNumberManager;
00045 import org.openmobileis.synchro.openmsp.OpenMSPException;
00046 import org.openmobileis.synchro.openmsp.client.db.DBImportFileCoder;
00047 import org.openmobileis.synchro.openmsp.protocol.AbstractCommand;
00048 import org.openmobileis.synchro.openmsp.protocol.Command;
00049 import org.openmobileis.synchro.openmsp.protocol.ContainerMessage;
00050 import org.openmobileis.synchro.openmsp.protocol.DataItem;
00051 import org.openmobileis.synchro.openmsp.protocol.Element;
00052 import org.openmobileis.synchro.openmsp.protocol.Result;
00053 import org.openmobileis.synchro.openmsp.protocol.Status;
00054 import org.openmobileis.synchro.openmsp.server.util.MemoryFile;
00055 import org.openmobileis.synchro.openmsp.server.util.OpenMISFile;
00056 import org.openmobileis.synchro.openmsp.server.util.ZipEntryMemoryFile;
00057 import org.openmobileis.synchro.security.auth.Credential;
00058
00067 public abstract class OpenMSPDBSynchroTargetListener implements OpenMSPSynchroTargetListener {
00068
00072 public OpenMSPDBSynchroTargetListener() {
00073 super();
00074 }
00075
00076 public SyncTargetAnswer processCommand(Credential cred, ContainerMessage containerMessage) throws OpenMSPException {
00077 SessionContext context = SessionContextManager.getManager().getSessionContext();
00078 AbstractCommand syncCommand = (AbstractCommand) containerMessage.getElement();
00079 int commandType = syncCommand.getElementType();
00080 if (commandType == Element.GET) {
00081
00082
00083
00084
00085 context.setAttribute("openMSP getCommand %%", syncCommand);
00086 SyncTargetAnswer answer = new SyncTargetAnswer();
00087 Status status = new Status(syncCommand.getCmdId(), Status.STATUS_NO_CONTENT);
00088 status.setCmdRef(syncCommand.getCmdId());
00089 ContainerMessage[] returnContainer = new ContainerMessage[1];
00090 returnContainer[0] = new ContainerMessage(status);
00091 answer.containerMessage = returnContainer;
00092 return answer;
00093 } else if (commandType != Element.SYNC) {
00094 SyncTargetAnswer answer = new SyncTargetAnswer();
00095 Status status = new Status(syncCommand.getCmdId(), Status.STATUS_WRONG_FORMAT);
00096 status.setCmdRef(syncCommand.getCmdId());
00097 ContainerMessage[] returnContainer = new ContainerMessage[1];
00098 returnContainer[0] = new ContainerMessage(status);
00099 answer.containerMessage = returnContainer;
00100 return answer;
00101 }
00102
00103
00104 try {
00105 this.connect(cred);
00106 try {
00107
00108 Array returnList = new Array(5);
00109 boolean doClientUpdate = true;
00110
00111 Command sync = (Command) containerMessage.getElement();
00112 this.processSyncActionMetaData(sync.getMetaInformation());
00113 long sessionID = sync.getSourceSessionID();
00114 Array clientsaoList = new Array(10);
00115 AbstractCommand getCommand = (AbstractCommand)context.getAttribute("openMSP getCommand %%");
00116 context.removeAttribute("openMSP getCommand %%");
00117
00118 while (containerMessage.hasMoreMessage()) {
00119 ContainerMessage commandContainer = containerMessage.nextMessage();
00120 Element command = commandContainer.getElement();
00121 try {
00122 short actiontype = -1;
00123 DataItem item = (DataItem) commandContainer.nextMessage().getElement();
00124 String meta = new String(GeneralCoder.decodeBase64(item.getMetaInformation().getBytes()));
00125 String dataUID = "";
00126 if (meta.length() >0) dataUID = meta.substring(6, meta.length());
00127 Object data = null;
00128
00129 if (command.getElementType() == Element.ADD) {
00130 actiontype = SynchroAtomicObject.ADD;
00131
00132 data = DBImportFileCoder.getCoder().decodeData(new String(GeneralCoder.decodeBase64(item.getData().getBytes())));
00133 } else if (command.getElementType() == Element.DELETE) {
00134 actiontype = SynchroAtomicObject.DELETE;
00135 data = dataUID;
00136 } else if (command.getElementType() == Element.REPLACE) {
00137
00138 actiontype = SynchroAtomicObject.REPLACE;
00139 data = DBImportFileCoder.getCoder().decodeData(new String(GeneralCoder.decodeBase64(item.getData().getBytes())));
00140 }
00141 if (actiontype != -1) {
00142 SyncDBSynchroAtomicObject SAO = new SyncDBSynchroAtomicObject(dataUID, "");
00143 SAO.setModificationType(actiontype);
00144 SAO.setModifSyncNumber(sessionID);
00145 SAO.setSyncdata(data);
00146 SAO.setOpenMSPcmdId(command.getCmdId());
00147 clientsaoList.add(SAO);
00148 }
00149 } catch (Throwable ex) {
00150 LogManager.traceError(0, ex);
00151 Status status = new Status(command.getCmdId(), Status.STATUS_FAILED);
00152 status.setCmdRef(command.getCmdId());
00153 ContainerMessage statusContainer = new ContainerMessage(status);
00154 returnList.add(statusContainer);
00155 doClientUpdate = false;
00156 }
00157 }
00158
00159
00160 AtomicObjectArrayReplication replication = new AtomicObjectArrayReplication(this.getSynchroConflicResolver());
00161 long newSessionId = SyncNumberManager.getManager().getNextSynchroNumber().getSynchroNumber();
00162 SynchroAtomicObject[] clientmodificationsoa = this.getAllModifiedAtomicObjectSince(sessionID);
00163 SynchroAtomicObject[] servermodificationsoa = new SynchroAtomicObject[clientsaoList.size()];
00164 clientsaoList.toArray(servermodificationsoa);
00165 ReplicationReturnData replicationData = replication.replicates(clientmodificationsoa, servermodificationsoa);
00166
00167
00168 ReplicationSAOEvent[] serverSideUpdates = replicationData.getServerSideModificationList();
00169 int clsize = serverSideUpdates.length;
00170 for (int i = 0; i < clsize; i++) {
00171 SyncDBSynchroAtomicObject syncObject = (SyncDBSynchroAtomicObject) serverSideUpdates[i].getAtomicObject();
00172 try {
00173 if ((serverSideUpdates[i].getTypeModif() == ReplicationSAOEvent.CREATE) || (serverSideUpdates[i].getTypeModif() == ReplicationSAOEvent.UPDATE)) {
00174 String[] updata = (String[]) syncObject.getSyncdata();
00175
00176 updata[0] = new String(GeneralCoder.decodeBase64(updata[0].getBytes()));
00177 Object obj = DBImportFileCoder.getCoder().unserializeDBObject(updata);
00178 this.updateTargetWithSynchroObject(obj);
00179 } else {
00180 this.deleteTargetForSynchroObjectId((String) syncObject.getSyncdata());
00181 }
00182 Status status = new Status(syncObject.getOpenMSPcmdId(), Status.STATUS_OK);
00183 status.setCmdRef(syncObject.getOpenMSPcmdId());
00184 ContainerMessage statusContainer = new ContainerMessage(status);
00185 returnList.add(statusContainer);
00186 } catch (Throwable ex) {
00187 LogManager.traceError(0, ex);
00188 Status status = new Status(syncObject.getOpenMSPcmdId(), Status.STATUS_FAILED);
00189 status.setCmdRef(syncObject.getOpenMSPcmdId());
00190 ContainerMessage statusContainer = new ContainerMessage(status);
00191 returnList.add(statusContainer);
00192 doClientUpdate = false;
00193 }
00194 }
00195
00196 if (sessionID == 0) doClientUpdate = true;
00197
00198 SyncTargetAnswer answer = new SyncTargetAnswer();
00199
00200 if ((getCommand != null) && (doClientUpdate)) {
00201 try {
00202 Array addFileRepository = new Array(15);
00203 int maxRow = this.getUpdateMaxNbRow();
00204 ReplicationSAOEvent[] clientSideUpdates = replicationData.getClientSideModificationList();
00205 if ((maxRow == -1) || (maxRow >= clientSideUpdates.length)) {
00206
00207
00208
00209 byte[] encodedData = this.encodeSynchroReturnDataList(clientSideUpdates);
00210
00211 String syncFileName = "/importdb/"+this.getTargetName();
00212 OpenMISFile file = new MemoryFile(syncFileName, encodedData);
00213 addFileRepository.add(file);
00214 DataItem newItem = new DataItem(Element.ITEM, "ImportDataFile", syncFileName, null, null);
00215
00216 Result resultCommand = new Result(getCommand.getCmdId(), this.getTargetName(), sync.getSource());
00217 resultCommand.setMetaInformation(Long.toString(newSessionId));
00218 ContainerMessage resultContainer = new ContainerMessage(resultCommand);
00219 resultContainer.add(newItem);
00220 returnList.add(resultContainer);
00221 } else {
00222
00223 OpenMISFile[] files = this.getDatabaseImportFiles();
00224 if ((files != null) && (files.length > 0)) {
00225 Result resultCommand = new Result(getCommand.getCmdId(), this.getTargetName(), sync.getSource());
00226 resultCommand.setMetaInformation(Long.toString(newSessionId));
00227 ContainerMessage resultContainer = new ContainerMessage(resultCommand);
00228 DataItem newItem = null;
00229 for (int i = 0; i < files.length; i++) {
00230 addFileRepository.add(files[i]);
00231 String filename = files[i].getFileCompleteName();
00232 if (files[i] instanceof ZipEntryMemoryFile) {
00233 filename = ((ZipEntryMemoryFile)files[i]).getZipFileName();
00234 }
00235 newItem = new DataItem(Element.ITEM, "DatabaseFile",filename, null, null);
00236 resultContainer.add(newItem);
00237 }
00238 returnList.add(resultContainer);
00239 }
00240
00241 }
00242 if (addFileRepository.size() > 0) {
00243 OpenMISFile[] addCyberFiles = new OpenMISFile[addFileRepository.size()];
00244 addFileRepository.toArray(addCyberFiles);
00245 answer.answerfiles = addCyberFiles;
00246 }
00247 } catch (Throwable ex) {
00248 LogManager.traceError(0, ex);
00249 Status status = new Status(getCommand.getCmdId(), Status.STATUS_FAILED);
00250 status.setCmdRef(getCommand.getCmdId());
00251 ContainerMessage statusContainer = new ContainerMessage(status);
00252 returnList.add(statusContainer);
00253 }
00254 }
00255
00256 if (returnList.size() != 0) {
00257 ContainerMessage[] returnContainer = new ContainerMessage[returnList.size()];
00258 returnList.toArray(returnContainer);
00259 answer.containerMessage = returnContainer;
00260 }
00261 return answer;
00262 } finally {
00263 this.disconnect();
00264 }
00265 } catch (Throwable ex) {
00266 throw new OpenMSPException(ex);
00267 }
00268 }
00269
00276 protected int getUpdateMaxNbRow() {
00277 return -1;
00278 }
00279
00284 private byte[] encodeSynchroReturnDataList(ReplicationSAOEvent[] cserverUpdates) throws OpenMSPException, IOException {
00285 int size = cserverUpdates.length;
00286 ByteArrayOutputStream stream = new ByteArrayOutputStream();
00287 Writer writer = new OutputStreamWriter(stream);
00288 try {
00289 for (int i = 0; i < size; i++) {
00290 char[] encoded = null;
00291 if ((cserverUpdates[i].getTypeModif() == ReplicationSAOEvent.CREATE) ||(cserverUpdates[i].getTypeModif() == ReplicationSAOEvent.UPDATE)) {
00292 Object updateobject = this.getTargetObjectWithId(cserverUpdates[i].getAtomicObjectID());
00293 if (updateobject != null) {
00294 String[] toencode = DBImportFileCoder.getCoder().serializeDBObject(cserverUpdates[i].getAtomicObjectID(), updateobject);
00295 if (cserverUpdates[i].getTypeModif() == ReplicationSAOEvent.CREATE) {
00296 encoded = DBImportFileCoder.getCoder().encodeData(DBImportFileCoder.ADDAction, toencode);
00297 } else if (cserverUpdates[i].getTypeModif() == ReplicationSAOEvent.UPDATE) {
00298 encoded = DBImportFileCoder.getCoder().encodeData(DBImportFileCoder.REPLACEAction, toencode);
00299 }
00300 }
00301 } else {
00302 String[] datadel = {cserverUpdates[i].getAtomicObjectID()};
00303 encoded = DBImportFileCoder.getCoder().encodeData(DBImportFileCoder.DELETEAction, datadel);
00304 }
00305 if (encoded != null) {
00306 writer.write(encoded);
00307 }
00308 }
00309 } finally {
00310 writer.flush();
00311 writer.close();
00312 }
00313 return stream.toByteArray();
00314 }
00315
00320 protected void processSyncActionMetaData(String metadata) throws ServiceException {
00321
00322 }
00323
00327 public abstract String getTargetName();
00328
00332 protected abstract void connect(Credential cred) throws UserNotFoundException, ServiceException;
00333
00334 protected abstract void disconnect();
00335
00336 protected abstract SynchroConflicResolver getSynchroConflicResolver();
00337
00341 protected abstract SynchroAtomicObject[] getAllModifiedAtomicObjectSince(long syncNumber) throws OpenMSPException;
00342
00343 protected abstract void updateTargetWithSynchroObject(Object syncObject) throws OpenMSPException;
00344
00345 protected abstract void deleteTargetForSynchroObjectId(String uid) throws OpenMSPException;
00346
00347 protected abstract Object getTargetObjectWithId(String uid) throws OpenMSPException;
00348
00356 protected abstract OpenMISFile[] getDatabaseImportFiles() throws ServiceException;
00357
00358 }