Mercurial > hg > blitz_condensed
diff src/EDU/oswego/cs/dl/util/concurrent/PooledExecutor.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/EDU/oswego/cs/dl/util/concurrent/PooledExecutor.java Sat Mar 21 11:00:06 2009 +0000 @@ -0,0 +1,882 @@ +/* + File: PooledExecutor.java + + Originally written by Doug Lea and released into the public domain. + This may be used for any purposes whatsoever without acknowledgment. + Thanks for the assistance and support of Sun Microsystems Labs, + and everyone contributing, testing, and using this code. + + History: + Date Who What + 19Jun1998 dl Create public version + 29aug1998 dl rely on ThreadFactoryUser, + remove ThreadGroup-based methods + adjusted locking policies + 3mar1999 dl Worker threads sense decreases in pool size + 31mar1999 dl Allow supplied channel in constructor; + add methods createThreads, drain + 15may1999 dl Allow infinite keepalives + 21oct1999 dl add minimumPoolSize methods + 7sep2000 dl BlockedExecutionHandler now an interface, + new DiscardOldestWhenBlocked policy + 12oct2000 dl add shutdownAfterProcessingCurrentlyQueuedTasks + 13nov2000 dl null out task ref after run + 08apr2001 dl declare inner class ctor protected + 12nov2001 dl Better shutdown support + Blocked exec handlers can throw IE + Simplify locking scheme + 25jan2001 dl {get,set}BlockedExecutionHandler now public + 17may2002 dl null out task var in worker run to enable GC. +*/ + +package EDU.oswego.cs.dl.util.concurrent; +import java.util.*; + +/** + * A tunable, extensible thread pool class. The main supported public + * method is <code>execute(Runnable command)</code>, which can be + * called instead of directly creating threads to execute commands. + * + * <p> + * Thread pools can be useful for several, usually intertwined + * reasons: + * + * <ul> + * + * <li> To bound resource use. A limit can be placed on the maximum + * number of simultaneously executing threads. + * + * <li> To manage concurrency levels. A targeted number of threads + * can be allowed to execute simultaneously. + * + * <li> To manage a set of threads performing related tasks. + * + * <li> To minimize overhead, by reusing previously constructed + * Thread objects rather than creating new ones. (Note however + * that pools are hardly ever cure-alls for performance problems + * associated with thread construction, especially on JVMs that + * themselves internally pool or recycle threads.) + * + * </ul> + * + * These goals introduce a number of policy parameters that are + * encapsulated in this class. All of these parameters have defaults + * and are tunable, either via get/set methods, or, in cases where + * decisions should hold across lifetimes, via methods that can be + * easily overridden in subclasses. The main, most commonly set + * parameters can be established in constructors. Policy choices + * across these dimensions can and do interact. Be careful, and + * please read this documentation completely before using! See also + * the usage examples below. + * + * <dl> + * <dt> Queueing + * + * <dd> By default, this pool uses queueless synchronous channels to + * to hand off work to threads. This is a safe, conservative policy + * that avoids lockups when handling sets of requests that might + * have internal dependencies. (In these cases, queuing one task + * could lock up another that would be able to continue if the + * queued task were to run.) If you are sure that this cannot + * happen, then you can instead supply a queue of some sort (for + * example, a BoundedBuffer or LinkedQueue) in the constructor. + * This will cause new commands to be queued in cases where all + * MaximumPoolSize threads are busy. Queues are sometimes + * appropriate when each task is completely independent of others, + * so tasks cannot affect each others execution. For example, in an + * http server. <p> + * + * When given a choice, this pool always prefers adding a new thread + * rather than queueing if there are currently fewer than the + * current getMinimumPoolSize threads running, but otherwise always + * prefers queuing a request rather than adding a new thread. Thus, + * if you use an unbounded buffer, you will never have more than + * getMinimumPoolSize threads running. (Since the default + * minimumPoolSize is one, you will probably want to explicitly + * setMinimumPoolSize.) <p> + * + * While queuing can be useful in smoothing out transient bursts of + * requests, especially in socket-based services, it is not very + * well behaved when commands continue to arrive on average faster + * than they can be processed. Using bounds for both the queue and + * the pool size, along with run-when-blocked policy is often a + * reasonable response to such possibilities. <p> + * + * Queue sizes and maximum pool sizes can often be traded off for + * each other. Using large queues and small pools minimizes CPU + * usage, OS resources, and context-switching overhead, but can lead + * to artifically low throughput. Especially if tasks frequently + * block (for example if they are I/O bound), a JVM and underlying + * OS may be able to schedule time for more threads than you + * otherwise allow. Use of small queues or queueless handoffs + * generally requires larger pool sizes, which keeps CPUs busier but + * may encounter unacceptable scheduling overhead, which also + * decreases throughput. <p> + * + * <dt> Maximum Pool size + * + * <dd> The maximum number of threads to use, when needed. The pool + * does not by default preallocate threads. Instead, a thread is + * created, if necessary and if there are fewer than the maximum, + * only when an <code>execute</code> request arrives. The default + * value is (for all practical purposes) infinite -- + * <code>Integer.MAX_VALUE</code>, so should be set in the + * constructor or the set method unless you are just using the pool + * to minimize construction overhead. Because task handoffs to idle + * worker threads require synchronization that in turn relies on JVM + * scheduling policies to ensure progress, it is possible that a new + * thread will be created even though an existing worker thread has + * just become idle but has not progressed to the point at which it + * can accept a new task. This phenomenon tends to occur on some + * JVMs when bursts of short tasks are executed. <p> + * + * <dt> Minimum Pool size + * + * <dd> The minimum number of threads to use, when needed (default + * 1). When a new request is received, and fewer than the minimum + * number of threads are running, a new thread is always created to + * handle the request even if other worker threads are idly waiting + * for work. Otherwise, a new thread is created only if there are + * fewer than the maximum and the request cannot immediately be + * queued. <p> + * + * <dt> Preallocation + * + * <dd> You can override lazy thread construction policies via + * method createThreads, which establishes a given number of warm + * threads. Be aware that these preallocated threads will time out + * and die (and later be replaced with others if needed) if not used + * within the keep-alive time window. If you use preallocation, you + * probably want to increase the keepalive time. The difference + * between setMinimumPoolSize and createThreads is that + * createThreads immediately establishes threads, while setting the + * minimum pool size waits until requests arrive. <p> + * + * <dt> Keep-alive time + * + * <dd> If the pool maintained references to a fixed set of threads + * in the pool, then it would impede garbage collection of otherwise + * idle threads. This would defeat the resource-management aspects + * of pools. One solution would be to use weak references. However, + * this would impose costly and difficult synchronization issues. + * Instead, threads are simply allowed to terminate and thus be + * GCable if they have been idle for the given keep-alive time. The + * value of this parameter represents a trade-off between GCability + * and construction time. In most current Java VMs, thread + * construction and cleanup overhead is on the order of + * milliseconds. The default keep-alive value is one minute, which + * means that the time needed to construct and then GC a thread is + * expended at most once per minute. + * <p> + * + * To establish worker threads permanently, use a <em>negative</em> + * argument to setKeepAliveTime. <p> + * + * <dt> Blocked execution policy + * + * <dd> If the maximum pool size or queue size is bounded, then it + * is possible for incoming <code>execute</code> requests to + * block. There are four supported policies for handling this + * problem, and mechanics (based on the Strategy Object pattern) to + * allow others in subclasses: <p> + * + * <dl> + * <dt> Run (the default) + * <dd> The thread making the <code>execute</code> request + * runs the task itself. This policy helps guard against lockup. + * <dt> Wait + * <dd> Wait until a thread becomes available. + * <dt> Abort + * <dd> Throw a RuntimeException + * <dt> Discard + * <dd> Throw away the current request and return. + * <dt> DiscardOldest + * <dd> Throw away the oldest request and return. + * </dl> + * + * Other plausible policies include raising the maximum pool size + * after checking with some other objects that this is OK. <p> + * + * These cases can never occur if the maximum pool size is unbounded + * or the queue is unbounded. In these cases you instead face + * potential resource exhaustion.) The execute method does not + * throw any checked exceptions in any of these cases since any + * errors associated with them must normally be dealt with via + * handlers or callbacks. (Although in some cases, these might be + * associated with throwing unchecked exceptions.) You may wish to + * add special implementations even if you choose one of the listed + * policies. For example, the supplied Discard policy does not + * inform the caller of the drop. You could add your own version + * that does so. Since choice of policies is normally a system-wide + * decision, selecting a policy affects all calls to + * <code>execute</code>. If for some reason you would instead like + * to make per-call decisions, you could add variant versions of the + * <code>execute</code> method (for example, + * <code>executeIfWouldNotBlock</code>) in subclasses. <p> + * + * <dt> Thread construction parameters + * + * <dd> A settable ThreadFactory establishes each new thread. By + * default, it merely generates a new instance of class Thread, but + * can be changed to use a Thread subclass, to set priorities, + * ThreadLocals, etc. <p> + * + * <dt> Interruption policy + * + * <dd> Worker threads check for interruption after processing each + * command, and terminate upon interruption. Fresh threads will + * replace them if needed. Thus, new tasks will not start out in an + * interrupted state due to an uncleared interruption in a previous + * task. Also, unprocessed commands are never dropped upon + * interruption. It would conceptually suffice simply to clear + * interruption between tasks, but implementation characteristics of + * interruption-based methods are uncertain enough to warrant this + * conservative strategy. It is a good idea to be equally + * conservative in your code for the tasks running within pools. + * <p> + * + * <dt> Shutdown policy + * + * <dd> The interruptAll method interrupts, but does not disable the + * pool. Two different shutdown methods are supported for use when + * you do want to (permanently) stop processing tasks. Method + * shutdownAfterProcessingCurrentlyQueuedTasks waits until all + * current tasks are finished. The shutDownNow method interrupts + * current threads and leaves other queued requests unprocessed. + * <p> + * + * <dt> Handling requests after shutdown + * + * <dd> When the pool is shutdown, new incoming requests are handled + * by the blockedExecutionHandler. By default, the handler is set to + * discard new requests, but this can be set with an optional + * argument to method + * shutdownAfterProcessingCurrentlyQueuedTasks. <p> Also, if you are + * using some form of queuing, you may wish to call method drain() + * to remove (and return) unprocessed commands from the queue after + * shutting down the pool and its clients. If you need to be sure + * these commands are processed, you can then run() each of the + * commands in the list returned by drain(). + * + * </dl> + * <p> + * + * <b>Usage examples.</b> + * <p> + * + * Probably the most common use of pools is in statics or singletons + * accessible from a number of classes in a package; for example: + * + * <pre> + * class MyPool { + * // initialize to use a maximum of 8 threads. + * static PooledExecutor pool = new PooledExecutor(8); + * } + * </pre> + * Here are some sample variants in initialization: + * <ol> + * <li> Using a bounded buffer of 10 tasks, at least 4 threads (started only + * when needed due to incoming requests), but allowing + * up to 100 threads if the buffer gets full. + * <pre> + * pool = new PooledExecutor(new BoundedBuffer(10), 100); + * pool.setMinimumPoolSize(4); + * </pre> + * <li> Same as (1), except pre-start 9 threads, allowing them to + * die if they are not used for five minutes. + * <pre> + * pool = new PooledExecutor(new BoundedBuffer(10), 100); + * pool.setMinimumPoolSize(4); + * pool.setKeepAliveTime(1000 * 60 * 5); + * pool.createThreads(9); + * </pre> + * <li> Same as (2) except clients block if both the buffer is full and + * all 100 threads are busy: + * <pre> + * pool = new PooledExecutor(new BoundedBuffer(10), 100); + * pool.setMinimumPoolSize(4); + * pool.setKeepAliveTime(1000 * 60 * 5); + * pool.waitWhenBlocked(); + * pool.createThreads(9); + * </pre> + * <li> An unbounded queue serviced by exactly 5 threads: + * <pre> + * pool = new PooledExecutor(new LinkedQueue()); + * pool.setKeepAliveTime(-1); // live forever + * pool.createThreads(5); + * </pre> + * </ol> + * + * <p> + * <b>Usage notes.</b> + * <p> + * + * Pools do not mesh well with using thread-specific storage via + * java.lang.ThreadLocal. ThreadLocal relies on the identity of a + * thread executing a particular task. Pools use the same thread to + * perform different tasks. <p> + * + * If you need a policy not handled by the parameters in this class + * consider writing a subclass. <p> + * + * Version note: Previous versions of this class relied on + * ThreadGroups for aggregate control. This has been removed, and the + * method interruptAll added, to avoid differences in behavior across + * JVMs. + * + * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] + **/ + +public class PooledExecutor extends ThreadFactoryUser implements Executor { + + /** + * The maximum pool size; used if not otherwise specified. Default + * value is essentially infinite (Integer.MAX_VALUE) + **/ + public static final int DEFAULT_MAXIMUMPOOLSIZE = Integer.MAX_VALUE; + + /** + * The minimum pool size; used if not otherwise specified. Default + * value is 1. + **/ + public static final int DEFAULT_MINIMUMPOOLSIZE = 1; + + /** + * The maximum time to keep worker threads alive waiting for new + * tasks; used if not otherwise specified. Default value is one + * minute (60000 milliseconds). + **/ + public static final long DEFAULT_KEEPALIVETIME = 60 * 1000; + + /** The maximum number of threads allowed in pool. **/ + protected int maximumPoolSize_ = DEFAULT_MAXIMUMPOOLSIZE; + + /** The minumum number of threads to maintain in pool. **/ + protected int minimumPoolSize_ = DEFAULT_MINIMUMPOOLSIZE; + + /** Current pool size. **/ + protected int poolSize_ = 0; + + /** The maximum time for an idle thread to wait for new task. **/ + protected long keepAliveTime_ = DEFAULT_KEEPALIVETIME; + + /** + * Shutdown flag - latches true when a shutdown method is called + * in order to disable queuing/handoffs of new tasks. + **/ + protected boolean shutdown_ = false; + + /** + * The channel used to hand off the command to a thread in the pool. + **/ + protected final Channel handOff_; + + /** + * The set of active threads, declared as a map from workers to + * their threads. This is needed by the interruptAll method. It + * may also be useful in subclasses that need to perform other + * thread management chores. + **/ + protected final Map threads_; + + /** The current handler for unserviceable requests. **/ + protected BlockedExecutionHandler blockedExecutionHandler_; + + /** + * Create a new pool with all default settings + **/ + + public PooledExecutor() { + this (new SynchronousChannel(), DEFAULT_MAXIMUMPOOLSIZE); + } + + /** + * Create a new pool with all default settings except + * for maximum pool size. + **/ + + public PooledExecutor(int maxPoolSize) { + this(new SynchronousChannel(), maxPoolSize); + } + + /** + * Create a new pool that uses the supplied Channel for queuing, and + * with all default parameter settings. + **/ + + public PooledExecutor(Channel channel) { + this(channel, DEFAULT_MAXIMUMPOOLSIZE); + } + + /** + * Create a new pool that uses the supplied Channel for queuing, and + * with all default parameter settings except for maximum pool size. + **/ + + public PooledExecutor(Channel channel, int maxPoolSize) { + maximumPoolSize_ = maxPoolSize; + handOff_ = channel; + runWhenBlocked(); + threads_ = new HashMap(); + } + + /** + * Return the maximum number of threads to simultaneously execute + * New unqueued requests will be handled according to the current + * blocking policy once this limit is exceeded. + **/ + public synchronized int getMaximumPoolSize() { + return maximumPoolSize_; + } + + /** + * Set the maximum number of threads to use. Decreasing the pool + * size will not immediately kill existing threads, but they may + * later die when idle. + * @exception IllegalArgumentException if less or equal to zero. + * (It is + * not considered an error to set the maximum to be less than than + * the minimum. However, in this case there are no guarantees + * about behavior.) + **/ + public synchronized void setMaximumPoolSize(int newMaximum) { + if (newMaximum <= 0) throw new IllegalArgumentException(); + maximumPoolSize_ = newMaximum; + } + + /** + * Return the minimum number of threads to simultaneously execute. + * (Default value is 1). If fewer than the mininum number are + * running upon reception of a new request, a new thread is started + * to handle this request. + **/ + public synchronized int getMinimumPoolSize() { + return minimumPoolSize_; + } + + /** + * Set the minimum number of threads to use. + * @exception IllegalArgumentException if less than zero. (It is not + * considered an error to set the minimum to be greater than the + * maximum. However, in this case there are no guarantees about + * behavior.) + **/ + public synchronized void setMinimumPoolSize(int newMinimum) { + if (newMinimum < 0) throw new IllegalArgumentException(); + minimumPoolSize_ = newMinimum; + } + + /** + * Return the current number of active threads in the pool. This + * number is just a snaphot, and may change immediately upon + * returning + **/ + public synchronized int getPoolSize() { + return poolSize_; + } + + /** + * Return the number of milliseconds to keep threads alive waiting + * for new commands. A negative value means to wait forever. A zero + * value means not to wait at all. + **/ + public synchronized long getKeepAliveTime() { + return keepAliveTime_; + } + + /** + * Set the number of milliseconds to keep threads alive waiting for + * new commands. A negative value means to wait forever. A zero + * value means not to wait at all. + **/ + public synchronized void setKeepAliveTime(long msecs) { + keepAliveTime_ = msecs; + } + + /** Get the handler for blocked execution **/ + public synchronized BlockedExecutionHandler getBlockedExecutionHandler() { + return blockedExecutionHandler_; + } + + /** Set the handler for blocked execution **/ + public synchronized void setBlockedExecutionHandler(BlockedExecutionHandler h) { + blockedExecutionHandler_ = h; + } + + /** + * Create and start a thread to handle a new command. Call only + * when holding lock. + **/ + protected void addThread(Runnable command) { + Worker worker = new Worker(command); + Thread thread = getThreadFactory().newThread(worker); + threads_.put(worker, thread); + ++poolSize_; + thread.start(); + } + + /** + * Create and start up to numberOfThreads threads in the pool. + * Return the number created. This may be less than the number + * requested if creating more would exceed maximum pool size bound. + **/ + public int createThreads(int numberOfThreads) { + int ncreated = 0; + for (int i = 0; i < numberOfThreads; ++i) { + synchronized(this) { + if (poolSize_ < maximumPoolSize_) { + addThread(null); + ++ncreated; + } + else + break; + } + } + return ncreated; + } + + /** + * Interrupt all threads in the pool, causing them all to + * terminate. Assuming that executed tasks do not disable (clear) + * interruptions, each thread will terminate after processing its + * current task. Threads will terminate sooner if the executed tasks + * themselves respond to interrupts. + **/ + public synchronized void interruptAll() { + for (Iterator it = threads_.values().iterator(); it.hasNext(); ) { + Thread t = (Thread)(it.next()); + t.interrupt(); + } + } + + /** + * Interrupt all threads and disable construction of new + * threads. Any tasks entered after this point will be discarded. A + * shut down pool cannot be restarted. + */ + public void shutdownNow() { + shutdownNow(new DiscardWhenBlocked()); + } + + /** + * Interrupt all threads and disable construction of new + * threads. Any tasks entered after this point will be handled by + * the given BlockedExecutionHandler. A shut down pool cannot be + * restarted. + */ + public synchronized void shutdownNow(BlockedExecutionHandler handler) { + setBlockedExecutionHandler(handler); + shutdown_ = true; // don't allow new tasks + minimumPoolSize_ = maximumPoolSize_ = 0; // don't make new threads + interruptAll(); // interrupt all existing threads + } + + /** + * Terminate threads after processing all elements currently in + * queue. Any tasks entered after this point will be discarded. A + * shut down pool cannot be restarted. + **/ + public void shutdownAfterProcessingCurrentlyQueuedTasks() { + shutdownAfterProcessingCurrentlyQueuedTasks(new DiscardWhenBlocked()); + } + + /** + * Terminate threads after processing all elements currently in + * queue. Any tasks entered after this point will be handled by the + * given BlockedExecutionHandler. A shut down pool cannot be + * restarted. + **/ + public synchronized void shutdownAfterProcessingCurrentlyQueuedTasks(BlockedExecutionHandler handler) { + setBlockedExecutionHandler(handler); + shutdown_ = true; + if (poolSize_ == 0) // disable new thread construction when idle + minimumPoolSize_ = maximumPoolSize_ = 0; + } + + /** + * Return true if a shutDown method has succeeded in terminating all + * threads. + */ + public synchronized boolean isTerminatedAfterShutdown() { + return shutdown_ && poolSize_ == 0; + } + + /** + * Wait for a shutdown pool to fully terminate, or until the timeout + * has expired. This method may only be called <em>after</em> + * invoking shutdownNow or + * shutdownAfterProcessingCurrentlyQueuedTasks. + * + * @param maxWaitTime the maximum time in milliseconds to wait + * @return true if the pool has terminated within the max wait period + * @exception IllegalStateException if shutdown has not been requested + * @exception InterruptedException if the current thread has been interrupted in the course of waiting + */ + public synchronized boolean awaitTerminationAfterShutdown(long maxWaitTime) throws InterruptedException { + if (!shutdown_) + throw new IllegalStateException(); + if (poolSize_ == 0) + return true; + long waitTime = maxWaitTime; + if (waitTime <= 0) + return false; + long start = System.currentTimeMillis(); + for (;;) { + wait(waitTime); + if (poolSize_ == 0) + return true; + waitTime = maxWaitTime - (System.currentTimeMillis() - start); + if (waitTime <= 0) + return false; + } + } + + /** + * Wait for a shutdown pool to fully terminate. This method may + * only be called <em>after</em> invoking shutdownNow or + * shutdownAfterProcessingCurrentlyQueuedTasks. + * + * @exception IllegalStateException if shutdown has not been requested + * @exception InterruptedException if the current thread has been interrupted in the course of waiting + */ + public synchronized void awaitTerminationAfterShutdown() throws InterruptedException { + if (!shutdown_) + throw new IllegalStateException(); + while (poolSize_ > 0) + wait(); + } + + /** + * Remove all unprocessed tasks from pool queue, and return them in + * a java.util.List. Thsi method should be used only when there are + * not any active clients of the pool. Otherwise you face the + * possibility that the method will loop pulling out tasks as + * clients are putting them in. This method can be useful after + * shutting down a pool (via shutdownNow) to determine whether there + * are any pending tasks that were not processed. You can then, for + * example execute all unprocessed commands via code along the lines + * of: + * + * <pre> + * List tasks = pool.drain(); + * for (Iterator it = tasks.iterator(); it.hasNext();) + * ( (Runnable)(it.next()) ).run(); + * </pre> + **/ + public List drain() { + boolean wasInterrupted = false; + Vector tasks = new Vector(); + for (;;) { + try { + Object x = handOff_.poll(0); + if (x == null) + break; + else + tasks.addElement(x); + } + catch (InterruptedException ex) { + wasInterrupted = true; // postpone re-interrupt until drained + } + } + if (wasInterrupted) Thread.currentThread().interrupt(); + return tasks; + } + + /** + * Cleanup method called upon termination of worker thread. + **/ + protected synchronized void workerDone(Worker w) { + threads_.remove(w); + if (--poolSize_ == 0 && shutdown_) { + maximumPoolSize_ = minimumPoolSize_ = 0; // disable new threads + notifyAll(); // notify awaitTerminationAfterShutdown + } + } + + /** + * Get a task from the handoff queue, or null if shutting down. + **/ + protected Runnable getTask() throws InterruptedException { + long waitTime; + synchronized(this) { + if (poolSize_ > maximumPoolSize_) // Cause to die if too many threads + return null; + waitTime = (shutdown_)? 0 : keepAliveTime_; + } + if (waitTime >= 0) + return (Runnable)(handOff_.poll(waitTime)); + else + return (Runnable)(handOff_.take()); + } + + + /** + * Class defining the basic run loop for pooled threads. + **/ + protected class Worker implements Runnable { + protected Runnable firstTask_; + + protected Worker(Runnable firstTask) { firstTask_ = firstTask; } + + public void run() { + try { + Runnable task = firstTask_; + firstTask_ = null; // enable GC + + if (task != null) { + task.run(); + task = null; + } + + while ( (task = getTask()) != null) { + task.run(); + task = null; + } + } + catch (InterruptedException ex) { } // fall through + finally { + workerDone(this); + } + } + } + + /** + * Class for actions to take when execute() blocks. Uses Strategy + * pattern to represent different actions. You can add more in + * subclasses, and/or create subclasses of these. If so, you will + * also want to add or modify the corresponding methods that set the + * current blockedExectionHandler_. + **/ + public interface BlockedExecutionHandler { + /** + * Return true if successfully handled so, execute should + * terminate; else return false if execute loop should be retried. + **/ + boolean blockedAction(Runnable command) throws InterruptedException; + } + + /** Class defining Run action. **/ + protected class RunWhenBlocked implements BlockedExecutionHandler { + public boolean blockedAction(Runnable command) { + command.run(); + return true; + } + } + + /** + * Set the policy for blocked execution to be that the current + * thread executes the command if there are no available threads in + * the pool. + **/ + public void runWhenBlocked() { + setBlockedExecutionHandler(new RunWhenBlocked()); + } + + /** Class defining Wait action. **/ + protected class WaitWhenBlocked implements BlockedExecutionHandler { + public boolean blockedAction(Runnable command) throws InterruptedException{ + handOff_.put(command); + return true; + } + } + + /** + * Set the policy for blocked execution to be to wait until a thread + * is available. + **/ + public void waitWhenBlocked() { + setBlockedExecutionHandler(new WaitWhenBlocked()); + } + + /** Class defining Discard action. **/ + protected class DiscardWhenBlocked implements BlockedExecutionHandler { + public boolean blockedAction(Runnable command) { + return true; + } + } + + /** + * Set the policy for blocked execution to be to return without + * executing the request. + **/ + public void discardWhenBlocked() { + setBlockedExecutionHandler(new DiscardWhenBlocked()); + } + + + /** Class defining Abort action. **/ + protected class AbortWhenBlocked implements BlockedExecutionHandler { + public boolean blockedAction(Runnable command) { + throw new RuntimeException("Pool is blocked"); + } + } + + /** + * Set the policy for blocked execution to be to + * throw a RuntimeException. + **/ + public void abortWhenBlocked() { + setBlockedExecutionHandler(new AbortWhenBlocked()); + } + + + /** + * Class defining DiscardOldest action. Under this policy, at most + * one old unhandled task is discarded. If the new task can then be + * handed off, it is. Otherwise, the new task is run in the current + * thread (i.e., RunWhenBlocked is used as a backup policy.) + **/ + protected class DiscardOldestWhenBlocked implements BlockedExecutionHandler { + public boolean blockedAction(Runnable command) throws InterruptedException{ + handOff_.poll(0); + if (!handOff_.offer(command, 0)) + command.run(); + return true; + } + } + + /** + * Set the policy for blocked execution to be to discard the oldest + * unhandled request + **/ + public void discardOldestWhenBlocked() { + setBlockedExecutionHandler(new DiscardOldestWhenBlocked()); + } + + /** + * Arrange for the given command to be executed by a thread in this + * pool. The method normally returns when the command has been + * handed off for (possibly later) execution. + **/ + public void execute(Runnable command) throws InterruptedException { + for (;;) { + synchronized(this) { + if (!shutdown_) { + int size = poolSize_; + + // Ensure minimum number of threads + if (size < minimumPoolSize_) { + addThread(command); + return; + } + + // Try to give to existing thread + if (handOff_.offer(command, 0)) { + return; + } + + // If cannot handoff and still under maximum, create new thread + if (size < maximumPoolSize_) { + addThread(command); + return; + } + } + } + + // Cannot hand off and cannot create -- ask for help + if (getBlockedExecutionHandler().blockedAction(command)) { + return; + } + } + } +}