comparison src/EDU/oswego/cs/dl/util/concurrent/Rendezvous.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:3dc0c5604566
1 /*
2 File: Rendezvous.java
3
4 Originally written by Doug Lea and released into the public domain.
5 This may be used for any purposes whatsoever without acknowledgment.
6 Thanks for the assistance and support of Sun Microsystems Labs,
7 and everyone contributing, testing, and using this code.
8
9 History:
10 Date Who What
11 11Jun1998 dl Create public version
12 30Jul1998 dl Minor code simplifications
13 */
14
15 package EDU.oswego.cs.dl.util.concurrent;
16
17 /**
18 * A rendezvous is a barrier that:
19 * <ul>
20 * <li> Unlike a CyclicBarrier, is not restricted to use
21 * with fixed-sized groups of threads.
22 * Any number of threads can attempt to enter a rendezvous,
23 * but only the predetermined number of parties enter
24 * and later become released from the rendezvous at any give time.
25 * <li> Enables each participating thread to exchange information
26 * with others at the rendezvous point. Each entering thread
27 * presents some object on entry to the rendezvous, and
28 * returns some object on release. The object returned is
29 * the result of a RendezvousFunction that is run once per
30 * rendezvous, (it is run by the last-entering thread). By
31 * default, the function applied is a rotation, so each
32 * thread returns the object given by the next (modulo parties)
33 * entering thread. This default function faciliates simple
34 * application of a common use of rendezvous, as exchangers.
35 * </ul>
36 * <p>
37 * Rendezvous use an all-or-none breakage model
38 * for failed synchronization attempts: If threads
39 * leave a rendezvous point prematurely because of timeout
40 * or interruption, others will also leave abnormally
41 * (via BrokenBarrierException), until
42 * the rendezvous is <code>restart</code>ed. This is usually
43 * the simplest and best strategy for sharing knowledge
44 * about failures among cooperating threads in the most
45 * common usages contexts of Rendezvous.
46 * <p>
47 * While any positive number (including 1) of parties can
48 * be handled, the most common case is to have two parties.
49 * <p>
50 * <b>Sample Usage</b><p>
51 * Here are the highlights of a class that uses a Rendezvous to
52 * swap buffers between threads so that the thread filling the
53 * buffer gets a freshly
54 * emptied one when it needs it, handing off the filled one to
55 * the thread emptying the buffer.
56 * <pre>
57 * class FillAndEmpty {
58 * Rendezvous exchanger = new Rendezvous(2);
59 * Buffer initialEmptyBuffer = ... a made-up type
60 * Buffer initialFullBuffer = ...
61 *
62 * class FillingLoop implements Runnable {
63 * public void run() {
64 * Buffer currentBuffer = initialEmptyBuffer;
65 * try {
66 * while (currentBuffer != null) {
67 * addToBuffer(currentBuffer);
68 * if (currentBuffer.full())
69 * currentBuffer = (Buffer)(exchanger.rendezvous(currentBuffer));
70 * }
71 * }
72 * catch (BrokenBarrierException ex) {
73 * return;
74 * }
75 * catch (InterruptedException ex) {
76 * Thread.currentThread().interrupt();
77 * }
78 * }
79 * }
80 *
81 * class EmptyingLoop implements Runnable {
82 * public void run() {
83 * Buffer currentBuffer = initialFullBuffer;
84 * try {
85 * while (currentBuffer != null) {
86 * takeFromBuffer(currentBuffer);
87 * if (currentBuffer.empty())
88 * currentBuffer = (Buffer)(exchanger.rendezvous(currentBuffer));
89 * }
90 * }
91 * catch (BrokenBarrierException ex) {
92 * return;
93 * }
94 * catch (InterruptedException ex) {
95 * Thread.currentThread().interrupt();
96 * }
97 * }
98 * }
99 *
100 * void start() {
101 * new Thread(new FillingLoop()).start();
102 * new Thread(new EmptyingLoop()).start();
103 * }
104 * }
105 * </pre>
106 * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
107
108 **/
109
110 public class Rendezvous implements Barrier {
111
112 /**
113 * Interface for functions run at rendezvous points
114 **/
115 public interface RendezvousFunction {
116 /**
117 * Perform some function on the objects presented at
118 * a rendezvous. The objects array holds all presented
119 * items; one per thread. Its length is the number of parties.
120 * The array is ordered by arrival into the rendezvous.
121 * So, the last element (at objects[objects.length-1])
122 * is guaranteed to have been presented by the thread performing
123 * this function. No identifying information is
124 * otherwise kept about which thread presented which item.
125 * If you need to
126 * trace origins, you will need to use an item type for rendezvous
127 * that includes identifying information. After return of this
128 * function, other threads are released, and each returns with
129 * the item with the same index as the one it presented.
130 **/
131 public void rendezvousFunction(Object[] objects);
132 }
133
134 /**
135 * The default rendezvous function. Rotates the array
136 * so that each thread returns an item presented by some
137 * other thread (or itself, if parties is 1).
138 **/
139 public static class Rotator implements RendezvousFunction {
140 /** Rotate the array **/
141 public void rendezvousFunction(Object[] objects) {
142 int lastIdx = objects.length - 1;
143 Object first = objects[0];
144 for (int i = 0; i < lastIdx; ++i) objects[i] = objects[i+1];
145 objects[lastIdx] = first;
146 }
147 }
148
149
150 protected final int parties_;
151
152
153 protected boolean broken_ = false;
154
155 /**
156 * Number of threads that have entered rendezvous
157 **/
158 protected int entries_ = 0;
159
160 /**
161 * Number of threads that are permitted to depart rendezvous
162 **/
163 protected long departures_ = 0;
164
165 /**
166 * Incoming threads pile up on entry until last set done.
167 **/
168 protected final Semaphore entryGate_;
169
170 /**
171 * Temporary holder for items in exchange
172 **/
173 protected final Object[] slots_;
174
175 /**
176 * The function to run at rendezvous point
177 **/
178
179 protected RendezvousFunction rendezvousFunction_;
180
181 /**
182 * Create a Barrier for the indicated number of parties,
183 * and the default Rotator function to run at each barrier point.
184 * @exception IllegalArgumentException if parties less than or equal to zero.
185 **/
186
187 public Rendezvous(int parties) {
188 this(parties, new Rotator());
189 }
190
191 /**
192 * Create a Barrier for the indicated number of parties.
193 * and the given function to run at each barrier point.
194 * @exception IllegalArgumentException if parties less than or equal to zero.
195 **/
196
197 public Rendezvous(int parties, RendezvousFunction function) {
198 if (parties <= 0) throw new IllegalArgumentException();
199 parties_ = parties;
200 rendezvousFunction_ = function;
201 entryGate_ = new WaiterPreferenceSemaphore(parties);
202 slots_ = new Object[parties];
203 }
204
205 /**
206 * Set the function to call at the point at which all threads reach the
207 * rendezvous. This function is run exactly once, by the thread
208 * that trips the barrier. The function is not run if the barrier is
209 * broken.
210 * @param function the function to run. If null, no function is run.
211 * @return the previous function
212 **/
213
214
215 public synchronized RendezvousFunction setRendezvousFunction(RendezvousFunction function) {
216 RendezvousFunction old = rendezvousFunction_;
217 rendezvousFunction_ = function;
218 return old;
219 }
220
221 public int parties() { return parties_; }
222
223 public synchronized boolean broken() { return broken_; }
224
225 /**
226 * Reset to initial state. Clears both the broken status
227 * and any record of waiting threads, and releases all
228 * currently waiting threads with indeterminate return status.
229 * This method is intended only for use in recovery actions
230 * in which it is somehow known
231 * that no thread could possibly be relying on the
232 * the synchronization properties of this barrier.
233 **/
234
235 public void restart() {
236 // This is not very good, but probably the best that can be done
237 for (;;) {
238 synchronized(this) {
239 if (entries_ != 0) {
240 notifyAll();
241 }
242 else {
243 broken_ = false;
244 return;
245 }
246 }
247 Thread.yield();
248 }
249 }
250
251
252 /**
253 * Enter a rendezvous; returning after all other parties arrive.
254 * @param x the item to present at rendezvous point.
255 * By default, this item is exchanged with another.
256 * @return an item x given by some thread, and/or processed
257 * by the rendezvousFunction.
258 * @exception BrokenBarrierException
259 * if any other thread
260 * in any previous or current barrier
261 * since either creation or the last <code>restart</code>
262 * operation left the barrier
263 * prematurely due to interruption or time-out. (If so,
264 * the <code>broken</code> status is also set.)
265 * Also returns as
266 * broken if the RendezvousFunction encountered a run-time exception.
267 * Threads that are noticed to have been
268 * interrupted <em>after</em> being released are not considered
269 * to have broken the barrier.
270 * In all cases, the interruption
271 * status of the current thread is preserved, so can be tested
272 * by checking <code>Thread.interrupted</code>.
273 * @exception InterruptedException if this thread was interrupted
274 * during the exchange. If so, <code>broken</code> status is also set.
275 **/
276
277
278 public Object rendezvous(Object x) throws InterruptedException, BrokenBarrierException {
279 return doRendezvous(x, false, 0);
280 }
281
282 /**
283 * Wait msecs to complete a rendezvous.
284 * @param x the item to present at rendezvous point.
285 * By default, this item is exchanged with another.
286 * @param msecs The maximum time to wait.
287 * @return an item x given by some thread, and/or processed
288 * by the rendezvousFunction.
289 * @exception BrokenBarrierException
290 * if any other thread
291 * in any previous or current barrier
292 * since either creation or the last <code>restart</code>
293 * operation left the barrier
294 * prematurely due to interruption or time-out. (If so,
295 * the <code>broken</code> status is also set.)
296 * Also returns as
297 * broken if the RendezvousFunction encountered a run-time exception.
298 * Threads that are noticed to have been
299 * interrupted <em>after</em> being released are not considered
300 * to have broken the barrier.
301 * In all cases, the interruption
302 * status of the current thread is preserved, so can be tested
303 * by checking <code>Thread.interrupted</code>.
304 * @exception InterruptedException if this thread was interrupted
305 * during the exchange. If so, <code>broken</code> status is also set.
306 * @exception TimeoutException if this thread timed out waiting for
307 * the exchange. If the timeout occured while already in the
308 * exchange, <code>broken</code> status is also set.
309 **/
310
311
312 public Object attemptRendezvous(Object x, long msecs)
313 throws InterruptedException, TimeoutException, BrokenBarrierException {
314 return doRendezvous(x, true, msecs);
315 }
316
317 protected Object doRendezvous(Object x, boolean timed, long msecs)
318 throws InterruptedException, TimeoutException, BrokenBarrierException {
319
320 // rely on semaphore to throw interrupt on entry
321
322 long startTime;
323
324 if (timed) {
325 startTime = System.currentTimeMillis();
326 if (!entryGate_.attempt(msecs)) {
327 throw new TimeoutException(msecs);
328 }
329 }
330 else {
331 startTime = 0;
332 entryGate_.acquire();
333 }
334
335 synchronized(this) {
336
337 Object y = null;
338
339 int index = entries_++;
340 slots_[index] = x;
341
342 try {
343 // last one in runs function and releases
344 if (entries_ == parties_) {
345
346 departures_ = entries_;
347 notifyAll();
348
349 try {
350 if (!broken_ && rendezvousFunction_ != null)
351 rendezvousFunction_.rendezvousFunction(slots_);
352 }
353 catch (RuntimeException ex) {
354 broken_ = true;
355 }
356
357 }
358
359 else {
360
361 while (!broken_ && departures_ < 1) {
362 long timeLeft = 0;
363 if (timed) {
364 timeLeft = msecs - (System.currentTimeMillis() - startTime);
365 if (timeLeft <= 0) {
366 broken_ = true;
367 departures_ = entries_;
368 notifyAll();
369 throw new TimeoutException(msecs);
370 }
371 }
372
373 try {
374 wait(timeLeft);
375 }
376 catch (InterruptedException ex) {
377 if (broken_ || departures_ > 0) { // interrupted after release
378 Thread.currentThread().interrupt();
379 break;
380 }
381 else {
382 broken_ = true;
383 departures_ = entries_;
384 notifyAll();
385 throw ex;
386 }
387 }
388 }
389 }
390
391 }
392
393 finally {
394
395 y = slots_[index];
396
397 // Last one out cleans up and allows next set of threads in
398 if (--departures_ <= 0) {
399 for (int i = 0; i < slots_.length; ++i) slots_[i] = null;
400 entryGate_.release(entries_);
401 entries_ = 0;
402 }
403 }
404
405 // continue if no IE/TO throw
406 if (broken_)
407 throw new BrokenBarrierException(index);
408 else
409 return y;
410 }
411 }
412
413 }
414
415