comparison src/EDU/oswego/cs/dl/util/concurrent/SynchronousChannel.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:3dc0c5604566
1 /*
2 File: SynchronousChannel.java
3
4 Originally written by Doug Lea and released into the public domain.
5 This may be used for any purposes whatsoever without acknowledgment.
6 Thanks for the assistance and support of Sun Microsystems Labs,
7 and everyone contributing, testing, and using this code.
8
9 History:
10 Date Who What
11 11Jun1998 dl Create public version
12 17Jul1998 dl Disabled direct semaphore permit check
13 31Jul1998 dl Replaced main algorithm with one with
14 better scaling and fairness properties.
15 25aug1998 dl added peek
16 24Nov2001 dl Replaced main algorithm with faster one.
17 */
18
19 package EDU.oswego.cs.dl.util.concurrent;
20
21 /**
22 * A rendezvous channel, similar to those used in CSP and Ada. Each
23 * put must wait for a take, and vice versa. Synchronous channels
24 * are well suited for handoff designs, in which an object running in
25 * one thread must synch up with an object running in another thread
26 * in order to hand it some information, event, or task.
27 * <p> If you only need threads to synch up without
28 * exchanging information, consider using a Barrier. If you need
29 * bidirectional exchanges, consider using a Rendezvous. <p>
30 *
31 * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
32 * @see CyclicBarrier
33 * @see Rendezvous
34 **/
35
36 public class SynchronousChannel implements BoundedChannel {
37
38 /*
39 This implementation divides actions into two cases for puts:
40
41 * An arriving putter that does not already have a waiting taker
42 creates a node holding item, and then waits for a taker to take it.
43 * An arriving putter that does already have a waiting taker fills
44 the slot node created by the taker, and notifies it to continue.
45
46 And symmetrically, two for takes:
47
48 * An arriving taker that does not already have a waiting putter
49 creates an empty slot node, and then waits for a putter to fill it.
50 * An arriving taker that does already have a waiting putter takes
51 item from the node created by the putter, and notifies it to continue.
52
53 This requires keeping two simple queues: waitingPuts and waitingTakes.
54
55 When a put or take waiting for the actions of its counterpart
56 aborts due to interruption or timeout, it marks the node
57 it created as "CANCELLED", which causes its counterpart to retry
58 the entire put or take sequence.
59 */
60
61 /**
62 * Special marker used in queue nodes to indicate that
63 * the thread waiting for a change in the node has timed out
64 * or been interrupted.
65 **/
66 protected static final Object CANCELLED = new Object();
67
68 /**
69 * Simple FIFO queue class to hold waiting puts/takes.
70 **/
71 protected static class Queue {
72 protected LinkedNode head;
73 protected LinkedNode last;
74
75 protected void enq(LinkedNode p) {
76 if (last == null)
77 last = head = p;
78 else
79 last = last.next = p;
80 }
81
82 protected LinkedNode deq() {
83 LinkedNode p = head;
84 if (p != null && (head = p.next) == null)
85 last = null;
86 return p;
87 }
88 }
89
90 protected final Queue waitingPuts = new Queue();
91 protected final Queue waitingTakes = new Queue();
92
93 /**
94 * @return zero --
95 * Synchronous channels have no internal capacity.
96 **/
97 public int capacity() { return 0; }
98
99 /**
100 * @return null --
101 * Synchronous channels do not hold contents unless actively taken
102 **/
103 public Object peek() { return null; }
104
105
106 public void put(Object x) throws InterruptedException {
107 if (x == null) throw new IllegalArgumentException();
108
109 // This code is conceptually straightforward, but messy
110 // because we need to intertwine handling of put-arrives first
111 // vs take-arrives first cases.
112
113 // Outer loop is to handle retry due to cancelled waiting taker
114 for (;;) {
115
116 // Get out now if we are interrupted
117 if (Thread.interrupted()) throw new InterruptedException();
118
119 // Exactly one of item or slot will be nonnull at end of
120 // synchronized block, depending on whether a put or a take
121 // arrived first.
122 LinkedNode slot;
123 LinkedNode item = null;
124
125 synchronized(this) {
126 // Try to match up with a waiting taker; fill and signal it below
127 slot = waitingTakes.deq();
128
129 // If no takers yet, create a node and wait below
130 if (slot == null)
131 waitingPuts.enq(item = new LinkedNode(x));
132 }
133
134 if (slot != null) { // There is a waiting taker.
135 // Fill in the slot created by the taker and signal taker to
136 // continue.
137 synchronized(slot) {
138 if (slot.value != CANCELLED) {
139 slot.value = x;
140 slot.notify();
141 return;
142 }
143 // else the taker has cancelled, so retry outer loop
144 }
145 }
146
147 else {
148 // Wait for a taker to arrive and take the item.
149 synchronized(item) {
150 try {
151 while (item.value != null)
152 item.wait();
153 return;
154 }
155 catch (InterruptedException ie) {
156 // If item was taken, return normally but set interrupt status
157 if (item.value == null) {
158 Thread.currentThread().interrupt();
159 return;
160 }
161 else {
162 item.value = CANCELLED;
163 throw ie;
164 }
165 }
166 }
167 }
168 }
169 }
170
171 public Object take() throws InterruptedException {
172 // Entirely symmetric to put()
173
174 for (;;) {
175 if (Thread.interrupted()) throw new InterruptedException();
176
177 LinkedNode item;
178 LinkedNode slot = null;
179
180 synchronized(this) {
181 item = waitingPuts.deq();
182 if (item == null)
183 waitingTakes.enq(slot = new LinkedNode());
184 }
185
186 if (item != null) {
187 synchronized(item) {
188 Object x = item.value;
189 if (x != CANCELLED) {
190 item.value = null;
191 item.next = null;
192 item.notify();
193 return x;
194 }
195 }
196 }
197
198 else {
199 synchronized(slot) {
200 try {
201 for (;;) {
202 Object x = slot.value;
203 if (x != null) {
204 slot.value = null;
205 slot.next = null;
206 return x;
207 }
208 else
209 slot.wait();
210 }
211 }
212 catch(InterruptedException ie) {
213 Object x = slot.value;
214 if (x != null) {
215 slot.value = null;
216 slot.next = null;
217 Thread.currentThread().interrupt();
218 return x;
219 }
220 else {
221 slot.value = CANCELLED;
222 throw ie;
223 }
224 }
225 }
226 }
227 }
228 }
229
230 /*
231 Offer and poll are just like put and take, except even messier.
232 */
233
234
235 public boolean offer(Object x, long msecs) throws InterruptedException {
236 if (x == null) throw new IllegalArgumentException();
237 long waitTime = msecs;
238 long startTime = 0; // lazily initialize below if needed
239
240 for (;;) {
241 if (Thread.interrupted()) throw new InterruptedException();
242
243 LinkedNode slot;
244 LinkedNode item = null;
245
246 synchronized(this) {
247 slot = waitingTakes.deq();
248 if (slot == null) {
249 if (waitTime <= 0)
250 return false;
251 else
252 waitingPuts.enq(item = new LinkedNode(x));
253 }
254 }
255
256 if (slot != null) {
257 synchronized(slot) {
258 if (slot.value != CANCELLED) {
259 slot.value = x;
260 slot.notify();
261 return true;
262 }
263 }
264 }
265
266 long now = System.currentTimeMillis();
267 if (startTime == 0)
268 startTime = now;
269 else
270 waitTime = msecs - (now - startTime);
271
272 if (item != null) {
273 synchronized(item) {
274 try {
275 for (;;) {
276 if (item.value == null)
277 return true;
278 if (waitTime <= 0) {
279 item.value = CANCELLED;
280 return false;
281 }
282 item.wait(waitTime);
283 waitTime = msecs - (System.currentTimeMillis() - startTime);
284 }
285 }
286 catch (InterruptedException ie) {
287 if (item.value == null) {
288 Thread.currentThread().interrupt();
289 return true;
290 }
291 else {
292 item.value = CANCELLED;
293 throw ie;
294 }
295 }
296 }
297 }
298 }
299 }
300
301 public Object poll(long msecs) throws InterruptedException {
302 long waitTime = msecs;
303 long startTime = 0;
304
305 for (;;) {
306 if (Thread.interrupted()) throw new InterruptedException();
307
308 LinkedNode item;
309 LinkedNode slot = null;
310
311 synchronized(this) {
312 item = waitingPuts.deq();
313 if (item == null) {
314 if (waitTime <= 0)
315 return null;
316 else
317 waitingTakes.enq(slot = new LinkedNode());
318 }
319 }
320
321 if (item != null) {
322 synchronized(item) {
323 Object x = item.value;
324 if (x != CANCELLED) {
325 item.value = null;
326 item.next = null;
327 item.notify();
328 return x;
329 }
330 }
331 }
332
333 long now = System.currentTimeMillis();
334 if (startTime == 0)
335 startTime = now;
336 else
337 waitTime = msecs - (now - startTime);
338
339 if (slot != null) {
340 synchronized(slot) {
341 try {
342 for (;;) {
343 Object x = slot.value;
344 if (x != null) {
345 slot.value = null;
346 slot.next = null;
347 return x;
348 }
349 if (waitTime <= 0) {
350 slot.value = CANCELLED;
351 return null;
352 }
353 slot.wait(waitTime);
354 waitTime = msecs - (System.currentTimeMillis() - startTime);
355 }
356 }
357 catch(InterruptedException ie) {
358 Object x = slot.value;
359 if (x != null) {
360 slot.value = null;
361 slot.next = null;
362 Thread.currentThread().interrupt();
363 return x;
364 }
365 else {
366 slot.value = CANCELLED;
367 throw ie;
368 }
369 }
370 }
371 }
372 }
373 }
374
375 }