comparison src/EDU/oswego/cs/dl/util/concurrent/FJTaskRunnerGroup.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:3dc0c5604566
1 /*
2 File: FJTaskRunnerGroup.java
3
4 Originally written by Doug Lea and released into the public domain.
5 This may be used for any purposes whatsoever without acknowledgment.
6 Thanks for the assistance and support of Sun Microsystems Labs,
7 and everyone contributing, testing, and using this code.
8
9 History:
10 Date Who What
11 7Jan1999 dl First public release
12 12Jan1999 dl made getActiveCount public; misc minor cleanup.
13 14Jan1999 dl Added executeTask
14 20Jan1999 dl Allow use of priorities; reformat stats
15 6Feb1999 dl Lazy thread starts
16 27Apr1999 dl Renamed
17 */
18
19 package EDU.oswego.cs.dl.util.concurrent;
20
21 /**
22 * A stripped down analog of a ThreadGroup used for
23 * establishing and managing FJTaskRunner threads.
24 * ThreadRunnerGroups serve as the control boundary separating
25 * the general world of normal threads from the specialized world
26 * of FJTasks.
27 * <p>
28 * By intent, this class does not subclass java.lang.ThreadGroup, and
29 * does not support most methods found in ThreadGroups, since they
30 * would make no sense for FJTaskRunner threads. In fact, the class
31 * does not deal with ThreadGroups at all. If you want to restrict
32 * a FJTaskRunnerGroup to a particular ThreadGroup, you can create
33 * it from within that ThreadGroup.
34 * <p>
35 * The main contextual parameter for a FJTaskRunnerGroup is
36 * the group size, established in the constructor.
37 * Groups must be of a fixed size.
38 * There is no way to dynamically increase or decrease the number
39 * of threads in an existing group.
40 * <p>
41 * In general, the group size should be equal to the number
42 * of CPUs on the system. (Unfortunately, there is no portable
43 * means of automatically detecting the number of CPUs on a JVM, so there is
44 * no good way to automate defaults.) In principle, when
45 * FJTasks are used for computation-intensive tasks, having only
46 * as many threads as CPUs should minimize bookkeeping overhead
47 * and contention, and so maximize throughput. However, because
48 * FJTaskRunners lie atop Java threads, and in turn operating system
49 * thread support and scheduling policies,
50 * it is very possible that using more threads
51 * than CPUs will improve overall throughput even though it adds
52 * to overhead. This will always be so if FJTasks are I/O bound.
53 * So it may pay to experiment a bit when tuning on particular platforms.
54 * You can also use <code>setRunPriorities</code> to either
55 * increase or decrease the priorities of active threads, which
56 * may interact with group size choice.
57 * <p>
58 * In any case, overestimating group sizes never
59 * seriously degrades performance (at least within reasonable bounds).
60 * You can also use a value
61 * less than the number of CPUs in order to reserve processing
62 * for unrelated threads.
63 * <p>
64 * There are two general styles for using a FJTaskRunnerGroup.
65 * You can create one group per entire program execution, for example
66 * as a static singleton, and use it for all parallel tasks:
67 * <pre>
68 * class Tasks {
69 * static FJTaskRunnerGroup group;
70 * public void initialize(int groupsize) {
71 * group = new FJTaskRunnerGroup(groupSize);
72 * }
73 * // ...
74 * }
75 * </pre>
76 * Alternatively, you can make new groups on the fly and use them only for
77 * particular task sets. This is more flexible,,
78 * and leads to more controllable and deterministic execution patterns,
79 * but it encounters greater overhead on startup. Also, to reclaim
80 * system resources, you should
81 * call <code>FJTaskRunnerGroup.interruptAll</code> when you are done
82 * using one-shot groups. Otherwise, because FJTaskRunners set
83 * <code>Thread.isDaemon</code>
84 * status, they will not normally be reclaimed until program termination.
85 * <p>
86 * The main supported methods are <code>execute</code>,
87 * which starts a task processed by FJTaskRunner threads,
88 * and <code>invoke</code>, which starts one and waits for completion.
89 * For example, you might extend the above <code>FJTasks</code>
90 * class to support a task-based computation, say, the
91 * <code>Fib</code> class from the <code>FJTask</code> documentation:
92 * <pre>
93 * class Tasks { // continued
94 * // ...
95 * static int fib(int n) {
96 * try {
97 * Fib f = new Fib(n);
98 * group.invoke(f);
99 * return f.getAnswer();
100 * }
101 * catch (InterruptedException ex) {
102 * throw new Error("Interrupted during computation");
103 * }
104 * }
105 * }
106 * </pre>
107 * <p>
108 * Method <code>stats()</code> can be used to monitor performance.
109 * Both FJTaskRunnerGroup and FJTaskRunner may be compiled with
110 * the compile-time constant COLLECT_STATS set to false. In this
111 * case, various simple counts reported in stats() are not collected.
112 * On platforms tested,
113 * this leads to such a tiny performance improvement that there is
114 * very little motivation to bother.
115 *
116 * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
117 * <p>
118 * @see FJTask
119 * @see FJTaskRunner
120 **/
121
122 public class FJTaskRunnerGroup implements Executor {
123
124 /** The threads in this group **/
125 protected final FJTaskRunner[] threads;
126
127 /** Group-wide queue for tasks entered via execute() **/
128 protected final LinkedQueue entryQueue = new LinkedQueue();
129
130 /** Number of threads that are not waiting for work **/
131 protected int activeCount = 0;
132
133 /** Number of threads that have been started. Used to avoid
134 unecessary contention during startup of task sets.
135 **/
136 protected int nstarted = 0;
137
138 /**
139 * Compile-time constant. If true, various counts of
140 * runs, waits, etc., are maintained. These are NOT
141 * updated with synchronization, so statistics reports
142 * might not be accurate.
143 **/
144
145 static final boolean COLLECT_STATS = true;
146 // static final boolean COLLECT_STATS = false;
147
148 // for stats
149
150 /** The time at which this ThreadRunnerGroup was constructed **/
151 long initTime = 0;
152
153 /** Total number of executes or invokes **/
154 int entries = 0;
155
156 static final int DEFAULT_SCAN_PRIORITY = Thread.MIN_PRIORITY+1;
157
158 /**
159 * Create a FJTaskRunnerGroup with the indicated number
160 * of FJTaskRunner threads. Normally, the best size to use is
161 * the number of CPUs on the system.
162 * <p>
163 * The threads in a FJTaskRunnerGroup are created with their
164 * isDaemon status set, so do not normally need to be
165 * shut down manually upon program termination.
166 **/
167
168 public FJTaskRunnerGroup(int groupSize) {
169 threads = new FJTaskRunner[groupSize];
170 initializeThreads();
171 initTime = System.currentTimeMillis();
172 }
173
174 /**
175 * Arrange for execution of the given task
176 * by placing it in a work queue. If the argument
177 * is not of type FJTask, it is embedded in a FJTask via
178 * <code>FJTask.Wrap</code>.
179 * @exception InterruptedException if current Thread is
180 * currently interrupted
181 **/
182
183 public void execute(Runnable r) throws InterruptedException {
184 if (r instanceof FJTask) {
185 entryQueue.put((FJTask)r);
186 }
187 else {
188 entryQueue.put(new FJTask.Wrap(r));
189 }
190 signalNewTask();
191 }
192
193
194 /**
195 * Specialized form of execute called only from within FJTasks
196 **/
197 public void executeTask(FJTask t) {
198 try {
199 entryQueue.put(t);
200 signalNewTask();
201 }
202 catch (InterruptedException ex) {
203 Thread.currentThread().interrupt();
204 }
205 }
206
207
208 /**
209 * Start a task and wait it out. Returns when the task completes.
210 * @exception InterruptedException if current Thread is
211 * interrupted before completion of the task.
212 **/
213
214 public void invoke(Runnable r) throws InterruptedException {
215 InvokableFJTask w = new InvokableFJTask(r);
216 entryQueue.put(w);
217 signalNewTask();
218 w.awaitTermination();
219 }
220
221
222 /**
223 * Try to shut down all FJTaskRunner threads in this group
224 * by interrupting them all. This method is designed
225 * to be used during cleanup when it is somehow known
226 * that all threads are idle.
227 * FJTaskRunners only
228 * check for interruption when they are not otherwise
229 * processing a task (and its generated subtasks,
230 * if any), so if any threads are active, shutdown may
231 * take a while, and may lead to unpredictable
232 * task processing.
233 **/
234
235 public void interruptAll() {
236 // paranoically interrupt current thread last if in group.
237 Thread current = Thread.currentThread();
238 boolean stopCurrent = false;
239
240 for (int i = 0; i < threads.length; ++i) {
241 Thread t = threads[i];
242 if (t == current)
243 stopCurrent = true;
244 else
245 t.interrupt();
246 }
247 if (stopCurrent)
248 current.interrupt();
249 }
250
251
252 /**
253 * Set the priority to use while a FJTaskRunner is
254 * polling for new tasks to perform. Default
255 * is currently Thread.MIN_PRIORITY+1. The value
256 * set may not go into effect immediately, but
257 * will be used at least the next time a thread scans for work.
258 **/
259 public synchronized void setScanPriorities(int pri) {
260 for (int i = 0; i < threads.length; ++i) {
261 FJTaskRunner t = threads[i];
262 t.setScanPriority(pri);
263 if (!t.active) t.setPriority(pri);
264 }
265 }
266
267
268 /**
269 * Set the priority to use while a FJTaskRunner is
270 * actively running tasks. Default
271 * is the priority that was in effect by the thread that
272 * constructed this FJTaskRunnerGroup. Setting this value
273 * while threads are running may momentarily result in
274 * them running at this priority even when idly waiting for work.
275 **/
276 public synchronized void setRunPriorities(int pri) {
277 for (int i = 0; i < threads.length; ++i) {
278 FJTaskRunner t = threads[i];
279 t.setRunPriority(pri);
280 if (t.active) t.setPriority(pri);
281 }
282 }
283
284
285
286 /** Return the number of FJTaskRunner threads in this group **/
287
288 public int size() { return threads.length; }
289
290
291 /**
292 * Return the number of threads that are not idly waiting for work.
293 * Beware that even active threads might not be doing any useful
294 * work, but just spinning waiting for other dependent tasks.
295 * Also, since this is just a snapshot value, some tasks
296 * may be in the process of becoming idle.
297 **/
298 public synchronized int getActiveCount() { return activeCount; }
299
300 /**
301 * Prints various snapshot statistics to System.out.
302 * <ul>
303 * <li> For each FJTaskRunner thread (labeled as T<em>n</em>, for
304 * <em>n</em> from zero to group size - 1):
305 * <ul>
306 * <li> A star "*" is printed if the thread is currently active;
307 * that is, not sleeping while waiting for work. Because
308 * threads gradually enter sleep modes, an active thread
309 * may in fact be about to sleep (or wake up).
310 * <li> <em>Q Cap</em> The current capacity of its task queue.
311 * <li> <em>Run</em> The total number of tasks that have been run.
312 * <li> <em>New</em> The number of these tasks that were
313 * taken from either the entry queue or from other
314 * thread queues; that is, the number of tasks run
315 * that were <em>not</em> forked by the thread itself.
316 * <li> <em>Scan</em> The number of times other task
317 * queues or the entry queue were polled for tasks.
318 * </ul>
319 * <li> <em>Execute</em> The total number of tasks entered
320 * (but not necessarily yet run) via execute or invoke.
321 * <li> <em>Time</em> Time in seconds since construction of this
322 * FJTaskRunnerGroup.
323 * <li> <em>Rate</em> The total number of tasks processed
324 * per second across all threads. This
325 * may be useful as a simple throughput indicator
326 * if all processed tasks take approximately the
327 * same time to run.
328 * </ul>
329 * <p>
330 * Cautions: Some statistics are updated and gathered
331 * without synchronization,
332 * so may not be accurate. However, reported counts may be considered
333 * as lower bounds of actual values.
334 * Some values may be zero if classes are compiled
335 * with COLLECT_STATS set to false. (FJTaskRunner and FJTaskRunnerGroup
336 * classes can be independently compiled with different values of
337 * COLLECT_STATS.) Also, the counts are maintained as ints so could
338 * overflow in exceptionally long-lived applications.
339 * <p>
340 * These statistics can be useful when tuning algorithms or diagnosing
341 * problems. For example:
342 * <ul>
343 * <li> High numbers of scans may mean that there is insufficient
344 * parallelism to keep threads busy. However, high scan rates
345 * are expected if the number
346 * of Executes is also high or there is a lot of global
347 * synchronization in the application, and the system is not otherwise
348 * busy. Threads may scan
349 * for work hundreds of times upon startup, shutdown, and
350 * global synch points of task sets.
351 * <li> Large imbalances in tasks run across different threads might
352 * just reflect contention with unrelated threads on a system
353 * (possibly including JVM threads such as GC), but may also
354 * indicate some systematic bias in how you generate tasks.
355 * <li> Large task queue capacities may mean that too many tasks are being
356 * generated before they can be run.
357 * Capacities are reported rather than current numbers of tasks
358 * in queues because they are better indicators of the existence
359 * of these kinds of possibly-transient problems.
360 * Queue capacities are
361 * resized on demand from their initial value of 4096 elements,
362 * which is much more than sufficient for the kinds of
363 * applications that this framework is intended to best support.
364 * </ul>
365 **/
366
367 public void stats() {
368 long time = System.currentTimeMillis() - initTime;
369 double secs = ((double)time) / 1000.0;
370 long totalRuns = 0;
371 long totalScans = 0;
372 long totalSteals = 0;
373
374 System.out.print("Thread" +
375 "\tQ Cap" +
376 "\tScans" +
377 "\tNew" +
378 "\tRuns" +
379 "\n");
380
381 for (int i = 0; i < threads.length; ++i) {
382 FJTaskRunner t = threads[i];
383 int truns = t.runs;
384 totalRuns += truns;
385
386 int tscans = t.scans;
387 totalScans += tscans;
388
389 int tsteals = t.steals;
390 totalSteals += tsteals;
391
392 String star = (getActive(t))? "*" : " ";
393
394
395 System.out.print("T" + i + star +
396 "\t" + t.deqSize() +
397 "\t" + tscans +
398 "\t" + tsteals +
399 "\t" + truns +
400 "\n");
401 }
402
403 System.out.print("Total" +
404 "\t " +
405 "\t" + totalScans +
406 "\t" + totalSteals +
407 "\t" + totalRuns +
408 "\n");
409
410 System.out.print("Execute: " + entries);
411
412 System.out.print("\tTime: " + secs);
413
414 long rps = 0;
415 if (secs != 0) rps = Math.round((double)(totalRuns) / secs);
416
417 System.out.println("\tRate: " + rps);
418 }
419
420
421 /* ------------ Methods called only by FJTaskRunners ------------- */
422
423
424 /**
425 * Return the array of threads in this group.
426 * Called only by FJTaskRunner.scan().
427 **/
428
429 protected FJTaskRunner[] getArray() { return threads; }
430
431
432 /**
433 * Return a task from entry queue, or null if empty.
434 * Called only by FJTaskRunner.scan().
435 **/
436
437 protected FJTask pollEntryQueue() {
438 try {
439 FJTask t = (FJTask)(entryQueue.poll(0));
440 return t;
441 }
442 catch(InterruptedException ex) { // ignore interrupts
443 Thread.currentThread().interrupt();
444 return null;
445 }
446 }
447
448
449 /**
450 * Return active status of t.
451 * Per-thread active status can only be accessed and
452 * modified via synchronized method here in the group class.
453 **/
454
455 protected synchronized boolean getActive(FJTaskRunner t) {
456 return t.active;
457 }
458
459
460 /**
461 * Set active status of thread t to true, and notify others
462 * that might be waiting for work.
463 **/
464
465 protected synchronized void setActive(FJTaskRunner t) {
466 if (!t.active) {
467 t.active = true;
468 ++activeCount;
469 if (nstarted < threads.length)
470 threads[nstarted++].start();
471 else
472 notifyAll();
473 }
474 }
475
476 /**
477 * Set active status of thread t to false.
478 **/
479
480 protected synchronized void setInactive(FJTaskRunner t) {
481 if (t.active) {
482 t.active = false;
483 --activeCount;
484 }
485 }
486
487 /**
488 * The number of times to scan other threads for tasks
489 * before transitioning to a mode where scans are
490 * interleaved with sleeps (actually timed waits).
491 * Upon transition, sleeps are for duration of
492 * scans / SCANS_PER_SLEEP milliseconds.
493 * <p>
494 * This is not treated as a user-tunable parameter because
495 * good values do not appear to vary much across JVMs or
496 * applications. Its main role is to help avoid some
497 * useless spinning and contention during task startup.
498 **/
499 static final long SCANS_PER_SLEEP = 15;
500
501 /**
502 * The maximum time (in msecs) to sleep when a thread is idle,
503 * yet others are not, so may eventually generate work that
504 * the current thread can steal. This value reflects the maximum time
505 * that a thread may sleep when it possibly should not, because there
506 * are other active threads that might generate work. In practice,
507 * designs in which some threads become stalled because others
508 * are running yet not generating tasks are not likely to work
509 * well in this framework anyway, so the exact value does not matter
510 * too much. However, keeping it in the sub-second range does
511 * help smooth out startup and shutdown effects.
512 **/
513
514 static final long MAX_SLEEP_TIME = 100;
515
516 /**
517 * Set active status of thread t to false, and
518 * then wait until: (a) there is a task in the entry
519 * queue, or (b) other threads are active, or (c) the current
520 * thread is interrupted. Upon return, it
521 * is not certain that there will be work available.
522 * The thread must itself check.
523 * <p>
524 * The main underlying reason
525 * for these mechanics is that threads do not
526 * signal each other when they add elements to their queues.
527 * (This would add to task overhead, reduce locality.
528 * and increase contention.)
529 * So we must rely on a tamed form of polling. However, tasks
530 * inserted into the entry queue do result in signals, so
531 * tasks can wait on these if all of them are otherwise idle.
532 **/
533
534 protected synchronized void checkActive(FJTaskRunner t, long scans) {
535
536 setInactive(t);
537
538 try {
539 // if nothing available, do a hard wait
540 if (activeCount == 0 && entryQueue.peek() == null) {
541 wait();
542 }
543 else {
544 // If there is possibly some work,
545 // sleep for a while before rechecking
546
547 long msecs = scans / SCANS_PER_SLEEP;
548 if (msecs > MAX_SLEEP_TIME) msecs = MAX_SLEEP_TIME;
549 int nsecs = (msecs == 0) ? 1 : 0; // forces shortest possible sleep
550 wait(msecs, nsecs);
551 }
552 }
553 catch (InterruptedException ex) {
554 notify(); // avoid lost notifies on interrupts
555 Thread.currentThread().interrupt();
556 }
557 }
558
559 /* ------------ Utility methods ------------- */
560
561 /**
562 * Start or wake up any threads waiting for work
563 **/
564
565 protected synchronized void signalNewTask() {
566 if (COLLECT_STATS) ++entries;
567 if (nstarted < threads.length)
568 threads[nstarted++].start();
569 else
570 notify();
571 }
572
573 /**
574 * Create all FJTaskRunner threads in this group.
575 **/
576
577 protected void initializeThreads() {
578 for (int i = 0; i < threads.length; ++i) threads[i] = new FJTaskRunner(this);
579 }
580
581
582
583
584 /**
585 * Wrap wait/notify mechanics around a task so that
586 * invoke() can wait it out
587 **/
588 protected static final class InvokableFJTask extends FJTask {
589 protected final Runnable wrapped;
590 protected boolean terminated = false;
591
592 protected InvokableFJTask(Runnable r) { wrapped = r; }
593
594 public void run() {
595 try {
596 if (wrapped instanceof FJTask)
597 FJTask.invoke((FJTask)(wrapped));
598 else
599 wrapped.run();
600 }
601 finally {
602 setTerminated();
603 }
604 }
605
606 protected synchronized void setTerminated() {
607 terminated = true;
608 notifyAll();
609 }
610
611 protected synchronized void awaitTermination() throws InterruptedException {
612 while (!terminated) wait();
613 }
614 }
615
616
617 }
618