Mercurial > hg > blitz_condensed
diff src/EDU/oswego/cs/dl/util/concurrent/LinkedQueue.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/EDU/oswego/cs/dl/util/concurrent/LinkedQueue.java Sat Mar 21 11:00:06 2009 +0000 @@ -0,0 +1,185 @@ +/* + File: LinkedQueue.java + + Originally written by Doug Lea and released into the public domain. + This may be used for any purposes whatsoever without acknowledgment. + Thanks for the assistance and support of Sun Microsystems Labs, + and everyone contributing, testing, and using this code. + + History: + Date Who What + 11Jun1998 dl Create public version + 25aug1998 dl added peek + 10dec1998 dl added isEmpty + 10oct1999 dl lock on node object to ensure visibility +*/ + +package EDU.oswego.cs.dl.util.concurrent; + +/** + * A linked list based channel implementation. + * The algorithm avoids contention between puts + * and takes when the queue is not empty. + * Normally a put and a take can proceed simultaneously. + * (Although it does not allow multiple concurrent puts or takes.) + * This class tends to perform more efficently than + * other Channel implementations in producer/consumer + * applications. + * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] + **/ + +public class LinkedQueue implements Channel { + + + /** + * Dummy header node of list. The first actual node, if it exists, is always + * at head_.next. After each take, the old first node becomes the head. + **/ + protected LinkedNode head_; + + /** + * Helper monitor for managing access to last node. + **/ + protected final Object putLock_ = new Object(); + + /** + * The last node of list. Put() appends to list, so modifies last_ + **/ + protected LinkedNode last_; + + /** + * The number of threads waiting for a take. + * Notifications are provided in put only if greater than zero. + * The bookkeeping is worth it here since in reasonably balanced + * usages, the notifications will hardly ever be necessary, so + * the call overhead to notify can be eliminated. + **/ + protected int waitingForTake_ = 0; + + public LinkedQueue() { + head_ = new LinkedNode(null); + last_ = head_; + } + + /** Main mechanics for put/offer **/ + protected void insert(Object x) { + synchronized(putLock_) { + LinkedNode p = new LinkedNode(x); + synchronized(last_) { + last_.next = p; + last_ = p; + } + if (waitingForTake_ > 0) + putLock_.notify(); + } + } + + /** Main mechanics for take/poll **/ + protected synchronized Object extract() { + synchronized(head_) { + Object x = null; + LinkedNode first = head_.next; + if (first != null) { + x = first.value; + first.value = null; + head_ = first; + } + return x; + } + } + + + public void put(Object x) throws InterruptedException { + if (x == null) throw new IllegalArgumentException(); + if (Thread.interrupted()) throw new InterruptedException(); + insert(x); + } + + public boolean offer(Object x, long msecs) throws InterruptedException { + if (x == null) throw new IllegalArgumentException(); + if (Thread.interrupted()) throw new InterruptedException(); + insert(x); + return true; + } + + public Object take() throws InterruptedException { + if (Thread.interrupted()) throw new InterruptedException(); + // try to extract. If fail, then enter wait-based retry loop + Object x = extract(); + if (x != null) + return x; + else { + synchronized(putLock_) { + try { + ++waitingForTake_; + for (;;) { + x = extract(); + if (x != null) { + --waitingForTake_; + return x; + } + else { + putLock_.wait(); + } + } + } + catch(InterruptedException ex) { + --waitingForTake_; + putLock_.notify(); + throw ex; + } + } + } + } + + public Object peek() { + synchronized(head_) { + LinkedNode first = head_.next; + if (first != null) + return first.value; + else + return null; + } + } + + + public boolean isEmpty() { + synchronized(head_) { + return head_.next == null; + } + } + + public Object poll(long msecs) throws InterruptedException { + if (Thread.interrupted()) throw new InterruptedException(); + Object x = extract(); + if (x != null) + return x; + else { + synchronized(putLock_) { + try { + long waitTime = msecs; + long start = (msecs <= 0)? 0 : System.currentTimeMillis(); + ++waitingForTake_; + for (;;) { + x = extract(); + if (x != null || waitTime <= 0) { + --waitingForTake_; + return x; + } + else { + putLock_.wait(waitTime); + waitTime = msecs - (System.currentTimeMillis() - start); + } + } + } + catch(InterruptedException ex) { + --waitingForTake_; + putLock_.notify(); + throw ex; + } + } + } + } +} + +