Mercurial > hg > blitz_condensed
comparison src/org/dancres/blitz/remote/nio/Invoker.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:3dc0c5604566 |
---|---|
1 package org.dancres.blitz.remote.nio; | |
2 | |
3 import net.jini.core.lease.Lease; | |
4 import net.jini.core.transaction.Transaction; | |
5 import net.jini.core.transaction.TransactionException; | |
6 import net.jini.core.transaction.UnknownTransactionException; | |
7 import net.jini.core.transaction.server.TransactionManager; | |
8 import net.jini.core.event.EventRegistration; | |
9 import net.jini.core.event.RemoteEventListener; | |
10 | |
11 import java.rmi.RemoteException; | |
12 import java.rmi.MarshalledObject; | |
13 import java.net.Socket; | |
14 import java.net.InetSocketAddress; | |
15 import java.io.*; | |
16 | |
17 import org.dancres.blitz.mangler.MangledEntry; | |
18 import org.dancres.blitz.remote.LeaseImpl; | |
19 | |
20 /** | |
21 * Encapsulates all the logic for invoking a space method - one of take, read and write. | |
22 * Manages the socket and wraps it in an asynchronous dispatch framework to allow queueing | |
23 * etc for max throughput. | |
24 */ | |
25 public class Invoker implements FastSpace, Serializable, TransportListener { | |
26 private transient Socket _socket; | |
27 private transient Rxer _rxer; | |
28 private transient Txer _txer; | |
29 private transient CommandFactory _commandFactory; | |
30 | |
31 private transient int _nextRequestId; | |
32 | |
33 private transient boolean isDown; | |
34 | |
35 private InetSocketAddress _addr; | |
36 | |
37 public Invoker(InetSocketAddress anAddr) { | |
38 _addr = anAddr; | |
39 } | |
40 | |
41 public Invoker(InetSocketAddress anAddr, boolean doOpen) throws IOException { | |
42 _addr = anAddr; | |
43 | |
44 if (doOpen) | |
45 init(); | |
46 } | |
47 | |
48 public void dead() { | |
49 synchronized(this) { | |
50 isDown = true; | |
51 } | |
52 | |
53 // Transport is down | |
54 _txer.halt(); | |
55 | |
56 try { | |
57 _socket.close(); | |
58 } catch (IOException anIOE) { | |
59 // Nothing to do | |
60 } | |
61 } | |
62 | |
63 public void init() throws IOException { | |
64 /* | |
65 System.err.println("Connecting to: " + _addr.getAddress() + | |
66 ", " + _addr.getPort()); | |
67 */ | |
68 | |
69 _socket = new Socket(_addr.getAddress(), _addr.getPort()); | |
70 _socket.setTcpNoDelay(true); | |
71 _socket.setReuseAddress(true); | |
72 | |
73 /* | |
74 System.err.println("Connect"); | |
75 */ | |
76 _rxer = new Rxer(_socket.getInputStream(), this); | |
77 _txer = new Txer(_socket.getOutputStream()); | |
78 | |
79 /* | |
80 System.err.println("Socket in buffer: " + | |
81 _socket.getReceiveBufferSize()); | |
82 System.err.println("Socket out buffer: " + | |
83 _socket.getSendBufferSize()); | |
84 */ | |
85 | |
86 _commandFactory = new CommandFactory(); | |
87 } | |
88 | |
89 public boolean isInited() { | |
90 return (_socket != null); | |
91 } | |
92 | |
93 private int getNextRequestId() { | |
94 synchronized(this) { | |
95 return _nextRequestId++; | |
96 } | |
97 } | |
98 | |
99 private void downBarrier() throws RemoteException { | |
100 synchronized(this) { | |
101 if (isDown) | |
102 throw new RemoteException("Connection is closed"); | |
103 } | |
104 } | |
105 | |
106 public LeaseImpl write(MangledEntry anEntry, Transaction aTxn, long aLeaseTime) | |
107 throws RemoteException, TransactionException { | |
108 | |
109 /* | |
110 long myStart = System.currentTimeMillis(); | |
111 */ | |
112 | |
113 downBarrier(); | |
114 | |
115 int myReqId = getNextRequestId(); | |
116 | |
117 Operation myOp = _commandFactory.newWrite(anEntry, aTxn, aLeaseTime); | |
118 | |
119 ResultReceiver myReceiver = new ResultReceiver(); | |
120 _rxer.waitFor(myReqId, myReceiver); | |
121 | |
122 try { | |
123 byte[] myFlattenedOp = _commandFactory.pack(myOp); | |
124 _txer.send(myReqId, myFlattenedOp); | |
125 } catch (Exception anE) { | |
126 _rxer.cancel(myReqId); | |
127 throw new RemoteException("Failed to send request", anE); | |
128 } | |
129 | |
130 byte[] myBuffer = myReceiver.getPayload(); | |
131 | |
132 try { | |
133 ByteArrayInputStream myBAIS = new ByteArrayInputStream(myBuffer); | |
134 ObjectInputStream myOIS = new ObjectInputStream(myBAIS); | |
135 | |
136 Object myResult = myOIS.readObject(); | |
137 | |
138 if (myResult instanceof TransactionException) { | |
139 throw (TransactionException) myResult; | |
140 } else if (myResult instanceof RemoteException) { | |
141 throw (RemoteException) myResult; | |
142 } else if (myResult instanceof Throwable) { | |
143 throw new RemoteException("Invocation failed", | |
144 (Throwable) myResult); | |
145 } else { | |
146 /* | |
147 System.err.println("Wrote: " + (System.currentTimeMillis() - | |
148 myStart)); | |
149 */ | |
150 | |
151 return (LeaseImpl) myResult; | |
152 } | |
153 | |
154 } catch (IOException anIOE) { | |
155 throw new RemoteException("Failed to unmarshal response: " + myReqId, anIOE); | |
156 } catch (ClassNotFoundException aCNFE) { | |
157 throw new RemoteException("Failed to unmarshal response", | |
158 aCNFE); | |
159 } | |
160 } | |
161 | |
162 public MangledEntry takeIfExists(MangledEntry anEntry, Transaction aTxn, | |
163 long aWaitTime) | |
164 throws RemoteException, TransactionException { | |
165 | |
166 downBarrier(); | |
167 | |
168 int myReqId = getNextRequestId(); | |
169 | |
170 Operation myOp = | |
171 _commandFactory.newTakeExists(anEntry, aTxn, aWaitTime); | |
172 | |
173 ResultReceiver myReceiver = new ResultReceiver(); | |
174 _rxer.waitFor(myReqId, myReceiver); | |
175 | |
176 try { | |
177 byte[] myFlattenedOp = _commandFactory.pack(myOp); | |
178 _txer.send(myReqId, myFlattenedOp); | |
179 } catch (Exception anE) { | |
180 _rxer.cancel(myReqId); | |
181 throw new RemoteException("Failed to send request", anE); | |
182 } | |
183 | |
184 byte[] myBuffer = myReceiver.getPayload(); | |
185 | |
186 try { | |
187 ByteArrayInputStream myBAIS = new ByteArrayInputStream(myBuffer); | |
188 ObjectInputStream myOIS = new ObjectInputStream(myBAIS); | |
189 | |
190 Object myResult = myOIS.readObject(); | |
191 | |
192 if (myResult == null) | |
193 return null; | |
194 | |
195 if (myResult instanceof TransactionException) { | |
196 throw (TransactionException) myResult; | |
197 } else if (myResult instanceof RemoteException) { | |
198 throw (RemoteException) myResult; | |
199 } else if (myResult instanceof Throwable) { | |
200 throw new RemoteException("Invocation failed", | |
201 (Throwable) myResult); | |
202 } else { | |
203 return (MangledEntry) myResult; | |
204 } | |
205 | |
206 } catch (IOException anIOE) { | |
207 throw new RemoteException("Failed to unmarshal response: " + myReqId, anIOE); | |
208 } catch (ClassNotFoundException aCNFE) { | |
209 throw new RemoteException("Failed to unmarshal response", | |
210 aCNFE); | |
211 } | |
212 } | |
213 | |
214 public MangledEntry readIfExists(MangledEntry anEntry, Transaction aTxn, long aWaitTime) | |
215 throws RemoteException, TransactionException { | |
216 | |
217 downBarrier(); | |
218 | |
219 int myReqId = getNextRequestId(); | |
220 | |
221 Operation myOp = | |
222 _commandFactory.newReadExists(anEntry, aTxn, aWaitTime); | |
223 | |
224 ResultReceiver myReceiver = new ResultReceiver(); | |
225 _rxer.waitFor(myReqId, myReceiver); | |
226 | |
227 try { | |
228 byte[] myFlattenedOp = _commandFactory.pack(myOp); | |
229 | |
230 _txer.send(myReqId, myFlattenedOp); | |
231 } catch (Exception anE) { | |
232 _rxer.cancel(myReqId); | |
233 throw new RemoteException("Failed to send request", anE); | |
234 } | |
235 | |
236 byte[] myBuffer = myReceiver.getPayload(); | |
237 | |
238 try { | |
239 ByteArrayInputStream myBAIS = new ByteArrayInputStream(myBuffer); | |
240 ObjectInputStream myOIS = new ObjectInputStream(myBAIS); | |
241 | |
242 Object myResult = myOIS.readObject(); | |
243 | |
244 if (myResult == null) | |
245 return null; | |
246 | |
247 if (myResult instanceof TransactionException) { | |
248 throw (TransactionException) myResult; | |
249 } else if (myResult instanceof RemoteException) { | |
250 throw (RemoteException) myResult; | |
251 } else if (myResult instanceof Throwable) { | |
252 throw new RemoteException("Invocation failed", | |
253 (Throwable) myResult); | |
254 } else { | |
255 return (MangledEntry) myResult; | |
256 } | |
257 | |
258 } catch (IOException anIOE) { | |
259 throw new RemoteException("Failed to unmarshal response: " + myReqId, anIOE); | |
260 } catch (ClassNotFoundException aCNFE) { | |
261 throw new RemoteException("Failed to unmarshal response", | |
262 aCNFE); | |
263 } | |
264 } | |
265 | |
266 public MangledEntry take(MangledEntry anEntry, Transaction aTxn, | |
267 long aWaitTime) | |
268 throws RemoteException, TransactionException { | |
269 | |
270 /* | |
271 long myStart = System.currentTimeMillis(); | |
272 */ | |
273 | |
274 downBarrier(); | |
275 | |
276 int myReqId = getNextRequestId(); | |
277 | |
278 Operation myOp = _commandFactory.newTake(anEntry, aTxn, aWaitTime); | |
279 | |
280 ResultReceiver myReceiver = new ResultReceiver(); | |
281 _rxer.waitFor(myReqId, myReceiver); | |
282 | |
283 try { | |
284 byte[] myFlattenedOp = _commandFactory.pack(myOp); | |
285 | |
286 _txer.send(myReqId, myFlattenedOp); | |
287 } catch (Exception anE) { | |
288 _rxer.cancel(myReqId); | |
289 throw new RemoteException("Failed to send request", anE); | |
290 } | |
291 | |
292 byte[] myBuffer = myReceiver.getPayload(); | |
293 | |
294 try { | |
295 ByteArrayInputStream myBAIS = new ByteArrayInputStream(myBuffer); | |
296 ObjectInputStream myOIS = new ObjectInputStream(myBAIS); | |
297 | |
298 Object myResult = myOIS.readObject(); | |
299 | |
300 if (myResult == null) { | |
301 /* | |
302 System.err.println("Took: " + (System.currentTimeMillis() - | |
303 myStart)); | |
304 */ | |
305 return null; | |
306 } | |
307 | |
308 if (myResult instanceof TransactionException) { | |
309 throw (TransactionException) myResult; | |
310 } else if (myResult instanceof RemoteException) { | |
311 throw (RemoteException) myResult; | |
312 } else if (myResult instanceof Throwable) { | |
313 throw new RemoteException("Invocation failed", | |
314 (Throwable) myResult); | |
315 } else { | |
316 /* | |
317 System.err.println("Took: " + (System.currentTimeMillis() - | |
318 myStart)); | |
319 */ | |
320 | |
321 return (MangledEntry) myResult; | |
322 } | |
323 | |
324 } catch (IOException anIOE) { | |
325 throw new RemoteException("Failed to unmarshal response: " + myReqId, anIOE); | |
326 } catch (ClassNotFoundException aCNFE) { | |
327 throw new RemoteException("Failed to unmarshal response", | |
328 aCNFE); | |
329 } | |
330 } | |
331 | |
332 public MangledEntry read(MangledEntry anEntry, Transaction aTxn, | |
333 long aWaitTime) | |
334 throws RemoteException, TransactionException { | |
335 | |
336 /* | |
337 long myStart = System.currentTimeMillis(); | |
338 */ | |
339 downBarrier(); | |
340 | |
341 int myReqId = getNextRequestId(); | |
342 | |
343 Operation myOp = _commandFactory.newRead(anEntry, aTxn, aWaitTime); | |
344 | |
345 ResultReceiver myReceiver = new ResultReceiver(); | |
346 _rxer.waitFor(myReqId, myReceiver); | |
347 | |
348 try { | |
349 byte[] myFlattenedOp = _commandFactory.pack(myOp); | |
350 | |
351 _txer.send(myReqId, myFlattenedOp); | |
352 } catch (Exception anE) { | |
353 _rxer.cancel(myReqId); | |
354 throw new RemoteException("Failed to send request", anE); | |
355 } | |
356 | |
357 byte[] myBuffer = myReceiver.getPayload(); | |
358 | |
359 try { | |
360 ByteArrayInputStream myBAIS = new ByteArrayInputStream(myBuffer); | |
361 ObjectInputStream myOIS = new ObjectInputStream(myBAIS); | |
362 | |
363 Object myResult = myOIS.readObject(); | |
364 | |
365 if (myResult == null) { | |
366 /* | |
367 System.err.println("Read: " + (System.currentTimeMillis() - | |
368 myStart)); | |
369 */ | |
370 return null; | |
371 } | |
372 | |
373 if (myResult instanceof TransactionException) { | |
374 throw (TransactionException) myResult; | |
375 } else if (myResult instanceof RemoteException) { | |
376 throw (RemoteException) myResult; | |
377 } else if (myResult instanceof Throwable) { | |
378 throw new RemoteException("Invocation failed", | |
379 (Throwable) myResult); | |
380 } else { | |
381 /* | |
382 System.err.println("Read: " + (System.currentTimeMillis() - | |
383 myStart)); | |
384 */ | |
385 return (MangledEntry) myResult; | |
386 } | |
387 | |
388 } catch (IOException anIOE) { | |
389 throw new RemoteException("Failed to unmarshal response: " + myReqId, anIOE); | |
390 } catch (ClassNotFoundException aCNFE) { | |
391 throw new RemoteException("Failed to unmarshal response", | |
392 aCNFE); | |
393 } | |
394 } | |
395 | |
396 public EventRegistration notify(MangledEntry anEntry, Transaction aTxn, | |
397 RemoteEventListener aListener, | |
398 long aLeaseTime, | |
399 MarshalledObject aHandback) | |
400 throws RemoteException, TransactionException { | |
401 | |
402 throw new UnsupportedOperationException(); | |
403 } | |
404 | |
405 public Object getAdmin() throws RemoteException { | |
406 throw new UnsupportedOperationException(); | |
407 } | |
408 | |
409 public int prepare(TransactionManager aTxnMgr, long anId) | |
410 throws UnknownTransactionException, RemoteException { | |
411 | |
412 downBarrier(); | |
413 | |
414 int myReqId = getNextRequestId(); | |
415 | |
416 Operation myOp = _commandFactory.newPrepare(aTxnMgr, anId); | |
417 | |
418 ResultReceiver myReceiver = new ResultReceiver(); | |
419 _rxer.waitFor(myReqId, myReceiver); | |
420 | |
421 try { | |
422 byte[] myFlattenedOp = _commandFactory.pack(myOp); | |
423 | |
424 _txer.send(myReqId, myFlattenedOp); | |
425 } catch (Exception anE) { | |
426 _rxer.cancel(myReqId); | |
427 throw new RemoteException("Failed to send request", anE); | |
428 } | |
429 | |
430 byte[] myBuffer = myReceiver.getPayload(); | |
431 | |
432 try { | |
433 ByteArrayInputStream myBAIS = new ByteArrayInputStream(myBuffer); | |
434 ObjectInputStream myOIS = new ObjectInputStream(myBAIS); | |
435 | |
436 Object myResult = myOIS.readObject(); | |
437 | |
438 if (myResult instanceof RemoteException) { | |
439 throw (RemoteException) myResult; | |
440 } else if (myResult instanceof UnknownTransactionException) { | |
441 throw (UnknownTransactionException) myResult; | |
442 } else if (myResult instanceof Throwable) { | |
443 throw new RemoteException("Invocation failed", | |
444 (Throwable) myResult); | |
445 } else { | |
446 return ((Integer) myResult).intValue(); | |
447 } | |
448 | |
449 } catch (IOException anIOE) { | |
450 throw new RemoteException("Failed to unmarshal response: " + myReqId, anIOE); | |
451 } catch (ClassNotFoundException aCNFE) { | |
452 throw new RemoteException("Failed to unmarshal response", | |
453 aCNFE); | |
454 } | |
455 } | |
456 | |
457 public void commit(TransactionManager aTxnMgr, long anId) | |
458 throws UnknownTransactionException, RemoteException { | |
459 | |
460 downBarrier(); | |
461 | |
462 int myReqId = getNextRequestId(); | |
463 | |
464 Operation myOp = _commandFactory.newCommit(aTxnMgr, anId); | |
465 | |
466 ResultReceiver myReceiver = new ResultReceiver(); | |
467 _rxer.waitFor(myReqId, myReceiver); | |
468 | |
469 try { | |
470 byte[] myFlattenedOp = _commandFactory.pack(myOp); | |
471 | |
472 _txer.send(myReqId, myFlattenedOp); | |
473 } catch (Exception anE) { | |
474 _rxer.cancel(myReqId); | |
475 throw new RemoteException("Failed to send request", anE); | |
476 } | |
477 | |
478 byte[] myBuffer = myReceiver.getPayload(); | |
479 | |
480 try { | |
481 ByteArrayInputStream myBAIS = new ByteArrayInputStream(myBuffer); | |
482 ObjectInputStream myOIS = new ObjectInputStream(myBAIS); | |
483 | |
484 Object myResult = myOIS.readObject(); | |
485 | |
486 if (myResult instanceof RemoteException) { | |
487 throw (RemoteException) myResult; | |
488 } else if (myResult instanceof UnknownTransactionException) { | |
489 throw (UnknownTransactionException) myResult; | |
490 } else if (myResult instanceof Throwable) { | |
491 throw new RemoteException("Invocation failed", | |
492 (Throwable) myResult); | |
493 } | |
494 } catch (IOException anIOE) { | |
495 throw new RemoteException("Failed to unmarshal response: " + myReqId, anIOE); | |
496 } catch (ClassNotFoundException aCNFE) { | |
497 throw new RemoteException("Failed to unmarshal response", | |
498 aCNFE); | |
499 } | |
500 } | |
501 | |
502 public void abort(TransactionManager aTxnMgr, long anId) | |
503 throws UnknownTransactionException, RemoteException { | |
504 | |
505 downBarrier(); | |
506 | |
507 int myReqId = getNextRequestId(); | |
508 | |
509 Operation myOp = _commandFactory.newAbort(aTxnMgr, anId); | |
510 | |
511 ResultReceiver myReceiver = new ResultReceiver(); | |
512 _rxer.waitFor(myReqId, myReceiver); | |
513 | |
514 try { | |
515 byte[] myFlattenedOp = _commandFactory.pack(myOp); | |
516 | |
517 _txer.send(myReqId, myFlattenedOp); | |
518 } catch (Exception anE) { | |
519 _rxer.cancel(myReqId); | |
520 throw new RemoteException("Failed to send request", anE); | |
521 } | |
522 | |
523 byte[] myBuffer = myReceiver.getPayload(); | |
524 | |
525 try { | |
526 ByteArrayInputStream myBAIS = new ByteArrayInputStream(myBuffer); | |
527 ObjectInputStream myOIS = new ObjectInputStream(myBAIS); | |
528 | |
529 Object myResult = myOIS.readObject(); | |
530 | |
531 if (myResult instanceof RemoteException) { | |
532 throw (RemoteException) myResult; | |
533 } else if (myResult instanceof UnknownTransactionException) { | |
534 throw (UnknownTransactionException) myResult; | |
535 } else if (myResult instanceof Throwable) { | |
536 throw new RemoteException("Invocation failed", | |
537 (Throwable) myResult); | |
538 } | |
539 } catch (IOException anIOE) { | |
540 throw new RemoteException("Failed to unmarshal response: " + myReqId, anIOE); | |
541 } catch (ClassNotFoundException aCNFE) { | |
542 throw new RemoteException("Failed to unmarshal response", | |
543 aCNFE); | |
544 } | |
545 } | |
546 | |
547 public int prepareAndCommit(TransactionManager aTxnMgr, long anId) | |
548 throws UnknownTransactionException, RemoteException { | |
549 | |
550 downBarrier(); | |
551 | |
552 int myReqId = getNextRequestId(); | |
553 | |
554 Operation myOp = _commandFactory.newPrepareCommit(aTxnMgr, anId); | |
555 | |
556 ResultReceiver myReceiver = new ResultReceiver(); | |
557 _rxer.waitFor(myReqId, myReceiver); | |
558 | |
559 try { | |
560 byte[] myFlattenedOp = _commandFactory.pack(myOp); | |
561 | |
562 _txer.send(myReqId, myFlattenedOp); | |
563 } catch (Exception anE) { | |
564 _rxer.cancel(myReqId); | |
565 throw new RemoteException("Failed to send request", anE); | |
566 } | |
567 | |
568 byte[] myBuffer = myReceiver.getPayload(); | |
569 | |
570 try { | |
571 ByteArrayInputStream myBAIS = new ByteArrayInputStream(myBuffer); | |
572 ObjectInputStream myOIS = new ObjectInputStream(myBAIS); | |
573 | |
574 Object myResult = myOIS.readObject(); | |
575 | |
576 if (myResult instanceof RemoteException) { | |
577 throw (RemoteException) myResult; | |
578 } else if (myResult instanceof UnknownTransactionException) { | |
579 throw (UnknownTransactionException) myResult; | |
580 } else if (myResult instanceof Throwable) { | |
581 throw new RemoteException("Invocation failed", | |
582 (Throwable) myResult); | |
583 } else { | |
584 return ((Integer) myResult).intValue(); | |
585 } | |
586 | |
587 } catch (IOException anIOE) { | |
588 throw new RemoteException("Failed to unmarshal response: " + myReqId, anIOE); | |
589 } catch (ClassNotFoundException aCNFE) { | |
590 throw new RemoteException("Failed to unmarshal response", | |
591 aCNFE); | |
592 } | |
593 } | |
594 | |
595 protected void finalize() throws Throwable { | |
596 dead(); | |
597 | |
598 super.finalize(); | |
599 } | |
600 } |