Mercurial > hg > blitz_condensed
diff src/EDU/oswego/cs/dl/util/concurrent/QueuedExecutor.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/EDU/oswego/cs/dl/util/concurrent/QueuedExecutor.java Sat Mar 21 11:00:06 2009 +0000 @@ -0,0 +1,216 @@ +/* + File: QueuedExecutor.java + + Originally written by Doug Lea and released into the public domain. + This may be used for any purposes whatsoever without acknowledgment. + Thanks for the assistance and support of Sun Microsystems Labs, + and everyone contributing, testing, and using this code. + + History: + Date Who What + 21Jun1998 dl Create public version + 28aug1998 dl rely on ThreadFactoryUser, restart now public + 4may1999 dl removed redundant interrupt detect + 7sep2000 dl new shutdown methods +*/ + +package EDU.oswego.cs.dl.util.concurrent; + +/** + * + * An implementation of Executor that queues incoming + * requests until they can be processed by a single background + * thread. + * <p> + * The thread is not actually started until the first + * <code>execute</code> request is encountered. Also, if the + * thread is stopped for any reason (for example, after hitting + * an unrecoverable exception in an executing task), one is started + * upon encountering a new request, or if <code>restart()</code> is + * invoked. + * <p> + * Beware that, especially in situations + * where command objects themselves invoke execute, queuing can + * sometimes lead to lockups, since commands that might allow + * other threads to terminate do not run at all when they are in the queue. + * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] + **/ +public class QueuedExecutor extends ThreadFactoryUser implements Executor { + + + + /** The thread used to process commands **/ + protected Thread thread_; + + /** Special queue element to signal termination **/ + protected static Runnable ENDTASK = new Runnable() { public void run() {} }; + + /** true if thread should shut down after processing current task **/ + protected volatile boolean shutdown_; // latches true; + + /** + * Return the thread being used to process commands, or + * null if there is no such thread. You can use this + * to invoke any special methods on the thread, for + * example, to interrupt it. + **/ + public synchronized Thread getThread() { + return thread_; + } + + /** set thread_ to null to indicate termination **/ + protected synchronized void clearThread() { + thread_ = null; + } + + + /** The queue **/ + protected final Channel queue_; + + + /** + * The runloop is isolated in its own Runnable class + * just so that the main + * class need not implement Runnable, which would + * allow others to directly invoke run, which would + * never make sense here. + **/ + protected class RunLoop implements Runnable { + public void run() { + try { + while (!shutdown_) { + Runnable task = (Runnable)(queue_.take()); + if (task == ENDTASK) { + shutdown_ = true; + break; + } + else if (task != null) { + task.run(); + task = null; + } + else + break; + } + } + catch (InterruptedException ex) {} // fallthrough + finally { + clearThread(); + } + } + } + + protected final RunLoop runLoop_; + + + /** + * Construct a new QueuedExecutor that uses + * the supplied Channel as its queue. + * <p> + * This class does not support any methods that + * reveal this queue. If you need to access it + * independently (for example to invoke any + * special status monitoring operations), you + * should record a reference to it separately. + **/ + + public QueuedExecutor(Channel queue) { + queue_ = queue; + runLoop_ = new RunLoop(); + } + + /** + * Construct a new QueuedExecutor that uses + * a BoundedLinkedQueue with the current + * DefaultChannelCapacity as its queue. + **/ + + public QueuedExecutor() { + this(new BoundedLinkedQueue()); + } + + /** + * Start (or restart) the background thread to process commands. It has + * no effect if a thread is already running. This + * method can be invoked if the background thread crashed + * due to an unrecoverable exception. + **/ + + public synchronized void restart() { + if (thread_ == null && !shutdown_) { + thread_ = threadFactory_.newThread(runLoop_); + thread_.start(); + } + } + + + /** + * Arrange for execution of the command in the + * background thread by adding it to the queue. + * The method may block if the channel's put + * operation blocks. + * <p> + * If the background thread + * does not exist, it is created and started. + **/ + public void execute(Runnable command) throws InterruptedException { + restart(); + queue_.put(command); + } + + /** + * Terminate background thread after it processes all + * elements currently in queue. Any tasks entered after this point will + * not be processed. A shut down thread cannot be restarted. + * This method may block if the task queue is finite and full. + * Also, this method + * does not in general apply (and may lead to comparator-based + * exceptions) if the task queue is a priority queue. + **/ + public synchronized void shutdownAfterProcessingCurrentlyQueuedTasks() { + if (thread_ != null && !shutdown_) { + try { queue_.put(ENDTASK); } + catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + } + + + /** + * Terminate background thread after it processes the + * current task, removing other queued tasks and leaving them unprocessed. + * A shut down thread cannot be restarted. + **/ + public synchronized void shutdownAfterProcessingCurrentTask() { + shutdown_ = true; + if (thread_ != null) { + try { + while (queue_.poll(0) != null) ; // drain + queue_.put(ENDTASK); + } + catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + } + + + /** + * Terminate background thread even if it is currently processing + * a task. This method uses Thread.interrupt, so relies on tasks + * themselves responding appropriately to interruption. If the + * current tasks does not terminate on interruption, then the + * thread will not terminate until processing current task. + * A shut down thread cannot be restarted. + **/ + public synchronized void shutdownNow() { + shutdown_ = true; + if (thread_ != null) { + thread_.interrupt(); + shutdownAfterProcessingCurrentTask(); + } + } + + + +}