Mercurial > hg > blitz_condensed
comparison src/EDU/oswego/cs/dl/util/concurrent/WaitFreeQueue.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:3dc0c5604566 |
---|---|
1 /* | |
2 File: WaitFreeQueue.java | |
3 | |
4 Originally written by Doug Lea and released into the public domain. | |
5 This may be used for any purposes whatsoever without acknowledgment. | |
6 Thanks for the assistance and support of Sun Microsystems Labs, | |
7 and everyone contributing, testing, and using this code. | |
8 | |
9 History: | |
10 Date Who What | |
11 16Jun1998 dl Create public version | |
12 5Aug1998 dl replaced int counters with longs | |
13 17nov2001 dl Simplify given Bill Pugh's observation | |
14 that counted pointers are unnecessary. | |
15 */ | |
16 | |
17 package EDU.oswego.cs.dl.util.concurrent; | |
18 | |
19 /** | |
20 * A wait-free linked list based queue implementation. | |
21 * <p> | |
22 * | |
23 * While this class conforms to the full Channel interface, only the | |
24 * <code>put</code> and <code>poll</code> methods are useful in most | |
25 * applications. Because the queue does not support blocking | |
26 * operations, <code>take</code> relies on spin-loops, which can be | |
27 * extremely wasteful. <p> | |
28 * | |
29 * This class is adapted from the algorithm described in <a | |
30 * href="http://www.cs.rochester.edu/u/michael/PODC96.html"> Simple, | |
31 * Fast, and Practical Non-Blocking and Blocking Concurrent Queue | |
32 * Algorithms</a> by Maged M. Michael and Michael L. Scott. This | |
33 * implementation is not strictly wait-free since it relies on locking | |
34 * for basic atomicity and visibility requirements. Locks can impose | |
35 * unbounded waits, although this should not be a major practical | |
36 * concern here since each lock is held for the duration of only a few | |
37 * statements. (However, the overhead of using so many locks can make | |
38 * it less attractive than other Channel implementations on JVMs where | |
39 * locking operations are very slow.) <p> | |
40 * | |
41 * @see BoundedLinkedQueue | |
42 * @see LinkedQueue | |
43 * | |
44 * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] | |
45 | |
46 **/ | |
47 | |
48 public class WaitFreeQueue implements Channel { | |
49 | |
50 /* | |
51 This is a straightforward adaptation of Michael & Scott | |
52 algorithm, with CAS's simulated via per-field locks, | |
53 and without version numbers for pointers since, under | |
54 Java Garbage Collection, you can never see the "wrong" | |
55 node with the same address as the one you think you have. | |
56 */ | |
57 | |
58 /** List nodes for Queue **/ | |
59 protected final static class Node { | |
60 protected final Object value; | |
61 protected volatile Node next; | |
62 | |
63 /** Make a new node with indicated item, and null link **/ | |
64 protected Node(Object x) { value = x; } | |
65 | |
66 /** Simulate a CAS operation for 'next' field **/ | |
67 protected synchronized boolean CASNext(Node oldNext, Node newNext) { | |
68 if (next == oldNext) { | |
69 next = newNext; | |
70 return true; | |
71 } | |
72 else | |
73 return false; | |
74 } | |
75 } | |
76 | |
77 /** Head of list is always a dummy node **/ | |
78 protected volatile Node head = new Node(null); | |
79 /** Pointer to last node on list **/ | |
80 protected volatile Node tail = head; | |
81 | |
82 /** Lock for simulating CAS for tail field **/ | |
83 protected final Object tailLock = new Object(); | |
84 | |
85 /** Simulate CAS for head field, using 'this' lock **/ | |
86 protected synchronized boolean CASHead(Node oldHead, Node newHead) { | |
87 if (head == oldHead) { | |
88 head = newHead; | |
89 return true; | |
90 } | |
91 else | |
92 return false; | |
93 } | |
94 | |
95 /** Simulate CAS for tail field **/ | |
96 protected boolean CASTail(Node oldTail, Node newTail) { | |
97 synchronized(tailLock) { | |
98 if (tail == oldTail) { | |
99 tail = newTail; | |
100 return true; | |
101 } | |
102 else | |
103 return false; | |
104 } | |
105 } | |
106 | |
107 public void put(Object x) throws InterruptedException { | |
108 if (x == null) throw new IllegalArgumentException(); | |
109 if (Thread.interrupted()) throw new InterruptedException(); | |
110 Node n = new Node(x); | |
111 | |
112 for(;;) { | |
113 Node t = tail; | |
114 // Try to link new node to end of list. | |
115 if (t.CASNext(null, n)) { | |
116 // Must now change tail field. | |
117 // This CAS might fail, but if so, it will be fixed by others. | |
118 CASTail(t, n); | |
119 return; | |
120 } | |
121 | |
122 // If cannot link, help out a previous failed attempt to move tail | |
123 CASTail(t, t.next); | |
124 } | |
125 } | |
126 | |
127 public boolean offer(Object x, long msecs) throws InterruptedException { | |
128 put(x); | |
129 return true; | |
130 } | |
131 | |
132 /** Main dequeue algorithm, called by poll, take. **/ | |
133 protected Object extract() throws InterruptedException { | |
134 for (;;) { | |
135 Node h = head; | |
136 Node first = h.next; | |
137 | |
138 if (first == null) | |
139 return null; | |
140 | |
141 Object result = first.value; | |
142 if (CASHead(h, first)) | |
143 return result; | |
144 } | |
145 } | |
146 | |
147 public Object peek() { | |
148 Node first = head.next; | |
149 | |
150 if (first == null) | |
151 return null; | |
152 | |
153 // Note: This synch unnecessary after JSR-133. | |
154 // It exists only to guarantee visibility of returned object, | |
155 // No other synch is needed, but "old" memory model requires one. | |
156 synchronized(this) { | |
157 return first.value; | |
158 } | |
159 } | |
160 | |
161 /** | |
162 * Spin until poll returns a non-null value. | |
163 * You probably don't want to call this method. | |
164 * A Thread.sleep(0) is performed on each iteration | |
165 * as a heuristic to reduce contention. If you would | |
166 * rather use, for example, an exponential backoff, | |
167 * you could manually set this up using poll. | |
168 **/ | |
169 public Object take() throws InterruptedException { | |
170 if (Thread.interrupted()) throw new InterruptedException(); | |
171 for(;;) { | |
172 Object x = extract(); | |
173 if (x != null) | |
174 return x; | |
175 else | |
176 Thread.sleep(0); | |
177 } | |
178 } | |
179 | |
180 /** | |
181 * Spin until poll returns a non-null value or time elapses. | |
182 * if msecs is positive, a Thread.sleep(0) is performed on each iteration | |
183 * as a heuristic to reduce contention. | |
184 **/ | |
185 public Object poll(long msecs) throws InterruptedException { | |
186 if (Thread.interrupted()) throw new InterruptedException(); | |
187 if (msecs <= 0) | |
188 return extract(); | |
189 | |
190 long startTime = System.currentTimeMillis(); | |
191 for(;;) { | |
192 Object x = extract(); | |
193 if (x != null) | |
194 return x; | |
195 else if (System.currentTimeMillis() - startTime >= msecs) | |
196 return null; | |
197 else | |
198 Thread.sleep(0); | |
199 } | |
200 | |
201 } | |
202 } |