Mercurial > hg > blitz_condensed
comparison src/EDU/oswego/cs/dl/util/concurrent/Rendezvous.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:3dc0c5604566 |
---|---|
1 /* | |
2 File: Rendezvous.java | |
3 | |
4 Originally written by Doug Lea and released into the public domain. | |
5 This may be used for any purposes whatsoever without acknowledgment. | |
6 Thanks for the assistance and support of Sun Microsystems Labs, | |
7 and everyone contributing, testing, and using this code. | |
8 | |
9 History: | |
10 Date Who What | |
11 11Jun1998 dl Create public version | |
12 30Jul1998 dl Minor code simplifications | |
13 */ | |
14 | |
15 package EDU.oswego.cs.dl.util.concurrent; | |
16 | |
17 /** | |
18 * A rendezvous is a barrier that: | |
19 * <ul> | |
20 * <li> Unlike a CyclicBarrier, is not restricted to use | |
21 * with fixed-sized groups of threads. | |
22 * Any number of threads can attempt to enter a rendezvous, | |
23 * but only the predetermined number of parties enter | |
24 * and later become released from the rendezvous at any give time. | |
25 * <li> Enables each participating thread to exchange information | |
26 * with others at the rendezvous point. Each entering thread | |
27 * presents some object on entry to the rendezvous, and | |
28 * returns some object on release. The object returned is | |
29 * the result of a RendezvousFunction that is run once per | |
30 * rendezvous, (it is run by the last-entering thread). By | |
31 * default, the function applied is a rotation, so each | |
32 * thread returns the object given by the next (modulo parties) | |
33 * entering thread. This default function faciliates simple | |
34 * application of a common use of rendezvous, as exchangers. | |
35 * </ul> | |
36 * <p> | |
37 * Rendezvous use an all-or-none breakage model | |
38 * for failed synchronization attempts: If threads | |
39 * leave a rendezvous point prematurely because of timeout | |
40 * or interruption, others will also leave abnormally | |
41 * (via BrokenBarrierException), until | |
42 * the rendezvous is <code>restart</code>ed. This is usually | |
43 * the simplest and best strategy for sharing knowledge | |
44 * about failures among cooperating threads in the most | |
45 * common usages contexts of Rendezvous. | |
46 * <p> | |
47 * While any positive number (including 1) of parties can | |
48 * be handled, the most common case is to have two parties. | |
49 * <p> | |
50 * <b>Sample Usage</b><p> | |
51 * Here are the highlights of a class that uses a Rendezvous to | |
52 * swap buffers between threads so that the thread filling the | |
53 * buffer gets a freshly | |
54 * emptied one when it needs it, handing off the filled one to | |
55 * the thread emptying the buffer. | |
56 * <pre> | |
57 * class FillAndEmpty { | |
58 * Rendezvous exchanger = new Rendezvous(2); | |
59 * Buffer initialEmptyBuffer = ... a made-up type | |
60 * Buffer initialFullBuffer = ... | |
61 * | |
62 * class FillingLoop implements Runnable { | |
63 * public void run() { | |
64 * Buffer currentBuffer = initialEmptyBuffer; | |
65 * try { | |
66 * while (currentBuffer != null) { | |
67 * addToBuffer(currentBuffer); | |
68 * if (currentBuffer.full()) | |
69 * currentBuffer = (Buffer)(exchanger.rendezvous(currentBuffer)); | |
70 * } | |
71 * } | |
72 * catch (BrokenBarrierException ex) { | |
73 * return; | |
74 * } | |
75 * catch (InterruptedException ex) { | |
76 * Thread.currentThread().interrupt(); | |
77 * } | |
78 * } | |
79 * } | |
80 * | |
81 * class EmptyingLoop implements Runnable { | |
82 * public void run() { | |
83 * Buffer currentBuffer = initialFullBuffer; | |
84 * try { | |
85 * while (currentBuffer != null) { | |
86 * takeFromBuffer(currentBuffer); | |
87 * if (currentBuffer.empty()) | |
88 * currentBuffer = (Buffer)(exchanger.rendezvous(currentBuffer)); | |
89 * } | |
90 * } | |
91 * catch (BrokenBarrierException ex) { | |
92 * return; | |
93 * } | |
94 * catch (InterruptedException ex) { | |
95 * Thread.currentThread().interrupt(); | |
96 * } | |
97 * } | |
98 * } | |
99 * | |
100 * void start() { | |
101 * new Thread(new FillingLoop()).start(); | |
102 * new Thread(new EmptyingLoop()).start(); | |
103 * } | |
104 * } | |
105 * </pre> | |
106 * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] | |
107 | |
108 **/ | |
109 | |
110 public class Rendezvous implements Barrier { | |
111 | |
112 /** | |
113 * Interface for functions run at rendezvous points | |
114 **/ | |
115 public interface RendezvousFunction { | |
116 /** | |
117 * Perform some function on the objects presented at | |
118 * a rendezvous. The objects array holds all presented | |
119 * items; one per thread. Its length is the number of parties. | |
120 * The array is ordered by arrival into the rendezvous. | |
121 * So, the last element (at objects[objects.length-1]) | |
122 * is guaranteed to have been presented by the thread performing | |
123 * this function. No identifying information is | |
124 * otherwise kept about which thread presented which item. | |
125 * If you need to | |
126 * trace origins, you will need to use an item type for rendezvous | |
127 * that includes identifying information. After return of this | |
128 * function, other threads are released, and each returns with | |
129 * the item with the same index as the one it presented. | |
130 **/ | |
131 public void rendezvousFunction(Object[] objects); | |
132 } | |
133 | |
134 /** | |
135 * The default rendezvous function. Rotates the array | |
136 * so that each thread returns an item presented by some | |
137 * other thread (or itself, if parties is 1). | |
138 **/ | |
139 public static class Rotator implements RendezvousFunction { | |
140 /** Rotate the array **/ | |
141 public void rendezvousFunction(Object[] objects) { | |
142 int lastIdx = objects.length - 1; | |
143 Object first = objects[0]; | |
144 for (int i = 0; i < lastIdx; ++i) objects[i] = objects[i+1]; | |
145 objects[lastIdx] = first; | |
146 } | |
147 } | |
148 | |
149 | |
150 protected final int parties_; | |
151 | |
152 | |
153 protected boolean broken_ = false; | |
154 | |
155 /** | |
156 * Number of threads that have entered rendezvous | |
157 **/ | |
158 protected int entries_ = 0; | |
159 | |
160 /** | |
161 * Number of threads that are permitted to depart rendezvous | |
162 **/ | |
163 protected long departures_ = 0; | |
164 | |
165 /** | |
166 * Incoming threads pile up on entry until last set done. | |
167 **/ | |
168 protected final Semaphore entryGate_; | |
169 | |
170 /** | |
171 * Temporary holder for items in exchange | |
172 **/ | |
173 protected final Object[] slots_; | |
174 | |
175 /** | |
176 * The function to run at rendezvous point | |
177 **/ | |
178 | |
179 protected RendezvousFunction rendezvousFunction_; | |
180 | |
181 /** | |
182 * Create a Barrier for the indicated number of parties, | |
183 * and the default Rotator function to run at each barrier point. | |
184 * @exception IllegalArgumentException if parties less than or equal to zero. | |
185 **/ | |
186 | |
187 public Rendezvous(int parties) { | |
188 this(parties, new Rotator()); | |
189 } | |
190 | |
191 /** | |
192 * Create a Barrier for the indicated number of parties. | |
193 * and the given function to run at each barrier point. | |
194 * @exception IllegalArgumentException if parties less than or equal to zero. | |
195 **/ | |
196 | |
197 public Rendezvous(int parties, RendezvousFunction function) { | |
198 if (parties <= 0) throw new IllegalArgumentException(); | |
199 parties_ = parties; | |
200 rendezvousFunction_ = function; | |
201 entryGate_ = new WaiterPreferenceSemaphore(parties); | |
202 slots_ = new Object[parties]; | |
203 } | |
204 | |
205 /** | |
206 * Set the function to call at the point at which all threads reach the | |
207 * rendezvous. This function is run exactly once, by the thread | |
208 * that trips the barrier. The function is not run if the barrier is | |
209 * broken. | |
210 * @param function the function to run. If null, no function is run. | |
211 * @return the previous function | |
212 **/ | |
213 | |
214 | |
215 public synchronized RendezvousFunction setRendezvousFunction(RendezvousFunction function) { | |
216 RendezvousFunction old = rendezvousFunction_; | |
217 rendezvousFunction_ = function; | |
218 return old; | |
219 } | |
220 | |
221 public int parties() { return parties_; } | |
222 | |
223 public synchronized boolean broken() { return broken_; } | |
224 | |
225 /** | |
226 * Reset to initial state. Clears both the broken status | |
227 * and any record of waiting threads, and releases all | |
228 * currently waiting threads with indeterminate return status. | |
229 * This method is intended only for use in recovery actions | |
230 * in which it is somehow known | |
231 * that no thread could possibly be relying on the | |
232 * the synchronization properties of this barrier. | |
233 **/ | |
234 | |
235 public void restart() { | |
236 // This is not very good, but probably the best that can be done | |
237 for (;;) { | |
238 synchronized(this) { | |
239 if (entries_ != 0) { | |
240 notifyAll(); | |
241 } | |
242 else { | |
243 broken_ = false; | |
244 return; | |
245 } | |
246 } | |
247 Thread.yield(); | |
248 } | |
249 } | |
250 | |
251 | |
252 /** | |
253 * Enter a rendezvous; returning after all other parties arrive. | |
254 * @param x the item to present at rendezvous point. | |
255 * By default, this item is exchanged with another. | |
256 * @return an item x given by some thread, and/or processed | |
257 * by the rendezvousFunction. | |
258 * @exception BrokenBarrierException | |
259 * if any other thread | |
260 * in any previous or current barrier | |
261 * since either creation or the last <code>restart</code> | |
262 * operation left the barrier | |
263 * prematurely due to interruption or time-out. (If so, | |
264 * the <code>broken</code> status is also set.) | |
265 * Also returns as | |
266 * broken if the RendezvousFunction encountered a run-time exception. | |
267 * Threads that are noticed to have been | |
268 * interrupted <em>after</em> being released are not considered | |
269 * to have broken the barrier. | |
270 * In all cases, the interruption | |
271 * status of the current thread is preserved, so can be tested | |
272 * by checking <code>Thread.interrupted</code>. | |
273 * @exception InterruptedException if this thread was interrupted | |
274 * during the exchange. If so, <code>broken</code> status is also set. | |
275 **/ | |
276 | |
277 | |
278 public Object rendezvous(Object x) throws InterruptedException, BrokenBarrierException { | |
279 return doRendezvous(x, false, 0); | |
280 } | |
281 | |
282 /** | |
283 * Wait msecs to complete a rendezvous. | |
284 * @param x the item to present at rendezvous point. | |
285 * By default, this item is exchanged with another. | |
286 * @param msecs The maximum time to wait. | |
287 * @return an item x given by some thread, and/or processed | |
288 * by the rendezvousFunction. | |
289 * @exception BrokenBarrierException | |
290 * if any other thread | |
291 * in any previous or current barrier | |
292 * since either creation or the last <code>restart</code> | |
293 * operation left the barrier | |
294 * prematurely due to interruption or time-out. (If so, | |
295 * the <code>broken</code> status is also set.) | |
296 * Also returns as | |
297 * broken if the RendezvousFunction encountered a run-time exception. | |
298 * Threads that are noticed to have been | |
299 * interrupted <em>after</em> being released are not considered | |
300 * to have broken the barrier. | |
301 * In all cases, the interruption | |
302 * status of the current thread is preserved, so can be tested | |
303 * by checking <code>Thread.interrupted</code>. | |
304 * @exception InterruptedException if this thread was interrupted | |
305 * during the exchange. If so, <code>broken</code> status is also set. | |
306 * @exception TimeoutException if this thread timed out waiting for | |
307 * the exchange. If the timeout occured while already in the | |
308 * exchange, <code>broken</code> status is also set. | |
309 **/ | |
310 | |
311 | |
312 public Object attemptRendezvous(Object x, long msecs) | |
313 throws InterruptedException, TimeoutException, BrokenBarrierException { | |
314 return doRendezvous(x, true, msecs); | |
315 } | |
316 | |
317 protected Object doRendezvous(Object x, boolean timed, long msecs) | |
318 throws InterruptedException, TimeoutException, BrokenBarrierException { | |
319 | |
320 // rely on semaphore to throw interrupt on entry | |
321 | |
322 long startTime; | |
323 | |
324 if (timed) { | |
325 startTime = System.currentTimeMillis(); | |
326 if (!entryGate_.attempt(msecs)) { | |
327 throw new TimeoutException(msecs); | |
328 } | |
329 } | |
330 else { | |
331 startTime = 0; | |
332 entryGate_.acquire(); | |
333 } | |
334 | |
335 synchronized(this) { | |
336 | |
337 Object y = null; | |
338 | |
339 int index = entries_++; | |
340 slots_[index] = x; | |
341 | |
342 try { | |
343 // last one in runs function and releases | |
344 if (entries_ == parties_) { | |
345 | |
346 departures_ = entries_; | |
347 notifyAll(); | |
348 | |
349 try { | |
350 if (!broken_ && rendezvousFunction_ != null) | |
351 rendezvousFunction_.rendezvousFunction(slots_); | |
352 } | |
353 catch (RuntimeException ex) { | |
354 broken_ = true; | |
355 } | |
356 | |
357 } | |
358 | |
359 else { | |
360 | |
361 while (!broken_ && departures_ < 1) { | |
362 long timeLeft = 0; | |
363 if (timed) { | |
364 timeLeft = msecs - (System.currentTimeMillis() - startTime); | |
365 if (timeLeft <= 0) { | |
366 broken_ = true; | |
367 departures_ = entries_; | |
368 notifyAll(); | |
369 throw new TimeoutException(msecs); | |
370 } | |
371 } | |
372 | |
373 try { | |
374 wait(timeLeft); | |
375 } | |
376 catch (InterruptedException ex) { | |
377 if (broken_ || departures_ > 0) { // interrupted after release | |
378 Thread.currentThread().interrupt(); | |
379 break; | |
380 } | |
381 else { | |
382 broken_ = true; | |
383 departures_ = entries_; | |
384 notifyAll(); | |
385 throw ex; | |
386 } | |
387 } | |
388 } | |
389 } | |
390 | |
391 } | |
392 | |
393 finally { | |
394 | |
395 y = slots_[index]; | |
396 | |
397 // Last one out cleans up and allows next set of threads in | |
398 if (--departures_ <= 0) { | |
399 for (int i = 0; i < slots_.length; ++i) slots_[i] = null; | |
400 entryGate_.release(entries_); | |
401 entries_ = 0; | |
402 } | |
403 } | |
404 | |
405 // continue if no IE/TO throw | |
406 if (broken_) | |
407 throw new BrokenBarrierException(index); | |
408 else | |
409 return y; | |
410 } | |
411 } | |
412 | |
413 } | |
414 | |
415 |