00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 package org.openmobileis.synchro.openmsp.server.synctarget;
00026
00027 import java.io.ByteArrayOutputStream;
00028 import java.io.IOException;
00029 import java.io.OutputStreamWriter;
00030 import java.io.Writer;
00031
00032 import org.openmobileis.common.context.SessionContext;
00033 import org.openmobileis.common.context.SessionContextManager;
00034 import org.openmobileis.common.user.UserNotFoundException;
00035 import org.openmobileis.common.util.codec.GeneralCoder;
00036 import org.openmobileis.common.util.collection.Array;
00037 import org.openmobileis.common.util.exception.DatabaseException;
00038 import org.openmobileis.common.util.exception.ServiceException;
00039 import org.openmobileis.common.util.exception.SynchroException;
00040 import org.openmobileis.common.util.log.LogManager;
00041 import org.openmobileis.synchro.algo.replication.AtomicObjectArrayReplication;
00042 import org.openmobileis.synchro.algo.replication.ReplicationReturnData;
00043 import org.openmobileis.synchro.algo.replication.ReplicationSAOEvent;
00044 import org.openmobileis.synchro.algo.replication.SynchroAtomicObject;
00045 import org.openmobileis.synchro.algo.replication.SynchroConflicResolver;
00046 import org.openmobileis.synchro.algo.syncnumber.SyncNumberManager;
00047 import org.openmobileis.synchro.openmsp.OpenMSPException;
00048 import org.openmobileis.synchro.openmsp.client.db.DBImportFileCoder;
00049 import org.openmobileis.synchro.openmsp.protocol.AbstractCommand;
00050 import org.openmobileis.synchro.openmsp.protocol.Command;
00051 import org.openmobileis.synchro.openmsp.protocol.ContainerMessage;
00052 import org.openmobileis.synchro.openmsp.protocol.DataItem;
00053 import org.openmobileis.synchro.openmsp.protocol.Element;
00054 import org.openmobileis.synchro.openmsp.protocol.Result;
00055 import org.openmobileis.synchro.openmsp.protocol.Status;
00056 import org.openmobileis.synchro.openmsp.server.util.MemoryFile;
00057 import org.openmobileis.synchro.openmsp.server.util.OpenMISFile;
00058 import org.openmobileis.synchro.openmsp.server.util.ZipEntryFile;
00059 import org.openmobileis.synchro.security.auth.Credential;
00060
00069 public abstract class OpenMSPDBSynchroTargetListener implements OpenMSPSynchroTargetListener {
00070
00074 public OpenMSPDBSynchroTargetListener() {
00075 super();
00076 }
00077
00078 public SyncTargetAnswer processCommand(Credential cred, ContainerMessage containerMessage) throws OpenMSPException {
00079 SessionContext context = SessionContextManager.getManager().getSessionContext();
00080 AbstractCommand syncCommand = (AbstractCommand) containerMessage.getElement();
00081 int commandType = syncCommand.getElementType();
00082 if (commandType == Element.GET) {
00083
00084
00085
00086
00087 context.setAttribute("openMSP getCommand %%", syncCommand);
00088 SyncTargetAnswer answer = new SyncTargetAnswer();
00089 Status status = new Status(syncCommand.getCmdId(), Status.STATUS_NO_CONTENT);
00090 status.setCmdRef(syncCommand.getCmdId());
00091 ContainerMessage[] returnContainer = new ContainerMessage[1];
00092 returnContainer[0] = new ContainerMessage(status);
00093 answer.containerMessage = returnContainer;
00094 return answer;
00095 } else if (commandType != Element.SYNC) {
00096 SyncTargetAnswer answer = new SyncTargetAnswer();
00097 Status status = new Status(syncCommand.getCmdId(), Status.STATUS_WRONG_FORMAT);
00098 status.setCmdRef(syncCommand.getCmdId());
00099 ContainerMessage[] returnContainer = new ContainerMessage[1];
00100 returnContainer[0] = new ContainerMessage(status);
00101 answer.containerMessage = returnContainer;
00102 return answer;
00103 }
00104
00105
00106 try {
00107 this.connect(cred);
00108 try {
00109
00110 Array returnList = new Array(5);
00111 boolean doClientUpdate = true;
00112
00113 Command sync = (Command) containerMessage.getElement();
00114 this.processSyncActionMetaData(sync.getMetaInformation());
00115 long sessionID = sync.getSourceSessionID();
00116 Array clientsaoList = new Array(10);
00117 AbstractCommand getCommand = (AbstractCommand)context.getAttribute("openMSP getCommand %%");
00118 context.removeAttribute("openMSP getCommand %%");
00119
00120 while (containerMessage.hasMoreMessage()) {
00121 ContainerMessage commandContainer = containerMessage.nextMessage();
00122 Element command = commandContainer.getElement();
00123 try {
00124 short actiontype = -1;
00125 DataItem item = (DataItem) commandContainer.nextMessage().getElement();
00126 String meta = new String(GeneralCoder.decodeBase64(item.getMetaInformation().getBytes()));
00127 String dataUID = "";
00128 if (meta.length() >0) dataUID = meta.substring(6, meta.length());
00129 Object data = null;
00130
00131 if (command.getElementType() == Element.ADD) {
00132 actiontype = SynchroAtomicObject.ADD;
00133
00134 data = DBImportFileCoder.getCoder().decodeData(new String(GeneralCoder.decodeBase64(item.getData().getBytes())));
00135 } else if (command.getElementType() == Element.DELETE) {
00136 actiontype = SynchroAtomicObject.DELETE;
00137 data = dataUID;
00138 } else if (command.getElementType() == Element.REPLACE) {
00139
00140 actiontype = SynchroAtomicObject.REPLACE;
00141 data = DBImportFileCoder.getCoder().decodeData(new String(GeneralCoder.decodeBase64(item.getData().getBytes())));
00142 }
00143 if (actiontype != -1) {
00144 SyncDBSynchroAtomicObject SAO = new SyncDBSynchroAtomicObject(dataUID, "");
00145 SAO.setModificationType(actiontype);
00146 SAO.setModifSyncNumber(sessionID);
00147 SAO.setSyncdata(data);
00148 SAO.setOpenMSPcmdId(command.getCmdId());
00149 clientsaoList.add(SAO);
00150 }
00151 } catch (Throwable ex) {
00152 LogManager.traceError(0, ex);
00153 int expstatus = Status.STATUS_FAILED;
00154 if (ex instanceof OpenMSPException) {
00155 OpenMSPException omspex = (OpenMSPException) ex;
00156 if (omspex.getSyncStatus() != OpenMSPException.NO_STATUS_DEFINED) expstatus = omspex.getSyncStatus();
00157 }
00158 Status status = new Status(command.getCmdId(), expstatus);
00159 status.setCmdRef(command.getCmdId());
00160 ContainerMessage statusContainer = new ContainerMessage(status);
00161 returnList.add(statusContainer);
00162 doClientUpdate = false;
00163 }
00164 }
00165
00166
00167 AtomicObjectArrayReplication replication = new AtomicObjectArrayReplication(this.getSynchroConflicResolver());
00168 long newSessionId = SyncNumberManager.getManager().getNextSynchroNumber().getSynchroNumber();
00169 SynchroAtomicObject[] clientmodificationsoa = this.getAllModifiedAtomicObjectSince(sessionID);
00170 SynchroAtomicObject[] servermodificationsoa = new SynchroAtomicObject[clientsaoList.size()];
00171 clientsaoList.toArray(servermodificationsoa);
00172 ReplicationReturnData replicationData = replication.replicates(clientmodificationsoa, servermodificationsoa);
00173
00174
00175 ReplicationSAOEvent[] serverSideUpdates = replicationData.getServerSideModificationList();
00176 int clsize = serverSideUpdates.length;
00177 for (int i = 0; i < clsize; i++) {
00178 SyncDBSynchroAtomicObject syncObject = (SyncDBSynchroAtomicObject) serverSideUpdates[i].getAtomicObject();
00179 try {
00180 if ((serverSideUpdates[i].getTypeModif() == ReplicationSAOEvent.CREATE) || (serverSideUpdates[i].getTypeModif() == ReplicationSAOEvent.UPDATE)) {
00181 String[] updata = (String[]) syncObject.getSyncdata();
00182
00183 updata[0] = new String(GeneralCoder.decodeBase64(updata[0].getBytes()));
00184 Object obj = DBImportFileCoder.getCoder().unserializeDBObject(updata);
00185 this.updateTargetWithSynchroObject(obj);
00186 } else {
00187 this.deleteTargetForSynchroObjectId((String) syncObject.getSyncdata());
00188 }
00189 Status status = new Status(syncObject.getOpenMSPcmdId(), Status.STATUS_OK);
00190 status.setCmdRef(syncObject.getOpenMSPcmdId());
00191 ContainerMessage statusContainer = new ContainerMessage(status);
00192 returnList.add(statusContainer);
00193 } catch (Throwable ex) {
00194 LogManager.traceError(0, ex);
00195 Status status = new Status(syncObject.getOpenMSPcmdId(), Status.STATUS_FAILED);
00196 status.setCmdRef(syncObject.getOpenMSPcmdId());
00197 ContainerMessage statusContainer = new ContainerMessage(status);
00198 returnList.add(statusContainer);
00199 doClientUpdate = false;
00200 }
00201 }
00202
00203 if (sessionID == 0) doClientUpdate = true;
00204
00205 SyncTargetAnswer answer = new SyncTargetAnswer();
00206
00207 if ((getCommand != null) && (doClientUpdate)) {
00208 try {
00209 Array addFileRepository = new Array(15);
00210 int maxRow = this.getUpdateMaxNbRow();
00211 ReplicationSAOEvent[] clientSideUpdates = replicationData.getClientSideModificationList();
00212 if ((maxRow == -1) || (maxRow >= clientSideUpdates.length)) {
00213
00214
00215
00216 byte[] encodedData = this.encodeSynchroReturnDataList(clientSideUpdates);
00217
00218 String syncFileName = "/importdb/"+this.getTargetName();
00219 OpenMISFile file = new MemoryFile(syncFileName, encodedData);
00220 addFileRepository.add(file);
00221 DataItem newItem = new DataItem(Element.ITEM, "ImportDataFile", syncFileName, null, null);
00222
00223 Result resultCommand = new Result(getCommand.getCmdId(), this.getTargetName(), sync.getSource());
00224 resultCommand.setMetaInformation(Long.toString(newSessionId));
00225 ContainerMessage resultContainer = new ContainerMessage(resultCommand);
00226 resultContainer.add(newItem);
00227 returnList.add(resultContainer);
00228 } else {
00229
00230 OpenMISFile[] files = this.getDatabaseImportFiles();
00231 if ((files != null) && (files.length > 0)) {
00232 Result resultCommand = new Result(getCommand.getCmdId(), this.getTargetName(), sync.getSource());
00233 resultCommand.setMetaInformation(Long.toString(newSessionId));
00234 ContainerMessage resultContainer = new ContainerMessage(resultCommand);
00235 DataItem newItem = null;
00236 for (int i = 0; i < files.length; i++) {
00237 addFileRepository.add(files[i]);
00238 String filename = files[i].getFileCompleteName();
00239 if (files[i] instanceof ZipEntryFile) {
00240 filename = ((ZipEntryFile)files[i]).getZipFileName();
00241 }
00242 newItem = new DataItem(Element.ITEM, "DatabaseFile",filename, null, null);
00243 resultContainer.add(newItem);
00244 }
00245 returnList.add(resultContainer);
00246 }
00247
00248 }
00249 if (addFileRepository.size() > 0) {
00250 OpenMISFile[] addCyberFiles = new OpenMISFile[addFileRepository.size()];
00251 addFileRepository.toArray(addCyberFiles);
00252 answer.answerfiles = addCyberFiles;
00253 }
00254 } catch (Throwable ex) {
00255 LogManager.traceError(0, ex);
00256 int expstatus = Status.STATUS_FAILED;
00257 if (ex instanceof OpenMSPException) {
00258 OpenMSPException omspex = (OpenMSPException) ex;
00259 if (omspex.getSyncStatus() != OpenMSPException.NO_STATUS_DEFINED) expstatus = omspex.getSyncStatus();
00260 }
00261 Status status = new Status(getCommand.getCmdId(), expstatus);
00262 status.setCmdRef(getCommand.getCmdId());
00263 ContainerMessage statusContainer = new ContainerMessage(status);
00264 returnList.add(statusContainer);
00265 }
00266 }
00267
00268 if (returnList.size() != 0) {
00269 ContainerMessage[] returnContainer = new ContainerMessage[returnList.size()];
00270 returnList.toArray(returnContainer);
00271 answer.containerMessage = returnContainer;
00272 }
00273 return answer;
00274 } finally {
00275 this.disconnect();
00276 }
00277 } catch (UserNotFoundException ex) {
00278 OpenMSPException nex = new OpenMSPException(ex);
00279 nex.setSyncStatus(Status.STATUS_UNAUTHORIZED);
00280 throw new OpenMSPException(ex);
00281 } catch (ServiceException ex) {
00282 throw new OpenMSPException(ex);
00283 } catch (DatabaseException ex) {
00284 throw new OpenMSPException(ex);
00285 } catch (SynchroException ex) {
00286 throw new OpenMSPException(ex);
00287 }
00288 }
00289
00296 protected int getUpdateMaxNbRow() {
00297 return -1;
00298 }
00299
00304 private byte[] encodeSynchroReturnDataList(ReplicationSAOEvent[] cserverUpdates) throws OpenMSPException, IOException {
00305 int size = cserverUpdates.length;
00306 ByteArrayOutputStream stream = new ByteArrayOutputStream();
00307 Writer writer = new OutputStreamWriter(stream);
00308 try {
00309 for (int i = 0; i < size; i++) {
00310 char[] encoded = null;
00311 if ((cserverUpdates[i].getTypeModif() == ReplicationSAOEvent.CREATE) ||(cserverUpdates[i].getTypeModif() == ReplicationSAOEvent.UPDATE)) {
00312 Object updateobject = this.getTargetObjectWithId(cserverUpdates[i].getAtomicObjectID());
00313 if (updateobject != null) {
00314 String[] toencode = DBImportFileCoder.getCoder().serializeDBObject(cserverUpdates[i].getAtomicObjectID(), updateobject);
00315 if (cserverUpdates[i].getTypeModif() == ReplicationSAOEvent.CREATE) {
00316 encoded = DBImportFileCoder.getCoder().encodeData(DBImportFileCoder.ADDAction, toencode);
00317 } else if (cserverUpdates[i].getTypeModif() == ReplicationSAOEvent.UPDATE) {
00318 encoded = DBImportFileCoder.getCoder().encodeData(DBImportFileCoder.REPLACEAction, toencode);
00319 }
00320 }
00321 } else {
00322 String[] datadel = {cserverUpdates[i].getAtomicObjectID()};
00323 encoded = DBImportFileCoder.getCoder().encodeData(DBImportFileCoder.DELETEAction, datadel);
00324 }
00325 if (encoded != null) {
00326 writer.write(encoded);
00327 }
00328 }
00329 } finally {
00330 writer.flush();
00331 writer.close();
00332 }
00333 return stream.toByteArray();
00334 }
00335
00340 protected void processSyncActionMetaData(String metadata) throws OpenMSPException {
00341
00342 }
00343
00347 public abstract String getTargetName();
00348
00352 protected abstract void connect(Credential cred) throws UserNotFoundException, ServiceException;
00353
00354 protected abstract void disconnect();
00355
00356 protected abstract SynchroConflicResolver getSynchroConflicResolver();
00357
00361 protected abstract SynchroAtomicObject[] getAllModifiedAtomicObjectSince(long syncNumber) throws OpenMSPException;
00362
00363 protected abstract void updateTargetWithSynchroObject(Object syncObject) throws OpenMSPException;
00364
00365 protected abstract void deleteTargetForSynchroObjectId(String uid) throws OpenMSPException;
00366
00367 protected abstract Object getTargetObjectWithId(String uid) throws OpenMSPException;
00368
00376 protected abstract OpenMISFile[] getDatabaseImportFiles() throws OpenMSPException;
00377
00378 }