Mercurial > hg > blitz_condensed
comparison src/EDU/oswego/cs/dl/util/concurrent/PooledExecutor.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:3dc0c5604566 |
---|---|
1 /* | |
2 File: PooledExecutor.java | |
3 | |
4 Originally written by Doug Lea and released into the public domain. | |
5 This may be used for any purposes whatsoever without acknowledgment. | |
6 Thanks for the assistance and support of Sun Microsystems Labs, | |
7 and everyone contributing, testing, and using this code. | |
8 | |
9 History: | |
10 Date Who What | |
11 19Jun1998 dl Create public version | |
12 29aug1998 dl rely on ThreadFactoryUser, | |
13 remove ThreadGroup-based methods | |
14 adjusted locking policies | |
15 3mar1999 dl Worker threads sense decreases in pool size | |
16 31mar1999 dl Allow supplied channel in constructor; | |
17 add methods createThreads, drain | |
18 15may1999 dl Allow infinite keepalives | |
19 21oct1999 dl add minimumPoolSize methods | |
20 7sep2000 dl BlockedExecutionHandler now an interface, | |
21 new DiscardOldestWhenBlocked policy | |
22 12oct2000 dl add shutdownAfterProcessingCurrentlyQueuedTasks | |
23 13nov2000 dl null out task ref after run | |
24 08apr2001 dl declare inner class ctor protected | |
25 12nov2001 dl Better shutdown support | |
26 Blocked exec handlers can throw IE | |
27 Simplify locking scheme | |
28 25jan2001 dl {get,set}BlockedExecutionHandler now public | |
29 17may2002 dl null out task var in worker run to enable GC. | |
30 */ | |
31 | |
32 package EDU.oswego.cs.dl.util.concurrent; | |
33 import java.util.*; | |
34 | |
35 /** | |
36 * A tunable, extensible thread pool class. The main supported public | |
37 * method is <code>execute(Runnable command)</code>, which can be | |
38 * called instead of directly creating threads to execute commands. | |
39 * | |
40 * <p> | |
41 * Thread pools can be useful for several, usually intertwined | |
42 * reasons: | |
43 * | |
44 * <ul> | |
45 * | |
46 * <li> To bound resource use. A limit can be placed on the maximum | |
47 * number of simultaneously executing threads. | |
48 * | |
49 * <li> To manage concurrency levels. A targeted number of threads | |
50 * can be allowed to execute simultaneously. | |
51 * | |
52 * <li> To manage a set of threads performing related tasks. | |
53 * | |
54 * <li> To minimize overhead, by reusing previously constructed | |
55 * Thread objects rather than creating new ones. (Note however | |
56 * that pools are hardly ever cure-alls for performance problems | |
57 * associated with thread construction, especially on JVMs that | |
58 * themselves internally pool or recycle threads.) | |
59 * | |
60 * </ul> | |
61 * | |
62 * These goals introduce a number of policy parameters that are | |
63 * encapsulated in this class. All of these parameters have defaults | |
64 * and are tunable, either via get/set methods, or, in cases where | |
65 * decisions should hold across lifetimes, via methods that can be | |
66 * easily overridden in subclasses. The main, most commonly set | |
67 * parameters can be established in constructors. Policy choices | |
68 * across these dimensions can and do interact. Be careful, and | |
69 * please read this documentation completely before using! See also | |
70 * the usage examples below. | |
71 * | |
72 * <dl> | |
73 * <dt> Queueing | |
74 * | |
75 * <dd> By default, this pool uses queueless synchronous channels to | |
76 * to hand off work to threads. This is a safe, conservative policy | |
77 * that avoids lockups when handling sets of requests that might | |
78 * have internal dependencies. (In these cases, queuing one task | |
79 * could lock up another that would be able to continue if the | |
80 * queued task were to run.) If you are sure that this cannot | |
81 * happen, then you can instead supply a queue of some sort (for | |
82 * example, a BoundedBuffer or LinkedQueue) in the constructor. | |
83 * This will cause new commands to be queued in cases where all | |
84 * MaximumPoolSize threads are busy. Queues are sometimes | |
85 * appropriate when each task is completely independent of others, | |
86 * so tasks cannot affect each others execution. For example, in an | |
87 * http server. <p> | |
88 * | |
89 * When given a choice, this pool always prefers adding a new thread | |
90 * rather than queueing if there are currently fewer than the | |
91 * current getMinimumPoolSize threads running, but otherwise always | |
92 * prefers queuing a request rather than adding a new thread. Thus, | |
93 * if you use an unbounded buffer, you will never have more than | |
94 * getMinimumPoolSize threads running. (Since the default | |
95 * minimumPoolSize is one, you will probably want to explicitly | |
96 * setMinimumPoolSize.) <p> | |
97 * | |
98 * While queuing can be useful in smoothing out transient bursts of | |
99 * requests, especially in socket-based services, it is not very | |
100 * well behaved when commands continue to arrive on average faster | |
101 * than they can be processed. Using bounds for both the queue and | |
102 * the pool size, along with run-when-blocked policy is often a | |
103 * reasonable response to such possibilities. <p> | |
104 * | |
105 * Queue sizes and maximum pool sizes can often be traded off for | |
106 * each other. Using large queues and small pools minimizes CPU | |
107 * usage, OS resources, and context-switching overhead, but can lead | |
108 * to artifically low throughput. Especially if tasks frequently | |
109 * block (for example if they are I/O bound), a JVM and underlying | |
110 * OS may be able to schedule time for more threads than you | |
111 * otherwise allow. Use of small queues or queueless handoffs | |
112 * generally requires larger pool sizes, which keeps CPUs busier but | |
113 * may encounter unacceptable scheduling overhead, which also | |
114 * decreases throughput. <p> | |
115 * | |
116 * <dt> Maximum Pool size | |
117 * | |
118 * <dd> The maximum number of threads to use, when needed. The pool | |
119 * does not by default preallocate threads. Instead, a thread is | |
120 * created, if necessary and if there are fewer than the maximum, | |
121 * only when an <code>execute</code> request arrives. The default | |
122 * value is (for all practical purposes) infinite -- | |
123 * <code>Integer.MAX_VALUE</code>, so should be set in the | |
124 * constructor or the set method unless you are just using the pool | |
125 * to minimize construction overhead. Because task handoffs to idle | |
126 * worker threads require synchronization that in turn relies on JVM | |
127 * scheduling policies to ensure progress, it is possible that a new | |
128 * thread will be created even though an existing worker thread has | |
129 * just become idle but has not progressed to the point at which it | |
130 * can accept a new task. This phenomenon tends to occur on some | |
131 * JVMs when bursts of short tasks are executed. <p> | |
132 * | |
133 * <dt> Minimum Pool size | |
134 * | |
135 * <dd> The minimum number of threads to use, when needed (default | |
136 * 1). When a new request is received, and fewer than the minimum | |
137 * number of threads are running, a new thread is always created to | |
138 * handle the request even if other worker threads are idly waiting | |
139 * for work. Otherwise, a new thread is created only if there are | |
140 * fewer than the maximum and the request cannot immediately be | |
141 * queued. <p> | |
142 * | |
143 * <dt> Preallocation | |
144 * | |
145 * <dd> You can override lazy thread construction policies via | |
146 * method createThreads, which establishes a given number of warm | |
147 * threads. Be aware that these preallocated threads will time out | |
148 * and die (and later be replaced with others if needed) if not used | |
149 * within the keep-alive time window. If you use preallocation, you | |
150 * probably want to increase the keepalive time. The difference | |
151 * between setMinimumPoolSize and createThreads is that | |
152 * createThreads immediately establishes threads, while setting the | |
153 * minimum pool size waits until requests arrive. <p> | |
154 * | |
155 * <dt> Keep-alive time | |
156 * | |
157 * <dd> If the pool maintained references to a fixed set of threads | |
158 * in the pool, then it would impede garbage collection of otherwise | |
159 * idle threads. This would defeat the resource-management aspects | |
160 * of pools. One solution would be to use weak references. However, | |
161 * this would impose costly and difficult synchronization issues. | |
162 * Instead, threads are simply allowed to terminate and thus be | |
163 * GCable if they have been idle for the given keep-alive time. The | |
164 * value of this parameter represents a trade-off between GCability | |
165 * and construction time. In most current Java VMs, thread | |
166 * construction and cleanup overhead is on the order of | |
167 * milliseconds. The default keep-alive value is one minute, which | |
168 * means that the time needed to construct and then GC a thread is | |
169 * expended at most once per minute. | |
170 * <p> | |
171 * | |
172 * To establish worker threads permanently, use a <em>negative</em> | |
173 * argument to setKeepAliveTime. <p> | |
174 * | |
175 * <dt> Blocked execution policy | |
176 * | |
177 * <dd> If the maximum pool size or queue size is bounded, then it | |
178 * is possible for incoming <code>execute</code> requests to | |
179 * block. There are four supported policies for handling this | |
180 * problem, and mechanics (based on the Strategy Object pattern) to | |
181 * allow others in subclasses: <p> | |
182 * | |
183 * <dl> | |
184 * <dt> Run (the default) | |
185 * <dd> The thread making the <code>execute</code> request | |
186 * runs the task itself. This policy helps guard against lockup. | |
187 * <dt> Wait | |
188 * <dd> Wait until a thread becomes available. | |
189 * <dt> Abort | |
190 * <dd> Throw a RuntimeException | |
191 * <dt> Discard | |
192 * <dd> Throw away the current request and return. | |
193 * <dt> DiscardOldest | |
194 * <dd> Throw away the oldest request and return. | |
195 * </dl> | |
196 * | |
197 * Other plausible policies include raising the maximum pool size | |
198 * after checking with some other objects that this is OK. <p> | |
199 * | |
200 * These cases can never occur if the maximum pool size is unbounded | |
201 * or the queue is unbounded. In these cases you instead face | |
202 * potential resource exhaustion.) The execute method does not | |
203 * throw any checked exceptions in any of these cases since any | |
204 * errors associated with them must normally be dealt with via | |
205 * handlers or callbacks. (Although in some cases, these might be | |
206 * associated with throwing unchecked exceptions.) You may wish to | |
207 * add special implementations even if you choose one of the listed | |
208 * policies. For example, the supplied Discard policy does not | |
209 * inform the caller of the drop. You could add your own version | |
210 * that does so. Since choice of policies is normally a system-wide | |
211 * decision, selecting a policy affects all calls to | |
212 * <code>execute</code>. If for some reason you would instead like | |
213 * to make per-call decisions, you could add variant versions of the | |
214 * <code>execute</code> method (for example, | |
215 * <code>executeIfWouldNotBlock</code>) in subclasses. <p> | |
216 * | |
217 * <dt> Thread construction parameters | |
218 * | |
219 * <dd> A settable ThreadFactory establishes each new thread. By | |
220 * default, it merely generates a new instance of class Thread, but | |
221 * can be changed to use a Thread subclass, to set priorities, | |
222 * ThreadLocals, etc. <p> | |
223 * | |
224 * <dt> Interruption policy | |
225 * | |
226 * <dd> Worker threads check for interruption after processing each | |
227 * command, and terminate upon interruption. Fresh threads will | |
228 * replace them if needed. Thus, new tasks will not start out in an | |
229 * interrupted state due to an uncleared interruption in a previous | |
230 * task. Also, unprocessed commands are never dropped upon | |
231 * interruption. It would conceptually suffice simply to clear | |
232 * interruption between tasks, but implementation characteristics of | |
233 * interruption-based methods are uncertain enough to warrant this | |
234 * conservative strategy. It is a good idea to be equally | |
235 * conservative in your code for the tasks running within pools. | |
236 * <p> | |
237 * | |
238 * <dt> Shutdown policy | |
239 * | |
240 * <dd> The interruptAll method interrupts, but does not disable the | |
241 * pool. Two different shutdown methods are supported for use when | |
242 * you do want to (permanently) stop processing tasks. Method | |
243 * shutdownAfterProcessingCurrentlyQueuedTasks waits until all | |
244 * current tasks are finished. The shutDownNow method interrupts | |
245 * current threads and leaves other queued requests unprocessed. | |
246 * <p> | |
247 * | |
248 * <dt> Handling requests after shutdown | |
249 * | |
250 * <dd> When the pool is shutdown, new incoming requests are handled | |
251 * by the blockedExecutionHandler. By default, the handler is set to | |
252 * discard new requests, but this can be set with an optional | |
253 * argument to method | |
254 * shutdownAfterProcessingCurrentlyQueuedTasks. <p> Also, if you are | |
255 * using some form of queuing, you may wish to call method drain() | |
256 * to remove (and return) unprocessed commands from the queue after | |
257 * shutting down the pool and its clients. If you need to be sure | |
258 * these commands are processed, you can then run() each of the | |
259 * commands in the list returned by drain(). | |
260 * | |
261 * </dl> | |
262 * <p> | |
263 * | |
264 * <b>Usage examples.</b> | |
265 * <p> | |
266 * | |
267 * Probably the most common use of pools is in statics or singletons | |
268 * accessible from a number of classes in a package; for example: | |
269 * | |
270 * <pre> | |
271 * class MyPool { | |
272 * // initialize to use a maximum of 8 threads. | |
273 * static PooledExecutor pool = new PooledExecutor(8); | |
274 * } | |
275 * </pre> | |
276 * Here are some sample variants in initialization: | |
277 * <ol> | |
278 * <li> Using a bounded buffer of 10 tasks, at least 4 threads (started only | |
279 * when needed due to incoming requests), but allowing | |
280 * up to 100 threads if the buffer gets full. | |
281 * <pre> | |
282 * pool = new PooledExecutor(new BoundedBuffer(10), 100); | |
283 * pool.setMinimumPoolSize(4); | |
284 * </pre> | |
285 * <li> Same as (1), except pre-start 9 threads, allowing them to | |
286 * die if they are not used for five minutes. | |
287 * <pre> | |
288 * pool = new PooledExecutor(new BoundedBuffer(10), 100); | |
289 * pool.setMinimumPoolSize(4); | |
290 * pool.setKeepAliveTime(1000 * 60 * 5); | |
291 * pool.createThreads(9); | |
292 * </pre> | |
293 * <li> Same as (2) except clients block if both the buffer is full and | |
294 * all 100 threads are busy: | |
295 * <pre> | |
296 * pool = new PooledExecutor(new BoundedBuffer(10), 100); | |
297 * pool.setMinimumPoolSize(4); | |
298 * pool.setKeepAliveTime(1000 * 60 * 5); | |
299 * pool.waitWhenBlocked(); | |
300 * pool.createThreads(9); | |
301 * </pre> | |
302 * <li> An unbounded queue serviced by exactly 5 threads: | |
303 * <pre> | |
304 * pool = new PooledExecutor(new LinkedQueue()); | |
305 * pool.setKeepAliveTime(-1); // live forever | |
306 * pool.createThreads(5); | |
307 * </pre> | |
308 * </ol> | |
309 * | |
310 * <p> | |
311 * <b>Usage notes.</b> | |
312 * <p> | |
313 * | |
314 * Pools do not mesh well with using thread-specific storage via | |
315 * java.lang.ThreadLocal. ThreadLocal relies on the identity of a | |
316 * thread executing a particular task. Pools use the same thread to | |
317 * perform different tasks. <p> | |
318 * | |
319 * If you need a policy not handled by the parameters in this class | |
320 * consider writing a subclass. <p> | |
321 * | |
322 * Version note: Previous versions of this class relied on | |
323 * ThreadGroups for aggregate control. This has been removed, and the | |
324 * method interruptAll added, to avoid differences in behavior across | |
325 * JVMs. | |
326 * | |
327 * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] | |
328 **/ | |
329 | |
330 public class PooledExecutor extends ThreadFactoryUser implements Executor { | |
331 | |
332 /** | |
333 * The maximum pool size; used if not otherwise specified. Default | |
334 * value is essentially infinite (Integer.MAX_VALUE) | |
335 **/ | |
336 public static final int DEFAULT_MAXIMUMPOOLSIZE = Integer.MAX_VALUE; | |
337 | |
338 /** | |
339 * The minimum pool size; used if not otherwise specified. Default | |
340 * value is 1. | |
341 **/ | |
342 public static final int DEFAULT_MINIMUMPOOLSIZE = 1; | |
343 | |
344 /** | |
345 * The maximum time to keep worker threads alive waiting for new | |
346 * tasks; used if not otherwise specified. Default value is one | |
347 * minute (60000 milliseconds). | |
348 **/ | |
349 public static final long DEFAULT_KEEPALIVETIME = 60 * 1000; | |
350 | |
351 /** The maximum number of threads allowed in pool. **/ | |
352 protected int maximumPoolSize_ = DEFAULT_MAXIMUMPOOLSIZE; | |
353 | |
354 /** The minumum number of threads to maintain in pool. **/ | |
355 protected int minimumPoolSize_ = DEFAULT_MINIMUMPOOLSIZE; | |
356 | |
357 /** Current pool size. **/ | |
358 protected int poolSize_ = 0; | |
359 | |
360 /** The maximum time for an idle thread to wait for new task. **/ | |
361 protected long keepAliveTime_ = DEFAULT_KEEPALIVETIME; | |
362 | |
363 /** | |
364 * Shutdown flag - latches true when a shutdown method is called | |
365 * in order to disable queuing/handoffs of new tasks. | |
366 **/ | |
367 protected boolean shutdown_ = false; | |
368 | |
369 /** | |
370 * The channel used to hand off the command to a thread in the pool. | |
371 **/ | |
372 protected final Channel handOff_; | |
373 | |
374 /** | |
375 * The set of active threads, declared as a map from workers to | |
376 * their threads. This is needed by the interruptAll method. It | |
377 * may also be useful in subclasses that need to perform other | |
378 * thread management chores. | |
379 **/ | |
380 protected final Map threads_; | |
381 | |
382 /** The current handler for unserviceable requests. **/ | |
383 protected BlockedExecutionHandler blockedExecutionHandler_; | |
384 | |
385 /** | |
386 * Create a new pool with all default settings | |
387 **/ | |
388 | |
389 public PooledExecutor() { | |
390 this (new SynchronousChannel(), DEFAULT_MAXIMUMPOOLSIZE); | |
391 } | |
392 | |
393 /** | |
394 * Create a new pool with all default settings except | |
395 * for maximum pool size. | |
396 **/ | |
397 | |
398 public PooledExecutor(int maxPoolSize) { | |
399 this(new SynchronousChannel(), maxPoolSize); | |
400 } | |
401 | |
402 /** | |
403 * Create a new pool that uses the supplied Channel for queuing, and | |
404 * with all default parameter settings. | |
405 **/ | |
406 | |
407 public PooledExecutor(Channel channel) { | |
408 this(channel, DEFAULT_MAXIMUMPOOLSIZE); | |
409 } | |
410 | |
411 /** | |
412 * Create a new pool that uses the supplied Channel for queuing, and | |
413 * with all default parameter settings except for maximum pool size. | |
414 **/ | |
415 | |
416 public PooledExecutor(Channel channel, int maxPoolSize) { | |
417 maximumPoolSize_ = maxPoolSize; | |
418 handOff_ = channel; | |
419 runWhenBlocked(); | |
420 threads_ = new HashMap(); | |
421 } | |
422 | |
423 /** | |
424 * Return the maximum number of threads to simultaneously execute | |
425 * New unqueued requests will be handled according to the current | |
426 * blocking policy once this limit is exceeded. | |
427 **/ | |
428 public synchronized int getMaximumPoolSize() { | |
429 return maximumPoolSize_; | |
430 } | |
431 | |
432 /** | |
433 * Set the maximum number of threads to use. Decreasing the pool | |
434 * size will not immediately kill existing threads, but they may | |
435 * later die when idle. | |
436 * @exception IllegalArgumentException if less or equal to zero. | |
437 * (It is | |
438 * not considered an error to set the maximum to be less than than | |
439 * the minimum. However, in this case there are no guarantees | |
440 * about behavior.) | |
441 **/ | |
442 public synchronized void setMaximumPoolSize(int newMaximum) { | |
443 if (newMaximum <= 0) throw new IllegalArgumentException(); | |
444 maximumPoolSize_ = newMaximum; | |
445 } | |
446 | |
447 /** | |
448 * Return the minimum number of threads to simultaneously execute. | |
449 * (Default value is 1). If fewer than the mininum number are | |
450 * running upon reception of a new request, a new thread is started | |
451 * to handle this request. | |
452 **/ | |
453 public synchronized int getMinimumPoolSize() { | |
454 return minimumPoolSize_; | |
455 } | |
456 | |
457 /** | |
458 * Set the minimum number of threads to use. | |
459 * @exception IllegalArgumentException if less than zero. (It is not | |
460 * considered an error to set the minimum to be greater than the | |
461 * maximum. However, in this case there are no guarantees about | |
462 * behavior.) | |
463 **/ | |
464 public synchronized void setMinimumPoolSize(int newMinimum) { | |
465 if (newMinimum < 0) throw new IllegalArgumentException(); | |
466 minimumPoolSize_ = newMinimum; | |
467 } | |
468 | |
469 /** | |
470 * Return the current number of active threads in the pool. This | |
471 * number is just a snaphot, and may change immediately upon | |
472 * returning | |
473 **/ | |
474 public synchronized int getPoolSize() { | |
475 return poolSize_; | |
476 } | |
477 | |
478 /** | |
479 * Return the number of milliseconds to keep threads alive waiting | |
480 * for new commands. A negative value means to wait forever. A zero | |
481 * value means not to wait at all. | |
482 **/ | |
483 public synchronized long getKeepAliveTime() { | |
484 return keepAliveTime_; | |
485 } | |
486 | |
487 /** | |
488 * Set the number of milliseconds to keep threads alive waiting for | |
489 * new commands. A negative value means to wait forever. A zero | |
490 * value means not to wait at all. | |
491 **/ | |
492 public synchronized void setKeepAliveTime(long msecs) { | |
493 keepAliveTime_ = msecs; | |
494 } | |
495 | |
496 /** Get the handler for blocked execution **/ | |
497 public synchronized BlockedExecutionHandler getBlockedExecutionHandler() { | |
498 return blockedExecutionHandler_; | |
499 } | |
500 | |
501 /** Set the handler for blocked execution **/ | |
502 public synchronized void setBlockedExecutionHandler(BlockedExecutionHandler h) { | |
503 blockedExecutionHandler_ = h; | |
504 } | |
505 | |
506 /** | |
507 * Create and start a thread to handle a new command. Call only | |
508 * when holding lock. | |
509 **/ | |
510 protected void addThread(Runnable command) { | |
511 Worker worker = new Worker(command); | |
512 Thread thread = getThreadFactory().newThread(worker); | |
513 threads_.put(worker, thread); | |
514 ++poolSize_; | |
515 thread.start(); | |
516 } | |
517 | |
518 /** | |
519 * Create and start up to numberOfThreads threads in the pool. | |
520 * Return the number created. This may be less than the number | |
521 * requested if creating more would exceed maximum pool size bound. | |
522 **/ | |
523 public int createThreads(int numberOfThreads) { | |
524 int ncreated = 0; | |
525 for (int i = 0; i < numberOfThreads; ++i) { | |
526 synchronized(this) { | |
527 if (poolSize_ < maximumPoolSize_) { | |
528 addThread(null); | |
529 ++ncreated; | |
530 } | |
531 else | |
532 break; | |
533 } | |
534 } | |
535 return ncreated; | |
536 } | |
537 | |
538 /** | |
539 * Interrupt all threads in the pool, causing them all to | |
540 * terminate. Assuming that executed tasks do not disable (clear) | |
541 * interruptions, each thread will terminate after processing its | |
542 * current task. Threads will terminate sooner if the executed tasks | |
543 * themselves respond to interrupts. | |
544 **/ | |
545 public synchronized void interruptAll() { | |
546 for (Iterator it = threads_.values().iterator(); it.hasNext(); ) { | |
547 Thread t = (Thread)(it.next()); | |
548 t.interrupt(); | |
549 } | |
550 } | |
551 | |
552 /** | |
553 * Interrupt all threads and disable construction of new | |
554 * threads. Any tasks entered after this point will be discarded. A | |
555 * shut down pool cannot be restarted. | |
556 */ | |
557 public void shutdownNow() { | |
558 shutdownNow(new DiscardWhenBlocked()); | |
559 } | |
560 | |
561 /** | |
562 * Interrupt all threads and disable construction of new | |
563 * threads. Any tasks entered after this point will be handled by | |
564 * the given BlockedExecutionHandler. A shut down pool cannot be | |
565 * restarted. | |
566 */ | |
567 public synchronized void shutdownNow(BlockedExecutionHandler handler) { | |
568 setBlockedExecutionHandler(handler); | |
569 shutdown_ = true; // don't allow new tasks | |
570 minimumPoolSize_ = maximumPoolSize_ = 0; // don't make new threads | |
571 interruptAll(); // interrupt all existing threads | |
572 } | |
573 | |
574 /** | |
575 * Terminate threads after processing all elements currently in | |
576 * queue. Any tasks entered after this point will be discarded. A | |
577 * shut down pool cannot be restarted. | |
578 **/ | |
579 public void shutdownAfterProcessingCurrentlyQueuedTasks() { | |
580 shutdownAfterProcessingCurrentlyQueuedTasks(new DiscardWhenBlocked()); | |
581 } | |
582 | |
583 /** | |
584 * Terminate threads after processing all elements currently in | |
585 * queue. Any tasks entered after this point will be handled by the | |
586 * given BlockedExecutionHandler. A shut down pool cannot be | |
587 * restarted. | |
588 **/ | |
589 public synchronized void shutdownAfterProcessingCurrentlyQueuedTasks(BlockedExecutionHandler handler) { | |
590 setBlockedExecutionHandler(handler); | |
591 shutdown_ = true; | |
592 if (poolSize_ == 0) // disable new thread construction when idle | |
593 minimumPoolSize_ = maximumPoolSize_ = 0; | |
594 } | |
595 | |
596 /** | |
597 * Return true if a shutDown method has succeeded in terminating all | |
598 * threads. | |
599 */ | |
600 public synchronized boolean isTerminatedAfterShutdown() { | |
601 return shutdown_ && poolSize_ == 0; | |
602 } | |
603 | |
604 /** | |
605 * Wait for a shutdown pool to fully terminate, or until the timeout | |
606 * has expired. This method may only be called <em>after</em> | |
607 * invoking shutdownNow or | |
608 * shutdownAfterProcessingCurrentlyQueuedTasks. | |
609 * | |
610 * @param maxWaitTime the maximum time in milliseconds to wait | |
611 * @return true if the pool has terminated within the max wait period | |
612 * @exception IllegalStateException if shutdown has not been requested | |
613 * @exception InterruptedException if the current thread has been interrupted in the course of waiting | |
614 */ | |
615 public synchronized boolean awaitTerminationAfterShutdown(long maxWaitTime) throws InterruptedException { | |
616 if (!shutdown_) | |
617 throw new IllegalStateException(); | |
618 if (poolSize_ == 0) | |
619 return true; | |
620 long waitTime = maxWaitTime; | |
621 if (waitTime <= 0) | |
622 return false; | |
623 long start = System.currentTimeMillis(); | |
624 for (;;) { | |
625 wait(waitTime); | |
626 if (poolSize_ == 0) | |
627 return true; | |
628 waitTime = maxWaitTime - (System.currentTimeMillis() - start); | |
629 if (waitTime <= 0) | |
630 return false; | |
631 } | |
632 } | |
633 | |
634 /** | |
635 * Wait for a shutdown pool to fully terminate. This method may | |
636 * only be called <em>after</em> invoking shutdownNow or | |
637 * shutdownAfterProcessingCurrentlyQueuedTasks. | |
638 * | |
639 * @exception IllegalStateException if shutdown has not been requested | |
640 * @exception InterruptedException if the current thread has been interrupted in the course of waiting | |
641 */ | |
642 public synchronized void awaitTerminationAfterShutdown() throws InterruptedException { | |
643 if (!shutdown_) | |
644 throw new IllegalStateException(); | |
645 while (poolSize_ > 0) | |
646 wait(); | |
647 } | |
648 | |
649 /** | |
650 * Remove all unprocessed tasks from pool queue, and return them in | |
651 * a java.util.List. Thsi method should be used only when there are | |
652 * not any active clients of the pool. Otherwise you face the | |
653 * possibility that the method will loop pulling out tasks as | |
654 * clients are putting them in. This method can be useful after | |
655 * shutting down a pool (via shutdownNow) to determine whether there | |
656 * are any pending tasks that were not processed. You can then, for | |
657 * example execute all unprocessed commands via code along the lines | |
658 * of: | |
659 * | |
660 * <pre> | |
661 * List tasks = pool.drain(); | |
662 * for (Iterator it = tasks.iterator(); it.hasNext();) | |
663 * ( (Runnable)(it.next()) ).run(); | |
664 * </pre> | |
665 **/ | |
666 public List drain() { | |
667 boolean wasInterrupted = false; | |
668 Vector tasks = new Vector(); | |
669 for (;;) { | |
670 try { | |
671 Object x = handOff_.poll(0); | |
672 if (x == null) | |
673 break; | |
674 else | |
675 tasks.addElement(x); | |
676 } | |
677 catch (InterruptedException ex) { | |
678 wasInterrupted = true; // postpone re-interrupt until drained | |
679 } | |
680 } | |
681 if (wasInterrupted) Thread.currentThread().interrupt(); | |
682 return tasks; | |
683 } | |
684 | |
685 /** | |
686 * Cleanup method called upon termination of worker thread. | |
687 **/ | |
688 protected synchronized void workerDone(Worker w) { | |
689 threads_.remove(w); | |
690 if (--poolSize_ == 0 && shutdown_) { | |
691 maximumPoolSize_ = minimumPoolSize_ = 0; // disable new threads | |
692 notifyAll(); // notify awaitTerminationAfterShutdown | |
693 } | |
694 } | |
695 | |
696 /** | |
697 * Get a task from the handoff queue, or null if shutting down. | |
698 **/ | |
699 protected Runnable getTask() throws InterruptedException { | |
700 long waitTime; | |
701 synchronized(this) { | |
702 if (poolSize_ > maximumPoolSize_) // Cause to die if too many threads | |
703 return null; | |
704 waitTime = (shutdown_)? 0 : keepAliveTime_; | |
705 } | |
706 if (waitTime >= 0) | |
707 return (Runnable)(handOff_.poll(waitTime)); | |
708 else | |
709 return (Runnable)(handOff_.take()); | |
710 } | |
711 | |
712 | |
713 /** | |
714 * Class defining the basic run loop for pooled threads. | |
715 **/ | |
716 protected class Worker implements Runnable { | |
717 protected Runnable firstTask_; | |
718 | |
719 protected Worker(Runnable firstTask) { firstTask_ = firstTask; } | |
720 | |
721 public void run() { | |
722 try { | |
723 Runnable task = firstTask_; | |
724 firstTask_ = null; // enable GC | |
725 | |
726 if (task != null) { | |
727 task.run(); | |
728 task = null; | |
729 } | |
730 | |
731 while ( (task = getTask()) != null) { | |
732 task.run(); | |
733 task = null; | |
734 } | |
735 } | |
736 catch (InterruptedException ex) { } // fall through | |
737 finally { | |
738 workerDone(this); | |
739 } | |
740 } | |
741 } | |
742 | |
743 /** | |
744 * Class for actions to take when execute() blocks. Uses Strategy | |
745 * pattern to represent different actions. You can add more in | |
746 * subclasses, and/or create subclasses of these. If so, you will | |
747 * also want to add or modify the corresponding methods that set the | |
748 * current blockedExectionHandler_. | |
749 **/ | |
750 public interface BlockedExecutionHandler { | |
751 /** | |
752 * Return true if successfully handled so, execute should | |
753 * terminate; else return false if execute loop should be retried. | |
754 **/ | |
755 boolean blockedAction(Runnable command) throws InterruptedException; | |
756 } | |
757 | |
758 /** Class defining Run action. **/ | |
759 protected class RunWhenBlocked implements BlockedExecutionHandler { | |
760 public boolean blockedAction(Runnable command) { | |
761 command.run(); | |
762 return true; | |
763 } | |
764 } | |
765 | |
766 /** | |
767 * Set the policy for blocked execution to be that the current | |
768 * thread executes the command if there are no available threads in | |
769 * the pool. | |
770 **/ | |
771 public void runWhenBlocked() { | |
772 setBlockedExecutionHandler(new RunWhenBlocked()); | |
773 } | |
774 | |
775 /** Class defining Wait action. **/ | |
776 protected class WaitWhenBlocked implements BlockedExecutionHandler { | |
777 public boolean blockedAction(Runnable command) throws InterruptedException{ | |
778 handOff_.put(command); | |
779 return true; | |
780 } | |
781 } | |
782 | |
783 /** | |
784 * Set the policy for blocked execution to be to wait until a thread | |
785 * is available. | |
786 **/ | |
787 public void waitWhenBlocked() { | |
788 setBlockedExecutionHandler(new WaitWhenBlocked()); | |
789 } | |
790 | |
791 /** Class defining Discard action. **/ | |
792 protected class DiscardWhenBlocked implements BlockedExecutionHandler { | |
793 public boolean blockedAction(Runnable command) { | |
794 return true; | |
795 } | |
796 } | |
797 | |
798 /** | |
799 * Set the policy for blocked execution to be to return without | |
800 * executing the request. | |
801 **/ | |
802 public void discardWhenBlocked() { | |
803 setBlockedExecutionHandler(new DiscardWhenBlocked()); | |
804 } | |
805 | |
806 | |
807 /** Class defining Abort action. **/ | |
808 protected class AbortWhenBlocked implements BlockedExecutionHandler { | |
809 public boolean blockedAction(Runnable command) { | |
810 throw new RuntimeException("Pool is blocked"); | |
811 } | |
812 } | |
813 | |
814 /** | |
815 * Set the policy for blocked execution to be to | |
816 * throw a RuntimeException. | |
817 **/ | |
818 public void abortWhenBlocked() { | |
819 setBlockedExecutionHandler(new AbortWhenBlocked()); | |
820 } | |
821 | |
822 | |
823 /** | |
824 * Class defining DiscardOldest action. Under this policy, at most | |
825 * one old unhandled task is discarded. If the new task can then be | |
826 * handed off, it is. Otherwise, the new task is run in the current | |
827 * thread (i.e., RunWhenBlocked is used as a backup policy.) | |
828 **/ | |
829 protected class DiscardOldestWhenBlocked implements BlockedExecutionHandler { | |
830 public boolean blockedAction(Runnable command) throws InterruptedException{ | |
831 handOff_.poll(0); | |
832 if (!handOff_.offer(command, 0)) | |
833 command.run(); | |
834 return true; | |
835 } | |
836 } | |
837 | |
838 /** | |
839 * Set the policy for blocked execution to be to discard the oldest | |
840 * unhandled request | |
841 **/ | |
842 public void discardOldestWhenBlocked() { | |
843 setBlockedExecutionHandler(new DiscardOldestWhenBlocked()); | |
844 } | |
845 | |
846 /** | |
847 * Arrange for the given command to be executed by a thread in this | |
848 * pool. The method normally returns when the command has been | |
849 * handed off for (possibly later) execution. | |
850 **/ | |
851 public void execute(Runnable command) throws InterruptedException { | |
852 for (;;) { | |
853 synchronized(this) { | |
854 if (!shutdown_) { | |
855 int size = poolSize_; | |
856 | |
857 // Ensure minimum number of threads | |
858 if (size < minimumPoolSize_) { | |
859 addThread(command); | |
860 return; | |
861 } | |
862 | |
863 // Try to give to existing thread | |
864 if (handOff_.offer(command, 0)) { | |
865 return; | |
866 } | |
867 | |
868 // If cannot handoff and still under maximum, create new thread | |
869 if (size < maximumPoolSize_) { | |
870 addThread(command); | |
871 return; | |
872 } | |
873 } | |
874 } | |
875 | |
876 // Cannot hand off and cannot create -- ask for help | |
877 if (getBlockedExecutionHandler().blockedAction(command)) { | |
878 return; | |
879 } | |
880 } | |
881 } | |
882 } |