Main Page | Packages | Class Hierarchy | Class List | Directories | File List | Class Members | Related Pages

OpenMSPDBSynchroTargetListener.java

00001 /*
00002  * OpenMobileIS - a free Java(TM) Framework for mobile applications Java(TM)
00003  * Copyright (C) 2004-2005 Philippe Delrieu
00004  * All rights reserved.
00005  * Contact: openmobileis@e-care.fr
00006  *
00007  * This library is free software; you can redistribute it and/or
00008  * modify it under the terms of the GNU Lesser General Public
00009  * License as published by the Free Software Foundation; either
00010  * version 2.1 of the License, or any later version.
00011  *
00012  * This library is distributed in the hope that it will be useful,
00013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00015  * Lesser General Public License for more details.
00016  *
00017  * You should have received a copy of the GNU Lesser General Public
00018  * License along with this library; if not, write to the Free Software
00019  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
00020  * USA
00021  *
00022  *  Author : Philippe Delrieu
00023  * 
00024  */
00025 package org.openmobileis.synchro.openmsp.server.synctarget;
00026 
00027 import java.io.ByteArrayOutputStream;
00028 import java.io.IOException;
00029 import java.io.OutputStreamWriter;
00030 import java.io.Writer;
00031 
00032 import org.openmobileis.common.user.UserNotFoundException;
00033 import org.openmobileis.common.util.codec.GeneralCoder;
00034 import org.openmobileis.common.util.collection.Array;
00035 import org.openmobileis.common.util.exception.ServiceException;
00036 import org.openmobileis.common.util.log.LogManager;
00037 import org.openmobileis.synchro.algo.replication.AtomicObjectArrayReplication;
00038 import org.openmobileis.synchro.algo.replication.ReplicationReturnData;
00039 import org.openmobileis.synchro.algo.replication.ReplicationSAOEvent;
00040 import org.openmobileis.synchro.algo.replication.SynchroAtomicObject;
00041 import org.openmobileis.synchro.algo.replication.SynchroConflicResolver;
00042 import org.openmobileis.synchro.algo.syncnumber.SyncNumberManager;
00043 import org.openmobileis.synchro.openmsp.OpenMSPException;
00044 import org.openmobileis.synchro.openmsp.client.db.DBImportFileCoder;
00045 import org.openmobileis.synchro.openmsp.protocol.AbstractCommand;
00046 import org.openmobileis.synchro.openmsp.protocol.Command;
00047 import org.openmobileis.synchro.openmsp.protocol.ContainerMessage;
00048 import org.openmobileis.synchro.openmsp.protocol.DataItem;
00049 import org.openmobileis.synchro.openmsp.protocol.Element;
00050 import org.openmobileis.synchro.openmsp.protocol.Result;
00051 import org.openmobileis.synchro.openmsp.protocol.Status;
00052 import org.openmobileis.synchro.openmsp.server.util.MemoryFile;
00053 import org.openmobileis.synchro.openmsp.server.util.OpenMISFile;
00054 import org.openmobileis.synchro.openmsp.server.util.ZipEntryMemoryFile;
00055 import org.openmobileis.synchro.security.auth.Credential;
00056 
00065 public abstract class OpenMSPDBSynchroTargetListener implements OpenMSPSynchroTargetListener {
00066 
00070   public OpenMSPDBSynchroTargetListener() {
00071     super();
00072   }
00073 
00074   public SyncTargetAnswer processCommand(Credential cred, ContainerMessage containerMessage) throws OpenMSPException {
00075     AbstractCommand syncCommand = (AbstractCommand) containerMessage.getElement();
00076     int commandType = syncCommand.getElementType();
00077     if (commandType != Element.SYNC) {
00078       SyncTargetAnswer answer = new SyncTargetAnswer();
00079       Status status = new Status(syncCommand.getCmdId(), Status.STATUS_WRONG_FORMAT);
00080       status.setCmdRef(syncCommand.getCmdId());
00081       ContainerMessage[] returnContainer = new ContainerMessage[1];
00082       returnContainer[0] = new ContainerMessage(status);
00083       answer.containerMessage = returnContainer;
00084       return answer;
00085     }
00086 
00087     // manage server connection
00088     try {
00089       this.connect(cred);
00090       try {
00091 
00092         Array returnList = new Array(5);
00093         boolean doClientUpdate = true; // if an error occurs during server update stop client update.
00094         // process incomming data
00095         Command sync = (Command) containerMessage.getElement();
00096         this.processSyncActionMetaData(sync.getMetaInformation());
00097         long sessionID = sync.getSourceSessionID();
00098         Array clientsaoList = new Array(10);
00099         Element getCommand = null;
00100         while (containerMessage.hasMoreMessage()) {
00101           ContainerMessage commandContainer = containerMessage.nextMessage();
00102           Element command = commandContainer.getElement();
00103           try {
00104             short actiontype = -1;
00105             DataItem item = (DataItem) commandContainer.nextMessage().getElement();
00106             String meta = new String(GeneralCoder.decodeBase64(item.getMetaInformation().getBytes()));
00107             String dataUID = "";
00108             if (meta.length() >0) dataUID =  meta.substring(6, meta.length());
00109             Object data = null;
00110 
00111             if (command.getElementType() == Element.ADD) {
00112               actiontype = SynchroAtomicObject.ADD;
00113               // data return [0] objID, [1] object serialized.
00114               data = DBImportFileCoder.getCoder().decodeData(new String(GeneralCoder.decodeBase64(item.getData().getBytes())));
00115             } else if (command.getElementType() == Element.DELETE) {
00116               actiontype = SynchroAtomicObject.DELETE;
00117               data = dataUID;
00118             } else if (command.getElementType() == Element.REPLACE) {
00119               // data return [0] objID, [1] object serialized.
00120               actiontype = SynchroAtomicObject.REPLACE;
00121               data = DBImportFileCoder.getCoder().decodeData(new String(GeneralCoder.decodeBase64(item.getData().getBytes())));
00122             } else if (command.getElementType() == Element.GET) {
00123               getCommand = command;
00124             }
00125             if (actiontype != -1) {
00126               SyncDBSynchroAtomicObject SAO = new SyncDBSynchroAtomicObject(dataUID, "");
00127               SAO.setModificationType(actiontype);
00128               SAO.setModifSyncNumber(sessionID);
00129               SAO.setSyncdata(data);
00130               SAO.setOpenMSPcmdId(command.getCmdId());
00131               clientsaoList.add(SAO);
00132             }
00133           } catch (Throwable ex) {
00134             LogManager.traceError(0, ex);
00135             Status status = new Status(command.getCmdId(), Status.STATUS_FAILED);
00136             status.setCmdRef(command.getCmdId());
00137             ContainerMessage statusContainer = new ContainerMessage(status);
00138             returnList.add(statusContainer);
00139             doClientUpdate = false;
00140           }
00141         }
00142 
00143         // manage synchro algo.
00144         AtomicObjectArrayReplication replication = new AtomicObjectArrayReplication(this.getSynchroConflicResolver());
00145         long newSessionId = SyncNumberManager.getManager().getNextSynchroNumber().getSynchroNumber();
00146         SynchroAtomicObject[] clientmodificationsoa = this.getAllModifiedAtomicObjectSince(sessionID);
00147         SynchroAtomicObject[] servermodificationsoa = new SynchroAtomicObject[clientsaoList.size()];
00148         clientsaoList.toArray(servermodificationsoa);
00149         ReplicationReturnData replicationData = replication.replicates(clientmodificationsoa, servermodificationsoa);
00150 
00151         // update client.
00152         ReplicationSAOEvent[] serverSideUpdates = replicationData.getServerSideModificationList();
00153         int clsize = serverSideUpdates.length;
00154         for (int i = 0; i < clsize; i++) {
00155           SyncDBSynchroAtomicObject syncObject = (SyncDBSynchroAtomicObject) serverSideUpdates[i].getAtomicObject();
00156           try {
00157             if ((serverSideUpdates[i].getTypeModif() == ReplicationSAOEvent.CREATE) || (serverSideUpdates[i].getTypeModif() == ReplicationSAOEvent.UPDATE)) {
00158               String[] updata = (String[]) syncObject.getSyncdata();
00159               Object obj = DBImportFileCoder.getCoder().unserializeDBObject(updata);
00160               this.updateTargetWithSynchroObject(obj);
00161             } else {
00162               this.deleteTargetForSynchroObjectId((String) syncObject.getSyncdata());
00163             }
00164             Status status = new Status(syncObject.getOpenMSPcmdId(), Status.STATUS_OK);
00165             status.setCmdRef(syncObject.getOpenMSPcmdId());
00166             ContainerMessage statusContainer = new ContainerMessage(status);
00167             returnList.add(statusContainer);
00168           } catch (Throwable ex) {
00169             LogManager.traceError(0, ex);
00170             Status status = new Status(syncObject.getOpenMSPcmdId(), Status.STATUS_FAILED);
00171             status.setCmdRef(syncObject.getOpenMSPcmdId());
00172             ContainerMessage statusContainer = new ContainerMessage(status);
00173             returnList.add(statusContainer);
00174             doClientUpdate = false;
00175           }
00176         }
00177         
00178         if (sessionID == 0) doClientUpdate = true; //force synchro complete.
00179 
00180         SyncTargetAnswer answer = new SyncTargetAnswer();
00181         // manager return sync. (answer to get commande)
00182         if ((getCommand != null) && (doClientUpdate)) {
00183           try {
00184             Array addFileRepository = new Array(15);
00185             int maxRow = this.getUpdateMaxNbRow();
00186             ReplicationSAOEvent[] clientSideUpdates = replicationData.getClientSideModificationList();
00187             if ((maxRow == -1) || (maxRow >= clientSideUpdates.length)) { // incremental
00188               // synchro
00189               // construct the result command with data file informations.
00190               // process PDA update
00191               byte[] encodedData = this.encodeSynchroReturnDataList(clientSideUpdates);
00192 
00193               String syncFileName = "/importdb/"+this.getTargetName();
00194               OpenMISFile file = new MemoryFile(syncFileName, encodedData);
00195               addFileRepository.add(file);
00196               DataItem newItem = new DataItem(Element.ITEM, "ImportDataFile", syncFileName, null, null);
00197 
00198               Result resultCommand = new Result(getCommand.getCmdId(), this.getTargetName(), sync.getSource());
00199               resultCommand.setMetaInformation(Long.toString(newSessionId));
00200               ContainerMessage resultContainer = new ContainerMessage(resultCommand);
00201               resultContainer.add(newItem);
00202               returnList.add(resultContainer);
00203             } else { // complete
00204               // construct the result command with db file informations.
00205               OpenMISFile[] files = this.getDatabaseImportFiles();
00206               if ((files != null) && (files.length > 0)) {
00207                 Result resultCommand = new Result(getCommand.getCmdId(), this.getTargetName(), sync.getSource());
00208                 resultCommand.setMetaInformation(Long.toString(newSessionId));
00209                 ContainerMessage resultContainer = new ContainerMessage(resultCommand);
00210                 DataItem newItem = null;
00211                 for (int i = 0; i < files.length; i++) {
00212                   addFileRepository.add(files[i]);
00213                   String filename =  files[i].getFileCompleteName();
00214                   if (files[i] instanceof ZipEntryMemoryFile) {
00215                     filename = ((ZipEntryMemoryFile)files[i]).getZipFileName();
00216                   }
00217                   newItem = new DataItem(Element.ITEM, "DatabaseFile",filename, null, null);
00218                   resultContainer.add(newItem);
00219                 }
00220                 returnList.add(resultContainer);
00221               }
00222 
00223             }
00224             if (addFileRepository.size() > 0) {
00225               OpenMISFile[] addCyberFiles = new OpenMISFile[addFileRepository.size()];
00226               addFileRepository.toArray(addCyberFiles);
00227               answer.answerfiles = addCyberFiles;
00228             }
00229           } catch (Throwable ex) {
00230             LogManager.traceError(0, ex);
00231             Status status = new Status(getCommand.getCmdId(), Status.STATUS_FAILED);
00232             status.setCmdRef(getCommand.getCmdId());
00233             ContainerMessage statusContainer = new ContainerMessage(status);
00234             returnList.add(statusContainer);
00235           }
00236         }
00237 
00238         if (returnList.size() != 0) {
00239           ContainerMessage[] returnContainer = new ContainerMessage[returnList.size()];
00240           returnList.toArray(returnContainer);
00241           answer.containerMessage = returnContainer;
00242         }
00243         return answer;
00244       } finally {
00245         this.disconnect();
00246       }
00247     } catch (Throwable ex) {
00248       throw new OpenMSPException(ex);
00249     }
00250   }
00251 
00258   protected int getUpdateMaxNbRow() {
00259     return -1;
00260   }
00261 
00266   private byte[] encodeSynchroReturnDataList(ReplicationSAOEvent[] cserverUpdates) throws OpenMSPException, IOException {
00267     int size = cserverUpdates.length;
00268     ByteArrayOutputStream stream = new ByteArrayOutputStream();
00269     Writer writer = new OutputStreamWriter(stream);
00270     try {
00271       for (int i = 0; i < size; i++) {
00272         char[] encoded = null;
00273         if ((cserverUpdates[i].getTypeModif() == ReplicationSAOEvent.CREATE) ||(cserverUpdates[i].getTypeModif() == ReplicationSAOEvent.UPDATE))  {
00274           Object updateobject = this.getTargetObjectWithId(cserverUpdates[i].getAtomicObjectID()); 
00275           if (updateobject != null) {
00276             String[] toencode = DBImportFileCoder.getCoder().serializeDBObject(cserverUpdates[i].getAtomicObjectID(), updateobject);
00277             if (cserverUpdates[i].getTypeModif() == ReplicationSAOEvent.CREATE) {
00278               encoded = DBImportFileCoder.getCoder().encodeData(DBImportFileCoder.ADDAction, toencode);
00279             } else if (cserverUpdates[i].getTypeModif() == ReplicationSAOEvent.UPDATE) {
00280               encoded = DBImportFileCoder.getCoder().encodeData(DBImportFileCoder.REPLACEAction, toencode);
00281             }
00282           }
00283         } else  {
00284           String[] datadel = {cserverUpdates[i].getAtomicObjectID()};
00285           encoded = DBImportFileCoder.getCoder().encodeData(DBImportFileCoder.DELETEAction, datadel);
00286         }
00287         if (encoded != null) {
00288           writer.write(encoded);
00289         }
00290       }
00291     } finally {
00292       writer.flush();
00293       writer.close();
00294     }
00295     return stream.toByteArray();
00296   }
00297 
00302   protected void processSyncActionMetaData(String metadata) throws ServiceException {
00303 
00304   }
00305 
00309   public abstract String getTargetName();
00310 
00314   protected abstract void connect(Credential cred) throws UserNotFoundException, ServiceException;
00315 
00316   protected abstract void disconnect();
00317 
00318   protected abstract SynchroConflicResolver getSynchroConflicResolver();
00319 
00323   protected abstract SynchroAtomicObject[] getAllModifiedAtomicObjectSince(long syncNumber) throws OpenMSPException;
00324 
00325   protected abstract void updateTargetWithSynchroObject(Object syncObject) throws OpenMSPException;
00326 
00327   protected abstract void deleteTargetForSynchroObjectId(String uid) throws OpenMSPException;
00328 
00329   protected abstract Object getTargetObjectWithId(String uid) throws OpenMSPException;
00330 
00338   protected abstract OpenMISFile[] getDatabaseImportFiles() throws ServiceException;
00339 
00340 }

Generated on Wed Dec 14 21:05:34 2005 for OpenMobileIS by  doxygen 1.4.4