Mercurial > hg > blitz_condensed
diff src/EDU/oswego/cs/dl/util/concurrent/WaitFreeQueue.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/EDU/oswego/cs/dl/util/concurrent/WaitFreeQueue.java Sat Mar 21 11:00:06 2009 +0000 @@ -0,0 +1,202 @@ +/* + File: WaitFreeQueue.java + + Originally written by Doug Lea and released into the public domain. + This may be used for any purposes whatsoever without acknowledgment. + Thanks for the assistance and support of Sun Microsystems Labs, + and everyone contributing, testing, and using this code. + + History: + Date Who What + 16Jun1998 dl Create public version + 5Aug1998 dl replaced int counters with longs + 17nov2001 dl Simplify given Bill Pugh's observation + that counted pointers are unnecessary. +*/ + +package EDU.oswego.cs.dl.util.concurrent; + +/** + * A wait-free linked list based queue implementation. + * <p> + * + * While this class conforms to the full Channel interface, only the + * <code>put</code> and <code>poll</code> methods are useful in most + * applications. Because the queue does not support blocking + * operations, <code>take</code> relies on spin-loops, which can be + * extremely wasteful. <p> + * + * This class is adapted from the algorithm described in <a + * href="http://www.cs.rochester.edu/u/michael/PODC96.html"> Simple, + * Fast, and Practical Non-Blocking and Blocking Concurrent Queue + * Algorithms</a> by Maged M. Michael and Michael L. Scott. This + * implementation is not strictly wait-free since it relies on locking + * for basic atomicity and visibility requirements. Locks can impose + * unbounded waits, although this should not be a major practical + * concern here since each lock is held for the duration of only a few + * statements. (However, the overhead of using so many locks can make + * it less attractive than other Channel implementations on JVMs where + * locking operations are very slow.) <p> + * + * @see BoundedLinkedQueue + * @see LinkedQueue + * + * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] + + **/ + +public class WaitFreeQueue implements Channel { + + /* + This is a straightforward adaptation of Michael & Scott + algorithm, with CAS's simulated via per-field locks, + and without version numbers for pointers since, under + Java Garbage Collection, you can never see the "wrong" + node with the same address as the one you think you have. + */ + + /** List nodes for Queue **/ + protected final static class Node { + protected final Object value; + protected volatile Node next; + + /** Make a new node with indicated item, and null link **/ + protected Node(Object x) { value = x; } + + /** Simulate a CAS operation for 'next' field **/ + protected synchronized boolean CASNext(Node oldNext, Node newNext) { + if (next == oldNext) { + next = newNext; + return true; + } + else + return false; + } + } + + /** Head of list is always a dummy node **/ + protected volatile Node head = new Node(null); + /** Pointer to last node on list **/ + protected volatile Node tail = head; + + /** Lock for simulating CAS for tail field **/ + protected final Object tailLock = new Object(); + + /** Simulate CAS for head field, using 'this' lock **/ + protected synchronized boolean CASHead(Node oldHead, Node newHead) { + if (head == oldHead) { + head = newHead; + return true; + } + else + return false; + } + + /** Simulate CAS for tail field **/ + protected boolean CASTail(Node oldTail, Node newTail) { + synchronized(tailLock) { + if (tail == oldTail) { + tail = newTail; + return true; + } + else + return false; + } + } + + public void put(Object x) throws InterruptedException { + if (x == null) throw new IllegalArgumentException(); + if (Thread.interrupted()) throw new InterruptedException(); + Node n = new Node(x); + + for(;;) { + Node t = tail; + // Try to link new node to end of list. + if (t.CASNext(null, n)) { + // Must now change tail field. + // This CAS might fail, but if so, it will be fixed by others. + CASTail(t, n); + return; + } + + // If cannot link, help out a previous failed attempt to move tail + CASTail(t, t.next); + } + } + + public boolean offer(Object x, long msecs) throws InterruptedException { + put(x); + return true; + } + + /** Main dequeue algorithm, called by poll, take. **/ + protected Object extract() throws InterruptedException { + for (;;) { + Node h = head; + Node first = h.next; + + if (first == null) + return null; + + Object result = first.value; + if (CASHead(h, first)) + return result; + } + } + + public Object peek() { + Node first = head.next; + + if (first == null) + return null; + + // Note: This synch unnecessary after JSR-133. + // It exists only to guarantee visibility of returned object, + // No other synch is needed, but "old" memory model requires one. + synchronized(this) { + return first.value; + } + } + + /** + * Spin until poll returns a non-null value. + * You probably don't want to call this method. + * A Thread.sleep(0) is performed on each iteration + * as a heuristic to reduce contention. If you would + * rather use, for example, an exponential backoff, + * you could manually set this up using poll. + **/ + public Object take() throws InterruptedException { + if (Thread.interrupted()) throw new InterruptedException(); + for(;;) { + Object x = extract(); + if (x != null) + return x; + else + Thread.sleep(0); + } + } + + /** + * Spin until poll returns a non-null value or time elapses. + * if msecs is positive, a Thread.sleep(0) is performed on each iteration + * as a heuristic to reduce contention. + **/ + public Object poll(long msecs) throws InterruptedException { + if (Thread.interrupted()) throw new InterruptedException(); + if (msecs <= 0) + return extract(); + + long startTime = System.currentTimeMillis(); + for(;;) { + Object x = extract(); + if (x != null) + return x; + else if (System.currentTimeMillis() - startTime >= msecs) + return null; + else + Thread.sleep(0); + } + + } +}