Mercurial > hg > blitz_condensed
comparison src/EDU/oswego/cs/dl/util/concurrent/Channel.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:3dc0c5604566 |
---|---|
1 /* | |
2 File: Channel.java | |
3 | |
4 Originally written by Doug Lea and released into the public domain. | |
5 This may be used for any purposes whatsoever without acknowledgment. | |
6 Thanks for the assistance and support of Sun Microsystems Labs, | |
7 and everyone contributing, testing, and using this code. | |
8 | |
9 History: | |
10 Date Who What | |
11 11Jun1998 dl Create public version | |
12 25aug1998 dl added peek | |
13 */ | |
14 | |
15 package EDU.oswego.cs.dl.util.concurrent; | |
16 | |
17 /** | |
18 * Main interface for buffers, queues, pipes, conduits, etc. | |
19 * <p> | |
20 * A Channel represents anything that you can put items | |
21 * into and take them out of. As with the Sync | |
22 * interface, both | |
23 * blocking (put(x), take), | |
24 * and timeouts (offer(x, msecs), poll(msecs)) policies | |
25 * are provided. Using a | |
26 * zero timeout for offer and poll results in a pure balking policy. | |
27 * <p> | |
28 * To aid in efforts to use Channels in a more typesafe manner, | |
29 * this interface extends Puttable and Takable. You can restrict | |
30 * arguments of instance variables to this type as a way of | |
31 * guaranteeing that producers never try to take, or consumers put. | |
32 * for example: | |
33 * <pre> | |
34 * class Producer implements Runnable { | |
35 * final Puttable chan; | |
36 * Producer(Puttable channel) { chan = channel; } | |
37 * public void run() { | |
38 * try { | |
39 * for(;;) { chan.put(produce()); } | |
40 * } | |
41 * catch (InterruptedException ex) {} | |
42 * } | |
43 * Object produce() { ... } | |
44 * } | |
45 * | |
46 * | |
47 * class Consumer implements Runnable { | |
48 * final Takable chan; | |
49 * Consumer(Takable channel) { chan = channel; } | |
50 * public void run() { | |
51 * try { | |
52 * for(;;) { consume(chan.take()); } | |
53 * } | |
54 * catch (InterruptedException ex) {} | |
55 * } | |
56 * void consume(Object x) { ... } | |
57 * } | |
58 * | |
59 * class Setup { | |
60 * void main() { | |
61 * Channel chan = new SomeChannelImplementation(); | |
62 * Producer p = new Producer(chan); | |
63 * Consumer c = new Consumer(chan); | |
64 * new Thread(p).start(); | |
65 * new Thread(c).start(); | |
66 * } | |
67 * } | |
68 * </pre> | |
69 * <p> | |
70 * A given channel implementation might or might not have bounded | |
71 * capacity or other insertion constraints, so in general, you cannot tell if | |
72 * a given put will block. However, | |
73 * Channels that are designed to | |
74 * have an element capacity (and so always block when full) | |
75 * should implement the | |
76 * BoundedChannel | |
77 * subinterface. | |
78 * <p> | |
79 * Channels may hold any kind of item. However, | |
80 * insertion of null is not in general supported. Implementations | |
81 * may (all currently do) throw IllegalArgumentExceptions upon attempts to | |
82 * insert null. | |
83 * <p> | |
84 * By design, the Channel interface does not support any methods to determine | |
85 * the current number of elements being held in the channel. | |
86 * This decision reflects the fact that in | |
87 * concurrent programming, such methods are so rarely useful | |
88 * that including them invites misuse; at best they could | |
89 * provide a snapshot of current | |
90 * state, that could change immediately after being reported. | |
91 * It is better practice to instead use poll and offer to try | |
92 * to take and put elements without blocking. For example, | |
93 * to empty out the current contents of a channel, you could write: | |
94 * <pre> | |
95 * try { | |
96 * for (;;) { | |
97 * Object item = channel.poll(0); | |
98 * if (item != null) | |
99 * process(item); | |
100 * else | |
101 * break; | |
102 * } | |
103 * } | |
104 * catch(InterruptedException ex) { ... } | |
105 * </pre> | |
106 * <p> | |
107 * However, it is possible to determine whether an item | |
108 * exists in a Channel via <code>peek</code>, which returns | |
109 * but does NOT remove the next item that can be taken (or null | |
110 * if there is no such item). The peek operation has a limited | |
111 * range of applicability, and must be used with care. Unless it | |
112 * is known that a given thread is the only possible consumer | |
113 * of a channel, and that no time-out-based <code>offer</code> operations | |
114 * are ever invoked, there is no guarantee that the item returned | |
115 * by peek will be available for a subsequent take. | |
116 * <p> | |
117 * When appropriate, you can define an isEmpty method to | |
118 * return whether <code>peek</code> returns null. | |
119 * <p> | |
120 * Also, as a compromise, even though it does not appear in interface, | |
121 * implementation classes that can readily compute the number | |
122 * of elements support a <code>size()</code> method. This allows careful | |
123 * use, for example in queue length monitors, appropriate to the | |
124 * particular implementation constraints and properties. | |
125 * <p> | |
126 * All channels allow multiple producers and/or consumers. | |
127 * They do not support any kind of <em>close</em> method | |
128 * to shut down operation or indicate completion of particular | |
129 * producer or consumer threads. | |
130 * If you need to signal completion, one way to do it is to | |
131 * create a class such as | |
132 * <pre> | |
133 * class EndOfStream { | |
134 * // Application-dependent field/methods | |
135 * } | |
136 * </pre> | |
137 * And to have producers put an instance of this class into | |
138 * the channel when they are done. The consumer side can then | |
139 * check this via | |
140 * <pre> | |
141 * Object x = aChannel.take(); | |
142 * if (x instanceof EndOfStream) | |
143 * // special actions; perhaps terminate | |
144 * else | |
145 * // process normally | |
146 * </pre> | |
147 * <p> | |
148 * In time-out based methods (poll(msecs) and offer(x, msecs), | |
149 * time bounds are interpreted in | |
150 * a coarse-grained, best-effort fashion. Since there is no | |
151 * way in Java to escape out of a wait for a synchronized | |
152 * method/block, time bounds can sometimes be exceeded when | |
153 * there is a lot contention for the channel. Additionally, | |
154 * some Channel semantics entail a ``point of | |
155 * no return'' where, once some parts of the operation have completed, | |
156 * others must follow, regardless of time bound. | |
157 * <p> | |
158 * Interruptions are in general handled as early as possible | |
159 * in all methods. Normally, InterruptionExceptions are thrown | |
160 * in put/take and offer(msec)/poll(msec) if interruption | |
161 * is detected upon entry to the method, as well as in any | |
162 * later context surrounding waits. | |
163 * <p> | |
164 * If a put returns normally, an offer | |
165 * returns true, or a put or poll returns non-null, the operation | |
166 * completed successfully. | |
167 * In all other cases, the operation fails cleanly -- the | |
168 * element is not put or taken. | |
169 * <p> | |
170 * As with Sync classes, spinloops are not directly supported, | |
171 * are not particularly recommended for routine use, but are not hard | |
172 * to construct. For example, here is an exponential backoff version: | |
173 * <pre> | |
174 * Object backOffTake(Channel q) throws InterruptedException { | |
175 * long waitTime = 0; | |
176 * for (;;) { | |
177 * Object x = q.poll(0); | |
178 * if (x != null) | |
179 * return x; | |
180 * else { | |
181 * Thread.sleep(waitTime); | |
182 * waitTime = 3 * waitTime / 2 + 1; | |
183 * } | |
184 * } | |
185 * </pre> | |
186 * <p> | |
187 * <b>Sample Usage</b>. Here is a producer/consumer design | |
188 * where the channel is used to hold Runnable commands representing | |
189 * background tasks. | |
190 * <pre> | |
191 * class Service { | |
192 * private final Channel channel = ... some Channel implementation; | |
193 * | |
194 * private void backgroundTask(int taskParam) { ... } | |
195 * | |
196 * public void action(final int arg) { | |
197 * Runnable command = | |
198 * new Runnable() { | |
199 * public void run() { backgroundTask(arg); } | |
200 * }; | |
201 * try { channel.put(command) } | |
202 * catch (InterruptedException ex) { | |
203 * Thread.currentThread().interrupt(); // ignore but propagate | |
204 * } | |
205 * } | |
206 * | |
207 * public Service() { | |
208 * Runnable backgroundLoop = | |
209 * new Runnable() { | |
210 * public void run() { | |
211 * for (;;) { | |
212 * try { | |
213 * Runnable task = (Runnable)(channel.take()); | |
214 * task.run(); | |
215 * } | |
216 * catch (InterruptedException ex) { return; } | |
217 * } | |
218 * } | |
219 * }; | |
220 * new Thread(backgroundLoop).start(); | |
221 * } | |
222 * } | |
223 * | |
224 * </pre> | |
225 * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] | |
226 * @see Sync | |
227 * @see BoundedChannel | |
228 **/ | |
229 | |
230 public interface Channel extends Puttable, Takable { | |
231 | |
232 /** | |
233 * Place item in the channel, possibly waiting indefinitely until | |
234 * it can be accepted. Channels implementing the BoundedChannel | |
235 * subinterface are generally guaranteed to block on puts upon | |
236 * reaching capacity, but other implementations may or may not block. | |
237 * @param item the element to be inserted. Should be non-null. | |
238 * @exception InterruptedException if the current thread has | |
239 * been interrupted at a point at which interruption | |
240 * is detected, in which case the element is guaranteed not | |
241 * to be inserted. Otherwise, on normal return, the element is guaranteed | |
242 * to have been inserted. | |
243 **/ | |
244 public void put(Object item) throws InterruptedException; | |
245 | |
246 /** | |
247 * Place item in channel only if it can be accepted within | |
248 * msecs milliseconds. The time bound is interpreted in | |
249 * a coarse-grained, best-effort fashion. | |
250 * @param item the element to be inserted. Should be non-null. | |
251 * @param msecs the number of milliseconds to wait. If less than | |
252 * or equal to zero, the method does not perform any timed waits, | |
253 * but might still require | |
254 * access to a synchronization lock, which can impose unbounded | |
255 * delay if there is a lot of contention for the channel. | |
256 * @return true if accepted, else false | |
257 * @exception InterruptedException if the current thread has | |
258 * been interrupted at a point at which interruption | |
259 * is detected, in which case the element is guaranteed not | |
260 * to be inserted (i.e., is equivalent to a false return). | |
261 **/ | |
262 public boolean offer(Object item, long msecs) throws InterruptedException; | |
263 | |
264 /** | |
265 * Return and remove an item from channel, | |
266 * possibly waiting indefinitely until | |
267 * such an item exists. | |
268 * @return some item from the channel. Different implementations | |
269 * may guarantee various properties (such as FIFO) about that item | |
270 * @exception InterruptedException if the current thread has | |
271 * been interrupted at a point at which interruption | |
272 * is detected, in which case state of the channel is unchanged. | |
273 * | |
274 **/ | |
275 public Object take() throws InterruptedException; | |
276 | |
277 | |
278 /** | |
279 * Return and remove an item from channel only if one is available within | |
280 * msecs milliseconds. The time bound is interpreted in a coarse | |
281 * grained, best-effort fashion. | |
282 * @param msecs the number of milliseconds to wait. If less than | |
283 * or equal to zero, the operation does not perform any timed waits, | |
284 * but might still require | |
285 * access to a synchronization lock, which can impose unbounded | |
286 * delay if there is a lot of contention for the channel. | |
287 * @return some item, or null if the channel is empty. | |
288 * @exception InterruptedException if the current thread has | |
289 * been interrupted at a point at which interruption | |
290 * is detected, in which case state of the channel is unchanged | |
291 * (i.e., equivalent to a null return). | |
292 **/ | |
293 | |
294 public Object poll(long msecs) throws InterruptedException; | |
295 | |
296 /** | |
297 * Return, but do not remove object at head of Channel, | |
298 * or null if it is empty. | |
299 **/ | |
300 | |
301 public Object peek(); | |
302 | |
303 } | |
304 |