comparison src/EDU/oswego/cs/dl/util/concurrent/WaitFreeQueue.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:3dc0c5604566
1 /*
2 File: WaitFreeQueue.java
3
4 Originally written by Doug Lea and released into the public domain.
5 This may be used for any purposes whatsoever without acknowledgment.
6 Thanks for the assistance and support of Sun Microsystems Labs,
7 and everyone contributing, testing, and using this code.
8
9 History:
10 Date Who What
11 16Jun1998 dl Create public version
12 5Aug1998 dl replaced int counters with longs
13 17nov2001 dl Simplify given Bill Pugh's observation
14 that counted pointers are unnecessary.
15 */
16
17 package EDU.oswego.cs.dl.util.concurrent;
18
19 /**
20 * A wait-free linked list based queue implementation.
21 * <p>
22 *
23 * While this class conforms to the full Channel interface, only the
24 * <code>put</code> and <code>poll</code> methods are useful in most
25 * applications. Because the queue does not support blocking
26 * operations, <code>take</code> relies on spin-loops, which can be
27 * extremely wasteful. <p>
28 *
29 * This class is adapted from the algorithm described in <a
30 * href="http://www.cs.rochester.edu/u/michael/PODC96.html"> Simple,
31 * Fast, and Practical Non-Blocking and Blocking Concurrent Queue
32 * Algorithms</a> by Maged M. Michael and Michael L. Scott. This
33 * implementation is not strictly wait-free since it relies on locking
34 * for basic atomicity and visibility requirements. Locks can impose
35 * unbounded waits, although this should not be a major practical
36 * concern here since each lock is held for the duration of only a few
37 * statements. (However, the overhead of using so many locks can make
38 * it less attractive than other Channel implementations on JVMs where
39 * locking operations are very slow.) <p>
40 *
41 * @see BoundedLinkedQueue
42 * @see LinkedQueue
43 *
44 * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
45
46 **/
47
48 public class WaitFreeQueue implements Channel {
49
50 /*
51 This is a straightforward adaptation of Michael & Scott
52 algorithm, with CAS's simulated via per-field locks,
53 and without version numbers for pointers since, under
54 Java Garbage Collection, you can never see the "wrong"
55 node with the same address as the one you think you have.
56 */
57
58 /** List nodes for Queue **/
59 protected final static class Node {
60 protected final Object value;
61 protected volatile Node next;
62
63 /** Make a new node with indicated item, and null link **/
64 protected Node(Object x) { value = x; }
65
66 /** Simulate a CAS operation for 'next' field **/
67 protected synchronized boolean CASNext(Node oldNext, Node newNext) {
68 if (next == oldNext) {
69 next = newNext;
70 return true;
71 }
72 else
73 return false;
74 }
75 }
76
77 /** Head of list is always a dummy node **/
78 protected volatile Node head = new Node(null);
79 /** Pointer to last node on list **/
80 protected volatile Node tail = head;
81
82 /** Lock for simulating CAS for tail field **/
83 protected final Object tailLock = new Object();
84
85 /** Simulate CAS for head field, using 'this' lock **/
86 protected synchronized boolean CASHead(Node oldHead, Node newHead) {
87 if (head == oldHead) {
88 head = newHead;
89 return true;
90 }
91 else
92 return false;
93 }
94
95 /** Simulate CAS for tail field **/
96 protected boolean CASTail(Node oldTail, Node newTail) {
97 synchronized(tailLock) {
98 if (tail == oldTail) {
99 tail = newTail;
100 return true;
101 }
102 else
103 return false;
104 }
105 }
106
107 public void put(Object x) throws InterruptedException {
108 if (x == null) throw new IllegalArgumentException();
109 if (Thread.interrupted()) throw new InterruptedException();
110 Node n = new Node(x);
111
112 for(;;) {
113 Node t = tail;
114 // Try to link new node to end of list.
115 if (t.CASNext(null, n)) {
116 // Must now change tail field.
117 // This CAS might fail, but if so, it will be fixed by others.
118 CASTail(t, n);
119 return;
120 }
121
122 // If cannot link, help out a previous failed attempt to move tail
123 CASTail(t, t.next);
124 }
125 }
126
127 public boolean offer(Object x, long msecs) throws InterruptedException {
128 put(x);
129 return true;
130 }
131
132 /** Main dequeue algorithm, called by poll, take. **/
133 protected Object extract() throws InterruptedException {
134 for (;;) {
135 Node h = head;
136 Node first = h.next;
137
138 if (first == null)
139 return null;
140
141 Object result = first.value;
142 if (CASHead(h, first))
143 return result;
144 }
145 }
146
147 public Object peek() {
148 Node first = head.next;
149
150 if (first == null)
151 return null;
152
153 // Note: This synch unnecessary after JSR-133.
154 // It exists only to guarantee visibility of returned object,
155 // No other synch is needed, but "old" memory model requires one.
156 synchronized(this) {
157 return first.value;
158 }
159 }
160
161 /**
162 * Spin until poll returns a non-null value.
163 * You probably don't want to call this method.
164 * A Thread.sleep(0) is performed on each iteration
165 * as a heuristic to reduce contention. If you would
166 * rather use, for example, an exponential backoff,
167 * you could manually set this up using poll.
168 **/
169 public Object take() throws InterruptedException {
170 if (Thread.interrupted()) throw new InterruptedException();
171 for(;;) {
172 Object x = extract();
173 if (x != null)
174 return x;
175 else
176 Thread.sleep(0);
177 }
178 }
179
180 /**
181 * Spin until poll returns a non-null value or time elapses.
182 * if msecs is positive, a Thread.sleep(0) is performed on each iteration
183 * as a heuristic to reduce contention.
184 **/
185 public Object poll(long msecs) throws InterruptedException {
186 if (Thread.interrupted()) throw new InterruptedException();
187 if (msecs <= 0)
188 return extract();
189
190 long startTime = System.currentTimeMillis();
191 for(;;) {
192 Object x = extract();
193 if (x != null)
194 return x;
195 else if (System.currentTimeMillis() - startTime >= msecs)
196 return null;
197 else
198 Thread.sleep(0);
199 }
200
201 }
202 }