Mercurial > hg > blitz_condensed
comparison src/EDU/oswego/cs/dl/util/concurrent/SynchronousChannel.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:3dc0c5604566 |
---|---|
1 /* | |
2 File: SynchronousChannel.java | |
3 | |
4 Originally written by Doug Lea and released into the public domain. | |
5 This may be used for any purposes whatsoever without acknowledgment. | |
6 Thanks for the assistance and support of Sun Microsystems Labs, | |
7 and everyone contributing, testing, and using this code. | |
8 | |
9 History: | |
10 Date Who What | |
11 11Jun1998 dl Create public version | |
12 17Jul1998 dl Disabled direct semaphore permit check | |
13 31Jul1998 dl Replaced main algorithm with one with | |
14 better scaling and fairness properties. | |
15 25aug1998 dl added peek | |
16 24Nov2001 dl Replaced main algorithm with faster one. | |
17 */ | |
18 | |
19 package EDU.oswego.cs.dl.util.concurrent; | |
20 | |
21 /** | |
22 * A rendezvous channel, similar to those used in CSP and Ada. Each | |
23 * put must wait for a take, and vice versa. Synchronous channels | |
24 * are well suited for handoff designs, in which an object running in | |
25 * one thread must synch up with an object running in another thread | |
26 * in order to hand it some information, event, or task. | |
27 * <p> If you only need threads to synch up without | |
28 * exchanging information, consider using a Barrier. If you need | |
29 * bidirectional exchanges, consider using a Rendezvous. <p> | |
30 * | |
31 * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] | |
32 * @see CyclicBarrier | |
33 * @see Rendezvous | |
34 **/ | |
35 | |
36 public class SynchronousChannel implements BoundedChannel { | |
37 | |
38 /* | |
39 This implementation divides actions into two cases for puts: | |
40 | |
41 * An arriving putter that does not already have a waiting taker | |
42 creates a node holding item, and then waits for a taker to take it. | |
43 * An arriving putter that does already have a waiting taker fills | |
44 the slot node created by the taker, and notifies it to continue. | |
45 | |
46 And symmetrically, two for takes: | |
47 | |
48 * An arriving taker that does not already have a waiting putter | |
49 creates an empty slot node, and then waits for a putter to fill it. | |
50 * An arriving taker that does already have a waiting putter takes | |
51 item from the node created by the putter, and notifies it to continue. | |
52 | |
53 This requires keeping two simple queues: waitingPuts and waitingTakes. | |
54 | |
55 When a put or take waiting for the actions of its counterpart | |
56 aborts due to interruption or timeout, it marks the node | |
57 it created as "CANCELLED", which causes its counterpart to retry | |
58 the entire put or take sequence. | |
59 */ | |
60 | |
61 /** | |
62 * Special marker used in queue nodes to indicate that | |
63 * the thread waiting for a change in the node has timed out | |
64 * or been interrupted. | |
65 **/ | |
66 protected static final Object CANCELLED = new Object(); | |
67 | |
68 /** | |
69 * Simple FIFO queue class to hold waiting puts/takes. | |
70 **/ | |
71 protected static class Queue { | |
72 protected LinkedNode head; | |
73 protected LinkedNode last; | |
74 | |
75 protected void enq(LinkedNode p) { | |
76 if (last == null) | |
77 last = head = p; | |
78 else | |
79 last = last.next = p; | |
80 } | |
81 | |
82 protected LinkedNode deq() { | |
83 LinkedNode p = head; | |
84 if (p != null && (head = p.next) == null) | |
85 last = null; | |
86 return p; | |
87 } | |
88 } | |
89 | |
90 protected final Queue waitingPuts = new Queue(); | |
91 protected final Queue waitingTakes = new Queue(); | |
92 | |
93 /** | |
94 * @return zero -- | |
95 * Synchronous channels have no internal capacity. | |
96 **/ | |
97 public int capacity() { return 0; } | |
98 | |
99 /** | |
100 * @return null -- | |
101 * Synchronous channels do not hold contents unless actively taken | |
102 **/ | |
103 public Object peek() { return null; } | |
104 | |
105 | |
106 public void put(Object x) throws InterruptedException { | |
107 if (x == null) throw new IllegalArgumentException(); | |
108 | |
109 // This code is conceptually straightforward, but messy | |
110 // because we need to intertwine handling of put-arrives first | |
111 // vs take-arrives first cases. | |
112 | |
113 // Outer loop is to handle retry due to cancelled waiting taker | |
114 for (;;) { | |
115 | |
116 // Get out now if we are interrupted | |
117 if (Thread.interrupted()) throw new InterruptedException(); | |
118 | |
119 // Exactly one of item or slot will be nonnull at end of | |
120 // synchronized block, depending on whether a put or a take | |
121 // arrived first. | |
122 LinkedNode slot; | |
123 LinkedNode item = null; | |
124 | |
125 synchronized(this) { | |
126 // Try to match up with a waiting taker; fill and signal it below | |
127 slot = waitingTakes.deq(); | |
128 | |
129 // If no takers yet, create a node and wait below | |
130 if (slot == null) | |
131 waitingPuts.enq(item = new LinkedNode(x)); | |
132 } | |
133 | |
134 if (slot != null) { // There is a waiting taker. | |
135 // Fill in the slot created by the taker and signal taker to | |
136 // continue. | |
137 synchronized(slot) { | |
138 if (slot.value != CANCELLED) { | |
139 slot.value = x; | |
140 slot.notify(); | |
141 return; | |
142 } | |
143 // else the taker has cancelled, so retry outer loop | |
144 } | |
145 } | |
146 | |
147 else { | |
148 // Wait for a taker to arrive and take the item. | |
149 synchronized(item) { | |
150 try { | |
151 while (item.value != null) | |
152 item.wait(); | |
153 return; | |
154 } | |
155 catch (InterruptedException ie) { | |
156 // If item was taken, return normally but set interrupt status | |
157 if (item.value == null) { | |
158 Thread.currentThread().interrupt(); | |
159 return; | |
160 } | |
161 else { | |
162 item.value = CANCELLED; | |
163 throw ie; | |
164 } | |
165 } | |
166 } | |
167 } | |
168 } | |
169 } | |
170 | |
171 public Object take() throws InterruptedException { | |
172 // Entirely symmetric to put() | |
173 | |
174 for (;;) { | |
175 if (Thread.interrupted()) throw new InterruptedException(); | |
176 | |
177 LinkedNode item; | |
178 LinkedNode slot = null; | |
179 | |
180 synchronized(this) { | |
181 item = waitingPuts.deq(); | |
182 if (item == null) | |
183 waitingTakes.enq(slot = new LinkedNode()); | |
184 } | |
185 | |
186 if (item != null) { | |
187 synchronized(item) { | |
188 Object x = item.value; | |
189 if (x != CANCELLED) { | |
190 item.value = null; | |
191 item.next = null; | |
192 item.notify(); | |
193 return x; | |
194 } | |
195 } | |
196 } | |
197 | |
198 else { | |
199 synchronized(slot) { | |
200 try { | |
201 for (;;) { | |
202 Object x = slot.value; | |
203 if (x != null) { | |
204 slot.value = null; | |
205 slot.next = null; | |
206 return x; | |
207 } | |
208 else | |
209 slot.wait(); | |
210 } | |
211 } | |
212 catch(InterruptedException ie) { | |
213 Object x = slot.value; | |
214 if (x != null) { | |
215 slot.value = null; | |
216 slot.next = null; | |
217 Thread.currentThread().interrupt(); | |
218 return x; | |
219 } | |
220 else { | |
221 slot.value = CANCELLED; | |
222 throw ie; | |
223 } | |
224 } | |
225 } | |
226 } | |
227 } | |
228 } | |
229 | |
230 /* | |
231 Offer and poll are just like put and take, except even messier. | |
232 */ | |
233 | |
234 | |
235 public boolean offer(Object x, long msecs) throws InterruptedException { | |
236 if (x == null) throw new IllegalArgumentException(); | |
237 long waitTime = msecs; | |
238 long startTime = 0; // lazily initialize below if needed | |
239 | |
240 for (;;) { | |
241 if (Thread.interrupted()) throw new InterruptedException(); | |
242 | |
243 LinkedNode slot; | |
244 LinkedNode item = null; | |
245 | |
246 synchronized(this) { | |
247 slot = waitingTakes.deq(); | |
248 if (slot == null) { | |
249 if (waitTime <= 0) | |
250 return false; | |
251 else | |
252 waitingPuts.enq(item = new LinkedNode(x)); | |
253 } | |
254 } | |
255 | |
256 if (slot != null) { | |
257 synchronized(slot) { | |
258 if (slot.value != CANCELLED) { | |
259 slot.value = x; | |
260 slot.notify(); | |
261 return true; | |
262 } | |
263 } | |
264 } | |
265 | |
266 long now = System.currentTimeMillis(); | |
267 if (startTime == 0) | |
268 startTime = now; | |
269 else | |
270 waitTime = msecs - (now - startTime); | |
271 | |
272 if (item != null) { | |
273 synchronized(item) { | |
274 try { | |
275 for (;;) { | |
276 if (item.value == null) | |
277 return true; | |
278 if (waitTime <= 0) { | |
279 item.value = CANCELLED; | |
280 return false; | |
281 } | |
282 item.wait(waitTime); | |
283 waitTime = msecs - (System.currentTimeMillis() - startTime); | |
284 } | |
285 } | |
286 catch (InterruptedException ie) { | |
287 if (item.value == null) { | |
288 Thread.currentThread().interrupt(); | |
289 return true; | |
290 } | |
291 else { | |
292 item.value = CANCELLED; | |
293 throw ie; | |
294 } | |
295 } | |
296 } | |
297 } | |
298 } | |
299 } | |
300 | |
301 public Object poll(long msecs) throws InterruptedException { | |
302 long waitTime = msecs; | |
303 long startTime = 0; | |
304 | |
305 for (;;) { | |
306 if (Thread.interrupted()) throw new InterruptedException(); | |
307 | |
308 LinkedNode item; | |
309 LinkedNode slot = null; | |
310 | |
311 synchronized(this) { | |
312 item = waitingPuts.deq(); | |
313 if (item == null) { | |
314 if (waitTime <= 0) | |
315 return null; | |
316 else | |
317 waitingTakes.enq(slot = new LinkedNode()); | |
318 } | |
319 } | |
320 | |
321 if (item != null) { | |
322 synchronized(item) { | |
323 Object x = item.value; | |
324 if (x != CANCELLED) { | |
325 item.value = null; | |
326 item.next = null; | |
327 item.notify(); | |
328 return x; | |
329 } | |
330 } | |
331 } | |
332 | |
333 long now = System.currentTimeMillis(); | |
334 if (startTime == 0) | |
335 startTime = now; | |
336 else | |
337 waitTime = msecs - (now - startTime); | |
338 | |
339 if (slot != null) { | |
340 synchronized(slot) { | |
341 try { | |
342 for (;;) { | |
343 Object x = slot.value; | |
344 if (x != null) { | |
345 slot.value = null; | |
346 slot.next = null; | |
347 return x; | |
348 } | |
349 if (waitTime <= 0) { | |
350 slot.value = CANCELLED; | |
351 return null; | |
352 } | |
353 slot.wait(waitTime); | |
354 waitTime = msecs - (System.currentTimeMillis() - startTime); | |
355 } | |
356 } | |
357 catch(InterruptedException ie) { | |
358 Object x = slot.value; | |
359 if (x != null) { | |
360 slot.value = null; | |
361 slot.next = null; | |
362 Thread.currentThread().interrupt(); | |
363 return x; | |
364 } | |
365 else { | |
366 slot.value = CANCELLED; | |
367 throw ie; | |
368 } | |
369 } | |
370 } | |
371 } | |
372 } | |
373 } | |
374 | |
375 } |