Mercurial > hg > blitz_condensed
comparison src/EDU/oswego/cs/dl/util/concurrent/FJTaskRunnerGroup.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:3dc0c5604566 |
---|---|
1 /* | |
2 File: FJTaskRunnerGroup.java | |
3 | |
4 Originally written by Doug Lea and released into the public domain. | |
5 This may be used for any purposes whatsoever without acknowledgment. | |
6 Thanks for the assistance and support of Sun Microsystems Labs, | |
7 and everyone contributing, testing, and using this code. | |
8 | |
9 History: | |
10 Date Who What | |
11 7Jan1999 dl First public release | |
12 12Jan1999 dl made getActiveCount public; misc minor cleanup. | |
13 14Jan1999 dl Added executeTask | |
14 20Jan1999 dl Allow use of priorities; reformat stats | |
15 6Feb1999 dl Lazy thread starts | |
16 27Apr1999 dl Renamed | |
17 */ | |
18 | |
19 package EDU.oswego.cs.dl.util.concurrent; | |
20 | |
21 /** | |
22 * A stripped down analog of a ThreadGroup used for | |
23 * establishing and managing FJTaskRunner threads. | |
24 * ThreadRunnerGroups serve as the control boundary separating | |
25 * the general world of normal threads from the specialized world | |
26 * of FJTasks. | |
27 * <p> | |
28 * By intent, this class does not subclass java.lang.ThreadGroup, and | |
29 * does not support most methods found in ThreadGroups, since they | |
30 * would make no sense for FJTaskRunner threads. In fact, the class | |
31 * does not deal with ThreadGroups at all. If you want to restrict | |
32 * a FJTaskRunnerGroup to a particular ThreadGroup, you can create | |
33 * it from within that ThreadGroup. | |
34 * <p> | |
35 * The main contextual parameter for a FJTaskRunnerGroup is | |
36 * the group size, established in the constructor. | |
37 * Groups must be of a fixed size. | |
38 * There is no way to dynamically increase or decrease the number | |
39 * of threads in an existing group. | |
40 * <p> | |
41 * In general, the group size should be equal to the number | |
42 * of CPUs on the system. (Unfortunately, there is no portable | |
43 * means of automatically detecting the number of CPUs on a JVM, so there is | |
44 * no good way to automate defaults.) In principle, when | |
45 * FJTasks are used for computation-intensive tasks, having only | |
46 * as many threads as CPUs should minimize bookkeeping overhead | |
47 * and contention, and so maximize throughput. However, because | |
48 * FJTaskRunners lie atop Java threads, and in turn operating system | |
49 * thread support and scheduling policies, | |
50 * it is very possible that using more threads | |
51 * than CPUs will improve overall throughput even though it adds | |
52 * to overhead. This will always be so if FJTasks are I/O bound. | |
53 * So it may pay to experiment a bit when tuning on particular platforms. | |
54 * You can also use <code>setRunPriorities</code> to either | |
55 * increase or decrease the priorities of active threads, which | |
56 * may interact with group size choice. | |
57 * <p> | |
58 * In any case, overestimating group sizes never | |
59 * seriously degrades performance (at least within reasonable bounds). | |
60 * You can also use a value | |
61 * less than the number of CPUs in order to reserve processing | |
62 * for unrelated threads. | |
63 * <p> | |
64 * There are two general styles for using a FJTaskRunnerGroup. | |
65 * You can create one group per entire program execution, for example | |
66 * as a static singleton, and use it for all parallel tasks: | |
67 * <pre> | |
68 * class Tasks { | |
69 * static FJTaskRunnerGroup group; | |
70 * public void initialize(int groupsize) { | |
71 * group = new FJTaskRunnerGroup(groupSize); | |
72 * } | |
73 * // ... | |
74 * } | |
75 * </pre> | |
76 * Alternatively, you can make new groups on the fly and use them only for | |
77 * particular task sets. This is more flexible,, | |
78 * and leads to more controllable and deterministic execution patterns, | |
79 * but it encounters greater overhead on startup. Also, to reclaim | |
80 * system resources, you should | |
81 * call <code>FJTaskRunnerGroup.interruptAll</code> when you are done | |
82 * using one-shot groups. Otherwise, because FJTaskRunners set | |
83 * <code>Thread.isDaemon</code> | |
84 * status, they will not normally be reclaimed until program termination. | |
85 * <p> | |
86 * The main supported methods are <code>execute</code>, | |
87 * which starts a task processed by FJTaskRunner threads, | |
88 * and <code>invoke</code>, which starts one and waits for completion. | |
89 * For example, you might extend the above <code>FJTasks</code> | |
90 * class to support a task-based computation, say, the | |
91 * <code>Fib</code> class from the <code>FJTask</code> documentation: | |
92 * <pre> | |
93 * class Tasks { // continued | |
94 * // ... | |
95 * static int fib(int n) { | |
96 * try { | |
97 * Fib f = new Fib(n); | |
98 * group.invoke(f); | |
99 * return f.getAnswer(); | |
100 * } | |
101 * catch (InterruptedException ex) { | |
102 * throw new Error("Interrupted during computation"); | |
103 * } | |
104 * } | |
105 * } | |
106 * </pre> | |
107 * <p> | |
108 * Method <code>stats()</code> can be used to monitor performance. | |
109 * Both FJTaskRunnerGroup and FJTaskRunner may be compiled with | |
110 * the compile-time constant COLLECT_STATS set to false. In this | |
111 * case, various simple counts reported in stats() are not collected. | |
112 * On platforms tested, | |
113 * this leads to such a tiny performance improvement that there is | |
114 * very little motivation to bother. | |
115 * | |
116 * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] | |
117 * <p> | |
118 * @see FJTask | |
119 * @see FJTaskRunner | |
120 **/ | |
121 | |
122 public class FJTaskRunnerGroup implements Executor { | |
123 | |
124 /** The threads in this group **/ | |
125 protected final FJTaskRunner[] threads; | |
126 | |
127 /** Group-wide queue for tasks entered via execute() **/ | |
128 protected final LinkedQueue entryQueue = new LinkedQueue(); | |
129 | |
130 /** Number of threads that are not waiting for work **/ | |
131 protected int activeCount = 0; | |
132 | |
133 /** Number of threads that have been started. Used to avoid | |
134 unecessary contention during startup of task sets. | |
135 **/ | |
136 protected int nstarted = 0; | |
137 | |
138 /** | |
139 * Compile-time constant. If true, various counts of | |
140 * runs, waits, etc., are maintained. These are NOT | |
141 * updated with synchronization, so statistics reports | |
142 * might not be accurate. | |
143 **/ | |
144 | |
145 static final boolean COLLECT_STATS = true; | |
146 // static final boolean COLLECT_STATS = false; | |
147 | |
148 // for stats | |
149 | |
150 /** The time at which this ThreadRunnerGroup was constructed **/ | |
151 long initTime = 0; | |
152 | |
153 /** Total number of executes or invokes **/ | |
154 int entries = 0; | |
155 | |
156 static final int DEFAULT_SCAN_PRIORITY = Thread.MIN_PRIORITY+1; | |
157 | |
158 /** | |
159 * Create a FJTaskRunnerGroup with the indicated number | |
160 * of FJTaskRunner threads. Normally, the best size to use is | |
161 * the number of CPUs on the system. | |
162 * <p> | |
163 * The threads in a FJTaskRunnerGroup are created with their | |
164 * isDaemon status set, so do not normally need to be | |
165 * shut down manually upon program termination. | |
166 **/ | |
167 | |
168 public FJTaskRunnerGroup(int groupSize) { | |
169 threads = new FJTaskRunner[groupSize]; | |
170 initializeThreads(); | |
171 initTime = System.currentTimeMillis(); | |
172 } | |
173 | |
174 /** | |
175 * Arrange for execution of the given task | |
176 * by placing it in a work queue. If the argument | |
177 * is not of type FJTask, it is embedded in a FJTask via | |
178 * <code>FJTask.Wrap</code>. | |
179 * @exception InterruptedException if current Thread is | |
180 * currently interrupted | |
181 **/ | |
182 | |
183 public void execute(Runnable r) throws InterruptedException { | |
184 if (r instanceof FJTask) { | |
185 entryQueue.put((FJTask)r); | |
186 } | |
187 else { | |
188 entryQueue.put(new FJTask.Wrap(r)); | |
189 } | |
190 signalNewTask(); | |
191 } | |
192 | |
193 | |
194 /** | |
195 * Specialized form of execute called only from within FJTasks | |
196 **/ | |
197 public void executeTask(FJTask t) { | |
198 try { | |
199 entryQueue.put(t); | |
200 signalNewTask(); | |
201 } | |
202 catch (InterruptedException ex) { | |
203 Thread.currentThread().interrupt(); | |
204 } | |
205 } | |
206 | |
207 | |
208 /** | |
209 * Start a task and wait it out. Returns when the task completes. | |
210 * @exception InterruptedException if current Thread is | |
211 * interrupted before completion of the task. | |
212 **/ | |
213 | |
214 public void invoke(Runnable r) throws InterruptedException { | |
215 InvokableFJTask w = new InvokableFJTask(r); | |
216 entryQueue.put(w); | |
217 signalNewTask(); | |
218 w.awaitTermination(); | |
219 } | |
220 | |
221 | |
222 /** | |
223 * Try to shut down all FJTaskRunner threads in this group | |
224 * by interrupting them all. This method is designed | |
225 * to be used during cleanup when it is somehow known | |
226 * that all threads are idle. | |
227 * FJTaskRunners only | |
228 * check for interruption when they are not otherwise | |
229 * processing a task (and its generated subtasks, | |
230 * if any), so if any threads are active, shutdown may | |
231 * take a while, and may lead to unpredictable | |
232 * task processing. | |
233 **/ | |
234 | |
235 public void interruptAll() { | |
236 // paranoically interrupt current thread last if in group. | |
237 Thread current = Thread.currentThread(); | |
238 boolean stopCurrent = false; | |
239 | |
240 for (int i = 0; i < threads.length; ++i) { | |
241 Thread t = threads[i]; | |
242 if (t == current) | |
243 stopCurrent = true; | |
244 else | |
245 t.interrupt(); | |
246 } | |
247 if (stopCurrent) | |
248 current.interrupt(); | |
249 } | |
250 | |
251 | |
252 /** | |
253 * Set the priority to use while a FJTaskRunner is | |
254 * polling for new tasks to perform. Default | |
255 * is currently Thread.MIN_PRIORITY+1. The value | |
256 * set may not go into effect immediately, but | |
257 * will be used at least the next time a thread scans for work. | |
258 **/ | |
259 public synchronized void setScanPriorities(int pri) { | |
260 for (int i = 0; i < threads.length; ++i) { | |
261 FJTaskRunner t = threads[i]; | |
262 t.setScanPriority(pri); | |
263 if (!t.active) t.setPriority(pri); | |
264 } | |
265 } | |
266 | |
267 | |
268 /** | |
269 * Set the priority to use while a FJTaskRunner is | |
270 * actively running tasks. Default | |
271 * is the priority that was in effect by the thread that | |
272 * constructed this FJTaskRunnerGroup. Setting this value | |
273 * while threads are running may momentarily result in | |
274 * them running at this priority even when idly waiting for work. | |
275 **/ | |
276 public synchronized void setRunPriorities(int pri) { | |
277 for (int i = 0; i < threads.length; ++i) { | |
278 FJTaskRunner t = threads[i]; | |
279 t.setRunPriority(pri); | |
280 if (t.active) t.setPriority(pri); | |
281 } | |
282 } | |
283 | |
284 | |
285 | |
286 /** Return the number of FJTaskRunner threads in this group **/ | |
287 | |
288 public int size() { return threads.length; } | |
289 | |
290 | |
291 /** | |
292 * Return the number of threads that are not idly waiting for work. | |
293 * Beware that even active threads might not be doing any useful | |
294 * work, but just spinning waiting for other dependent tasks. | |
295 * Also, since this is just a snapshot value, some tasks | |
296 * may be in the process of becoming idle. | |
297 **/ | |
298 public synchronized int getActiveCount() { return activeCount; } | |
299 | |
300 /** | |
301 * Prints various snapshot statistics to System.out. | |
302 * <ul> | |
303 * <li> For each FJTaskRunner thread (labeled as T<em>n</em>, for | |
304 * <em>n</em> from zero to group size - 1): | |
305 * <ul> | |
306 * <li> A star "*" is printed if the thread is currently active; | |
307 * that is, not sleeping while waiting for work. Because | |
308 * threads gradually enter sleep modes, an active thread | |
309 * may in fact be about to sleep (or wake up). | |
310 * <li> <em>Q Cap</em> The current capacity of its task queue. | |
311 * <li> <em>Run</em> The total number of tasks that have been run. | |
312 * <li> <em>New</em> The number of these tasks that were | |
313 * taken from either the entry queue or from other | |
314 * thread queues; that is, the number of tasks run | |
315 * that were <em>not</em> forked by the thread itself. | |
316 * <li> <em>Scan</em> The number of times other task | |
317 * queues or the entry queue were polled for tasks. | |
318 * </ul> | |
319 * <li> <em>Execute</em> The total number of tasks entered | |
320 * (but not necessarily yet run) via execute or invoke. | |
321 * <li> <em>Time</em> Time in seconds since construction of this | |
322 * FJTaskRunnerGroup. | |
323 * <li> <em>Rate</em> The total number of tasks processed | |
324 * per second across all threads. This | |
325 * may be useful as a simple throughput indicator | |
326 * if all processed tasks take approximately the | |
327 * same time to run. | |
328 * </ul> | |
329 * <p> | |
330 * Cautions: Some statistics are updated and gathered | |
331 * without synchronization, | |
332 * so may not be accurate. However, reported counts may be considered | |
333 * as lower bounds of actual values. | |
334 * Some values may be zero if classes are compiled | |
335 * with COLLECT_STATS set to false. (FJTaskRunner and FJTaskRunnerGroup | |
336 * classes can be independently compiled with different values of | |
337 * COLLECT_STATS.) Also, the counts are maintained as ints so could | |
338 * overflow in exceptionally long-lived applications. | |
339 * <p> | |
340 * These statistics can be useful when tuning algorithms or diagnosing | |
341 * problems. For example: | |
342 * <ul> | |
343 * <li> High numbers of scans may mean that there is insufficient | |
344 * parallelism to keep threads busy. However, high scan rates | |
345 * are expected if the number | |
346 * of Executes is also high or there is a lot of global | |
347 * synchronization in the application, and the system is not otherwise | |
348 * busy. Threads may scan | |
349 * for work hundreds of times upon startup, shutdown, and | |
350 * global synch points of task sets. | |
351 * <li> Large imbalances in tasks run across different threads might | |
352 * just reflect contention with unrelated threads on a system | |
353 * (possibly including JVM threads such as GC), but may also | |
354 * indicate some systematic bias in how you generate tasks. | |
355 * <li> Large task queue capacities may mean that too many tasks are being | |
356 * generated before they can be run. | |
357 * Capacities are reported rather than current numbers of tasks | |
358 * in queues because they are better indicators of the existence | |
359 * of these kinds of possibly-transient problems. | |
360 * Queue capacities are | |
361 * resized on demand from their initial value of 4096 elements, | |
362 * which is much more than sufficient for the kinds of | |
363 * applications that this framework is intended to best support. | |
364 * </ul> | |
365 **/ | |
366 | |
367 public void stats() { | |
368 long time = System.currentTimeMillis() - initTime; | |
369 double secs = ((double)time) / 1000.0; | |
370 long totalRuns = 0; | |
371 long totalScans = 0; | |
372 long totalSteals = 0; | |
373 | |
374 System.out.print("Thread" + | |
375 "\tQ Cap" + | |
376 "\tScans" + | |
377 "\tNew" + | |
378 "\tRuns" + | |
379 "\n"); | |
380 | |
381 for (int i = 0; i < threads.length; ++i) { | |
382 FJTaskRunner t = threads[i]; | |
383 int truns = t.runs; | |
384 totalRuns += truns; | |
385 | |
386 int tscans = t.scans; | |
387 totalScans += tscans; | |
388 | |
389 int tsteals = t.steals; | |
390 totalSteals += tsteals; | |
391 | |
392 String star = (getActive(t))? "*" : " "; | |
393 | |
394 | |
395 System.out.print("T" + i + star + | |
396 "\t" + t.deqSize() + | |
397 "\t" + tscans + | |
398 "\t" + tsteals + | |
399 "\t" + truns + | |
400 "\n"); | |
401 } | |
402 | |
403 System.out.print("Total" + | |
404 "\t " + | |
405 "\t" + totalScans + | |
406 "\t" + totalSteals + | |
407 "\t" + totalRuns + | |
408 "\n"); | |
409 | |
410 System.out.print("Execute: " + entries); | |
411 | |
412 System.out.print("\tTime: " + secs); | |
413 | |
414 long rps = 0; | |
415 if (secs != 0) rps = Math.round((double)(totalRuns) / secs); | |
416 | |
417 System.out.println("\tRate: " + rps); | |
418 } | |
419 | |
420 | |
421 /* ------------ Methods called only by FJTaskRunners ------------- */ | |
422 | |
423 | |
424 /** | |
425 * Return the array of threads in this group. | |
426 * Called only by FJTaskRunner.scan(). | |
427 **/ | |
428 | |
429 protected FJTaskRunner[] getArray() { return threads; } | |
430 | |
431 | |
432 /** | |
433 * Return a task from entry queue, or null if empty. | |
434 * Called only by FJTaskRunner.scan(). | |
435 **/ | |
436 | |
437 protected FJTask pollEntryQueue() { | |
438 try { | |
439 FJTask t = (FJTask)(entryQueue.poll(0)); | |
440 return t; | |
441 } | |
442 catch(InterruptedException ex) { // ignore interrupts | |
443 Thread.currentThread().interrupt(); | |
444 return null; | |
445 } | |
446 } | |
447 | |
448 | |
449 /** | |
450 * Return active status of t. | |
451 * Per-thread active status can only be accessed and | |
452 * modified via synchronized method here in the group class. | |
453 **/ | |
454 | |
455 protected synchronized boolean getActive(FJTaskRunner t) { | |
456 return t.active; | |
457 } | |
458 | |
459 | |
460 /** | |
461 * Set active status of thread t to true, and notify others | |
462 * that might be waiting for work. | |
463 **/ | |
464 | |
465 protected synchronized void setActive(FJTaskRunner t) { | |
466 if (!t.active) { | |
467 t.active = true; | |
468 ++activeCount; | |
469 if (nstarted < threads.length) | |
470 threads[nstarted++].start(); | |
471 else | |
472 notifyAll(); | |
473 } | |
474 } | |
475 | |
476 /** | |
477 * Set active status of thread t to false. | |
478 **/ | |
479 | |
480 protected synchronized void setInactive(FJTaskRunner t) { | |
481 if (t.active) { | |
482 t.active = false; | |
483 --activeCount; | |
484 } | |
485 } | |
486 | |
487 /** | |
488 * The number of times to scan other threads for tasks | |
489 * before transitioning to a mode where scans are | |
490 * interleaved with sleeps (actually timed waits). | |
491 * Upon transition, sleeps are for duration of | |
492 * scans / SCANS_PER_SLEEP milliseconds. | |
493 * <p> | |
494 * This is not treated as a user-tunable parameter because | |
495 * good values do not appear to vary much across JVMs or | |
496 * applications. Its main role is to help avoid some | |
497 * useless spinning and contention during task startup. | |
498 **/ | |
499 static final long SCANS_PER_SLEEP = 15; | |
500 | |
501 /** | |
502 * The maximum time (in msecs) to sleep when a thread is idle, | |
503 * yet others are not, so may eventually generate work that | |
504 * the current thread can steal. This value reflects the maximum time | |
505 * that a thread may sleep when it possibly should not, because there | |
506 * are other active threads that might generate work. In practice, | |
507 * designs in which some threads become stalled because others | |
508 * are running yet not generating tasks are not likely to work | |
509 * well in this framework anyway, so the exact value does not matter | |
510 * too much. However, keeping it in the sub-second range does | |
511 * help smooth out startup and shutdown effects. | |
512 **/ | |
513 | |
514 static final long MAX_SLEEP_TIME = 100; | |
515 | |
516 /** | |
517 * Set active status of thread t to false, and | |
518 * then wait until: (a) there is a task in the entry | |
519 * queue, or (b) other threads are active, or (c) the current | |
520 * thread is interrupted. Upon return, it | |
521 * is not certain that there will be work available. | |
522 * The thread must itself check. | |
523 * <p> | |
524 * The main underlying reason | |
525 * for these mechanics is that threads do not | |
526 * signal each other when they add elements to their queues. | |
527 * (This would add to task overhead, reduce locality. | |
528 * and increase contention.) | |
529 * So we must rely on a tamed form of polling. However, tasks | |
530 * inserted into the entry queue do result in signals, so | |
531 * tasks can wait on these if all of them are otherwise idle. | |
532 **/ | |
533 | |
534 protected synchronized void checkActive(FJTaskRunner t, long scans) { | |
535 | |
536 setInactive(t); | |
537 | |
538 try { | |
539 // if nothing available, do a hard wait | |
540 if (activeCount == 0 && entryQueue.peek() == null) { | |
541 wait(); | |
542 } | |
543 else { | |
544 // If there is possibly some work, | |
545 // sleep for a while before rechecking | |
546 | |
547 long msecs = scans / SCANS_PER_SLEEP; | |
548 if (msecs > MAX_SLEEP_TIME) msecs = MAX_SLEEP_TIME; | |
549 int nsecs = (msecs == 0) ? 1 : 0; // forces shortest possible sleep | |
550 wait(msecs, nsecs); | |
551 } | |
552 } | |
553 catch (InterruptedException ex) { | |
554 notify(); // avoid lost notifies on interrupts | |
555 Thread.currentThread().interrupt(); | |
556 } | |
557 } | |
558 | |
559 /* ------------ Utility methods ------------- */ | |
560 | |
561 /** | |
562 * Start or wake up any threads waiting for work | |
563 **/ | |
564 | |
565 protected synchronized void signalNewTask() { | |
566 if (COLLECT_STATS) ++entries; | |
567 if (nstarted < threads.length) | |
568 threads[nstarted++].start(); | |
569 else | |
570 notify(); | |
571 } | |
572 | |
573 /** | |
574 * Create all FJTaskRunner threads in this group. | |
575 **/ | |
576 | |
577 protected void initializeThreads() { | |
578 for (int i = 0; i < threads.length; ++i) threads[i] = new FJTaskRunner(this); | |
579 } | |
580 | |
581 | |
582 | |
583 | |
584 /** | |
585 * Wrap wait/notify mechanics around a task so that | |
586 * invoke() can wait it out | |
587 **/ | |
588 protected static final class InvokableFJTask extends FJTask { | |
589 protected final Runnable wrapped; | |
590 protected boolean terminated = false; | |
591 | |
592 protected InvokableFJTask(Runnable r) { wrapped = r; } | |
593 | |
594 public void run() { | |
595 try { | |
596 if (wrapped instanceof FJTask) | |
597 FJTask.invoke((FJTask)(wrapped)); | |
598 else | |
599 wrapped.run(); | |
600 } | |
601 finally { | |
602 setTerminated(); | |
603 } | |
604 } | |
605 | |
606 protected synchronized void setTerminated() { | |
607 terminated = true; | |
608 notifyAll(); | |
609 } | |
610 | |
611 protected synchronized void awaitTermination() throws InterruptedException { | |
612 while (!terminated) wait(); | |
613 } | |
614 } | |
615 | |
616 | |
617 } | |
618 |