comparison src/org/dancres/blitz/remote/nio/Invoker.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:3dc0c5604566
1 package org.dancres.blitz.remote.nio;
2
3 import net.jini.core.lease.Lease;
4 import net.jini.core.transaction.Transaction;
5 import net.jini.core.transaction.TransactionException;
6 import net.jini.core.transaction.UnknownTransactionException;
7 import net.jini.core.transaction.server.TransactionManager;
8 import net.jini.core.event.EventRegistration;
9 import net.jini.core.event.RemoteEventListener;
10
11 import java.rmi.RemoteException;
12 import java.rmi.MarshalledObject;
13 import java.net.Socket;
14 import java.net.InetSocketAddress;
15 import java.io.*;
16
17 import org.dancres.blitz.mangler.MangledEntry;
18 import org.dancres.blitz.remote.LeaseImpl;
19
20 /**
21 * Encapsulates all the logic for invoking a space method - one of take, read and write.
22 * Manages the socket and wraps it in an asynchronous dispatch framework to allow queueing
23 * etc for max throughput.
24 */
25 public class Invoker implements FastSpace, Serializable, TransportListener {
26 private transient Socket _socket;
27 private transient Rxer _rxer;
28 private transient Txer _txer;
29 private transient CommandFactory _commandFactory;
30
31 private transient int _nextRequestId;
32
33 private transient boolean isDown;
34
35 private InetSocketAddress _addr;
36
37 public Invoker(InetSocketAddress anAddr) {
38 _addr = anAddr;
39 }
40
41 public Invoker(InetSocketAddress anAddr, boolean doOpen) throws IOException {
42 _addr = anAddr;
43
44 if (doOpen)
45 init();
46 }
47
48 public void dead() {
49 synchronized(this) {
50 isDown = true;
51 }
52
53 // Transport is down
54 _txer.halt();
55
56 try {
57 _socket.close();
58 } catch (IOException anIOE) {
59 // Nothing to do
60 }
61 }
62
63 public void init() throws IOException {
64 /*
65 System.err.println("Connecting to: " + _addr.getAddress() +
66 ", " + _addr.getPort());
67 */
68
69 _socket = new Socket(_addr.getAddress(), _addr.getPort());
70 _socket.setTcpNoDelay(true);
71 _socket.setReuseAddress(true);
72
73 /*
74 System.err.println("Connect");
75 */
76 _rxer = new Rxer(_socket.getInputStream(), this);
77 _txer = new Txer(_socket.getOutputStream());
78
79 /*
80 System.err.println("Socket in buffer: " +
81 _socket.getReceiveBufferSize());
82 System.err.println("Socket out buffer: " +
83 _socket.getSendBufferSize());
84 */
85
86 _commandFactory = new CommandFactory();
87 }
88
89 public boolean isInited() {
90 return (_socket != null);
91 }
92
93 private int getNextRequestId() {
94 synchronized(this) {
95 return _nextRequestId++;
96 }
97 }
98
99 private void downBarrier() throws RemoteException {
100 synchronized(this) {
101 if (isDown)
102 throw new RemoteException("Connection is closed");
103 }
104 }
105
106 public LeaseImpl write(MangledEntry anEntry, Transaction aTxn, long aLeaseTime)
107 throws RemoteException, TransactionException {
108
109 /*
110 long myStart = System.currentTimeMillis();
111 */
112
113 downBarrier();
114
115 int myReqId = getNextRequestId();
116
117 Operation myOp = _commandFactory.newWrite(anEntry, aTxn, aLeaseTime);
118
119 ResultReceiver myReceiver = new ResultReceiver();
120 _rxer.waitFor(myReqId, myReceiver);
121
122 try {
123 byte[] myFlattenedOp = _commandFactory.pack(myOp);
124 _txer.send(myReqId, myFlattenedOp);
125 } catch (Exception anE) {
126 _rxer.cancel(myReqId);
127 throw new RemoteException("Failed to send request", anE);
128 }
129
130 byte[] myBuffer = myReceiver.getPayload();
131
132 try {
133 ByteArrayInputStream myBAIS = new ByteArrayInputStream(myBuffer);
134 ObjectInputStream myOIS = new ObjectInputStream(myBAIS);
135
136 Object myResult = myOIS.readObject();
137
138 if (myResult instanceof TransactionException) {
139 throw (TransactionException) myResult;
140 } else if (myResult instanceof RemoteException) {
141 throw (RemoteException) myResult;
142 } else if (myResult instanceof Throwable) {
143 throw new RemoteException("Invocation failed",
144 (Throwable) myResult);
145 } else {
146 /*
147 System.err.println("Wrote: " + (System.currentTimeMillis() -
148 myStart));
149 */
150
151 return (LeaseImpl) myResult;
152 }
153
154 } catch (IOException anIOE) {
155 throw new RemoteException("Failed to unmarshal response: " + myReqId, anIOE);
156 } catch (ClassNotFoundException aCNFE) {
157 throw new RemoteException("Failed to unmarshal response",
158 aCNFE);
159 }
160 }
161
162 public MangledEntry takeIfExists(MangledEntry anEntry, Transaction aTxn,
163 long aWaitTime)
164 throws RemoteException, TransactionException {
165
166 downBarrier();
167
168 int myReqId = getNextRequestId();
169
170 Operation myOp =
171 _commandFactory.newTakeExists(anEntry, aTxn, aWaitTime);
172
173 ResultReceiver myReceiver = new ResultReceiver();
174 _rxer.waitFor(myReqId, myReceiver);
175
176 try {
177 byte[] myFlattenedOp = _commandFactory.pack(myOp);
178 _txer.send(myReqId, myFlattenedOp);
179 } catch (Exception anE) {
180 _rxer.cancel(myReqId);
181 throw new RemoteException("Failed to send request", anE);
182 }
183
184 byte[] myBuffer = myReceiver.getPayload();
185
186 try {
187 ByteArrayInputStream myBAIS = new ByteArrayInputStream(myBuffer);
188 ObjectInputStream myOIS = new ObjectInputStream(myBAIS);
189
190 Object myResult = myOIS.readObject();
191
192 if (myResult == null)
193 return null;
194
195 if (myResult instanceof TransactionException) {
196 throw (TransactionException) myResult;
197 } else if (myResult instanceof RemoteException) {
198 throw (RemoteException) myResult;
199 } else if (myResult instanceof Throwable) {
200 throw new RemoteException("Invocation failed",
201 (Throwable) myResult);
202 } else {
203 return (MangledEntry) myResult;
204 }
205
206 } catch (IOException anIOE) {
207 throw new RemoteException("Failed to unmarshal response: " + myReqId, anIOE);
208 } catch (ClassNotFoundException aCNFE) {
209 throw new RemoteException("Failed to unmarshal response",
210 aCNFE);
211 }
212 }
213
214 public MangledEntry readIfExists(MangledEntry anEntry, Transaction aTxn, long aWaitTime)
215 throws RemoteException, TransactionException {
216
217 downBarrier();
218
219 int myReqId = getNextRequestId();
220
221 Operation myOp =
222 _commandFactory.newReadExists(anEntry, aTxn, aWaitTime);
223
224 ResultReceiver myReceiver = new ResultReceiver();
225 _rxer.waitFor(myReqId, myReceiver);
226
227 try {
228 byte[] myFlattenedOp = _commandFactory.pack(myOp);
229
230 _txer.send(myReqId, myFlattenedOp);
231 } catch (Exception anE) {
232 _rxer.cancel(myReqId);
233 throw new RemoteException("Failed to send request", anE);
234 }
235
236 byte[] myBuffer = myReceiver.getPayload();
237
238 try {
239 ByteArrayInputStream myBAIS = new ByteArrayInputStream(myBuffer);
240 ObjectInputStream myOIS = new ObjectInputStream(myBAIS);
241
242 Object myResult = myOIS.readObject();
243
244 if (myResult == null)
245 return null;
246
247 if (myResult instanceof TransactionException) {
248 throw (TransactionException) myResult;
249 } else if (myResult instanceof RemoteException) {
250 throw (RemoteException) myResult;
251 } else if (myResult instanceof Throwable) {
252 throw new RemoteException("Invocation failed",
253 (Throwable) myResult);
254 } else {
255 return (MangledEntry) myResult;
256 }
257
258 } catch (IOException anIOE) {
259 throw new RemoteException("Failed to unmarshal response: " + myReqId, anIOE);
260 } catch (ClassNotFoundException aCNFE) {
261 throw new RemoteException("Failed to unmarshal response",
262 aCNFE);
263 }
264 }
265
266 public MangledEntry take(MangledEntry anEntry, Transaction aTxn,
267 long aWaitTime)
268 throws RemoteException, TransactionException {
269
270 /*
271 long myStart = System.currentTimeMillis();
272 */
273
274 downBarrier();
275
276 int myReqId = getNextRequestId();
277
278 Operation myOp = _commandFactory.newTake(anEntry, aTxn, aWaitTime);
279
280 ResultReceiver myReceiver = new ResultReceiver();
281 _rxer.waitFor(myReqId, myReceiver);
282
283 try {
284 byte[] myFlattenedOp = _commandFactory.pack(myOp);
285
286 _txer.send(myReqId, myFlattenedOp);
287 } catch (Exception anE) {
288 _rxer.cancel(myReqId);
289 throw new RemoteException("Failed to send request", anE);
290 }
291
292 byte[] myBuffer = myReceiver.getPayload();
293
294 try {
295 ByteArrayInputStream myBAIS = new ByteArrayInputStream(myBuffer);
296 ObjectInputStream myOIS = new ObjectInputStream(myBAIS);
297
298 Object myResult = myOIS.readObject();
299
300 if (myResult == null) {
301 /*
302 System.err.println("Took: " + (System.currentTimeMillis() -
303 myStart));
304 */
305 return null;
306 }
307
308 if (myResult instanceof TransactionException) {
309 throw (TransactionException) myResult;
310 } else if (myResult instanceof RemoteException) {
311 throw (RemoteException) myResult;
312 } else if (myResult instanceof Throwable) {
313 throw new RemoteException("Invocation failed",
314 (Throwable) myResult);
315 } else {
316 /*
317 System.err.println("Took: " + (System.currentTimeMillis() -
318 myStart));
319 */
320
321 return (MangledEntry) myResult;
322 }
323
324 } catch (IOException anIOE) {
325 throw new RemoteException("Failed to unmarshal response: " + myReqId, anIOE);
326 } catch (ClassNotFoundException aCNFE) {
327 throw new RemoteException("Failed to unmarshal response",
328 aCNFE);
329 }
330 }
331
332 public MangledEntry read(MangledEntry anEntry, Transaction aTxn,
333 long aWaitTime)
334 throws RemoteException, TransactionException {
335
336 /*
337 long myStart = System.currentTimeMillis();
338 */
339 downBarrier();
340
341 int myReqId = getNextRequestId();
342
343 Operation myOp = _commandFactory.newRead(anEntry, aTxn, aWaitTime);
344
345 ResultReceiver myReceiver = new ResultReceiver();
346 _rxer.waitFor(myReqId, myReceiver);
347
348 try {
349 byte[] myFlattenedOp = _commandFactory.pack(myOp);
350
351 _txer.send(myReqId, myFlattenedOp);
352 } catch (Exception anE) {
353 _rxer.cancel(myReqId);
354 throw new RemoteException("Failed to send request", anE);
355 }
356
357 byte[] myBuffer = myReceiver.getPayload();
358
359 try {
360 ByteArrayInputStream myBAIS = new ByteArrayInputStream(myBuffer);
361 ObjectInputStream myOIS = new ObjectInputStream(myBAIS);
362
363 Object myResult = myOIS.readObject();
364
365 if (myResult == null) {
366 /*
367 System.err.println("Read: " + (System.currentTimeMillis() -
368 myStart));
369 */
370 return null;
371 }
372
373 if (myResult instanceof TransactionException) {
374 throw (TransactionException) myResult;
375 } else if (myResult instanceof RemoteException) {
376 throw (RemoteException) myResult;
377 } else if (myResult instanceof Throwable) {
378 throw new RemoteException("Invocation failed",
379 (Throwable) myResult);
380 } else {
381 /*
382 System.err.println("Read: " + (System.currentTimeMillis() -
383 myStart));
384 */
385 return (MangledEntry) myResult;
386 }
387
388 } catch (IOException anIOE) {
389 throw new RemoteException("Failed to unmarshal response: " + myReqId, anIOE);
390 } catch (ClassNotFoundException aCNFE) {
391 throw new RemoteException("Failed to unmarshal response",
392 aCNFE);
393 }
394 }
395
396 public EventRegistration notify(MangledEntry anEntry, Transaction aTxn,
397 RemoteEventListener aListener,
398 long aLeaseTime,
399 MarshalledObject aHandback)
400 throws RemoteException, TransactionException {
401
402 throw new UnsupportedOperationException();
403 }
404
405 public Object getAdmin() throws RemoteException {
406 throw new UnsupportedOperationException();
407 }
408
409 public int prepare(TransactionManager aTxnMgr, long anId)
410 throws UnknownTransactionException, RemoteException {
411
412 downBarrier();
413
414 int myReqId = getNextRequestId();
415
416 Operation myOp = _commandFactory.newPrepare(aTxnMgr, anId);
417
418 ResultReceiver myReceiver = new ResultReceiver();
419 _rxer.waitFor(myReqId, myReceiver);
420
421 try {
422 byte[] myFlattenedOp = _commandFactory.pack(myOp);
423
424 _txer.send(myReqId, myFlattenedOp);
425 } catch (Exception anE) {
426 _rxer.cancel(myReqId);
427 throw new RemoteException("Failed to send request", anE);
428 }
429
430 byte[] myBuffer = myReceiver.getPayload();
431
432 try {
433 ByteArrayInputStream myBAIS = new ByteArrayInputStream(myBuffer);
434 ObjectInputStream myOIS = new ObjectInputStream(myBAIS);
435
436 Object myResult = myOIS.readObject();
437
438 if (myResult instanceof RemoteException) {
439 throw (RemoteException) myResult;
440 } else if (myResult instanceof UnknownTransactionException) {
441 throw (UnknownTransactionException) myResult;
442 } else if (myResult instanceof Throwable) {
443 throw new RemoteException("Invocation failed",
444 (Throwable) myResult);
445 } else {
446 return ((Integer) myResult).intValue();
447 }
448
449 } catch (IOException anIOE) {
450 throw new RemoteException("Failed to unmarshal response: " + myReqId, anIOE);
451 } catch (ClassNotFoundException aCNFE) {
452 throw new RemoteException("Failed to unmarshal response",
453 aCNFE);
454 }
455 }
456
457 public void commit(TransactionManager aTxnMgr, long anId)
458 throws UnknownTransactionException, RemoteException {
459
460 downBarrier();
461
462 int myReqId = getNextRequestId();
463
464 Operation myOp = _commandFactory.newCommit(aTxnMgr, anId);
465
466 ResultReceiver myReceiver = new ResultReceiver();
467 _rxer.waitFor(myReqId, myReceiver);
468
469 try {
470 byte[] myFlattenedOp = _commandFactory.pack(myOp);
471
472 _txer.send(myReqId, myFlattenedOp);
473 } catch (Exception anE) {
474 _rxer.cancel(myReqId);
475 throw new RemoteException("Failed to send request", anE);
476 }
477
478 byte[] myBuffer = myReceiver.getPayload();
479
480 try {
481 ByteArrayInputStream myBAIS = new ByteArrayInputStream(myBuffer);
482 ObjectInputStream myOIS = new ObjectInputStream(myBAIS);
483
484 Object myResult = myOIS.readObject();
485
486 if (myResult instanceof RemoteException) {
487 throw (RemoteException) myResult;
488 } else if (myResult instanceof UnknownTransactionException) {
489 throw (UnknownTransactionException) myResult;
490 } else if (myResult instanceof Throwable) {
491 throw new RemoteException("Invocation failed",
492 (Throwable) myResult);
493 }
494 } catch (IOException anIOE) {
495 throw new RemoteException("Failed to unmarshal response: " + myReqId, anIOE);
496 } catch (ClassNotFoundException aCNFE) {
497 throw new RemoteException("Failed to unmarshal response",
498 aCNFE);
499 }
500 }
501
502 public void abort(TransactionManager aTxnMgr, long anId)
503 throws UnknownTransactionException, RemoteException {
504
505 downBarrier();
506
507 int myReqId = getNextRequestId();
508
509 Operation myOp = _commandFactory.newAbort(aTxnMgr, anId);
510
511 ResultReceiver myReceiver = new ResultReceiver();
512 _rxer.waitFor(myReqId, myReceiver);
513
514 try {
515 byte[] myFlattenedOp = _commandFactory.pack(myOp);
516
517 _txer.send(myReqId, myFlattenedOp);
518 } catch (Exception anE) {
519 _rxer.cancel(myReqId);
520 throw new RemoteException("Failed to send request", anE);
521 }
522
523 byte[] myBuffer = myReceiver.getPayload();
524
525 try {
526 ByteArrayInputStream myBAIS = new ByteArrayInputStream(myBuffer);
527 ObjectInputStream myOIS = new ObjectInputStream(myBAIS);
528
529 Object myResult = myOIS.readObject();
530
531 if (myResult instanceof RemoteException) {
532 throw (RemoteException) myResult;
533 } else if (myResult instanceof UnknownTransactionException) {
534 throw (UnknownTransactionException) myResult;
535 } else if (myResult instanceof Throwable) {
536 throw new RemoteException("Invocation failed",
537 (Throwable) myResult);
538 }
539 } catch (IOException anIOE) {
540 throw new RemoteException("Failed to unmarshal response: " + myReqId, anIOE);
541 } catch (ClassNotFoundException aCNFE) {
542 throw new RemoteException("Failed to unmarshal response",
543 aCNFE);
544 }
545 }
546
547 public int prepareAndCommit(TransactionManager aTxnMgr, long anId)
548 throws UnknownTransactionException, RemoteException {
549
550 downBarrier();
551
552 int myReqId = getNextRequestId();
553
554 Operation myOp = _commandFactory.newPrepareCommit(aTxnMgr, anId);
555
556 ResultReceiver myReceiver = new ResultReceiver();
557 _rxer.waitFor(myReqId, myReceiver);
558
559 try {
560 byte[] myFlattenedOp = _commandFactory.pack(myOp);
561
562 _txer.send(myReqId, myFlattenedOp);
563 } catch (Exception anE) {
564 _rxer.cancel(myReqId);
565 throw new RemoteException("Failed to send request", anE);
566 }
567
568 byte[] myBuffer = myReceiver.getPayload();
569
570 try {
571 ByteArrayInputStream myBAIS = new ByteArrayInputStream(myBuffer);
572 ObjectInputStream myOIS = new ObjectInputStream(myBAIS);
573
574 Object myResult = myOIS.readObject();
575
576 if (myResult instanceof RemoteException) {
577 throw (RemoteException) myResult;
578 } else if (myResult instanceof UnknownTransactionException) {
579 throw (UnknownTransactionException) myResult;
580 } else if (myResult instanceof Throwable) {
581 throw new RemoteException("Invocation failed",
582 (Throwable) myResult);
583 } else {
584 return ((Integer) myResult).intValue();
585 }
586
587 } catch (IOException anIOE) {
588 throw new RemoteException("Failed to unmarshal response: " + myReqId, anIOE);
589 } catch (ClassNotFoundException aCNFE) {
590 throw new RemoteException("Failed to unmarshal response",
591 aCNFE);
592 }
593 }
594
595 protected void finalize() throws Throwable {
596 dead();
597
598 super.finalize();
599 }
600 }