00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 package org.openmobileis.synchro.openmsp.server.synctarget;
00026
00027 import java.io.ByteArrayOutputStream;
00028 import java.io.IOException;
00029 import java.io.OutputStreamWriter;
00030 import java.io.Writer;
00031
00032 import org.openmobileis.common.user.UserNotFoundException;
00033 import org.openmobileis.common.util.codec.GeneralCoder;
00034 import org.openmobileis.common.util.collection.Array;
00035 import org.openmobileis.common.util.exception.ServiceException;
00036 import org.openmobileis.common.util.log.LogManager;
00037 import org.openmobileis.synchro.algo.replication.AtomicObjectArrayReplication;
00038 import org.openmobileis.synchro.algo.replication.ReplicationReturnData;
00039 import org.openmobileis.synchro.algo.replication.ReplicationSAOEvent;
00040 import org.openmobileis.synchro.algo.replication.SynchroAtomicObject;
00041 import org.openmobileis.synchro.algo.replication.SynchroConflicResolver;
00042 import org.openmobileis.synchro.algo.syncnumber.SyncNumberManager;
00043 import org.openmobileis.synchro.openmsp.OpenMSPException;
00044 import org.openmobileis.synchro.openmsp.client.db.DBImportFileCoder;
00045 import org.openmobileis.synchro.openmsp.protocol.AbstractCommand;
00046 import org.openmobileis.synchro.openmsp.protocol.Command;
00047 import org.openmobileis.synchro.openmsp.protocol.ContainerMessage;
00048 import org.openmobileis.synchro.openmsp.protocol.DataItem;
00049 import org.openmobileis.synchro.openmsp.protocol.Element;
00050 import org.openmobileis.synchro.openmsp.protocol.Result;
00051 import org.openmobileis.synchro.openmsp.protocol.Status;
00052 import org.openmobileis.synchro.openmsp.server.util.MemoryFile;
00053 import org.openmobileis.synchro.openmsp.server.util.OpenMISFile;
00054 import org.openmobileis.synchro.openmsp.server.util.ZipEntryMemoryFile;
00055 import org.openmobileis.synchro.security.auth.Credential;
00056
00065 public abstract class OpenMSPDBSynchroTargetListener implements OpenMSPSynchroTargetListener {
00066
00070 public OpenMSPDBSynchroTargetListener() {
00071 super();
00072 }
00073
00074 public SyncTargetAnswer processCommand(Credential cred, ContainerMessage containerMessage) throws OpenMSPException {
00075 AbstractCommand syncCommand = (AbstractCommand) containerMessage.getElement();
00076 int commandType = syncCommand.getElementType();
00077 if (commandType != Element.SYNC) {
00078 SyncTargetAnswer answer = new SyncTargetAnswer();
00079 Status status = new Status(syncCommand.getCmdId(), Status.STATUS_WRONG_FORMAT);
00080 status.setCmdRef(syncCommand.getCmdId());
00081 ContainerMessage[] returnContainer = new ContainerMessage[1];
00082 returnContainer[0] = new ContainerMessage(status);
00083 answer.containerMessage = returnContainer;
00084 return answer;
00085 }
00086
00087
00088 try {
00089 this.connect(cred);
00090 try {
00091
00092 Array returnList = new Array(5);
00093 boolean doClientUpdate = true;
00094
00095 Command sync = (Command) containerMessage.getElement();
00096 this.processSyncActionMetaData(sync.getMetaInformation());
00097 long sessionID = sync.getSourceSessionID();
00098 Array clientsaoList = new Array(10);
00099 Element getCommand = null;
00100 while (containerMessage.hasMoreMessage()) {
00101 ContainerMessage commandContainer = containerMessage.nextMessage();
00102 Element command = commandContainer.getElement();
00103 try {
00104 short actiontype = -1;
00105 DataItem item = (DataItem) commandContainer.nextMessage().getElement();
00106 String meta = new String(GeneralCoder.decodeBase64(item.getMetaInformation().getBytes()));
00107 String dataUID = "";
00108 if (meta.length() >0) dataUID = meta.substring(6, meta.length());
00109 Object data = null;
00110
00111 if (command.getElementType() == Element.ADD) {
00112 actiontype = SynchroAtomicObject.ADD;
00113
00114 data = DBImportFileCoder.getCoder().decodeData(new String(GeneralCoder.decodeBase64(item.getData().getBytes())));
00115 } else if (command.getElementType() == Element.DELETE) {
00116 actiontype = SynchroAtomicObject.DELETE;
00117 data = dataUID;
00118 } else if (command.getElementType() == Element.REPLACE) {
00119
00120 actiontype = SynchroAtomicObject.REPLACE;
00121 data = DBImportFileCoder.getCoder().decodeData(new String(GeneralCoder.decodeBase64(item.getData().getBytes())));
00122 } else if (command.getElementType() == Element.GET) {
00123 getCommand = command;
00124 }
00125 if (actiontype != -1) {
00126 SyncDBSynchroAtomicObject SAO = new SyncDBSynchroAtomicObject(dataUID, "");
00127 SAO.setModificationType(actiontype);
00128 SAO.setModifSyncNumber(sessionID);
00129 SAO.setSyncdata(data);
00130 SAO.setOpenMSPcmdId(command.getCmdId());
00131 clientsaoList.add(SAO);
00132 }
00133 } catch (Throwable ex) {
00134 LogManager.traceError(0, ex);
00135 Status status = new Status(command.getCmdId(), Status.STATUS_FAILED);
00136 status.setCmdRef(command.getCmdId());
00137 ContainerMessage statusContainer = new ContainerMessage(status);
00138 returnList.add(statusContainer);
00139 doClientUpdate = false;
00140 }
00141 }
00142
00143
00144 AtomicObjectArrayReplication replication = new AtomicObjectArrayReplication(this.getSynchroConflicResolver());
00145 long newSessionId = SyncNumberManager.getManager().getNextSynchroNumber().getSynchroNumber();
00146 SynchroAtomicObject[] clientmodificationsoa = this.getAllModifiedAtomicObjectSince(sessionID);
00147 SynchroAtomicObject[] servermodificationsoa = new SynchroAtomicObject[clientsaoList.size()];
00148 clientsaoList.toArray(servermodificationsoa);
00149 ReplicationReturnData replicationData = replication.replicates(clientmodificationsoa, servermodificationsoa);
00150
00151
00152 ReplicationSAOEvent[] serverSideUpdates = replicationData.getServerSideModificationList();
00153 int clsize = serverSideUpdates.length;
00154 for (int i = 0; i < clsize; i++) {
00155 SyncDBSynchroAtomicObject syncObject = (SyncDBSynchroAtomicObject) serverSideUpdates[i].getAtomicObject();
00156 try {
00157 if ((serverSideUpdates[i].getTypeModif() == ReplicationSAOEvent.CREATE) || (serverSideUpdates[i].getTypeModif() == ReplicationSAOEvent.UPDATE)) {
00158 String[] updata = (String[]) syncObject.getSyncdata();
00159 Object obj = DBImportFileCoder.getCoder().unserializeDBObject(updata);
00160 this.updateTargetWithSynchroObject(obj);
00161 } else {
00162 this.deleteTargetForSynchroObjectId((String) syncObject.getSyncdata());
00163 }
00164 Status status = new Status(syncObject.getOpenMSPcmdId(), Status.STATUS_OK);
00165 status.setCmdRef(syncObject.getOpenMSPcmdId());
00166 ContainerMessage statusContainer = new ContainerMessage(status);
00167 returnList.add(statusContainer);
00168 } catch (Throwable ex) {
00169 LogManager.traceError(0, ex);
00170 Status status = new Status(syncObject.getOpenMSPcmdId(), Status.STATUS_FAILED);
00171 status.setCmdRef(syncObject.getOpenMSPcmdId());
00172 ContainerMessage statusContainer = new ContainerMessage(status);
00173 returnList.add(statusContainer);
00174 doClientUpdate = false;
00175 }
00176 }
00177
00178 if (sessionID == 0) doClientUpdate = true;
00179
00180 SyncTargetAnswer answer = new SyncTargetAnswer();
00181
00182 if ((getCommand != null) && (doClientUpdate)) {
00183 try {
00184 Array addFileRepository = new Array(15);
00185 int maxRow = this.getUpdateMaxNbRow();
00186 ReplicationSAOEvent[] clientSideUpdates = replicationData.getClientSideModificationList();
00187 if ((maxRow == -1) || (maxRow >= clientSideUpdates.length)) {
00188
00189
00190
00191 byte[] encodedData = this.encodeSynchroReturnDataList(clientSideUpdates);
00192
00193 String syncFileName = "/importdb/"+this.getTargetName();
00194 OpenMISFile file = new MemoryFile(syncFileName, encodedData);
00195 addFileRepository.add(file);
00196 DataItem newItem = new DataItem(Element.ITEM, "ImportDataFile", syncFileName, null, null);
00197
00198 Result resultCommand = new Result(getCommand.getCmdId(), this.getTargetName(), sync.getSource());
00199 resultCommand.setMetaInformation(Long.toString(newSessionId));
00200 ContainerMessage resultContainer = new ContainerMessage(resultCommand);
00201 resultContainer.add(newItem);
00202 returnList.add(resultContainer);
00203 } else {
00204
00205 OpenMISFile[] files = this.getDatabaseImportFiles();
00206 if ((files != null) && (files.length > 0)) {
00207 Result resultCommand = new Result(getCommand.getCmdId(), this.getTargetName(), sync.getSource());
00208 resultCommand.setMetaInformation(Long.toString(newSessionId));
00209 ContainerMessage resultContainer = new ContainerMessage(resultCommand);
00210 DataItem newItem = null;
00211 for (int i = 0; i < files.length; i++) {
00212 addFileRepository.add(files[i]);
00213 String filename = files[i].getFileCompleteName();
00214 if (files[i] instanceof ZipEntryMemoryFile) {
00215 filename = ((ZipEntryMemoryFile)files[i]).getZipFileName();
00216 }
00217 newItem = new DataItem(Element.ITEM, "DatabaseFile",filename, null, null);
00218 resultContainer.add(newItem);
00219 }
00220 returnList.add(resultContainer);
00221 }
00222
00223 }
00224 if (addFileRepository.size() > 0) {
00225 OpenMISFile[] addCyberFiles = new OpenMISFile[addFileRepository.size()];
00226 addFileRepository.toArray(addCyberFiles);
00227 answer.answerfiles = addCyberFiles;
00228 }
00229 } catch (Throwable ex) {
00230 LogManager.traceError(0, ex);
00231 Status status = new Status(getCommand.getCmdId(), Status.STATUS_FAILED);
00232 status.setCmdRef(getCommand.getCmdId());
00233 ContainerMessage statusContainer = new ContainerMessage(status);
00234 returnList.add(statusContainer);
00235 }
00236 }
00237
00238 if (returnList.size() != 0) {
00239 ContainerMessage[] returnContainer = new ContainerMessage[returnList.size()];
00240 returnList.toArray(returnContainer);
00241 answer.containerMessage = returnContainer;
00242 }
00243 return answer;
00244 } finally {
00245 this.disconnect();
00246 }
00247 } catch (Throwable ex) {
00248 throw new OpenMSPException(ex);
00249 }
00250 }
00251
00258 protected int getUpdateMaxNbRow() {
00259 return -1;
00260 }
00261
00266 private byte[] encodeSynchroReturnDataList(ReplicationSAOEvent[] cserverUpdates) throws OpenMSPException, IOException {
00267 int size = cserverUpdates.length;
00268 ByteArrayOutputStream stream = new ByteArrayOutputStream();
00269 Writer writer = new OutputStreamWriter(stream);
00270 try {
00271 for (int i = 0; i < size; i++) {
00272 char[] encoded = null;
00273 if ((cserverUpdates[i].getTypeModif() == ReplicationSAOEvent.CREATE) ||(cserverUpdates[i].getTypeModif() == ReplicationSAOEvent.UPDATE)) {
00274 Object updateobject = this.getTargetObjectWithId(cserverUpdates[i].getAtomicObjectID());
00275 if (updateobject != null) {
00276 String[] toencode = DBImportFileCoder.getCoder().serializeDBObject(cserverUpdates[i].getAtomicObjectID(), updateobject);
00277 if (cserverUpdates[i].getTypeModif() == ReplicationSAOEvent.CREATE) {
00278 encoded = DBImportFileCoder.getCoder().encodeData(DBImportFileCoder.ADDAction, toencode);
00279 } else if (cserverUpdates[i].getTypeModif() == ReplicationSAOEvent.UPDATE) {
00280 encoded = DBImportFileCoder.getCoder().encodeData(DBImportFileCoder.REPLACEAction, toencode);
00281 }
00282 }
00283 } else {
00284 String[] datadel = {cserverUpdates[i].getAtomicObjectID()};
00285 encoded = DBImportFileCoder.getCoder().encodeData(DBImportFileCoder.DELETEAction, datadel);
00286 }
00287 if (encoded != null) {
00288 writer.write(encoded);
00289 }
00290 }
00291 } finally {
00292 writer.flush();
00293 writer.close();
00294 }
00295 return stream.toByteArray();
00296 }
00297
00302 protected void processSyncActionMetaData(String metadata) throws ServiceException {
00303
00304 }
00305
00309 public abstract String getTargetName();
00310
00314 protected abstract void connect(Credential cred) throws UserNotFoundException, ServiceException;
00315
00316 protected abstract void disconnect();
00317
00318 protected abstract SynchroConflicResolver getSynchroConflicResolver();
00319
00323 protected abstract SynchroAtomicObject[] getAllModifiedAtomicObjectSince(long syncNumber) throws OpenMSPException;
00324
00325 protected abstract void updateTargetWithSynchroObject(Object syncObject) throws OpenMSPException;
00326
00327 protected abstract void deleteTargetForSynchroObjectId(String uid) throws OpenMSPException;
00328
00329 protected abstract Object getTargetObjectWithId(String uid) throws OpenMSPException;
00330
00338 protected abstract OpenMISFile[] getDatabaseImportFiles() throws ServiceException;
00339
00340 }