00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 package org.openmobileis.synchro.openmsp.server.synctarget;
00026
00027 import java.io.ByteArrayOutputStream;
00028 import java.io.IOException;
00029 import java.io.OutputStreamWriter;
00030 import java.io.Writer;
00031
00032 import org.openmobileis.common.context.SessionContext;
00033 import org.openmobileis.common.context.SessionContextManager;
00034 import org.openmobileis.common.user.UserNotFoundException;
00035 import org.openmobileis.common.util.codec.GeneralCoder;
00036 import org.openmobileis.common.util.collection.Array;
00037 import org.openmobileis.common.util.exception.DatabaseException;
00038 import org.openmobileis.common.util.exception.ServiceException;
00039 import org.openmobileis.common.util.exception.SynchroException;
00040 import org.openmobileis.common.util.log.LogManager;
00041 import org.openmobileis.synchro.algo.replication.AtomicObjectArrayReplication;
00042 import org.openmobileis.synchro.algo.replication.ReplicationReturnData;
00043 import org.openmobileis.synchro.algo.replication.ReplicationSAOEvent;
00044 import org.openmobileis.synchro.algo.replication.SynchroAtomicObject;
00045 import org.openmobileis.synchro.algo.replication.SynchroConflicResolver;
00046 import org.openmobileis.synchro.algo.syncnumber.SyncNumberManager;
00047 import org.openmobileis.synchro.openmsp.OpenMSPException;
00048 import org.openmobileis.synchro.openmsp.client.db.DBImportFileCoder;
00049 import org.openmobileis.synchro.openmsp.protocol.AbstractCommand;
00050 import org.openmobileis.synchro.openmsp.protocol.Command;
00051 import org.openmobileis.synchro.openmsp.protocol.ContainerMessage;
00052 import org.openmobileis.synchro.openmsp.protocol.DataItem;
00053 import org.openmobileis.synchro.openmsp.protocol.Element;
00054 import org.openmobileis.synchro.openmsp.protocol.Result;
00055 import org.openmobileis.synchro.openmsp.protocol.Status;
00056 import org.openmobileis.synchro.openmsp.server.util.MemoryFile;
00057 import org.openmobileis.synchro.openmsp.server.util.OpenMISFile;
00058 import org.openmobileis.synchro.openmsp.server.util.ZipEntryFile;
00059 import org.openmobileis.synchro.security.auth.Credential;
00060
00069 public abstract class OpenMSPDBSynchroTargetListener implements OpenMSPSynchroTargetListener {
00070
00074 public OpenMSPDBSynchroTargetListener() {
00075 super();
00076 }
00077
00078 public SyncTargetAnswer processCommand(Credential cred, ContainerMessage containerMessage) throws OpenMSPException {
00079 SessionContext context = SessionContextManager.getManager().getSessionContext();
00080 AbstractCommand syncCommand = (AbstractCommand) containerMessage.getElement();
00081 int commandType = syncCommand.getElementType();
00082 if (commandType == Element.GET) {
00083
00084
00085
00086
00087 context.setAttribute("openMSP getCommand %%", syncCommand);
00088 SyncTargetAnswer answer = new SyncTargetAnswer();
00089 Status status = new Status(syncCommand.getCmdId(), Status.STATUS_NO_CONTENT);
00090 status.setCmdRef(syncCommand.getCmdId());
00091 ContainerMessage[] returnContainer = new ContainerMessage[1];
00092 returnContainer[0] = new ContainerMessage(status);
00093 answer.containerMessage = returnContainer;
00094 return answer;
00095 } else if (commandType != Element.SYNC) {
00096 SyncTargetAnswer answer = new SyncTargetAnswer();
00097 Status status = new Status(syncCommand.getCmdId(), Status.STATUS_WRONG_FORMAT);
00098 status.setCmdRef(syncCommand.getCmdId());
00099 ContainerMessage[] returnContainer = new ContainerMessage[1];
00100 returnContainer[0] = new ContainerMessage(status);
00101 answer.containerMessage = returnContainer;
00102 return answer;
00103 }
00104
00105
00106 try {
00107 this.connect(cred);
00108 try {
00109
00110 Array returnList = new Array(5);
00111 boolean doClientUpdate = true;
00112
00113 Command sync = (Command) containerMessage.getElement();
00114 this.processSyncActionMetaData(sync.getMetaInformation());
00115 long sessionID = sync.getSourceSessionID();
00116 Array clientsaoList = new Array(10);
00117 AbstractCommand getCommand = (AbstractCommand)context.getAttribute("openMSP getCommand %%");
00118 context.removeAttribute("openMSP getCommand %%");
00119
00120 while (containerMessage.hasMoreMessage()) {
00121 ContainerMessage commandContainer = containerMessage.nextMessage();
00122 Element command = commandContainer.getElement();
00123 try {
00124 short actiontype = -1;
00125 DataItem item = (DataItem) commandContainer.nextMessage().getElement();
00126 String meta = new String(GeneralCoder.decodeBase64(item.getMetaInformation().getBytes()));
00127 String dataUID = "";
00128 if (meta.length() >0) dataUID = meta.substring(6, meta.length());
00129 Object data = null;
00130
00131 if (command.getElementType() == Element.ADD) {
00132 actiontype = SynchroAtomicObject.ADD;
00133
00134 data = DBImportFileCoder.getCoder().decodeData(new String(GeneralCoder.decodeBase64(item.getData().getBytes())));
00135 } else if (command.getElementType() == Element.DELETE) {
00136 actiontype = SynchroAtomicObject.DELETE;
00137 data = dataUID;
00138 } else if (command.getElementType() == Element.REPLACE) {
00139
00140 actiontype = SynchroAtomicObject.REPLACE;
00141 data = DBImportFileCoder.getCoder().decodeData(new String(GeneralCoder.decodeBase64(item.getData().getBytes())));
00142 }
00143 if (actiontype != -1) {
00144 SyncDBSynchroAtomicObject SAO = new SyncDBSynchroAtomicObject(dataUID, "");
00145 SAO.setModificationType(actiontype);
00146 SAO.setModifSyncNumber(sessionID);
00147 SAO.setSyncdata(data);
00148 SAO.setOpenMSPcmdId(command.getCmdId());
00149 clientsaoList.add(SAO);
00150 }
00151 } catch (Throwable ex) {
00152 LogManager.traceError(0, ex);
00153 int expstatus = Status.STATUS_FAILED;
00154 if (ex instanceof OpenMSPException) {
00155 OpenMSPException omspex = (OpenMSPException) ex;
00156 if (omspex.getSyncStatus() != OpenMSPException.NO_STATUS_DEFINED) expstatus = omspex.getSyncStatus();
00157 }
00158 Status status = new Status(command.getCmdId(), expstatus);
00159 status.setCmdRef(command.getCmdId());
00160 ContainerMessage statusContainer = new ContainerMessage(status);
00161 returnList.add(statusContainer);
00162 doClientUpdate = false;
00163 }
00164 }
00165
00166
00167 AtomicObjectArrayReplication replication = new AtomicObjectArrayReplication(this.getSynchroConflicResolver());
00168 long updateSessionId = 0;
00169 long newSessionId = 0;
00170 synchronized (OpenMSPDBSynchroTargetListener.class) {
00171 updateSessionId = SyncNumberManager.getManager().getNextSynchroNumber().getSynchroNumber();
00172 newSessionId = SyncNumberManager.getManager().getNextSynchroNumber().getSynchroNumber();
00173 }
00174 SynchroAtomicObject[] clientmodificationsoa = this.getAllModifiedAtomicObjectSince(sessionID);
00175 SynchroAtomicObject[] servermodificationsoa = new SynchroAtomicObject[clientsaoList.size()];
00176 clientsaoList.toArray(servermodificationsoa);
00177 ReplicationReturnData replicationData = replication.replicates(clientmodificationsoa, servermodificationsoa);
00178
00179
00180 ReplicationSAOEvent[] serverSideUpdates = replicationData.getServerSideModificationList();
00181 int clsize = serverSideUpdates.length;
00182 for (int i = 0; i < clsize; i++) {
00183 SyncDBSynchroAtomicObject syncObject = (SyncDBSynchroAtomicObject) serverSideUpdates[i].getAtomicObject();
00184 try {
00185 if ((serverSideUpdates[i].getTypeModif() == ReplicationSAOEvent.CREATE) || (serverSideUpdates[i].getTypeModif() == ReplicationSAOEvent.UPDATE)) {
00186 String[] updata = (String[]) syncObject.getSyncdata();
00187
00188 updata[0] = new String(GeneralCoder.decodeBase64(updata[0].getBytes()));
00189 Object obj = DBImportFileCoder.getCoder().unserializeDBObject(updata);
00190 this.updateTargetWithSynchroObject(obj, updateSessionId);
00191 } else {
00192 this.deleteTargetForSynchroObjectId((String) syncObject.getSyncdata(), updateSessionId);
00193 }
00194 Status status = new Status(syncObject.getOpenMSPcmdId(), Status.STATUS_OK);
00195 status.setCmdRef(syncObject.getOpenMSPcmdId());
00196 ContainerMessage statusContainer = new ContainerMessage(status);
00197 returnList.add(statusContainer);
00198 } catch (Throwable ex) {
00199 LogManager.traceError(0, ex);
00200 Status status = new Status(syncObject.getOpenMSPcmdId(), Status.STATUS_FAILED);
00201 status.setCmdRef(syncObject.getOpenMSPcmdId());
00202 ContainerMessage statusContainer = new ContainerMessage(status);
00203 returnList.add(statusContainer);
00204 doClientUpdate = false;
00205 }
00206 }
00207
00208 if (sessionID == 0) doClientUpdate = true;
00209
00210 SyncTargetAnswer answer = new SyncTargetAnswer();
00211
00212 if ((getCommand != null) && (doClientUpdate)) {
00213 try {
00214 Array addFileRepository = new Array(15);
00215 int maxRow = this.getUpdateMaxNbRow();
00216 ReplicationSAOEvent[] clientSideUpdates = replicationData.getClientSideModificationList();
00217 if ((maxRow == -1) || (maxRow >= clientSideUpdates.length)) {
00218
00219
00220
00221 byte[] encodedData = this.encodeSynchroReturnDataList(clientSideUpdates);
00222
00223 String syncFileName = "/importdb/"+this.getTargetName();
00224 OpenMISFile file = new MemoryFile(syncFileName, encodedData);
00225 addFileRepository.add(file);
00226 DataItem newItem = new DataItem(Element.ITEM, "ImportDataFile", syncFileName, null, null);
00227
00228 Result resultCommand = new Result(getCommand.getCmdId(), this.getTargetName(), sync.getSource());
00229 resultCommand.setMetaInformation(Long.toString(newSessionId));
00230 ContainerMessage resultContainer = new ContainerMessage(resultCommand);
00231 resultContainer.add(newItem);
00232 returnList.add(resultContainer);
00233 } else {
00234
00235 OpenMISFile[] files = this.getDatabaseImportFiles();
00236 if ((files != null) && (files.length > 0)) {
00237 Result resultCommand = new Result(getCommand.getCmdId(), this.getTargetName(), sync.getSource());
00238 resultCommand.setMetaInformation(Long.toString(newSessionId));
00239 ContainerMessage resultContainer = new ContainerMessage(resultCommand);
00240 DataItem newItem = null;
00241 for (int i = 0; i < files.length; i++) {
00242 addFileRepository.add(files[i]);
00243 String filename = files[i].getFileCompleteName();
00244 if (files[i] instanceof ZipEntryFile) {
00245 filename = ((ZipEntryFile)files[i]).getZipFileName();
00246 }
00247 newItem = new DataItem(Element.ITEM, "DatabaseFile",filename, null, null);
00248 resultContainer.add(newItem);
00249 }
00250 returnList.add(resultContainer);
00251 }
00252
00253 }
00254 if (addFileRepository.size() > 0) {
00255 OpenMISFile[] addCyberFiles = new OpenMISFile[addFileRepository.size()];
00256 addFileRepository.toArray(addCyberFiles);
00257 answer.answerfiles = addCyberFiles;
00258 }
00259 } catch (Throwable ex) {
00260 LogManager.traceError(0, ex);
00261 int expstatus = Status.STATUS_FAILED;
00262 if (ex instanceof OpenMSPException) {
00263 OpenMSPException omspex = (OpenMSPException) ex;
00264 if (omspex.getSyncStatus() != OpenMSPException.NO_STATUS_DEFINED) expstatus = omspex.getSyncStatus();
00265 }
00266 Status status = new Status(getCommand.getCmdId(), expstatus);
00267 status.setCmdRef(getCommand.getCmdId());
00268 ContainerMessage statusContainer = new ContainerMessage(status);
00269 returnList.add(statusContainer);
00270 }
00271 }
00272
00273 if (returnList.size() != 0) {
00274 ContainerMessage[] returnContainer = new ContainerMessage[returnList.size()];
00275 returnList.toArray(returnContainer);
00276 answer.containerMessage = returnContainer;
00277 }
00278 return answer;
00279 } finally {
00280 this.disconnect();
00281 }
00282 } catch (UserNotFoundException ex) {
00283 OpenMSPException nex = new OpenMSPException(ex);
00284 nex.setSyncStatus(Status.STATUS_UNAUTHORIZED);
00285 throw new OpenMSPException(ex);
00286 } catch (ServiceException ex) {
00287 throw new OpenMSPException(ex);
00288 } catch (DatabaseException ex) {
00289 throw new OpenMSPException(ex);
00290 } catch (SynchroException ex) {
00291 throw new OpenMSPException(ex);
00292 }
00293 }
00294
00301 protected int getUpdateMaxNbRow() {
00302 return -1;
00303 }
00304
00309 private byte[] encodeSynchroReturnDataList(ReplicationSAOEvent[] cserverUpdates) throws OpenMSPException, IOException {
00310 int size = cserverUpdates.length;
00311 ByteArrayOutputStream stream = new ByteArrayOutputStream();
00312 Writer writer = new OutputStreamWriter(stream);
00313 try {
00314 for (int i = 0; i < size; i++) {
00315 char[] encoded = null;
00316 if ((cserverUpdates[i].getTypeModif() == ReplicationSAOEvent.CREATE) ||(cserverUpdates[i].getTypeModif() == ReplicationSAOEvent.UPDATE)) {
00317 Object updateobject = this.getTargetObjectWithId(cserverUpdates[i].getAtomicObjectID());
00318 if (updateobject != null) {
00319 String[] toencode = DBImportFileCoder.getCoder().serializeDBObject(cserverUpdates[i].getAtomicObjectID(), updateobject);
00320 if (cserverUpdates[i].getTypeModif() == ReplicationSAOEvent.CREATE) {
00321 encoded = DBImportFileCoder.getCoder().encodeData(DBImportFileCoder.ADDAction, toencode);
00322 } else if (cserverUpdates[i].getTypeModif() == ReplicationSAOEvent.UPDATE) {
00323 encoded = DBImportFileCoder.getCoder().encodeData(DBImportFileCoder.REPLACEAction, toencode);
00324 }
00325 }
00326 } else {
00327 String[] datadel = {cserverUpdates[i].getAtomicObjectID()};
00328 encoded = DBImportFileCoder.getCoder().encodeData(DBImportFileCoder.DELETEAction, datadel);
00329 }
00330 if (encoded != null) {
00331 writer.write(encoded);
00332 }
00333 }
00334 } finally {
00335 writer.flush();
00336 writer.close();
00337 }
00338 return stream.toByteArray();
00339 }
00340
00345 protected void processSyncActionMetaData(String metadata) throws OpenMSPException {
00346
00347 }
00348
00352 public abstract String getTargetName();
00353
00357 protected abstract void connect(Credential cred) throws UserNotFoundException, ServiceException;
00358
00359 protected abstract void disconnect();
00360
00361 protected abstract SynchroConflicResolver getSynchroConflicResolver();
00362
00366 protected abstract SynchroAtomicObject[] getAllModifiedAtomicObjectSince(long syncNumber) throws OpenMSPException;
00367
00375 protected abstract void updateTargetWithSynchroObject(Object syncObject, long updateSynchroNumber) throws OpenMSPException;
00376
00384 protected abstract void deleteTargetForSynchroObjectId(String uid, long updateSynchroNumber) throws OpenMSPException;
00385
00386 protected abstract Object getTargetObjectWithId(String uid) throws OpenMSPException;
00387
00395 protected abstract OpenMISFile[] getDatabaseImportFiles() throws OpenMSPException;
00396
00397 }