comparison src/EDU/oswego/cs/dl/util/concurrent/CyclicBarrier.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:3dc0c5604566
1 /*
2 File: CyclicBarrier.java
3
4 Originally written by Doug Lea and released into the public domain.
5 This may be used for any purposes whatsoever without acknowledgment.
6 Thanks for the assistance and support of Sun Microsystems Labs,
7 and everyone contributing, testing, and using this code.
8
9 History:
10 Date Who What
11 11Jul1998 dl Create public version
12 28Aug1998 dl minor code simplification
13 */
14
15 package EDU.oswego.cs.dl.util.concurrent;
16
17 /**
18 * A cyclic barrier is a reasonable choice for a barrier in contexts
19 * involving a fixed sized group of threads that
20 * must occasionally wait for each other.
21 * (A Rendezvous better handles applications in which
22 * any number of threads meet, n-at-a-time.)
23 * <p>
24 * CyclicBarriers use an all-or-none breakage model
25 * for failed synchronization attempts: If threads
26 * leave a barrier point prematurely because of timeout
27 * or interruption, others will also leave abnormally
28 * (via BrokenBarrierException), until
29 * the barrier is <code>restart</code>ed. This is usually
30 * the simplest and best strategy for sharing knowledge
31 * about failures among cooperating threads in the most
32 * common usages contexts of Barriers.
33 * This implementation has the property that interruptions
34 * among newly arriving threads can cause as-yet-unresumed
35 * threads from a previous barrier cycle to return out
36 * as broken. This transmits breakage
37 * as early as possible, but with the possible byproduct that
38 * only some threads returning out of a barrier will realize
39 * that it is newly broken. (Others will not realize this until a
40 * future cycle.) (The Rendezvous class has a more uniform, but
41 * sometimes less desirable policy.)
42 * <p>
43 * Barriers support an optional Runnable command
44 * that is run once per barrier point.
45 * <p>
46 * <b>Sample usage</b> Here is a code sketch of
47 * a barrier in a parallel decomposition design.
48 * <pre>
49 * class Solver {
50 * final int N;
51 * final float[][] data;
52 * final CyclicBarrier barrier;
53 *
54 * class Worker implements Runnable {
55 * int myRow;
56 * Worker(int row) { myRow = row; }
57 * public void run() {
58 * while (!done()) {
59 * processRow(myRow);
60 *
61 * try {
62 * barrier.barrier();
63 * }
64 * catch (InterruptedException ex) { return; }
65 * catch (BrokenBarrierException ex) { return; }
66 * }
67 * }
68 * }
69 *
70 * public Solver(float[][] matrix) {
71 * data = matrix;
72 * N = matrix.length;
73 * barrier = new CyclicBarrier(N);
74 * barrier.setBarrierCommand(new Runnable() {
75 * public void run() { mergeRows(...); }
76 * });
77 * for (int i = 0; i < N; ++i) {
78 * new Thread(new Worker(i)).start();
79 * waitUntilDone();
80 * }
81 * }
82 * </pre>
83 * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
84
85 **/
86 public class CyclicBarrier implements Barrier {
87
88 protected final int parties_;
89 protected boolean broken_ = false;
90 protected Runnable barrierCommand_ = null;
91 protected int count_; // number of parties still waiting
92 protected int resets_ = 0; // incremented on each release
93
94 /**
95 * Create a CyclicBarrier for the indicated number of parties,
96 * and no command to run at each barrier.
97 * @exception IllegalArgumentException if parties less than or equal to zero.
98 **/
99
100 public CyclicBarrier(int parties) { this(parties, null); }
101
102 /**
103 * Create a CyclicBarrier for the indicated number of parties.
104 * and the given command to run at each barrier point.
105 * @exception IllegalArgumentException if parties less than or equal to zero.
106 **/
107
108 public CyclicBarrier(int parties, Runnable command) {
109 if (parties <= 0) throw new IllegalArgumentException();
110 parties_ = parties;
111 count_ = parties;
112 barrierCommand_ = command;
113 }
114
115 /**
116 * Set the command to run at the point at which all threads reach the
117 * barrier. This command is run exactly once, by the thread
118 * that trips the barrier. The command is not run if the barrier is
119 * broken.
120 * @param command the command to run. If null, no command is run.
121 * @return the previous command
122 **/
123
124 public synchronized Runnable setBarrierCommand(Runnable command) {
125 Runnable old = barrierCommand_;
126 barrierCommand_ = command;
127 return old;
128 }
129
130 public synchronized boolean broken() { return broken_; }
131
132 /**
133 * Reset to initial state. Clears both the broken status
134 * and any record of waiting threads, and releases all
135 * currently waiting threads with indeterminate return status.
136 * This method is intended only for use in recovery actions
137 * in which it is somehow known
138 * that no thread could possibly be relying on the
139 * the synchronization properties of this barrier.
140 **/
141
142 public synchronized void restart() {
143 broken_ = false;
144 ++resets_;
145 count_ = parties_;
146 notifyAll();
147 }
148
149
150 public int parties() { return parties_; }
151
152 /**
153 * Enter barrier and wait for the other parties()-1 threads.
154 * @return the arrival index: the number of other parties
155 * that were still waiting
156 * upon entry. This is a unique value from zero to parties()-1.
157 * If it is zero, then the current
158 * thread was the last party to hit barrier point
159 * and so was responsible for releasing the others.
160 * @exception BrokenBarrierException if any other thread
161 * in any previous or current barrier
162 * since either creation or the last <code>restart</code>
163 * operation left the barrier
164 * prematurely due to interruption or time-out. (If so,
165 * the <code>broken</code> status is also set.)
166 * Threads that are noticied to have been
167 * interrupted <em>after</em> being released are not considered
168 * to have broken the barrier.
169 * In all cases, the interruption
170 * status of the current thread is preserved, so can be tested
171 * by checking <code>Thread.interrupted</code>.
172 * @exception InterruptedException if this thread was interrupted
173 * during the barrier, and was the one causing breakage.
174 * If so, <code>broken</code> status is also set.
175 **/
176
177 public int barrier() throws InterruptedException, BrokenBarrierException {
178 return doBarrier(false, 0);
179 }
180
181 /**
182 * Enter barrier and wait at most msecs for the other parties()-1 threads.
183 * @return if not timed out, the arrival index: the number of other parties
184 * that were still waiting
185 * upon entry. This is a unique value from zero to parties()-1.
186 * If it is zero, then the current
187 * thread was the last party to hit barrier point
188 * and so was responsible for releasing the others.
189 * @exception BrokenBarrierException
190 * if any other thread
191 * in any previous or current barrier
192 * since either creation or the last <code>restart</code>
193 * operation left the barrier
194 * prematurely due to interruption or time-out. (If so,
195 * the <code>broken</code> status is also set.)
196 * Threads that are noticed to have been
197 * interrupted <em>after</em> being released are not considered
198 * to have broken the barrier.
199 * In all cases, the interruption
200 * status of the current thread is preserved, so can be tested
201 * by checking <code>Thread.interrupted</code>.
202 * @exception InterruptedException if this thread was interrupted
203 * during the barrier. If so, <code>broken</code> status is also set.
204 * @exception TimeoutException if this thread timed out waiting for
205 * the barrier. If the timeout occured while already in the
206 * barrier, <code>broken</code> status is also set.
207 **/
208
209 public int attemptBarrier(long msecs)
210 throws InterruptedException, TimeoutException, BrokenBarrierException {
211 return doBarrier(true, msecs);
212 }
213
214 protected synchronized int doBarrier(boolean timed, long msecs)
215 throws InterruptedException, TimeoutException, BrokenBarrierException {
216
217 int index = --count_;
218
219 if (broken_) {
220 throw new BrokenBarrierException(index);
221 }
222 else if (Thread.interrupted()) {
223 broken_ = true;
224 notifyAll();
225 throw new InterruptedException();
226 }
227 else if (index == 0) { // tripped
228 count_ = parties_;
229 ++resets_;
230 notifyAll();
231 try {
232 if (barrierCommand_ != null)
233 barrierCommand_.run();
234 return 0;
235 }
236 catch (RuntimeException ex) {
237 broken_ = true;
238 return 0;
239 }
240 }
241 else if (timed && msecs <= 0) {
242 broken_ = true;
243 notifyAll();
244 throw new TimeoutException(msecs);
245 }
246 else { // wait until next reset
247 int r = resets_;
248 long startTime = (timed)? System.currentTimeMillis() : 0;
249 long waitTime = msecs;
250 for (;;) {
251 try {
252 wait(waitTime);
253 }
254 catch (InterruptedException ex) {
255 // Only claim that broken if interrupted before reset
256 if (resets_ == r) {
257 broken_ = true;
258 notifyAll();
259 throw ex;
260 }
261 else {
262 Thread.currentThread().interrupt(); // propagate
263 }
264 }
265
266 if (broken_)
267 throw new BrokenBarrierException(index);
268
269 else if (r != resets_)
270 return index;
271
272 else if (timed) {
273 waitTime = msecs - (System.currentTimeMillis() - startTime);
274 if (waitTime <= 0) {
275 broken_ = true;
276 notifyAll();
277 throw new TimeoutException(msecs);
278 }
279 }
280 }
281 }
282 }
283
284 }