Mercurial > hg > blitz_condensed
comparison src/org/dancres/blitz/remote/nio/MessageDecoder.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:3dc0c5604566 |
---|---|
1 package org.dancres.blitz.remote.nio; | |
2 | |
3 import org.dancres.blitz.SpaceImpl; | |
4 import org.dancres.blitz.WriteTicket; | |
5 import org.dancres.blitz.remote.LeaseImpl; | |
6 | |
7 import java.io.ByteArrayOutputStream; | |
8 import java.io.ObjectOutputStream; | |
9 import java.nio.ByteBuffer; | |
10 import java.rmi.RemoteException; | |
11 | |
12 /** | |
13 * Responsible for decoding a serialized space operation as provided by | |
14 * Invoker. It then dispatches the relevant operation and marshals up the | |
15 * return (or exception) befor epassing it back to the ControlBlock for | |
16 * transmission. | |
17 */ | |
18 public class MessageDecoder implements Runnable { | |
19 private static CommandFactory _commandFactory = new CommandFactory(); | |
20 | |
21 private int _reqId; | |
22 private byte[] _operation; | |
23 private ControlBlock _block; | |
24 private SpaceImpl _space; | |
25 | |
26 MessageDecoder(int aReqId,SpaceImpl aSpace, ControlBlock aBlock, | |
27 byte[] anOpBlock) { | |
28 _reqId = aReqId; | |
29 _operation = anOpBlock; | |
30 _block = aBlock; | |
31 _space = aSpace; | |
32 } | |
33 | |
34 /** | |
35 * @todo Put the execution code into the op itself which would save | |
36 * the switch etc and allow us to drop the test for GenericSpaceOp etc. | |
37 * It'll also be easier to add say TransactionOp so we can extend the | |
38 * interface to TransactionParticipant | |
39 */ | |
40 public void run() { | |
41 Object myResult = null; | |
42 try { | |
43 // System.err.println("Received op: " + _reqId + ", " + _operation.length); | |
44 Operation myOp = _commandFactory.unpack(_operation); | |
45 | |
46 if (myOp instanceof GenericSpaceOp) { | |
47 GenericSpaceOp mySpaceOp = (GenericSpaceOp) myOp; | |
48 | |
49 switch (mySpaceOp.getOperation()) { | |
50 case GenericSpaceOp.WRITE : { | |
51 WriteTicket myTicket = | |
52 _space.write(mySpaceOp.getEntry(), | |
53 mySpaceOp.getTxn(), | |
54 mySpaceOp.getLease()); | |
55 | |
56 myResult = new LeaseImpl(null, null, myTicket.getUID(), | |
57 myTicket.getExpirationTime()); | |
58 break; | |
59 } | |
60 | |
61 case GenericSpaceOp.TAKE : { | |
62 myResult = | |
63 _space.take(mySpaceOp.getEntry(), | |
64 mySpaceOp.getTxn(), | |
65 mySpaceOp.getLease()); | |
66 break; | |
67 } | |
68 | |
69 case GenericSpaceOp.TAKE_EXISTS : { | |
70 myResult = | |
71 _space.takeIfExists(mySpaceOp.getEntry(), | |
72 mySpaceOp.getTxn(), | |
73 mySpaceOp.getLease()); | |
74 break; | |
75 } | |
76 | |
77 case GenericSpaceOp.READ : { | |
78 myResult = | |
79 _space.read(mySpaceOp.getEntry(), | |
80 mySpaceOp.getTxn(), | |
81 mySpaceOp.getLease()); | |
82 break; | |
83 } | |
84 | |
85 case GenericSpaceOp.READ_EXISTS : { | |
86 myResult = | |
87 _space.readIfExists(mySpaceOp.getEntry(), | |
88 mySpaceOp.getTxn(), | |
89 mySpaceOp.getLease()); | |
90 break; | |
91 } | |
92 | |
93 default : | |
94 myResult = new RemoteException("Unrecognised space op"); | |
95 } | |
96 } else if (myOp instanceof TransactionOp) { | |
97 TransactionOp myTxnOp = (TransactionOp) myOp; | |
98 | |
99 switch (myTxnOp.getOperation()) { | |
100 case TransactionOp.PREPARE : { | |
101 int myCode = | |
102 _space.getTxnControl().prepare(myTxnOp.getMgr(), | |
103 myTxnOp.getId()); | |
104 | |
105 myResult = new Integer(myCode); | |
106 break; | |
107 } | |
108 | |
109 case TransactionOp.COMMIT : { | |
110 _space.getTxnControl().commit(myTxnOp.getMgr(), | |
111 myTxnOp.getId()); | |
112 break; | |
113 } | |
114 | |
115 case TransactionOp.ABORT : { | |
116 _space.getTxnControl().abort(myTxnOp.getMgr(), | |
117 myTxnOp.getId()); | |
118 break; | |
119 } | |
120 | |
121 case TransactionOp.PREPARE_COMMIT : { | |
122 int myCode = | |
123 _space.getTxnControl().prepareAndCommit(myTxnOp.getMgr(), | |
124 myTxnOp.getId()); | |
125 | |
126 myResult = new Integer(myCode); | |
127 break; | |
128 } | |
129 | |
130 default : | |
131 myResult = new RemoteException("Unrecognised txn op"); | |
132 } | |
133 } | |
134 } catch (Throwable aT) { | |
135 System.err.println("Failed to dispatch request"); | |
136 aT.printStackTrace(System.err); | |
137 | |
138 myResult = aT; | |
139 } | |
140 | |
141 ByteArrayOutputStream myBAOS = new ByteArrayOutputStream(); | |
142 | |
143 try { | |
144 ObjectOutputStream myOOS = new ObjectOutputStream(myBAOS); | |
145 myOOS.writeObject(myResult); | |
146 myOOS.close(); | |
147 } catch (Exception anE) { | |
148 System.err.println("Failed to marshall response"); | |
149 anE.printStackTrace(System.err); | |
150 } | |
151 | |
152 ByteBuffer myBuffer = ByteBuffer.allocate(myBAOS.size() + 8); | |
153 | |
154 // System.err.println("Sending: " + _reqId + ", " + myBAOS.size()); | |
155 myBuffer.putInt(_reqId); | |
156 myBuffer.putInt(myBAOS.size()); | |
157 myBuffer.put(myBAOS.toByteArray()); | |
158 | |
159 myBuffer.flip(); | |
160 _block.send(new ByteBuffer[] {myBuffer}); | |
161 } | |
162 } |