Mercurial > hg > blitz_stable
diff src/org/dancres/blitz/remote/nio/Rxer.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/org/dancres/blitz/remote/nio/Rxer.java Sat Mar 21 11:00:06 2009 +0000 @@ -0,0 +1,101 @@ +package org.dancres.blitz.remote.nio; + +import com.go.trove.util.IntHashMap; + +import java.io.InputStream; +import java.io.DataInputStream; +import java.util.Collection; +import java.util.Iterator; +import java.rmi.RemoteException; +import java.lang.ref.WeakReference; + +/** + * Txer's twin - waits for a response id and a payload which is then passed + * to the ResultProcessor registered against the id. Obviously the + * ResultProcessor should be registered against the id prior to transmission + * to avoid race conditions. + * + * @todo That means if there's an exception during transmission, the + * ResultProcessor should be removed (might be a good plan to do this all with + * weak references automatically). + */ +public class Rxer extends Thread { + private DataInputStream _socketRx; + private final IntHashMap _listeners = new IntHashMap(); + private WeakReference _listener; + + Rxer(InputStream anInput, TransportListener aListener) { + _socketRx = new DataInputStream(anInput); + _listener = new WeakReference(aListener); + setDaemon(true); + start(); + } + + void waitFor(int anId, ResultProcessor aProcessor) { + synchronized(_listeners) { + _listeners.put(anId, aProcessor); + } + } + + void cancel(int anId) { + synchronized(_listeners) { + _listeners.remove(anId); + } + } + + public void run() { + while (true) { + try { + int myReqId = _socketRx.readInt(); + + // System.err.println("Receiving: " + myReqId); + + int myResponseSize = _socketRx.readInt(); + + byte[] myBuffer = new byte[myResponseSize]; + + _socketRx.readFully(myBuffer); + + ResultProcessor myProcessor; + + synchronized(_listeners) { + myProcessor = (ResultProcessor) _listeners.remove(myReqId); + } + + if (myProcessor != null) + myProcessor.deliver(myBuffer); + + } catch (Exception anE) { + // System.err.println("Error receiving response"); + // anE.printStackTrace(System.err); + + // Tell all listeners to abort + Collection myListeners; + + synchronized(_listeners) { + myListeners = _listeners.values(); + } + + Iterator myRecipients = myListeners.iterator(); + + while (myRecipients.hasNext()) { + ResultProcessor myProcessor = + (ResultProcessor) myRecipients.next(); + + myProcessor.deliver(new RemoteException("Connection problem", anE)); + } + + pingListener(); + + return; + } + } + } + + private void pingListener() { + TransportListener myListener = (TransportListener) _listener.get(); + + if (myListener != null) + myListener.dead(); + } +}