comparison src/EDU/oswego/cs/dl/util/concurrent/QueuedExecutor.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:3dc0c5604566
1 /*
2 File: QueuedExecutor.java
3
4 Originally written by Doug Lea and released into the public domain.
5 This may be used for any purposes whatsoever without acknowledgment.
6 Thanks for the assistance and support of Sun Microsystems Labs,
7 and everyone contributing, testing, and using this code.
8
9 History:
10 Date Who What
11 21Jun1998 dl Create public version
12 28aug1998 dl rely on ThreadFactoryUser, restart now public
13 4may1999 dl removed redundant interrupt detect
14 7sep2000 dl new shutdown methods
15 */
16
17 package EDU.oswego.cs.dl.util.concurrent;
18
19 /**
20 *
21 * An implementation of Executor that queues incoming
22 * requests until they can be processed by a single background
23 * thread.
24 * <p>
25 * The thread is not actually started until the first
26 * <code>execute</code> request is encountered. Also, if the
27 * thread is stopped for any reason (for example, after hitting
28 * an unrecoverable exception in an executing task), one is started
29 * upon encountering a new request, or if <code>restart()</code> is
30 * invoked.
31 * <p>
32 * Beware that, especially in situations
33 * where command objects themselves invoke execute, queuing can
34 * sometimes lead to lockups, since commands that might allow
35 * other threads to terminate do not run at all when they are in the queue.
36 * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
37 **/
38 public class QueuedExecutor extends ThreadFactoryUser implements Executor {
39
40
41
42 /** The thread used to process commands **/
43 protected Thread thread_;
44
45 /** Special queue element to signal termination **/
46 protected static Runnable ENDTASK = new Runnable() { public void run() {} };
47
48 /** true if thread should shut down after processing current task **/
49 protected volatile boolean shutdown_; // latches true;
50
51 /**
52 * Return the thread being used to process commands, or
53 * null if there is no such thread. You can use this
54 * to invoke any special methods on the thread, for
55 * example, to interrupt it.
56 **/
57 public synchronized Thread getThread() {
58 return thread_;
59 }
60
61 /** set thread_ to null to indicate termination **/
62 protected synchronized void clearThread() {
63 thread_ = null;
64 }
65
66
67 /** The queue **/
68 protected final Channel queue_;
69
70
71 /**
72 * The runloop is isolated in its own Runnable class
73 * just so that the main
74 * class need not implement Runnable, which would
75 * allow others to directly invoke run, which would
76 * never make sense here.
77 **/
78 protected class RunLoop implements Runnable {
79 public void run() {
80 try {
81 while (!shutdown_) {
82 Runnable task = (Runnable)(queue_.take());
83 if (task == ENDTASK) {
84 shutdown_ = true;
85 break;
86 }
87 else if (task != null) {
88 task.run();
89 task = null;
90 }
91 else
92 break;
93 }
94 }
95 catch (InterruptedException ex) {} // fallthrough
96 finally {
97 clearThread();
98 }
99 }
100 }
101
102 protected final RunLoop runLoop_;
103
104
105 /**
106 * Construct a new QueuedExecutor that uses
107 * the supplied Channel as its queue.
108 * <p>
109 * This class does not support any methods that
110 * reveal this queue. If you need to access it
111 * independently (for example to invoke any
112 * special status monitoring operations), you
113 * should record a reference to it separately.
114 **/
115
116 public QueuedExecutor(Channel queue) {
117 queue_ = queue;
118 runLoop_ = new RunLoop();
119 }
120
121 /**
122 * Construct a new QueuedExecutor that uses
123 * a BoundedLinkedQueue with the current
124 * DefaultChannelCapacity as its queue.
125 **/
126
127 public QueuedExecutor() {
128 this(new BoundedLinkedQueue());
129 }
130
131 /**
132 * Start (or restart) the background thread to process commands. It has
133 * no effect if a thread is already running. This
134 * method can be invoked if the background thread crashed
135 * due to an unrecoverable exception.
136 **/
137
138 public synchronized void restart() {
139 if (thread_ == null && !shutdown_) {
140 thread_ = threadFactory_.newThread(runLoop_);
141 thread_.start();
142 }
143 }
144
145
146 /**
147 * Arrange for execution of the command in the
148 * background thread by adding it to the queue.
149 * The method may block if the channel's put
150 * operation blocks.
151 * <p>
152 * If the background thread
153 * does not exist, it is created and started.
154 **/
155 public void execute(Runnable command) throws InterruptedException {
156 restart();
157 queue_.put(command);
158 }
159
160 /**
161 * Terminate background thread after it processes all
162 * elements currently in queue. Any tasks entered after this point will
163 * not be processed. A shut down thread cannot be restarted.
164 * This method may block if the task queue is finite and full.
165 * Also, this method
166 * does not in general apply (and may lead to comparator-based
167 * exceptions) if the task queue is a priority queue.
168 **/
169 public synchronized void shutdownAfterProcessingCurrentlyQueuedTasks() {
170 if (thread_ != null && !shutdown_) {
171 try { queue_.put(ENDTASK); }
172 catch (InterruptedException ex) {
173 Thread.currentThread().interrupt();
174 }
175 }
176 }
177
178
179 /**
180 * Terminate background thread after it processes the
181 * current task, removing other queued tasks and leaving them unprocessed.
182 * A shut down thread cannot be restarted.
183 **/
184 public synchronized void shutdownAfterProcessingCurrentTask() {
185 shutdown_ = true;
186 if (thread_ != null) {
187 try {
188 while (queue_.poll(0) != null) ; // drain
189 queue_.put(ENDTASK);
190 }
191 catch (InterruptedException ex) {
192 Thread.currentThread().interrupt();
193 }
194 }
195 }
196
197
198 /**
199 * Terminate background thread even if it is currently processing
200 * a task. This method uses Thread.interrupt, so relies on tasks
201 * themselves responding appropriately to interruption. If the
202 * current tasks does not terminate on interruption, then the
203 * thread will not terminate until processing current task.
204 * A shut down thread cannot be restarted.
205 **/
206 public synchronized void shutdownNow() {
207 shutdown_ = true;
208 if (thread_ != null) {
209 thread_.interrupt();
210 shutdownAfterProcessingCurrentTask();
211 }
212 }
213
214
215
216 }