Mercurial > hg > blitz_condensed
diff src/EDU/oswego/cs/dl/util/concurrent/SynchronousChannel.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/EDU/oswego/cs/dl/util/concurrent/SynchronousChannel.java Sat Mar 21 11:00:06 2009 +0000 @@ -0,0 +1,375 @@ +/* + File: SynchronousChannel.java + + Originally written by Doug Lea and released into the public domain. + This may be used for any purposes whatsoever without acknowledgment. + Thanks for the assistance and support of Sun Microsystems Labs, + and everyone contributing, testing, and using this code. + + History: + Date Who What + 11Jun1998 dl Create public version + 17Jul1998 dl Disabled direct semaphore permit check + 31Jul1998 dl Replaced main algorithm with one with + better scaling and fairness properties. + 25aug1998 dl added peek + 24Nov2001 dl Replaced main algorithm with faster one. +*/ + +package EDU.oswego.cs.dl.util.concurrent; + +/** + * A rendezvous channel, similar to those used in CSP and Ada. Each + * put must wait for a take, and vice versa. Synchronous channels + * are well suited for handoff designs, in which an object running in + * one thread must synch up with an object running in another thread + * in order to hand it some information, event, or task. + * <p> If you only need threads to synch up without + * exchanging information, consider using a Barrier. If you need + * bidirectional exchanges, consider using a Rendezvous. <p> + * + * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] + * @see CyclicBarrier + * @see Rendezvous +**/ + +public class SynchronousChannel implements BoundedChannel { + + /* + This implementation divides actions into two cases for puts: + + * An arriving putter that does not already have a waiting taker + creates a node holding item, and then waits for a taker to take it. + * An arriving putter that does already have a waiting taker fills + the slot node created by the taker, and notifies it to continue. + + And symmetrically, two for takes: + + * An arriving taker that does not already have a waiting putter + creates an empty slot node, and then waits for a putter to fill it. + * An arriving taker that does already have a waiting putter takes + item from the node created by the putter, and notifies it to continue. + + This requires keeping two simple queues: waitingPuts and waitingTakes. + + When a put or take waiting for the actions of its counterpart + aborts due to interruption or timeout, it marks the node + it created as "CANCELLED", which causes its counterpart to retry + the entire put or take sequence. + */ + + /** + * Special marker used in queue nodes to indicate that + * the thread waiting for a change in the node has timed out + * or been interrupted. + **/ + protected static final Object CANCELLED = new Object(); + + /** + * Simple FIFO queue class to hold waiting puts/takes. + **/ + protected static class Queue { + protected LinkedNode head; + protected LinkedNode last; + + protected void enq(LinkedNode p) { + if (last == null) + last = head = p; + else + last = last.next = p; + } + + protected LinkedNode deq() { + LinkedNode p = head; + if (p != null && (head = p.next) == null) + last = null; + return p; + } + } + + protected final Queue waitingPuts = new Queue(); + protected final Queue waitingTakes = new Queue(); + + /** + * @return zero -- + * Synchronous channels have no internal capacity. + **/ + public int capacity() { return 0; } + + /** + * @return null -- + * Synchronous channels do not hold contents unless actively taken + **/ + public Object peek() { return null; } + + + public void put(Object x) throws InterruptedException { + if (x == null) throw new IllegalArgumentException(); + + // This code is conceptually straightforward, but messy + // because we need to intertwine handling of put-arrives first + // vs take-arrives first cases. + + // Outer loop is to handle retry due to cancelled waiting taker + for (;;) { + + // Get out now if we are interrupted + if (Thread.interrupted()) throw new InterruptedException(); + + // Exactly one of item or slot will be nonnull at end of + // synchronized block, depending on whether a put or a take + // arrived first. + LinkedNode slot; + LinkedNode item = null; + + synchronized(this) { + // Try to match up with a waiting taker; fill and signal it below + slot = waitingTakes.deq(); + + // If no takers yet, create a node and wait below + if (slot == null) + waitingPuts.enq(item = new LinkedNode(x)); + } + + if (slot != null) { // There is a waiting taker. + // Fill in the slot created by the taker and signal taker to + // continue. + synchronized(slot) { + if (slot.value != CANCELLED) { + slot.value = x; + slot.notify(); + return; + } + // else the taker has cancelled, so retry outer loop + } + } + + else { + // Wait for a taker to arrive and take the item. + synchronized(item) { + try { + while (item.value != null) + item.wait(); + return; + } + catch (InterruptedException ie) { + // If item was taken, return normally but set interrupt status + if (item.value == null) { + Thread.currentThread().interrupt(); + return; + } + else { + item.value = CANCELLED; + throw ie; + } + } + } + } + } + } + + public Object take() throws InterruptedException { + // Entirely symmetric to put() + + for (;;) { + if (Thread.interrupted()) throw new InterruptedException(); + + LinkedNode item; + LinkedNode slot = null; + + synchronized(this) { + item = waitingPuts.deq(); + if (item == null) + waitingTakes.enq(slot = new LinkedNode()); + } + + if (item != null) { + synchronized(item) { + Object x = item.value; + if (x != CANCELLED) { + item.value = null; + item.next = null; + item.notify(); + return x; + } + } + } + + else { + synchronized(slot) { + try { + for (;;) { + Object x = slot.value; + if (x != null) { + slot.value = null; + slot.next = null; + return x; + } + else + slot.wait(); + } + } + catch(InterruptedException ie) { + Object x = slot.value; + if (x != null) { + slot.value = null; + slot.next = null; + Thread.currentThread().interrupt(); + return x; + } + else { + slot.value = CANCELLED; + throw ie; + } + } + } + } + } + } + + /* + Offer and poll are just like put and take, except even messier. + */ + + + public boolean offer(Object x, long msecs) throws InterruptedException { + if (x == null) throw new IllegalArgumentException(); + long waitTime = msecs; + long startTime = 0; // lazily initialize below if needed + + for (;;) { + if (Thread.interrupted()) throw new InterruptedException(); + + LinkedNode slot; + LinkedNode item = null; + + synchronized(this) { + slot = waitingTakes.deq(); + if (slot == null) { + if (waitTime <= 0) + return false; + else + waitingPuts.enq(item = new LinkedNode(x)); + } + } + + if (slot != null) { + synchronized(slot) { + if (slot.value != CANCELLED) { + slot.value = x; + slot.notify(); + return true; + } + } + } + + long now = System.currentTimeMillis(); + if (startTime == 0) + startTime = now; + else + waitTime = msecs - (now - startTime); + + if (item != null) { + synchronized(item) { + try { + for (;;) { + if (item.value == null) + return true; + if (waitTime <= 0) { + item.value = CANCELLED; + return false; + } + item.wait(waitTime); + waitTime = msecs - (System.currentTimeMillis() - startTime); + } + } + catch (InterruptedException ie) { + if (item.value == null) { + Thread.currentThread().interrupt(); + return true; + } + else { + item.value = CANCELLED; + throw ie; + } + } + } + } + } + } + + public Object poll(long msecs) throws InterruptedException { + long waitTime = msecs; + long startTime = 0; + + for (;;) { + if (Thread.interrupted()) throw new InterruptedException(); + + LinkedNode item; + LinkedNode slot = null; + + synchronized(this) { + item = waitingPuts.deq(); + if (item == null) { + if (waitTime <= 0) + return null; + else + waitingTakes.enq(slot = new LinkedNode()); + } + } + + if (item != null) { + synchronized(item) { + Object x = item.value; + if (x != CANCELLED) { + item.value = null; + item.next = null; + item.notify(); + return x; + } + } + } + + long now = System.currentTimeMillis(); + if (startTime == 0) + startTime = now; + else + waitTime = msecs - (now - startTime); + + if (slot != null) { + synchronized(slot) { + try { + for (;;) { + Object x = slot.value; + if (x != null) { + slot.value = null; + slot.next = null; + return x; + } + if (waitTime <= 0) { + slot.value = CANCELLED; + return null; + } + slot.wait(waitTime); + waitTime = msecs - (System.currentTimeMillis() - startTime); + } + } + catch(InterruptedException ie) { + Object x = slot.value; + if (x != null) { + slot.value = null; + slot.next = null; + Thread.currentThread().interrupt(); + return x; + } + else { + slot.value = CANCELLED; + throw ie; + } + } + } + } + } + } + +}