diff src/org/dancres/blitz/remote/nio/MessageDecoder.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/org/dancres/blitz/remote/nio/MessageDecoder.java	Sat Mar 21 11:00:06 2009 +0000
@@ -0,0 +1,162 @@
+package org.dancres.blitz.remote.nio;
+
+import org.dancres.blitz.SpaceImpl;
+import org.dancres.blitz.WriteTicket;
+import org.dancres.blitz.remote.LeaseImpl;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+import java.rmi.RemoteException;
+
+/**
+ * Responsible for decoding a serialized space operation as provided by
+ * Invoker.  It then dispatches the relevant operation and marshals up the
+ * return (or exception) befor epassing it back to the ControlBlock for
+ * transmission.
+ */
+public class MessageDecoder implements Runnable {
+    private static CommandFactory _commandFactory = new CommandFactory();
+
+    private int _reqId;
+    private byte[] _operation;
+    private ControlBlock _block;
+    private SpaceImpl _space;
+
+    MessageDecoder(int aReqId,SpaceImpl aSpace, ControlBlock aBlock,
+                   byte[] anOpBlock) {
+        _reqId = aReqId;
+        _operation = anOpBlock;
+        _block = aBlock;
+        _space = aSpace;
+    }
+
+    /**
+     * @todo Put the execution code into the op itself which would save
+     * the switch etc and allow us to drop the test for GenericSpaceOp etc.
+     * It'll also be easier to add say TransactionOp so we can extend the
+     * interface to TransactionParticipant
+     */
+    public void run() {
+        Object myResult = null;
+        try {
+            // System.err.println("Received op: " + _reqId + ", " + _operation.length);
+            Operation myOp = _commandFactory.unpack(_operation);
+
+            if (myOp instanceof GenericSpaceOp) {
+                GenericSpaceOp mySpaceOp = (GenericSpaceOp) myOp;
+
+                switch (mySpaceOp.getOperation()) {
+                    case GenericSpaceOp.WRITE : {
+                        WriteTicket myTicket =
+                                _space.write(mySpaceOp.getEntry(),
+                                        mySpaceOp.getTxn(),
+                                        mySpaceOp.getLease());
+
+                        myResult = new LeaseImpl(null, null, myTicket.getUID(),
+                                myTicket.getExpirationTime());
+                        break;
+                    }
+
+                    case GenericSpaceOp.TAKE : {
+                        myResult =
+                                _space.take(mySpaceOp.getEntry(),
+                                        mySpaceOp.getTxn(),
+                                        mySpaceOp.getLease());
+                        break;
+                    }
+
+                    case GenericSpaceOp.TAKE_EXISTS : {
+                        myResult =
+                                _space.takeIfExists(mySpaceOp.getEntry(),
+                                        mySpaceOp.getTxn(),
+                                        mySpaceOp.getLease());
+                        break;
+                    }
+
+                    case GenericSpaceOp.READ : {
+                        myResult =
+                                _space.read(mySpaceOp.getEntry(),
+                                        mySpaceOp.getTxn(),
+                                        mySpaceOp.getLease());
+                        break;
+                    }
+
+                    case GenericSpaceOp.READ_EXISTS : {
+                        myResult =
+                                _space.readIfExists(mySpaceOp.getEntry(),
+                                        mySpaceOp.getTxn(),
+                                        mySpaceOp.getLease());
+                        break;
+                    }
+
+                    default :
+                        myResult = new RemoteException("Unrecognised space op");
+                }
+            } else if (myOp instanceof TransactionOp) {
+                TransactionOp myTxnOp = (TransactionOp) myOp;
+
+                switch (myTxnOp.getOperation()) {
+                    case TransactionOp.PREPARE : {
+                        int myCode =
+                            _space.getTxnControl().prepare(myTxnOp.getMgr(),
+                                myTxnOp.getId());
+
+                        myResult = new Integer(myCode);
+                        break;
+                    }
+
+                    case TransactionOp.COMMIT : {
+                        _space.getTxnControl().commit(myTxnOp.getMgr(),
+                            myTxnOp.getId());
+                        break;
+                    }
+
+                    case TransactionOp.ABORT : {
+                        _space.getTxnControl().abort(myTxnOp.getMgr(),
+                            myTxnOp.getId());
+                        break;
+                    }
+
+                    case TransactionOp.PREPARE_COMMIT : {
+                        int myCode =
+                            _space.getTxnControl().prepareAndCommit(myTxnOp.getMgr(),
+                                myTxnOp.getId());
+
+                        myResult = new Integer(myCode);
+                        break;
+                    }
+
+                    default :
+                        myResult = new RemoteException("Unrecognised txn op");
+                }
+            }
+        } catch (Throwable aT) {
+            System.err.println("Failed to dispatch request");
+            aT.printStackTrace(System.err);
+
+            myResult = aT;
+        }
+
+        ByteArrayOutputStream myBAOS = new ByteArrayOutputStream();
+
+        try {
+            ObjectOutputStream myOOS = new ObjectOutputStream(myBAOS);
+            myOOS.writeObject(myResult);
+            myOOS.close();
+        } catch (Exception anE) {
+            System.err.println("Failed to marshall response");
+            anE.printStackTrace(System.err);
+        }
+
+        ByteBuffer myBuffer = ByteBuffer.allocate(myBAOS.size() + 8);
+
+        // System.err.println("Sending: " + _reqId + ", " + myBAOS.size());
+        myBuffer.putInt(_reqId);
+        myBuffer.putInt(myBAOS.size());
+        myBuffer.put(myBAOS.toByteArray());
+
+        myBuffer.flip();
+        _block.send(new ByteBuffer[] {myBuffer});
+    }
+}