00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 package org.openmobileis.synchro.openmsp.server.synctarget;
00026
00027 import java.io.ByteArrayOutputStream;
00028 import java.io.IOException;
00029 import java.io.OutputStreamWriter;
00030 import java.io.Writer;
00031
00032 import org.openmobileis.common.user.UserNotFoundException;
00033 import org.openmobileis.common.util.codec.GeneralCoder;
00034 import org.openmobileis.common.util.collection.Array;
00035 import org.openmobileis.common.util.exception.ServiceException;
00036 import org.openmobileis.common.util.log.LogManager;
00037 import org.openmobileis.synchro.algo.replication.AtomicObjectArrayReplication;
00038 import org.openmobileis.synchro.algo.replication.ReplicationReturnData;
00039 import org.openmobileis.synchro.algo.replication.ReplicationSAOEvent;
00040 import org.openmobileis.synchro.algo.replication.SynchroAtomicObject;
00041 import org.openmobileis.synchro.algo.replication.SynchroConflicResolver;
00042 import org.openmobileis.synchro.algo.syncnumber.SyncNumberManager;
00043 import org.openmobileis.synchro.openmsp.OpenMSPException;
00044 import org.openmobileis.synchro.openmsp.client.db.DBImportFileCoder;
00045 import org.openmobileis.synchro.openmsp.protocol.AbstractCommand;
00046 import org.openmobileis.synchro.openmsp.protocol.Command;
00047 import org.openmobileis.synchro.openmsp.protocol.ContainerMessage;
00048 import org.openmobileis.synchro.openmsp.protocol.DataItem;
00049 import org.openmobileis.synchro.openmsp.protocol.Element;
00050 import org.openmobileis.synchro.openmsp.protocol.Result;
00051 import org.openmobileis.synchro.openmsp.protocol.Status;
00052 import org.openmobileis.synchro.openmsp.server.util.MemoryFile;
00053 import org.openmobileis.synchro.openmsp.server.util.OpenMISFile;
00054 import org.openmobileis.synchro.openmsp.server.util.ZipEntryMemoryFile;
00055 import org.openmobileis.synchro.security.auth.Credential;
00056
00065 public abstract class OpenMSPDBSynchroTargetListener implements OpenMSPSynchroTargetListener {
00066
00070 public OpenMSPDBSynchroTargetListener() {
00071 super();
00072 }
00073
00074 public SyncTargetAnswer processCommand(Credential cred, ContainerMessage containerMessage) throws OpenMSPException {
00075 AbstractCommand syncCommand = (AbstractCommand) containerMessage.getElement();
00076 int commandType = syncCommand.getElementType();
00077 if (commandType != Element.SYNC) {
00078 SyncTargetAnswer answer = new SyncTargetAnswer();
00079 Status status = new Status(syncCommand.getCmdId(), Status.STATUS_WRONG_FORMAT);
00080 status.setCmdRef(syncCommand.getCmdId());
00081 ContainerMessage[] returnContainer = new ContainerMessage[1];
00082 returnContainer[0] = new ContainerMessage(status);
00083 answer.containerMessage = returnContainer;
00084 return answer;
00085 }
00086
00087
00088 try {
00089 this.connect(cred);
00090 try {
00091
00092 Array returnList = new Array(5);
00093 boolean doClientUpdate = true;
00094
00095 Command sync = (Command) containerMessage.getElement();
00096 this.processSyncActionMetaData(sync.getMetaInformation());
00097 long sessionID = sync.getSourceSessionID();
00098 Array clientsaoList = new Array(10);
00099 Element getCommand = null;
00100 while (containerMessage.hasMoreMessage()) {
00101 ContainerMessage commandContainer = containerMessage.nextMessage();
00102 Element command = commandContainer.getElement();
00103 try {
00104 short actiontype = -1;
00105 DataItem item = (DataItem) commandContainer.nextMessage().getElement();
00106 String meta = new String(GeneralCoder.decodeBase64(item.getMetaInformation().getBytes()));
00107 String dataUID = "";
00108 if (meta.length() >0) dataUID = meta.substring(6, meta.length());
00109 Object data = null;
00110
00111 if (command.getElementType() == Element.ADD) {
00112 actiontype = SynchroAtomicObject.ADD;
00113
00114 data = DBImportFileCoder.getCoder().decodeData(new String(GeneralCoder.decodeBase64(item.getData().getBytes())));
00115 } else if (command.getElementType() == Element.DELETE) {
00116 actiontype = SynchroAtomicObject.DELETE;
00117 data = dataUID;
00118 } else if (command.getElementType() == Element.REPLACE) {
00119
00120 actiontype = SynchroAtomicObject.REPLACE;
00121 data = DBImportFileCoder.getCoder().decodeData(new String(GeneralCoder.decodeBase64(item.getData().getBytes())));
00122 } else if (command.getElementType() == Element.GET) {
00123 getCommand = command;
00124 }
00125 if (actiontype != -1) {
00126 SyncDBSynchroAtomicObject SAO = new SyncDBSynchroAtomicObject(dataUID, "");
00127 SAO.setModificationType(actiontype);
00128 SAO.setModifSyncNumber(sessionID);
00129 SAO.setSyncdata(data);
00130 SAO.setOpenMSPcmdId(command.getCmdId());
00131 clientsaoList.add(SAO);
00132 }
00133 } catch (Throwable ex) {
00134 LogManager.traceError(0, ex);
00135 Status status = new Status(command.getCmdId(), Status.STATUS_FAILED);
00136 status.setCmdRef(command.getCmdId());
00137 ContainerMessage statusContainer = new ContainerMessage(status);
00138 returnList.add(statusContainer);
00139 doClientUpdate = false;
00140 }
00141 }
00142
00143
00144 AtomicObjectArrayReplication replication = new AtomicObjectArrayReplication(this.getSynchroConflicResolver());
00145 long newSessionId = SyncNumberManager.getManager().getNextSynchroNumber().getSynchroNumber();
00146 SynchroAtomicObject[] clientmodificationsoa = this.getAllModifiedAtomicObjectSince(sessionID);
00147 SynchroAtomicObject[] servermodificationsoa = new SynchroAtomicObject[clientsaoList.size()];
00148 clientsaoList.toArray(servermodificationsoa);
00149 ReplicationReturnData replicationData = replication.replicates(clientmodificationsoa, servermodificationsoa);
00150
00151
00152 ReplicationSAOEvent[] serverSideUpdates = replicationData.getServerSideModificationList();
00153 int clsize = serverSideUpdates.length;
00154 for (int i = 0; i < clsize; i++) {
00155 SyncDBSynchroAtomicObject syncObject = (SyncDBSynchroAtomicObject) serverSideUpdates[i].getAtomicObject();
00156 try {
00157 if ((serverSideUpdates[i].getTypeModif() == ReplicationSAOEvent.CREATE) || (serverSideUpdates[i].getTypeModif() == ReplicationSAOEvent.UPDATE)) {
00158 String[] updata = (String[]) syncObject.getSyncdata();
00159
00160 updata[0] = new String(GeneralCoder.decodeBase64(updata[0].getBytes()));
00161 Object obj = DBImportFileCoder.getCoder().unserializeDBObject(updata);
00162 this.updateTargetWithSynchroObject(obj);
00163 } else {
00164 this.deleteTargetForSynchroObjectId((String) syncObject.getSyncdata());
00165 }
00166 Status status = new Status(syncObject.getOpenMSPcmdId(), Status.STATUS_OK);
00167 status.setCmdRef(syncObject.getOpenMSPcmdId());
00168 ContainerMessage statusContainer = new ContainerMessage(status);
00169 returnList.add(statusContainer);
00170 } catch (Throwable ex) {
00171 LogManager.traceError(0, ex);
00172 Status status = new Status(syncObject.getOpenMSPcmdId(), Status.STATUS_FAILED);
00173 status.setCmdRef(syncObject.getOpenMSPcmdId());
00174 ContainerMessage statusContainer = new ContainerMessage(status);
00175 returnList.add(statusContainer);
00176 doClientUpdate = false;
00177 }
00178 }
00179
00180 if (sessionID == 0) doClientUpdate = true;
00181
00182 SyncTargetAnswer answer = new SyncTargetAnswer();
00183
00184 if ((getCommand != null) && (doClientUpdate)) {
00185 try {
00186 Array addFileRepository = new Array(15);
00187 int maxRow = this.getUpdateMaxNbRow();
00188 ReplicationSAOEvent[] clientSideUpdates = replicationData.getClientSideModificationList();
00189 if ((maxRow == -1) || (maxRow >= clientSideUpdates.length)) {
00190
00191
00192
00193 byte[] encodedData = this.encodeSynchroReturnDataList(clientSideUpdates);
00194
00195 String syncFileName = "/importdb/"+this.getTargetName();
00196 OpenMISFile file = new MemoryFile(syncFileName, encodedData);
00197 addFileRepository.add(file);
00198 DataItem newItem = new DataItem(Element.ITEM, "ImportDataFile", syncFileName, null, null);
00199
00200 Result resultCommand = new Result(getCommand.getCmdId(), this.getTargetName(), sync.getSource());
00201 resultCommand.setMetaInformation(Long.toString(newSessionId));
00202 ContainerMessage resultContainer = new ContainerMessage(resultCommand);
00203 resultContainer.add(newItem);
00204 returnList.add(resultContainer);
00205 } else {
00206
00207 OpenMISFile[] files = this.getDatabaseImportFiles();
00208 if ((files != null) && (files.length > 0)) {
00209 Result resultCommand = new Result(getCommand.getCmdId(), this.getTargetName(), sync.getSource());
00210 resultCommand.setMetaInformation(Long.toString(newSessionId));
00211 ContainerMessage resultContainer = new ContainerMessage(resultCommand);
00212 DataItem newItem = null;
00213 for (int i = 0; i < files.length; i++) {
00214 addFileRepository.add(files[i]);
00215 String filename = files[i].getFileCompleteName();
00216 if (files[i] instanceof ZipEntryMemoryFile) {
00217 filename = ((ZipEntryMemoryFile)files[i]).getZipFileName();
00218 }
00219 newItem = new DataItem(Element.ITEM, "DatabaseFile",filename, null, null);
00220 resultContainer.add(newItem);
00221 }
00222 returnList.add(resultContainer);
00223 }
00224
00225 }
00226 if (addFileRepository.size() > 0) {
00227 OpenMISFile[] addCyberFiles = new OpenMISFile[addFileRepository.size()];
00228 addFileRepository.toArray(addCyberFiles);
00229 answer.answerfiles = addCyberFiles;
00230 }
00231 } catch (Throwable ex) {
00232 LogManager.traceError(0, ex);
00233 Status status = new Status(getCommand.getCmdId(), Status.STATUS_FAILED);
00234 status.setCmdRef(getCommand.getCmdId());
00235 ContainerMessage statusContainer = new ContainerMessage(status);
00236 returnList.add(statusContainer);
00237 }
00238 }
00239
00240 if (returnList.size() != 0) {
00241 ContainerMessage[] returnContainer = new ContainerMessage[returnList.size()];
00242 returnList.toArray(returnContainer);
00243 answer.containerMessage = returnContainer;
00244 }
00245 return answer;
00246 } finally {
00247 this.disconnect();
00248 }
00249 } catch (Throwable ex) {
00250 throw new OpenMSPException(ex);
00251 }
00252 }
00253
00260 protected int getUpdateMaxNbRow() {
00261 return -1;
00262 }
00263
00268 private byte[] encodeSynchroReturnDataList(ReplicationSAOEvent[] cserverUpdates) throws OpenMSPException, IOException {
00269 int size = cserverUpdates.length;
00270 ByteArrayOutputStream stream = new ByteArrayOutputStream();
00271 Writer writer = new OutputStreamWriter(stream);
00272 try {
00273 for (int i = 0; i < size; i++) {
00274 char[] encoded = null;
00275 if ((cserverUpdates[i].getTypeModif() == ReplicationSAOEvent.CREATE) ||(cserverUpdates[i].getTypeModif() == ReplicationSAOEvent.UPDATE)) {
00276 Object updateobject = this.getTargetObjectWithId(cserverUpdates[i].getAtomicObjectID());
00277 if (updateobject != null) {
00278 String[] toencode = DBImportFileCoder.getCoder().serializeDBObject(cserverUpdates[i].getAtomicObjectID(), updateobject);
00279 if (cserverUpdates[i].getTypeModif() == ReplicationSAOEvent.CREATE) {
00280 encoded = DBImportFileCoder.getCoder().encodeData(DBImportFileCoder.ADDAction, toencode);
00281 } else if (cserverUpdates[i].getTypeModif() == ReplicationSAOEvent.UPDATE) {
00282 encoded = DBImportFileCoder.getCoder().encodeData(DBImportFileCoder.REPLACEAction, toencode);
00283 }
00284 }
00285 } else {
00286 String[] datadel = {cserverUpdates[i].getAtomicObjectID()};
00287 encoded = DBImportFileCoder.getCoder().encodeData(DBImportFileCoder.DELETEAction, datadel);
00288 }
00289 if (encoded != null) {
00290 writer.write(encoded);
00291 }
00292 }
00293 } finally {
00294 writer.flush();
00295 writer.close();
00296 }
00297 return stream.toByteArray();
00298 }
00299
00304 protected void processSyncActionMetaData(String metadata) throws ServiceException {
00305
00306 }
00307
00311 public abstract String getTargetName();
00312
00316 protected abstract void connect(Credential cred) throws UserNotFoundException, ServiceException;
00317
00318 protected abstract void disconnect();
00319
00320 protected abstract SynchroConflicResolver getSynchroConflicResolver();
00321
00325 protected abstract SynchroAtomicObject[] getAllModifiedAtomicObjectSince(long syncNumber) throws OpenMSPException;
00326
00327 protected abstract void updateTargetWithSynchroObject(Object syncObject) throws OpenMSPException;
00328
00329 protected abstract void deleteTargetForSynchroObjectId(String uid) throws OpenMSPException;
00330
00331 protected abstract Object getTargetObjectWithId(String uid) throws OpenMSPException;
00332
00340 protected abstract OpenMISFile[] getDatabaseImportFiles() throws ServiceException;
00341
00342 }