Main Page | Packages | Class Hierarchy | Class List | Directories | File List | Class Members | Related Pages

OpenMSPDBSynchroTargetListener.java

00001 /*
00002  * OpenMobileIS - a free Java(TM) Framework for mobile applications Java(TM)
00003  * Copyright (C) 2004-2006 Philippe Delrieu
00004  * All rights reserved.
00005  * Contact: pdelrieu@openmobileis.org
00006  *
00007  * This library is free software; you can redistribute it and/or
00008  * modify it under the terms of the GNU Lesser General Public
00009  * License as published by the Free Software Foundation; either
00010  * version 2.1 of the License, or any later version.
00011  *
00012  * This library is distributed in the hope that it will be useful,
00013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00015  * Lesser General Public License for more details.
00016  *
00017  * You should have received a copy of the GNU Lesser General Public
00018  * License along with this library; if not, write to the Free Software
00019  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
00020  * USA
00021  *
00022  *  Author : Philippe Delrieu
00023  * 
00024  */
00025 package org.openmobileis.synchro.openmsp.server.synctarget;
00026 
00027 import java.io.ByteArrayOutputStream;
00028 import java.io.IOException;
00029 import java.io.OutputStreamWriter;
00030 import java.io.Writer;
00031 
00032 import org.openmobileis.common.user.UserNotFoundException;
00033 import org.openmobileis.common.util.codec.GeneralCoder;
00034 import org.openmobileis.common.util.collection.Array;
00035 import org.openmobileis.common.util.exception.ServiceException;
00036 import org.openmobileis.common.util.log.LogManager;
00037 import org.openmobileis.synchro.algo.replication.AtomicObjectArrayReplication;
00038 import org.openmobileis.synchro.algo.replication.ReplicationReturnData;
00039 import org.openmobileis.synchro.algo.replication.ReplicationSAOEvent;
00040 import org.openmobileis.synchro.algo.replication.SynchroAtomicObject;
00041 import org.openmobileis.synchro.algo.replication.SynchroConflicResolver;
00042 import org.openmobileis.synchro.algo.syncnumber.SyncNumberManager;
00043 import org.openmobileis.synchro.openmsp.OpenMSPException;
00044 import org.openmobileis.synchro.openmsp.client.db.DBImportFileCoder;
00045 import org.openmobileis.synchro.openmsp.protocol.AbstractCommand;
00046 import org.openmobileis.synchro.openmsp.protocol.Command;
00047 import org.openmobileis.synchro.openmsp.protocol.ContainerMessage;
00048 import org.openmobileis.synchro.openmsp.protocol.DataItem;
00049 import org.openmobileis.synchro.openmsp.protocol.Element;
00050 import org.openmobileis.synchro.openmsp.protocol.Result;
00051 import org.openmobileis.synchro.openmsp.protocol.Status;
00052 import org.openmobileis.synchro.openmsp.server.util.MemoryFile;
00053 import org.openmobileis.synchro.openmsp.server.util.OpenMISFile;
00054 import org.openmobileis.synchro.openmsp.server.util.ZipEntryMemoryFile;
00055 import org.openmobileis.synchro.security.auth.Credential;
00056 
00065 public abstract class OpenMSPDBSynchroTargetListener implements OpenMSPSynchroTargetListener {
00066 
00070   public OpenMSPDBSynchroTargetListener() {
00071     super();
00072   }
00073 
00074   public SyncTargetAnswer processCommand(Credential cred, ContainerMessage containerMessage) throws OpenMSPException {
00075     AbstractCommand syncCommand = (AbstractCommand) containerMessage.getElement();
00076     int commandType = syncCommand.getElementType();
00077     if (commandType != Element.SYNC) {
00078       SyncTargetAnswer answer = new SyncTargetAnswer();
00079       Status status = new Status(syncCommand.getCmdId(), Status.STATUS_WRONG_FORMAT);
00080       status.setCmdRef(syncCommand.getCmdId());
00081       ContainerMessage[] returnContainer = new ContainerMessage[1];
00082       returnContainer[0] = new ContainerMessage(status);
00083       answer.containerMessage = returnContainer;
00084       return answer;
00085     }
00086 
00087     // manage server connection
00088     try {
00089       this.connect(cred);
00090       try {
00091 
00092         Array returnList = new Array(5);
00093         boolean doClientUpdate = true; // if an error occurs during server update stop client update.
00094         // process incomming data
00095         Command sync = (Command) containerMessage.getElement();
00096         this.processSyncActionMetaData(sync.getMetaInformation());
00097         long sessionID = sync.getSourceSessionID();
00098         Array clientsaoList = new Array(10);
00099         Element getCommand = null;
00100         while (containerMessage.hasMoreMessage()) {
00101           ContainerMessage commandContainer = containerMessage.nextMessage();
00102           Element command = commandContainer.getElement();
00103           try {
00104             short actiontype = -1;
00105             DataItem item = (DataItem) commandContainer.nextMessage().getElement();
00106             String meta = new String(GeneralCoder.decodeBase64(item.getMetaInformation().getBytes()));
00107             String dataUID = "";
00108             if (meta.length() >0) dataUID =  meta.substring(6, meta.length());
00109             Object data = null;
00110 
00111             if (command.getElementType() == Element.ADD) {
00112               actiontype = SynchroAtomicObject.ADD;
00113               // data return [0] objID, [1] object serialized.
00114               data = DBImportFileCoder.getCoder().decodeData(new String(GeneralCoder.decodeBase64(item.getData().getBytes())));
00115             } else if (command.getElementType() == Element.DELETE) {
00116               actiontype = SynchroAtomicObject.DELETE;
00117               data = dataUID;
00118             } else if (command.getElementType() == Element.REPLACE) {
00119               // data return [0] objID, [1] object serialized.
00120               actiontype = SynchroAtomicObject.REPLACE;
00121               data = DBImportFileCoder.getCoder().decodeData(new String(GeneralCoder.decodeBase64(item.getData().getBytes())));
00122             } else if (command.getElementType() == Element.GET) {
00123               getCommand = command;
00124             }
00125             if (actiontype != -1) {
00126               SyncDBSynchroAtomicObject SAO = new SyncDBSynchroAtomicObject(dataUID, "");
00127               SAO.setModificationType(actiontype);
00128               SAO.setModifSyncNumber(sessionID);
00129               SAO.setSyncdata(data);
00130               SAO.setOpenMSPcmdId(command.getCmdId());
00131               clientsaoList.add(SAO);
00132             }
00133           } catch (Throwable ex) {
00134             LogManager.traceError(0, ex);
00135             Status status = new Status(command.getCmdId(), Status.STATUS_FAILED);
00136             status.setCmdRef(command.getCmdId());
00137             ContainerMessage statusContainer = new ContainerMessage(status);
00138             returnList.add(statusContainer);
00139             doClientUpdate = false;
00140           }
00141         }
00142 
00143         // manage synchro algo.
00144         AtomicObjectArrayReplication replication = new AtomicObjectArrayReplication(this.getSynchroConflicResolver());
00145         long newSessionId = SyncNumberManager.getManager().getNextSynchroNumber().getSynchroNumber();
00146         SynchroAtomicObject[] clientmodificationsoa = this.getAllModifiedAtomicObjectSince(sessionID);
00147         SynchroAtomicObject[] servermodificationsoa = new SynchroAtomicObject[clientsaoList.size()];
00148         clientsaoList.toArray(servermodificationsoa);
00149         ReplicationReturnData replicationData = replication.replicates(clientmodificationsoa, servermodificationsoa);
00150 
00151         // update client.
00152         ReplicationSAOEvent[] serverSideUpdates = replicationData.getServerSideModificationList();
00153         int clsize = serverSideUpdates.length;
00154         for (int i = 0; i < clsize; i++) {
00155           SyncDBSynchroAtomicObject syncObject = (SyncDBSynchroAtomicObject) serverSideUpdates[i].getAtomicObject();
00156           try {
00157             if ((serverSideUpdates[i].getTypeModif() == ReplicationSAOEvent.CREATE) || (serverSideUpdates[i].getTypeModif() == ReplicationSAOEvent.UPDATE)) {
00158               String[] updata = (String[]) syncObject.getSyncdata();
00159               //decode encoded uid. uid is encoded to avoid problem with special caractere in UID and sync data encoding (' or ; caractere).
00160               updata[0] = new String(GeneralCoder.decodeBase64(updata[0].getBytes()));
00161               Object obj = DBImportFileCoder.getCoder().unserializeDBObject(updata);
00162               this.updateTargetWithSynchroObject(obj);
00163             } else {
00164               this.deleteTargetForSynchroObjectId((String) syncObject.getSyncdata());
00165             }
00166             Status status = new Status(syncObject.getOpenMSPcmdId(), Status.STATUS_OK);
00167             status.setCmdRef(syncObject.getOpenMSPcmdId());
00168             ContainerMessage statusContainer = new ContainerMessage(status);
00169             returnList.add(statusContainer);
00170           } catch (Throwable ex) {
00171             LogManager.traceError(0, ex);
00172             Status status = new Status(syncObject.getOpenMSPcmdId(), Status.STATUS_FAILED);
00173             status.setCmdRef(syncObject.getOpenMSPcmdId());
00174             ContainerMessage statusContainer = new ContainerMessage(status);
00175             returnList.add(statusContainer);
00176             doClientUpdate = false;
00177           }
00178         }
00179         
00180         if (sessionID == 0) doClientUpdate = true; //force synchro complete.
00181 
00182         SyncTargetAnswer answer = new SyncTargetAnswer();
00183         // manager return sync. (answer to get commande)
00184         if ((getCommand != null) && (doClientUpdate)) {
00185           try {
00186             Array addFileRepository = new Array(15);
00187             int maxRow = this.getUpdateMaxNbRow();
00188             ReplicationSAOEvent[] clientSideUpdates = replicationData.getClientSideModificationList();
00189             if ((maxRow == -1) || (maxRow >= clientSideUpdates.length)) { // incremental
00190               // synchro
00191               // construct the result command with data file informations.
00192               // process PDA update
00193               byte[] encodedData = this.encodeSynchroReturnDataList(clientSideUpdates);
00194 
00195               String syncFileName = "/importdb/"+this.getTargetName();
00196               OpenMISFile file = new MemoryFile(syncFileName, encodedData);
00197               addFileRepository.add(file);
00198               DataItem newItem = new DataItem(Element.ITEM, "ImportDataFile", syncFileName, null, null);
00199 
00200               Result resultCommand = new Result(getCommand.getCmdId(), this.getTargetName(), sync.getSource());
00201               resultCommand.setMetaInformation(Long.toString(newSessionId));
00202               ContainerMessage resultContainer = new ContainerMessage(resultCommand);
00203               resultContainer.add(newItem);
00204               returnList.add(resultContainer);
00205             } else { // complete
00206               // construct the result command with db file informations.
00207               OpenMISFile[] files = this.getDatabaseImportFiles();
00208               if ((files != null) && (files.length > 0)) {
00209                 Result resultCommand = new Result(getCommand.getCmdId(), this.getTargetName(), sync.getSource());
00210                 resultCommand.setMetaInformation(Long.toString(newSessionId));
00211                 ContainerMessage resultContainer = new ContainerMessage(resultCommand);
00212                 DataItem newItem = null;
00213                 for (int i = 0; i < files.length; i++) {
00214                   addFileRepository.add(files[i]);
00215                   String filename =  files[i].getFileCompleteName();
00216                   if (files[i] instanceof ZipEntryMemoryFile) {
00217                     filename = ((ZipEntryMemoryFile)files[i]).getZipFileName();
00218                   }
00219                   newItem = new DataItem(Element.ITEM, "DatabaseFile",filename, null, null);
00220                   resultContainer.add(newItem);
00221                 }
00222                 returnList.add(resultContainer);
00223               }
00224 
00225             }
00226             if (addFileRepository.size() > 0) {
00227               OpenMISFile[] addCyberFiles = new OpenMISFile[addFileRepository.size()];
00228               addFileRepository.toArray(addCyberFiles);
00229               answer.answerfiles = addCyberFiles;
00230             }
00231           } catch (Throwable ex) {
00232             LogManager.traceError(0, ex);
00233             Status status = new Status(getCommand.getCmdId(), Status.STATUS_FAILED);
00234             status.setCmdRef(getCommand.getCmdId());
00235             ContainerMessage statusContainer = new ContainerMessage(status);
00236             returnList.add(statusContainer);
00237           }
00238         }
00239 
00240         if (returnList.size() != 0) {
00241           ContainerMessage[] returnContainer = new ContainerMessage[returnList.size()];
00242           returnList.toArray(returnContainer);
00243           answer.containerMessage = returnContainer;
00244         }
00245         return answer;
00246       } finally {
00247         this.disconnect();
00248       }
00249     } catch (Throwable ex) {
00250       throw new OpenMSPException(ex);
00251     }
00252   }
00253 
00260   protected int getUpdateMaxNbRow() {
00261     return -1;
00262   }
00263 
00268   private byte[] encodeSynchroReturnDataList(ReplicationSAOEvent[] cserverUpdates) throws OpenMSPException, IOException {
00269     int size = cserverUpdates.length;
00270     ByteArrayOutputStream stream = new ByteArrayOutputStream();
00271     Writer writer = new OutputStreamWriter(stream);
00272     try {
00273       for (int i = 0; i < size; i++) {
00274         char[] encoded = null;
00275         if ((cserverUpdates[i].getTypeModif() == ReplicationSAOEvent.CREATE) ||(cserverUpdates[i].getTypeModif() == ReplicationSAOEvent.UPDATE))  {
00276           Object updateobject = this.getTargetObjectWithId(cserverUpdates[i].getAtomicObjectID()); 
00277           if (updateobject != null) {
00278             String[] toencode = DBImportFileCoder.getCoder().serializeDBObject(cserverUpdates[i].getAtomicObjectID(), updateobject);
00279             if (cserverUpdates[i].getTypeModif() == ReplicationSAOEvent.CREATE) {
00280               encoded = DBImportFileCoder.getCoder().encodeData(DBImportFileCoder.ADDAction, toencode);
00281             } else if (cserverUpdates[i].getTypeModif() == ReplicationSAOEvent.UPDATE) {
00282               encoded = DBImportFileCoder.getCoder().encodeData(DBImportFileCoder.REPLACEAction, toencode);
00283             }
00284           }
00285         } else  {
00286           String[] datadel = {cserverUpdates[i].getAtomicObjectID()};
00287           encoded = DBImportFileCoder.getCoder().encodeData(DBImportFileCoder.DELETEAction, datadel);
00288         }
00289         if (encoded != null) {
00290           writer.write(encoded);
00291         }
00292       }
00293     } finally {
00294       writer.flush();
00295       writer.close();
00296     }
00297     return stream.toByteArray();
00298   }
00299 
00304   protected void processSyncActionMetaData(String metadata) throws ServiceException {
00305 
00306   }
00307 
00311   public abstract String getTargetName();
00312 
00316   protected abstract void connect(Credential cred) throws UserNotFoundException, ServiceException;
00317 
00318   protected abstract void disconnect();
00319 
00320   protected abstract SynchroConflicResolver getSynchroConflicResolver();
00321 
00325   protected abstract SynchroAtomicObject[] getAllModifiedAtomicObjectSince(long syncNumber) throws OpenMSPException;
00326 
00327   protected abstract void updateTargetWithSynchroObject(Object syncObject) throws OpenMSPException;
00328 
00329   protected abstract void deleteTargetForSynchroObjectId(String uid) throws OpenMSPException;
00330 
00331   protected abstract Object getTargetObjectWithId(String uid) throws OpenMSPException;
00332 
00340   protected abstract OpenMISFile[] getDatabaseImportFiles() throws ServiceException;
00341 
00342 }

Generated on Mon Jul 10 10:29:31 2006 for OpenMobileIS by  doxygen 1.4.4