Mercurial > hg > blitz_condensed
comparison src/EDU/oswego/cs/dl/util/concurrent/QueuedExecutor.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:3dc0c5604566 |
---|---|
1 /* | |
2 File: QueuedExecutor.java | |
3 | |
4 Originally written by Doug Lea and released into the public domain. | |
5 This may be used for any purposes whatsoever without acknowledgment. | |
6 Thanks for the assistance and support of Sun Microsystems Labs, | |
7 and everyone contributing, testing, and using this code. | |
8 | |
9 History: | |
10 Date Who What | |
11 21Jun1998 dl Create public version | |
12 28aug1998 dl rely on ThreadFactoryUser, restart now public | |
13 4may1999 dl removed redundant interrupt detect | |
14 7sep2000 dl new shutdown methods | |
15 */ | |
16 | |
17 package EDU.oswego.cs.dl.util.concurrent; | |
18 | |
19 /** | |
20 * | |
21 * An implementation of Executor that queues incoming | |
22 * requests until they can be processed by a single background | |
23 * thread. | |
24 * <p> | |
25 * The thread is not actually started until the first | |
26 * <code>execute</code> request is encountered. Also, if the | |
27 * thread is stopped for any reason (for example, after hitting | |
28 * an unrecoverable exception in an executing task), one is started | |
29 * upon encountering a new request, or if <code>restart()</code> is | |
30 * invoked. | |
31 * <p> | |
32 * Beware that, especially in situations | |
33 * where command objects themselves invoke execute, queuing can | |
34 * sometimes lead to lockups, since commands that might allow | |
35 * other threads to terminate do not run at all when they are in the queue. | |
36 * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] | |
37 **/ | |
38 public class QueuedExecutor extends ThreadFactoryUser implements Executor { | |
39 | |
40 | |
41 | |
42 /** The thread used to process commands **/ | |
43 protected Thread thread_; | |
44 | |
45 /** Special queue element to signal termination **/ | |
46 protected static Runnable ENDTASK = new Runnable() { public void run() {} }; | |
47 | |
48 /** true if thread should shut down after processing current task **/ | |
49 protected volatile boolean shutdown_; // latches true; | |
50 | |
51 /** | |
52 * Return the thread being used to process commands, or | |
53 * null if there is no such thread. You can use this | |
54 * to invoke any special methods on the thread, for | |
55 * example, to interrupt it. | |
56 **/ | |
57 public synchronized Thread getThread() { | |
58 return thread_; | |
59 } | |
60 | |
61 /** set thread_ to null to indicate termination **/ | |
62 protected synchronized void clearThread() { | |
63 thread_ = null; | |
64 } | |
65 | |
66 | |
67 /** The queue **/ | |
68 protected final Channel queue_; | |
69 | |
70 | |
71 /** | |
72 * The runloop is isolated in its own Runnable class | |
73 * just so that the main | |
74 * class need not implement Runnable, which would | |
75 * allow others to directly invoke run, which would | |
76 * never make sense here. | |
77 **/ | |
78 protected class RunLoop implements Runnable { | |
79 public void run() { | |
80 try { | |
81 while (!shutdown_) { | |
82 Runnable task = (Runnable)(queue_.take()); | |
83 if (task == ENDTASK) { | |
84 shutdown_ = true; | |
85 break; | |
86 } | |
87 else if (task != null) { | |
88 task.run(); | |
89 task = null; | |
90 } | |
91 else | |
92 break; | |
93 } | |
94 } | |
95 catch (InterruptedException ex) {} // fallthrough | |
96 finally { | |
97 clearThread(); | |
98 } | |
99 } | |
100 } | |
101 | |
102 protected final RunLoop runLoop_; | |
103 | |
104 | |
105 /** | |
106 * Construct a new QueuedExecutor that uses | |
107 * the supplied Channel as its queue. | |
108 * <p> | |
109 * This class does not support any methods that | |
110 * reveal this queue. If you need to access it | |
111 * independently (for example to invoke any | |
112 * special status monitoring operations), you | |
113 * should record a reference to it separately. | |
114 **/ | |
115 | |
116 public QueuedExecutor(Channel queue) { | |
117 queue_ = queue; | |
118 runLoop_ = new RunLoop(); | |
119 } | |
120 | |
121 /** | |
122 * Construct a new QueuedExecutor that uses | |
123 * a BoundedLinkedQueue with the current | |
124 * DefaultChannelCapacity as its queue. | |
125 **/ | |
126 | |
127 public QueuedExecutor() { | |
128 this(new BoundedLinkedQueue()); | |
129 } | |
130 | |
131 /** | |
132 * Start (or restart) the background thread to process commands. It has | |
133 * no effect if a thread is already running. This | |
134 * method can be invoked if the background thread crashed | |
135 * due to an unrecoverable exception. | |
136 **/ | |
137 | |
138 public synchronized void restart() { | |
139 if (thread_ == null && !shutdown_) { | |
140 thread_ = threadFactory_.newThread(runLoop_); | |
141 thread_.start(); | |
142 } | |
143 } | |
144 | |
145 | |
146 /** | |
147 * Arrange for execution of the command in the | |
148 * background thread by adding it to the queue. | |
149 * The method may block if the channel's put | |
150 * operation blocks. | |
151 * <p> | |
152 * If the background thread | |
153 * does not exist, it is created and started. | |
154 **/ | |
155 public void execute(Runnable command) throws InterruptedException { | |
156 restart(); | |
157 queue_.put(command); | |
158 } | |
159 | |
160 /** | |
161 * Terminate background thread after it processes all | |
162 * elements currently in queue. Any tasks entered after this point will | |
163 * not be processed. A shut down thread cannot be restarted. | |
164 * This method may block if the task queue is finite and full. | |
165 * Also, this method | |
166 * does not in general apply (and may lead to comparator-based | |
167 * exceptions) if the task queue is a priority queue. | |
168 **/ | |
169 public synchronized void shutdownAfterProcessingCurrentlyQueuedTasks() { | |
170 if (thread_ != null && !shutdown_) { | |
171 try { queue_.put(ENDTASK); } | |
172 catch (InterruptedException ex) { | |
173 Thread.currentThread().interrupt(); | |
174 } | |
175 } | |
176 } | |
177 | |
178 | |
179 /** | |
180 * Terminate background thread after it processes the | |
181 * current task, removing other queued tasks and leaving them unprocessed. | |
182 * A shut down thread cannot be restarted. | |
183 **/ | |
184 public synchronized void shutdownAfterProcessingCurrentTask() { | |
185 shutdown_ = true; | |
186 if (thread_ != null) { | |
187 try { | |
188 while (queue_.poll(0) != null) ; // drain | |
189 queue_.put(ENDTASK); | |
190 } | |
191 catch (InterruptedException ex) { | |
192 Thread.currentThread().interrupt(); | |
193 } | |
194 } | |
195 } | |
196 | |
197 | |
198 /** | |
199 * Terminate background thread even if it is currently processing | |
200 * a task. This method uses Thread.interrupt, so relies on tasks | |
201 * themselves responding appropriately to interruption. If the | |
202 * current tasks does not terminate on interruption, then the | |
203 * thread will not terminate until processing current task. | |
204 * A shut down thread cannot be restarted. | |
205 **/ | |
206 public synchronized void shutdownNow() { | |
207 shutdown_ = true; | |
208 if (thread_ != null) { | |
209 thread_.interrupt(); | |
210 shutdownAfterProcessingCurrentTask(); | |
211 } | |
212 } | |
213 | |
214 | |
215 | |
216 } |