Mercurial > hg > blitz_condensed
comparison src/EDU/oswego/cs/dl/util/concurrent/FJTaskRunner.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:3dc0c5604566 |
---|---|
1 /* | |
2 File: FJTaskRunner.java | |
3 | |
4 Originally written by Doug Lea and released into the public domain. | |
5 This may be used for any purposes whatsoever without acknowledgment. | |
6 Thanks for the assistance and support of Sun Microsystems Labs, | |
7 and everyone contributing, testing, and using this code. | |
8 | |
9 History: | |
10 Date Who What | |
11 7Jan1999 dl First public release | |
12 13Jan1999 dl correct a stat counter update; | |
13 ensure inactive status on run termination; | |
14 misc minor cleaup | |
15 14Jan1999 dl Use random starting point in scan; | |
16 variable renamings. | |
17 18Jan1999 dl Runloop allowed to die on task exception; | |
18 remove useless timed join | |
19 22Jan1999 dl Rework scan to allow use of priorities. | |
20 6Feb1999 dl Documentation updates. | |
21 7Mar1999 dl Add array-based coInvoke | |
22 31Mar1999 dl Revise scan to remove need for NullTasks | |
23 27Apr1999 dl Renamed | |
24 23oct1999 dl Earlier detect of interrupt in scanWhileIdling | |
25 24nov1999 dl Now works on JVMs that do not properly | |
26 implement read-after-write of 2 volatiles. | |
27 */ | |
28 | |
29 package EDU.oswego.cs.dl.util.concurrent; | |
30 | |
31 import java.util.Random; | |
32 | |
33 /** | |
34 * Specialized Thread subclass for running FJTasks. | |
35 * <p> | |
36 * Each FJTaskRunner keeps FJTasks in a double-ended queue (DEQ). | |
37 * Double-ended queues support stack-based operations | |
38 * push and pop, as well as queue-based operations put and take. | |
39 * Normally, threads run their own tasks. But they | |
40 * may also steal tasks from each others DEQs. | |
41 * <p> | |
42 * The algorithms are minor variants of those used | |
43 * in <A href="http://supertech.lcs.mit.edu/cilk/"> Cilk</A> and | |
44 * <A href="http://www.cs.utexas.edu/users/hood/"> Hood</A>, and | |
45 * to a lesser extent | |
46 * <A href="http://www.cs.uga.edu/~dkl/filaments/dist.html"> Filaments</A>, | |
47 * but are adapted to work in Java. | |
48 * <p> | |
49 * The two most important capabilities are: | |
50 * <ul> | |
51 * <li> Fork a FJTask: | |
52 * <pre> | |
53 * Push task onto DEQ | |
54 * </pre> | |
55 * <li> Get a task to run (for example within taskYield) | |
56 * <pre> | |
57 * If DEQ is not empty, | |
58 * Pop a task and run it. | |
59 * Else if any other DEQ is not empty, | |
60 * Take ("steal") a task from it and run it. | |
61 * Else if the entry queue for our group is not empty, | |
62 * Take a task from it and run it. | |
63 * Else if current thread is otherwise idling | |
64 * If all threads are idling | |
65 * Wait for a task to be put on group entry queue | |
66 * Else | |
67 * Yield or Sleep for a while, and then retry | |
68 * </pre> | |
69 * </ul> | |
70 * The push, pop, and put are designed to only ever called by the | |
71 * current thread, and take (steal) is only ever called by | |
72 * other threads. | |
73 * All other operations are composites and variants of these, | |
74 * plus a few miscellaneous bookkeeping methods. | |
75 * <p> | |
76 * Implementations of the underlying representations and operations | |
77 * are geared for use on JVMs operating on multiple CPUs (although | |
78 * they should of course work fine on single CPUs as well). | |
79 * <p> | |
80 * A possible snapshot of a FJTaskRunner's DEQ is: | |
81 * <pre> | |
82 * 0 1 2 3 4 5 6 ... | |
83 * +-----+-----+-----+-----+-----+-----+-----+-- | |
84 * | | t | t | t | t | | | ... deq array | |
85 * +-----+-----+-----+-----+-----+-----+-----+-- | |
86 * ^ ^ | |
87 * base top | |
88 * (incremented (incremented | |
89 * on take, on push | |
90 * decremented decremented | |
91 * on put) on pop) | |
92 * </pre> | |
93 * <p> | |
94 * FJTasks are held in elements of the DEQ. | |
95 * They are maintained in a bounded array that | |
96 * works similarly to a circular bounded buffer. To ensure | |
97 * visibility of stolen FJTasks across threads, the array elements | |
98 * must be <code>volatile</code>. | |
99 * Using volatile rather than synchronizing suffices here since | |
100 * each task accessed by a thread is either one that it | |
101 * created or one that has never seen before. Thus we cannot | |
102 * encounter any staleness problems executing run methods, | |
103 * although FJTask programmers must be still sure to either synch or use | |
104 * volatile for shared data within their run methods. | |
105 * <p> | |
106 * However, since there is no way | |
107 * to declare an array of volatiles in Java, the DEQ elements actually | |
108 * hold VolatileTaskRef objects, each of which in turn holds a | |
109 * volatile reference to a FJTask. | |
110 * Even with the double-indirection overhead of | |
111 * volatile refs, using an array for the DEQ works out | |
112 * better than linking them since fewer shared | |
113 * memory locations need to be | |
114 * touched or modified by the threads while using the DEQ. | |
115 * Further, the double indirection may alleviate cache-line | |
116 * sharing effects (which cannot otherwise be directly dealt with in Java). | |
117 * <p> | |
118 * The indices for the <code>base</code> and <code>top</code> of the DEQ | |
119 * are declared as volatile. The main contention point with | |
120 * multiple FJTaskRunner threads occurs when one thread is trying | |
121 * to pop its own stack while another is trying to steal from it. | |
122 * This is handled via a specialization of Dekker's algorithm, | |
123 * in which the popping thread pre-decrements <code>top</code>, | |
124 * and then checks it against <code>base</code>. | |
125 * To be conservative in the face of JVMs that only partially | |
126 * honor the specification for volatile, the pop proceeds | |
127 * without synchronization only if there are apparently enough | |
128 * items for both a simultaneous pop and take to succeed. | |
129 * It otherwise enters a | |
130 * synchronized lock to check if the DEQ is actually empty, | |
131 * if so failing. The stealing thread | |
132 * does almost the opposite, but is set up to be less likely | |
133 * to win in cases of contention: Steals always run under synchronized | |
134 * locks in order to avoid conflicts with other ongoing steals. | |
135 * They pre-increment <code>base</code>, and then check against | |
136 * <code>top</code>. They back out (resetting the base index | |
137 * and failing to steal) if the | |
138 * DEQ is empty or is about to become empty by an ongoing pop. | |
139 * <p> | |
140 * A push operation can normally run concurrently with a steal. | |
141 * A push enters a synch lock only if the DEQ appears full so must | |
142 * either be resized or have indices adjusted due to wrap-around | |
143 * of the bounded DEQ. The put operation always requires synchronization. | |
144 * <p> | |
145 * When a FJTaskRunner thread has no tasks of its own to run, | |
146 * it tries to be a good citizen. | |
147 * Threads run at lower priority while scanning for work. | |
148 * <p> | |
149 * If the task is currently waiting | |
150 * via yield, the thread alternates scans (starting at a randomly | |
151 * chosen victim) with Thread.yields. This is | |
152 * well-behaved so long as the JVM handles Thread.yield in a | |
153 * sensible fashion. (It need not. Thread.yield is so underspecified | |
154 * that it is legal for a JVM to treat it as a no-op.) This also | |
155 * keeps things well-behaved even if we are running on a uniprocessor | |
156 * JVM using a simple cooperative threading model. | |
157 * <p> | |
158 * If a thread needing work is | |
159 * is otherwise idle (which occurs only in the main runloop), and | |
160 * there are no available tasks to steal or poll, it | |
161 * instead enters into a sleep-based (actually timed wait(msec)) | |
162 * phase in which it progressively sleeps for longer durations | |
163 * (up to a maximum of FJTaskRunnerGroup.MAX_SLEEP_TIME, | |
164 * currently 100ms) between scans. | |
165 * If all threads in the group | |
166 * are idling, they further progress to a hard wait phase, suspending | |
167 * until a new task is entered into the FJTaskRunnerGroup entry queue. | |
168 * A sleeping FJTaskRunner thread may be awakened by a new | |
169 * task being put into the group entry queue or by another FJTaskRunner | |
170 * becoming active, but not merely by some DEQ becoming non-empty. | |
171 * Thus the MAX_SLEEP_TIME provides a bound for sleep durations | |
172 * in cases where all but one worker thread start sleeping | |
173 * even though there will eventually be work produced | |
174 * by a thread that is taking a long time to place tasks in DEQ. | |
175 * These sleep mechanics are handled in the FJTaskRunnerGroup class. | |
176 * <p> | |
177 * Composite operations such as taskJoin include heavy | |
178 * manual inlining of the most time-critical operations | |
179 * (mainly FJTask.invoke). | |
180 * This opens up a few opportunities for further hand-optimizations. | |
181 * Until Java compilers get a lot smarter, these tweaks | |
182 * improve performance significantly enough for task-intensive | |
183 * programs to be worth the poorer maintainability and code duplication. | |
184 * <p> | |
185 * Because they are so fragile and performance-sensitive, nearly | |
186 * all methods are declared as final. However, nearly all fields | |
187 * and methods are also declared as protected, so it is possible, | |
188 * with much care, to extend functionality in subclasses. (Normally | |
189 * you would also need to subclass FJTaskRunnerGroup.) | |
190 * <p> | |
191 * None of the normal java.lang.Thread class methods should ever be called | |
192 * on FJTaskRunners. For this reason, it might have been nicer to | |
193 * declare FJTaskRunner as a Runnable to run within a Thread. However, | |
194 * this would have complicated many minor logistics. And since | |
195 * no FJTaskRunner methods should normally be called from outside the | |
196 * FJTask and FJTaskRunnerGroup classes either, this decision doesn't impact | |
197 * usage. | |
198 * <p> | |
199 * You might think that layering this kind of framework on top of | |
200 * Java threads, which are already several levels removed from raw CPU | |
201 * scheduling on most systems, would lead to very poor performance. | |
202 * But on the platforms | |
203 * tested, the performance is quite good. | |
204 * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] | |
205 * @see FJTask | |
206 * @see FJTaskRunnerGroup | |
207 **/ | |
208 | |
209 public class FJTaskRunner extends Thread { | |
210 | |
211 /** The group of which this FJTaskRunner is a member **/ | |
212 protected final FJTaskRunnerGroup group; | |
213 | |
214 /** | |
215 * Constructor called only during FJTaskRunnerGroup initialization | |
216 **/ | |
217 | |
218 protected FJTaskRunner(FJTaskRunnerGroup g) { | |
219 group = g; | |
220 victimRNG = new Random(System.identityHashCode(this)); | |
221 runPriority = getPriority(); | |
222 setDaemon(true); | |
223 } | |
224 | |
225 /** | |
226 * Return the FJTaskRunnerGroup of which this thread is a member | |
227 **/ | |
228 | |
229 protected final FJTaskRunnerGroup getGroup() { return group; } | |
230 | |
231 | |
232 /* ------------ DEQ Representation ------------------- */ | |
233 | |
234 | |
235 /** | |
236 * FJTasks are held in an array-based DEQ with INITIAL_CAPACITY | |
237 * elements. The DEQ is grown if necessary, but default value is | |
238 * normally much more than sufficient unless there are | |
239 * user programming errors or questionable operations generating | |
240 * large numbers of Tasks without running them. | |
241 * Capacities must be a power of two. | |
242 **/ | |
243 | |
244 protected static final int INITIAL_CAPACITY = 4096; | |
245 | |
246 /** | |
247 * The maximum supported DEQ capacity. | |
248 * When exceeded, FJTaskRunner operations throw Errors | |
249 **/ | |
250 | |
251 protected static final int MAX_CAPACITY = 1 << 30; | |
252 | |
253 /** | |
254 * An object holding a single volatile reference to a FJTask. | |
255 **/ | |
256 | |
257 protected final static class VolatileTaskRef { | |
258 /** The reference **/ | |
259 protected volatile FJTask ref; | |
260 | |
261 /** Set the reference **/ | |
262 protected final void put(FJTask r) { ref = r; } | |
263 /** Return the reference **/ | |
264 protected final FJTask get() { return ref; } | |
265 /** Return the reference and clear it **/ | |
266 protected final FJTask take() { FJTask r = ref; ref = null; return r; } | |
267 | |
268 /** | |
269 * Initialization utility for constructing arrays. | |
270 * Make an array of given capacity and fill it with | |
271 * VolatileTaskRefs. | |
272 **/ | |
273 protected static VolatileTaskRef[] newArray(int cap) { | |
274 VolatileTaskRef[] a = new VolatileTaskRef[cap]; | |
275 for (int k = 0; k < cap; k++) a[k] = new VolatileTaskRef(); | |
276 return a; | |
277 } | |
278 | |
279 } | |
280 | |
281 /** | |
282 * The DEQ array. | |
283 **/ | |
284 | |
285 protected VolatileTaskRef[] deq = VolatileTaskRef.newArray(INITIAL_CAPACITY); | |
286 | |
287 /** Current size of the task DEQ **/ | |
288 protected int deqSize() { return deq.length; } | |
289 | |
290 /** | |
291 * Current top of DEQ. Generally acts just like a stack pointer in an | |
292 * array-based stack, except that it circularly wraps around the | |
293 * array, as in an array-based queue. The value is NOT | |
294 * always kept within <code>0 ... deq.length</code> though. | |
295 * The current top element is always at <code>top & (deq.length-1)</code>. | |
296 * To avoid integer overflow, top is reset down | |
297 * within bounds whenever it is noticed to be out out bounds; | |
298 * at worst when it is at <code>2 * deq.length</code>. | |
299 **/ | |
300 protected volatile int top = 0; | |
301 | |
302 | |
303 /** | |
304 * Current base of DEQ. Acts like a take-pointer in an | |
305 * array-based bounded queue. Same bounds and usage as top. | |
306 **/ | |
307 | |
308 protected volatile int base = 0; | |
309 | |
310 | |
311 /** | |
312 * An extra object to synchronize on in order to | |
313 * achieve a memory barrier. | |
314 **/ | |
315 | |
316 protected final Object barrier = new Object(); | |
317 | |
318 /* ------------ Other BookKeeping ------------------- */ | |
319 | |
320 /** | |
321 * Record whether current thread may be processing a task | |
322 * (i.e., has been started and is not in an idle wait). | |
323 * Accessed, under synch, ONLY by FJTaskRunnerGroup, but the field is | |
324 * stored here for simplicity. | |
325 **/ | |
326 | |
327 protected boolean active = false; | |
328 | |
329 /** Random starting point generator for scan() **/ | |
330 protected final Random victimRNG; | |
331 | |
332 | |
333 /** Priority to use while scanning for work **/ | |
334 protected int scanPriority = FJTaskRunnerGroup.DEFAULT_SCAN_PRIORITY; | |
335 | |
336 /** Priority to use while running tasks **/ | |
337 protected int runPriority; | |
338 | |
339 /** | |
340 * Set the priority to use while scanning. | |
341 * We do not bother synchronizing access, since | |
342 * by the time the value is needed, both this FJTaskRunner | |
343 * and its FJTaskRunnerGroup will | |
344 * necessarily have performed enough synchronization | |
345 * to avoid staleness problems of any consequence. | |
346 **/ | |
347 protected void setScanPriority(int pri) { scanPriority = pri; } | |
348 | |
349 | |
350 /** | |
351 * Set the priority to use while running tasks. | |
352 * Same usage and rationale as setScanPriority. | |
353 **/ | |
354 protected void setRunPriority(int pri) { runPriority = pri; } | |
355 | |
356 /** | |
357 * Compile-time constant for statistics gathering. | |
358 * Even when set, reported values may not be accurate | |
359 * since all are read and written without synchronization. | |
360 **/ | |
361 | |
362 | |
363 | |
364 static final boolean COLLECT_STATS = true; | |
365 // static final boolean COLLECT_STATS = false; | |
366 | |
367 | |
368 // for stat collection | |
369 | |
370 /** Total number of tasks run **/ | |
371 protected int runs = 0; | |
372 | |
373 /** Total number of queues scanned for work **/ | |
374 protected int scans = 0; | |
375 | |
376 /** Total number of tasks obtained via scan **/ | |
377 protected int steals = 0; | |
378 | |
379 | |
380 | |
381 | |
382 /* ------------ DEQ operations ------------------- */ | |
383 | |
384 | |
385 /** | |
386 * Push a task onto DEQ. | |
387 * Called ONLY by current thread. | |
388 **/ | |
389 | |
390 protected final void push(final FJTask r) { | |
391 int t = top; | |
392 | |
393 /* | |
394 This test catches both overflows and index wraps. It doesn't | |
395 really matter if base value is in the midst of changing in take. | |
396 As long as deq length is < 2^30, we are guaranteed to catch wrap in | |
397 time since base can only be incremented at most length times | |
398 between pushes (or puts). | |
399 */ | |
400 | |
401 if (t < (base & (deq.length-1)) + deq.length) { | |
402 | |
403 deq[t & (deq.length-1)].put(r); | |
404 top = t + 1; | |
405 } | |
406 | |
407 else // isolate slow case to increase chances push is inlined | |
408 slowPush(r); // check overflow and retry | |
409 } | |
410 | |
411 | |
412 /** | |
413 * Handle slow case for push | |
414 **/ | |
415 | |
416 protected synchronized void slowPush(final FJTask r) { | |
417 checkOverflow(); | |
418 push(r); // just recurse -- this one is sure to succeed. | |
419 } | |
420 | |
421 | |
422 /** | |
423 * Enqueue task at base of DEQ. | |
424 * Called ONLY by current thread. | |
425 * This method is currently not called from class FJTask. It could be used | |
426 * as a faster way to do FJTask.start, but most users would | |
427 * find the semantics too confusing and unpredictable. | |
428 **/ | |
429 | |
430 protected final synchronized void put(final FJTask r) { | |
431 for (;;) { | |
432 int b = base - 1; | |
433 if (top < b + deq.length) { | |
434 | |
435 int newBase = b & (deq.length-1); | |
436 deq[newBase].put(r); | |
437 base = newBase; | |
438 | |
439 if (b != newBase) { // Adjust for index underflow | |
440 int newTop = top & (deq.length-1); | |
441 if (newTop < newBase) newTop += deq.length; | |
442 top = newTop; | |
443 } | |
444 return; | |
445 } | |
446 else { | |
447 checkOverflow(); | |
448 // ... and retry | |
449 } | |
450 } | |
451 } | |
452 | |
453 /** | |
454 * Return a popped task, or null if DEQ is empty. | |
455 * Called ONLY by current thread. | |
456 * <p> | |
457 * This is not usually called directly but is | |
458 * instead inlined in callers. This version differs from the | |
459 * cilk algorithm in that pop does not fully back down and | |
460 * retry in the case of potential conflict with take. It simply | |
461 * rechecks under synch lock. This gives a preference | |
462 * for threads to run their own tasks, which seems to | |
463 * reduce flailing a bit when there are few tasks to run. | |
464 **/ | |
465 | |
466 protected final FJTask pop() { | |
467 /* | |
468 Decrement top, to force a contending take to back down. | |
469 */ | |
470 | |
471 int t = --top; | |
472 | |
473 /* | |
474 To avoid problems with JVMs that do not properly implement | |
475 read-after-write of a pair of volatiles, we conservatively | |
476 grab without lock only if the DEQ appears to have at least two | |
477 elements, thus guaranteeing that both a pop and take will succeed, | |
478 even if the pre-increment in take is not seen by current thread. | |
479 Otherwise we recheck under synch. | |
480 */ | |
481 | |
482 if (base + 1 < t) | |
483 return deq[t & (deq.length-1)].take(); | |
484 else | |
485 return confirmPop(t); | |
486 | |
487 } | |
488 | |
489 | |
490 /** | |
491 * Check under synch lock if DEQ is really empty when doing pop. | |
492 * Return task if not empty, else null. | |
493 **/ | |
494 | |
495 protected final synchronized FJTask confirmPop(int provisionalTop) { | |
496 if (base <= provisionalTop) | |
497 return deq[provisionalTop & (deq.length-1)].take(); | |
498 else { // was empty | |
499 /* | |
500 Reset DEQ indices to zero whenever it is empty. | |
501 This both avoids unnecessary calls to checkOverflow | |
502 in push, and helps keep the DEQ from accumulating garbage | |
503 */ | |
504 | |
505 top = base = 0; | |
506 return null; | |
507 } | |
508 } | |
509 | |
510 | |
511 /** | |
512 * Take a task from the base of the DEQ. | |
513 * Always called by other threads via scan() | |
514 **/ | |
515 | |
516 | |
517 protected final synchronized FJTask take() { | |
518 | |
519 /* | |
520 Increment base in order to suppress a contending pop | |
521 */ | |
522 | |
523 int b = base++; | |
524 | |
525 if (b < top) | |
526 return confirmTake(b); | |
527 else { | |
528 // back out | |
529 base = b; | |
530 return null; | |
531 } | |
532 } | |
533 | |
534 | |
535 /** | |
536 * double-check a potential take | |
537 **/ | |
538 | |
539 protected FJTask confirmTake(int oldBase) { | |
540 | |
541 /* | |
542 Use a second (guaranteed uncontended) synch | |
543 to serve as a barrier in case JVM does not | |
544 properly process read-after-write of 2 volatiles | |
545 */ | |
546 | |
547 synchronized(barrier) { | |
548 if (oldBase < top) { | |
549 /* | |
550 We cannot call deq[oldBase].take here because of possible races when | |
551 nulling out versus concurrent push operations. Resulting | |
552 accumulated garbage is swept out periodically in | |
553 checkOverflow, or more typically, just by keeping indices | |
554 zero-based when found to be empty in pop, which keeps active | |
555 region small and constantly overwritten. | |
556 */ | |
557 | |
558 return deq[oldBase & (deq.length-1)].get(); | |
559 } | |
560 else { | |
561 base = oldBase; | |
562 return null; | |
563 } | |
564 } | |
565 } | |
566 | |
567 | |
568 /** | |
569 * Adjust top and base, and grow DEQ if necessary. | |
570 * Called only while DEQ synch lock being held. | |
571 * We don't expect this to be called very often. In most | |
572 * programs using FJTasks, it is never called. | |
573 **/ | |
574 | |
575 protected void checkOverflow() { | |
576 int t = top; | |
577 int b = base; | |
578 | |
579 if (t - b < deq.length-1) { // check if just need an index reset | |
580 | |
581 int newBase = b & (deq.length-1); | |
582 int newTop = top & (deq.length-1); | |
583 if (newTop < newBase) newTop += deq.length; | |
584 top = newTop; | |
585 base = newBase; | |
586 | |
587 /* | |
588 Null out refs to stolen tasks. | |
589 This is the only time we can safely do it. | |
590 */ | |
591 | |
592 int i = newBase; | |
593 while (i != newTop && deq[i].ref != null) { | |
594 deq[i].ref = null; | |
595 i = (i - 1) & (deq.length-1); | |
596 } | |
597 | |
598 } | |
599 else { // grow by doubling array | |
600 | |
601 int newTop = t - b; | |
602 int oldcap = deq.length; | |
603 int newcap = oldcap * 2; | |
604 | |
605 if (newcap >= MAX_CAPACITY) | |
606 throw new Error("FJTask queue maximum capacity exceeded"); | |
607 | |
608 VolatileTaskRef[] newdeq = new VolatileTaskRef[newcap]; | |
609 | |
610 // copy in bottom half of new deq with refs from old deq | |
611 for (int j = 0; j < oldcap; ++j) newdeq[j] = deq[b++ & (oldcap-1)]; | |
612 | |
613 // fill top half of new deq with new refs | |
614 for (int j = oldcap; j < newcap; ++j) newdeq[j] = new VolatileTaskRef(); | |
615 | |
616 deq = newdeq; | |
617 base = 0; | |
618 top = newTop; | |
619 } | |
620 } | |
621 | |
622 | |
623 /* ------------ Scheduling ------------------- */ | |
624 | |
625 | |
626 /** | |
627 * Do all but the pop() part of yield or join, by | |
628 * traversing all DEQs in our group looking for a task to | |
629 * steal. If none, it checks the entry queue. | |
630 * <p> | |
631 * Since there are no good, portable alternatives, | |
632 * we rely here on a mixture of Thread.yield and priorities | |
633 * to reduce wasted spinning, even though these are | |
634 * not well defined. We are hoping here that the JVM | |
635 * does something sensible. | |
636 * @param waitingFor if non-null, the current task being joined | |
637 **/ | |
638 | |
639 protected void scan(final FJTask waitingFor) { | |
640 | |
641 FJTask task = null; | |
642 | |
643 // to delay lowering priority until first failure to steal | |
644 boolean lowered = false; | |
645 | |
646 /* | |
647 Circularly traverse from a random start index. | |
648 | |
649 This differs slightly from cilk version that uses a random index | |
650 for each attempted steal. | |
651 Exhaustive scanning might impede analytic tractablity of | |
652 the scheduling policy, but makes it much easier to deal with | |
653 startup and shutdown. | |
654 */ | |
655 | |
656 FJTaskRunner[] ts = group.getArray(); | |
657 int idx = victimRNG.nextInt(ts.length); | |
658 | |
659 for (int i = 0; i < ts.length; ++i) { | |
660 | |
661 FJTaskRunner t = ts[idx]; | |
662 if (++idx >= ts.length) idx = 0; // circularly traverse | |
663 | |
664 if (t != null && t != this) { | |
665 | |
666 if (waitingFor != null && waitingFor.isDone()) { | |
667 break; | |
668 } | |
669 else { | |
670 if (COLLECT_STATS) ++scans; | |
671 task = t.take(); | |
672 if (task != null) { | |
673 if (COLLECT_STATS) ++steals; | |
674 break; | |
675 } | |
676 else if (isInterrupted()) { | |
677 break; | |
678 } | |
679 else if (!lowered) { // if this is first fail, lower priority | |
680 lowered = true; | |
681 setPriority(scanPriority); | |
682 } | |
683 else { // otherwise we are at low priority; just yield | |
684 yield(); | |
685 } | |
686 } | |
687 } | |
688 | |
689 } | |
690 | |
691 if (task == null) { | |
692 if (COLLECT_STATS) ++scans; | |
693 task = group.pollEntryQueue(); | |
694 if (COLLECT_STATS) if (task != null) ++steals; | |
695 } | |
696 | |
697 if (lowered) setPriority(runPriority); | |
698 | |
699 if (task != null && !task.isDone()) { | |
700 if (COLLECT_STATS) ++runs; | |
701 task.run(); | |
702 task.setDone(); | |
703 } | |
704 | |
705 } | |
706 | |
707 /** | |
708 * Same as scan, but called when current thread is idling. | |
709 * It repeatedly scans other threads for tasks, | |
710 * sleeping while none are available. | |
711 * <p> | |
712 * This differs from scan mainly in that | |
713 * since there is no reason to return to recheck any | |
714 * condition, we iterate until a task is found, backing | |
715 * off via sleeps if necessary. | |
716 **/ | |
717 | |
718 protected void scanWhileIdling() { | |
719 FJTask task = null; | |
720 | |
721 boolean lowered = false; | |
722 long iters = 0; | |
723 | |
724 FJTaskRunner[] ts = group.getArray(); | |
725 int idx = victimRNG.nextInt(ts.length); | |
726 | |
727 do { | |
728 for (int i = 0; i < ts.length; ++i) { | |
729 | |
730 FJTaskRunner t = ts[idx]; | |
731 if (++idx >= ts.length) idx = 0; // circularly traverse | |
732 | |
733 if (t != null && t != this) { | |
734 if (COLLECT_STATS) ++scans; | |
735 | |
736 task = t.take(); | |
737 if (task != null) { | |
738 if (COLLECT_STATS) ++steals; | |
739 if (lowered) setPriority(runPriority); | |
740 group.setActive(this); | |
741 break; | |
742 } | |
743 } | |
744 } | |
745 | |
746 if (task == null) { | |
747 if (isInterrupted()) | |
748 return; | |
749 | |
750 if (COLLECT_STATS) ++scans; | |
751 task = group.pollEntryQueue(); | |
752 | |
753 if (task != null) { | |
754 if (COLLECT_STATS) ++steals; | |
755 if (lowered) setPriority(runPriority); | |
756 group.setActive(this); | |
757 } | |
758 else { | |
759 ++iters; | |
760 // Check here for yield vs sleep to avoid entering group synch lock | |
761 if (iters >= group.SCANS_PER_SLEEP) { | |
762 group.checkActive(this, iters); | |
763 if (isInterrupted()) | |
764 return; | |
765 } | |
766 else if (!lowered) { | |
767 lowered = true; | |
768 setPriority(scanPriority); | |
769 } | |
770 else { | |
771 yield(); | |
772 } | |
773 } | |
774 } | |
775 } while (task == null); | |
776 | |
777 | |
778 if (!task.isDone()) { | |
779 if (COLLECT_STATS) ++runs; | |
780 task.run(); | |
781 task.setDone(); | |
782 } | |
783 | |
784 } | |
785 | |
786 /* ------------ composite operations ------------------- */ | |
787 | |
788 | |
789 /** | |
790 * Main runloop | |
791 **/ | |
792 | |
793 public void run() { | |
794 try{ | |
795 while (!interrupted()) { | |
796 | |
797 FJTask task = pop(); | |
798 if (task != null) { | |
799 if (!task.isDone()) { | |
800 // inline FJTask.invoke | |
801 if (COLLECT_STATS) ++runs; | |
802 task.run(); | |
803 task.setDone(); | |
804 } | |
805 } | |
806 else | |
807 scanWhileIdling(); | |
808 } | |
809 } | |
810 finally { | |
811 group.setInactive(this); | |
812 } | |
813 } | |
814 | |
815 /** | |
816 * Execute a task in this thread. Generally called when current task | |
817 * cannot otherwise continue. | |
818 **/ | |
819 | |
820 | |
821 protected final void taskYield() { | |
822 FJTask task = pop(); | |
823 if (task != null) { | |
824 if (!task.isDone()) { | |
825 if (COLLECT_STATS) ++runs; | |
826 task.run(); | |
827 task.setDone(); | |
828 } | |
829 } | |
830 else | |
831 scan(null); | |
832 } | |
833 | |
834 | |
835 /** | |
836 * Process tasks until w is done. | |
837 * Equivalent to <code>while(!w.isDone()) taskYield(); </code> | |
838 **/ | |
839 | |
840 protected final void taskJoin(final FJTask w) { | |
841 | |
842 while (!w.isDone()) { | |
843 | |
844 FJTask task = pop(); | |
845 if (task != null) { | |
846 if (!task.isDone()) { | |
847 if (COLLECT_STATS) ++runs; | |
848 task.run(); | |
849 task.setDone(); | |
850 if (task == w) return; // fast exit if we just ran w | |
851 } | |
852 } | |
853 else | |
854 scan(w); | |
855 } | |
856 } | |
857 | |
858 /** | |
859 * A specialized expansion of | |
860 * <code> w.fork(); invoke(v); w.join(); </code> | |
861 **/ | |
862 | |
863 | |
864 protected final void coInvoke(final FJTask w, final FJTask v) { | |
865 | |
866 // inline push | |
867 | |
868 int t = top; | |
869 if (t < (base & (deq.length-1)) + deq.length) { | |
870 | |
871 deq[t & (deq.length-1)].put(w); | |
872 top = t + 1; | |
873 | |
874 // inline invoke | |
875 | |
876 if (!v.isDone()) { | |
877 if (COLLECT_STATS) ++runs; | |
878 v.run(); | |
879 v.setDone(); | |
880 } | |
881 | |
882 // inline taskJoin | |
883 | |
884 while (!w.isDone()) { | |
885 FJTask task = pop(); | |
886 if (task != null) { | |
887 if (!task.isDone()) { | |
888 if (COLLECT_STATS) ++runs; | |
889 task.run(); | |
890 task.setDone(); | |
891 if (task == w) return; // fast exit if we just ran w | |
892 } | |
893 } | |
894 else | |
895 scan(w); | |
896 } | |
897 } | |
898 | |
899 else // handle non-inlinable cases | |
900 slowCoInvoke(w, v); | |
901 } | |
902 | |
903 | |
904 /** | |
905 * Backup to handle noninlinable cases of coInvoke | |
906 **/ | |
907 | |
908 protected void slowCoInvoke(final FJTask w, final FJTask v) { | |
909 push(w); // let push deal with overflow | |
910 FJTask.invoke(v); | |
911 taskJoin(w); | |
912 } | |
913 | |
914 | |
915 /** | |
916 * Array-based version of coInvoke | |
917 **/ | |
918 | |
919 protected final void coInvoke(FJTask[] tasks) { | |
920 int nforks = tasks.length - 1; | |
921 | |
922 // inline bulk push of all but one task | |
923 | |
924 int t = top; | |
925 | |
926 if (nforks >= 0 && t + nforks < (base & (deq.length-1)) + deq.length) { | |
927 for (int i = 0; i < nforks; ++i) { | |
928 deq[t++ & (deq.length-1)].put(tasks[i]); | |
929 top = t; | |
930 } | |
931 | |
932 // inline invoke of one task | |
933 FJTask v = tasks[nforks]; | |
934 if (!v.isDone()) { | |
935 if (COLLECT_STATS) ++runs; | |
936 v.run(); | |
937 v.setDone(); | |
938 } | |
939 | |
940 // inline taskJoins | |
941 | |
942 for (int i = 0; i < nforks; ++i) { | |
943 FJTask w = tasks[i]; | |
944 while (!w.isDone()) { | |
945 | |
946 FJTask task = pop(); | |
947 if (task != null) { | |
948 if (!task.isDone()) { | |
949 if (COLLECT_STATS) ++runs; | |
950 task.run(); | |
951 task.setDone(); | |
952 } | |
953 } | |
954 else | |
955 scan(w); | |
956 } | |
957 } | |
958 } | |
959 | |
960 else // handle non-inlinable cases | |
961 slowCoInvoke(tasks); | |
962 } | |
963 | |
964 /** | |
965 * Backup to handle atypical or noninlinable cases of coInvoke | |
966 **/ | |
967 | |
968 protected void slowCoInvoke(FJTask[] tasks) { | |
969 for (int i = 0; i < tasks.length; ++i) push(tasks[i]); | |
970 for (int i = 0; i < tasks.length; ++i) taskJoin(tasks[i]); | |
971 } | |
972 | |
973 } | |
974 |