comparison src/org/dancres/blitz/remote/nio/MessageDecoder.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:3dc0c5604566
1 package org.dancres.blitz.remote.nio;
2
3 import org.dancres.blitz.SpaceImpl;
4 import org.dancres.blitz.WriteTicket;
5 import org.dancres.blitz.remote.LeaseImpl;
6
7 import java.io.ByteArrayOutputStream;
8 import java.io.ObjectOutputStream;
9 import java.nio.ByteBuffer;
10 import java.rmi.RemoteException;
11
12 /**
13 * Responsible for decoding a serialized space operation as provided by
14 * Invoker. It then dispatches the relevant operation and marshals up the
15 * return (or exception) befor epassing it back to the ControlBlock for
16 * transmission.
17 */
18 public class MessageDecoder implements Runnable {
19 private static CommandFactory _commandFactory = new CommandFactory();
20
21 private int _reqId;
22 private byte[] _operation;
23 private ControlBlock _block;
24 private SpaceImpl _space;
25
26 MessageDecoder(int aReqId,SpaceImpl aSpace, ControlBlock aBlock,
27 byte[] anOpBlock) {
28 _reqId = aReqId;
29 _operation = anOpBlock;
30 _block = aBlock;
31 _space = aSpace;
32 }
33
34 /**
35 * @todo Put the execution code into the op itself which would save
36 * the switch etc and allow us to drop the test for GenericSpaceOp etc.
37 * It'll also be easier to add say TransactionOp so we can extend the
38 * interface to TransactionParticipant
39 */
40 public void run() {
41 Object myResult = null;
42 try {
43 // System.err.println("Received op: " + _reqId + ", " + _operation.length);
44 Operation myOp = _commandFactory.unpack(_operation);
45
46 if (myOp instanceof GenericSpaceOp) {
47 GenericSpaceOp mySpaceOp = (GenericSpaceOp) myOp;
48
49 switch (mySpaceOp.getOperation()) {
50 case GenericSpaceOp.WRITE : {
51 WriteTicket myTicket =
52 _space.write(mySpaceOp.getEntry(),
53 mySpaceOp.getTxn(),
54 mySpaceOp.getLease());
55
56 myResult = new LeaseImpl(null, null, myTicket.getUID(),
57 myTicket.getExpirationTime());
58 break;
59 }
60
61 case GenericSpaceOp.TAKE : {
62 myResult =
63 _space.take(mySpaceOp.getEntry(),
64 mySpaceOp.getTxn(),
65 mySpaceOp.getLease());
66 break;
67 }
68
69 case GenericSpaceOp.TAKE_EXISTS : {
70 myResult =
71 _space.takeIfExists(mySpaceOp.getEntry(),
72 mySpaceOp.getTxn(),
73 mySpaceOp.getLease());
74 break;
75 }
76
77 case GenericSpaceOp.READ : {
78 myResult =
79 _space.read(mySpaceOp.getEntry(),
80 mySpaceOp.getTxn(),
81 mySpaceOp.getLease());
82 break;
83 }
84
85 case GenericSpaceOp.READ_EXISTS : {
86 myResult =
87 _space.readIfExists(mySpaceOp.getEntry(),
88 mySpaceOp.getTxn(),
89 mySpaceOp.getLease());
90 break;
91 }
92
93 default :
94 myResult = new RemoteException("Unrecognised space op");
95 }
96 } else if (myOp instanceof TransactionOp) {
97 TransactionOp myTxnOp = (TransactionOp) myOp;
98
99 switch (myTxnOp.getOperation()) {
100 case TransactionOp.PREPARE : {
101 int myCode =
102 _space.getTxnControl().prepare(myTxnOp.getMgr(),
103 myTxnOp.getId());
104
105 myResult = new Integer(myCode);
106 break;
107 }
108
109 case TransactionOp.COMMIT : {
110 _space.getTxnControl().commit(myTxnOp.getMgr(),
111 myTxnOp.getId());
112 break;
113 }
114
115 case TransactionOp.ABORT : {
116 _space.getTxnControl().abort(myTxnOp.getMgr(),
117 myTxnOp.getId());
118 break;
119 }
120
121 case TransactionOp.PREPARE_COMMIT : {
122 int myCode =
123 _space.getTxnControl().prepareAndCommit(myTxnOp.getMgr(),
124 myTxnOp.getId());
125
126 myResult = new Integer(myCode);
127 break;
128 }
129
130 default :
131 myResult = new RemoteException("Unrecognised txn op");
132 }
133 }
134 } catch (Throwable aT) {
135 System.err.println("Failed to dispatch request");
136 aT.printStackTrace(System.err);
137
138 myResult = aT;
139 }
140
141 ByteArrayOutputStream myBAOS = new ByteArrayOutputStream();
142
143 try {
144 ObjectOutputStream myOOS = new ObjectOutputStream(myBAOS);
145 myOOS.writeObject(myResult);
146 myOOS.close();
147 } catch (Exception anE) {
148 System.err.println("Failed to marshall response");
149 anE.printStackTrace(System.err);
150 }
151
152 ByteBuffer myBuffer = ByteBuffer.allocate(myBAOS.size() + 8);
153
154 // System.err.println("Sending: " + _reqId + ", " + myBAOS.size());
155 myBuffer.putInt(_reqId);
156 myBuffer.putInt(myBAOS.size());
157 myBuffer.put(myBAOS.toByteArray());
158
159 myBuffer.flip();
160 _block.send(new ByteBuffer[] {myBuffer});
161 }
162 }