Mercurial > hg > blitz_condensed
comparison src/EDU/oswego/cs/dl/util/concurrent/CyclicBarrier.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:3dc0c5604566 |
---|---|
1 /* | |
2 File: CyclicBarrier.java | |
3 | |
4 Originally written by Doug Lea and released into the public domain. | |
5 This may be used for any purposes whatsoever without acknowledgment. | |
6 Thanks for the assistance and support of Sun Microsystems Labs, | |
7 and everyone contributing, testing, and using this code. | |
8 | |
9 History: | |
10 Date Who What | |
11 11Jul1998 dl Create public version | |
12 28Aug1998 dl minor code simplification | |
13 */ | |
14 | |
15 package EDU.oswego.cs.dl.util.concurrent; | |
16 | |
17 /** | |
18 * A cyclic barrier is a reasonable choice for a barrier in contexts | |
19 * involving a fixed sized group of threads that | |
20 * must occasionally wait for each other. | |
21 * (A Rendezvous better handles applications in which | |
22 * any number of threads meet, n-at-a-time.) | |
23 * <p> | |
24 * CyclicBarriers use an all-or-none breakage model | |
25 * for failed synchronization attempts: If threads | |
26 * leave a barrier point prematurely because of timeout | |
27 * or interruption, others will also leave abnormally | |
28 * (via BrokenBarrierException), until | |
29 * the barrier is <code>restart</code>ed. This is usually | |
30 * the simplest and best strategy for sharing knowledge | |
31 * about failures among cooperating threads in the most | |
32 * common usages contexts of Barriers. | |
33 * This implementation has the property that interruptions | |
34 * among newly arriving threads can cause as-yet-unresumed | |
35 * threads from a previous barrier cycle to return out | |
36 * as broken. This transmits breakage | |
37 * as early as possible, but with the possible byproduct that | |
38 * only some threads returning out of a barrier will realize | |
39 * that it is newly broken. (Others will not realize this until a | |
40 * future cycle.) (The Rendezvous class has a more uniform, but | |
41 * sometimes less desirable policy.) | |
42 * <p> | |
43 * Barriers support an optional Runnable command | |
44 * that is run once per barrier point. | |
45 * <p> | |
46 * <b>Sample usage</b> Here is a code sketch of | |
47 * a barrier in a parallel decomposition design. | |
48 * <pre> | |
49 * class Solver { | |
50 * final int N; | |
51 * final float[][] data; | |
52 * final CyclicBarrier barrier; | |
53 * | |
54 * class Worker implements Runnable { | |
55 * int myRow; | |
56 * Worker(int row) { myRow = row; } | |
57 * public void run() { | |
58 * while (!done()) { | |
59 * processRow(myRow); | |
60 * | |
61 * try { | |
62 * barrier.barrier(); | |
63 * } | |
64 * catch (InterruptedException ex) { return; } | |
65 * catch (BrokenBarrierException ex) { return; } | |
66 * } | |
67 * } | |
68 * } | |
69 * | |
70 * public Solver(float[][] matrix) { | |
71 * data = matrix; | |
72 * N = matrix.length; | |
73 * barrier = new CyclicBarrier(N); | |
74 * barrier.setBarrierCommand(new Runnable() { | |
75 * public void run() { mergeRows(...); } | |
76 * }); | |
77 * for (int i = 0; i < N; ++i) { | |
78 * new Thread(new Worker(i)).start(); | |
79 * waitUntilDone(); | |
80 * } | |
81 * } | |
82 * </pre> | |
83 * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] | |
84 | |
85 **/ | |
86 public class CyclicBarrier implements Barrier { | |
87 | |
88 protected final int parties_; | |
89 protected boolean broken_ = false; | |
90 protected Runnable barrierCommand_ = null; | |
91 protected int count_; // number of parties still waiting | |
92 protected int resets_ = 0; // incremented on each release | |
93 | |
94 /** | |
95 * Create a CyclicBarrier for the indicated number of parties, | |
96 * and no command to run at each barrier. | |
97 * @exception IllegalArgumentException if parties less than or equal to zero. | |
98 **/ | |
99 | |
100 public CyclicBarrier(int parties) { this(parties, null); } | |
101 | |
102 /** | |
103 * Create a CyclicBarrier for the indicated number of parties. | |
104 * and the given command to run at each barrier point. | |
105 * @exception IllegalArgumentException if parties less than or equal to zero. | |
106 **/ | |
107 | |
108 public CyclicBarrier(int parties, Runnable command) { | |
109 if (parties <= 0) throw new IllegalArgumentException(); | |
110 parties_ = parties; | |
111 count_ = parties; | |
112 barrierCommand_ = command; | |
113 } | |
114 | |
115 /** | |
116 * Set the command to run at the point at which all threads reach the | |
117 * barrier. This command is run exactly once, by the thread | |
118 * that trips the barrier. The command is not run if the barrier is | |
119 * broken. | |
120 * @param command the command to run. If null, no command is run. | |
121 * @return the previous command | |
122 **/ | |
123 | |
124 public synchronized Runnable setBarrierCommand(Runnable command) { | |
125 Runnable old = barrierCommand_; | |
126 barrierCommand_ = command; | |
127 return old; | |
128 } | |
129 | |
130 public synchronized boolean broken() { return broken_; } | |
131 | |
132 /** | |
133 * Reset to initial state. Clears both the broken status | |
134 * and any record of waiting threads, and releases all | |
135 * currently waiting threads with indeterminate return status. | |
136 * This method is intended only for use in recovery actions | |
137 * in which it is somehow known | |
138 * that no thread could possibly be relying on the | |
139 * the synchronization properties of this barrier. | |
140 **/ | |
141 | |
142 public synchronized void restart() { | |
143 broken_ = false; | |
144 ++resets_; | |
145 count_ = parties_; | |
146 notifyAll(); | |
147 } | |
148 | |
149 | |
150 public int parties() { return parties_; } | |
151 | |
152 /** | |
153 * Enter barrier and wait for the other parties()-1 threads. | |
154 * @return the arrival index: the number of other parties | |
155 * that were still waiting | |
156 * upon entry. This is a unique value from zero to parties()-1. | |
157 * If it is zero, then the current | |
158 * thread was the last party to hit barrier point | |
159 * and so was responsible for releasing the others. | |
160 * @exception BrokenBarrierException if any other thread | |
161 * in any previous or current barrier | |
162 * since either creation or the last <code>restart</code> | |
163 * operation left the barrier | |
164 * prematurely due to interruption or time-out. (If so, | |
165 * the <code>broken</code> status is also set.) | |
166 * Threads that are noticied to have been | |
167 * interrupted <em>after</em> being released are not considered | |
168 * to have broken the barrier. | |
169 * In all cases, the interruption | |
170 * status of the current thread is preserved, so can be tested | |
171 * by checking <code>Thread.interrupted</code>. | |
172 * @exception InterruptedException if this thread was interrupted | |
173 * during the barrier, and was the one causing breakage. | |
174 * If so, <code>broken</code> status is also set. | |
175 **/ | |
176 | |
177 public int barrier() throws InterruptedException, BrokenBarrierException { | |
178 return doBarrier(false, 0); | |
179 } | |
180 | |
181 /** | |
182 * Enter barrier and wait at most msecs for the other parties()-1 threads. | |
183 * @return if not timed out, the arrival index: the number of other parties | |
184 * that were still waiting | |
185 * upon entry. This is a unique value from zero to parties()-1. | |
186 * If it is zero, then the current | |
187 * thread was the last party to hit barrier point | |
188 * and so was responsible for releasing the others. | |
189 * @exception BrokenBarrierException | |
190 * if any other thread | |
191 * in any previous or current barrier | |
192 * since either creation or the last <code>restart</code> | |
193 * operation left the barrier | |
194 * prematurely due to interruption or time-out. (If so, | |
195 * the <code>broken</code> status is also set.) | |
196 * Threads that are noticed to have been | |
197 * interrupted <em>after</em> being released are not considered | |
198 * to have broken the barrier. | |
199 * In all cases, the interruption | |
200 * status of the current thread is preserved, so can be tested | |
201 * by checking <code>Thread.interrupted</code>. | |
202 * @exception InterruptedException if this thread was interrupted | |
203 * during the barrier. If so, <code>broken</code> status is also set. | |
204 * @exception TimeoutException if this thread timed out waiting for | |
205 * the barrier. If the timeout occured while already in the | |
206 * barrier, <code>broken</code> status is also set. | |
207 **/ | |
208 | |
209 public int attemptBarrier(long msecs) | |
210 throws InterruptedException, TimeoutException, BrokenBarrierException { | |
211 return doBarrier(true, msecs); | |
212 } | |
213 | |
214 protected synchronized int doBarrier(boolean timed, long msecs) | |
215 throws InterruptedException, TimeoutException, BrokenBarrierException { | |
216 | |
217 int index = --count_; | |
218 | |
219 if (broken_) { | |
220 throw new BrokenBarrierException(index); | |
221 } | |
222 else if (Thread.interrupted()) { | |
223 broken_ = true; | |
224 notifyAll(); | |
225 throw new InterruptedException(); | |
226 } | |
227 else if (index == 0) { // tripped | |
228 count_ = parties_; | |
229 ++resets_; | |
230 notifyAll(); | |
231 try { | |
232 if (barrierCommand_ != null) | |
233 barrierCommand_.run(); | |
234 return 0; | |
235 } | |
236 catch (RuntimeException ex) { | |
237 broken_ = true; | |
238 return 0; | |
239 } | |
240 } | |
241 else if (timed && msecs <= 0) { | |
242 broken_ = true; | |
243 notifyAll(); | |
244 throw new TimeoutException(msecs); | |
245 } | |
246 else { // wait until next reset | |
247 int r = resets_; | |
248 long startTime = (timed)? System.currentTimeMillis() : 0; | |
249 long waitTime = msecs; | |
250 for (;;) { | |
251 try { | |
252 wait(waitTime); | |
253 } | |
254 catch (InterruptedException ex) { | |
255 // Only claim that broken if interrupted before reset | |
256 if (resets_ == r) { | |
257 broken_ = true; | |
258 notifyAll(); | |
259 throw ex; | |
260 } | |
261 else { | |
262 Thread.currentThread().interrupt(); // propagate | |
263 } | |
264 } | |
265 | |
266 if (broken_) | |
267 throw new BrokenBarrierException(index); | |
268 | |
269 else if (r != resets_) | |
270 return index; | |
271 | |
272 else if (timed) { | |
273 waitTime = msecs - (System.currentTimeMillis() - startTime); | |
274 if (waitTime <= 0) { | |
275 broken_ = true; | |
276 notifyAll(); | |
277 throw new TimeoutException(msecs); | |
278 } | |
279 } | |
280 } | |
281 } | |
282 } | |
283 | |
284 } |