comparison src/EDU/oswego/cs/dl/util/concurrent/Channel.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:3dc0c5604566
1 /*
2 File: Channel.java
3
4 Originally written by Doug Lea and released into the public domain.
5 This may be used for any purposes whatsoever without acknowledgment.
6 Thanks for the assistance and support of Sun Microsystems Labs,
7 and everyone contributing, testing, and using this code.
8
9 History:
10 Date Who What
11 11Jun1998 dl Create public version
12 25aug1998 dl added peek
13 */
14
15 package EDU.oswego.cs.dl.util.concurrent;
16
17 /**
18 * Main interface for buffers, queues, pipes, conduits, etc.
19 * <p>
20 * A Channel represents anything that you can put items
21 * into and take them out of. As with the Sync
22 * interface, both
23 * blocking (put(x), take),
24 * and timeouts (offer(x, msecs), poll(msecs)) policies
25 * are provided. Using a
26 * zero timeout for offer and poll results in a pure balking policy.
27 * <p>
28 * To aid in efforts to use Channels in a more typesafe manner,
29 * this interface extends Puttable and Takable. You can restrict
30 * arguments of instance variables to this type as a way of
31 * guaranteeing that producers never try to take, or consumers put.
32 * for example:
33 * <pre>
34 * class Producer implements Runnable {
35 * final Puttable chan;
36 * Producer(Puttable channel) { chan = channel; }
37 * public void run() {
38 * try {
39 * for(;;) { chan.put(produce()); }
40 * }
41 * catch (InterruptedException ex) {}
42 * }
43 * Object produce() { ... }
44 * }
45 *
46 *
47 * class Consumer implements Runnable {
48 * final Takable chan;
49 * Consumer(Takable channel) { chan = channel; }
50 * public void run() {
51 * try {
52 * for(;;) { consume(chan.take()); }
53 * }
54 * catch (InterruptedException ex) {}
55 * }
56 * void consume(Object x) { ... }
57 * }
58 *
59 * class Setup {
60 * void main() {
61 * Channel chan = new SomeChannelImplementation();
62 * Producer p = new Producer(chan);
63 * Consumer c = new Consumer(chan);
64 * new Thread(p).start();
65 * new Thread(c).start();
66 * }
67 * }
68 * </pre>
69 * <p>
70 * A given channel implementation might or might not have bounded
71 * capacity or other insertion constraints, so in general, you cannot tell if
72 * a given put will block. However,
73 * Channels that are designed to
74 * have an element capacity (and so always block when full)
75 * should implement the
76 * BoundedChannel
77 * subinterface.
78 * <p>
79 * Channels may hold any kind of item. However,
80 * insertion of null is not in general supported. Implementations
81 * may (all currently do) throw IllegalArgumentExceptions upon attempts to
82 * insert null.
83 * <p>
84 * By design, the Channel interface does not support any methods to determine
85 * the current number of elements being held in the channel.
86 * This decision reflects the fact that in
87 * concurrent programming, such methods are so rarely useful
88 * that including them invites misuse; at best they could
89 * provide a snapshot of current
90 * state, that could change immediately after being reported.
91 * It is better practice to instead use poll and offer to try
92 * to take and put elements without blocking. For example,
93 * to empty out the current contents of a channel, you could write:
94 * <pre>
95 * try {
96 * for (;;) {
97 * Object item = channel.poll(0);
98 * if (item != null)
99 * process(item);
100 * else
101 * break;
102 * }
103 * }
104 * catch(InterruptedException ex) { ... }
105 * </pre>
106 * <p>
107 * However, it is possible to determine whether an item
108 * exists in a Channel via <code>peek</code>, which returns
109 * but does NOT remove the next item that can be taken (or null
110 * if there is no such item). The peek operation has a limited
111 * range of applicability, and must be used with care. Unless it
112 * is known that a given thread is the only possible consumer
113 * of a channel, and that no time-out-based <code>offer</code> operations
114 * are ever invoked, there is no guarantee that the item returned
115 * by peek will be available for a subsequent take.
116 * <p>
117 * When appropriate, you can define an isEmpty method to
118 * return whether <code>peek</code> returns null.
119 * <p>
120 * Also, as a compromise, even though it does not appear in interface,
121 * implementation classes that can readily compute the number
122 * of elements support a <code>size()</code> method. This allows careful
123 * use, for example in queue length monitors, appropriate to the
124 * particular implementation constraints and properties.
125 * <p>
126 * All channels allow multiple producers and/or consumers.
127 * They do not support any kind of <em>close</em> method
128 * to shut down operation or indicate completion of particular
129 * producer or consumer threads.
130 * If you need to signal completion, one way to do it is to
131 * create a class such as
132 * <pre>
133 * class EndOfStream {
134 * // Application-dependent field/methods
135 * }
136 * </pre>
137 * And to have producers put an instance of this class into
138 * the channel when they are done. The consumer side can then
139 * check this via
140 * <pre>
141 * Object x = aChannel.take();
142 * if (x instanceof EndOfStream)
143 * // special actions; perhaps terminate
144 * else
145 * // process normally
146 * </pre>
147 * <p>
148 * In time-out based methods (poll(msecs) and offer(x, msecs),
149 * time bounds are interpreted in
150 * a coarse-grained, best-effort fashion. Since there is no
151 * way in Java to escape out of a wait for a synchronized
152 * method/block, time bounds can sometimes be exceeded when
153 * there is a lot contention for the channel. Additionally,
154 * some Channel semantics entail a ``point of
155 * no return'' where, once some parts of the operation have completed,
156 * others must follow, regardless of time bound.
157 * <p>
158 * Interruptions are in general handled as early as possible
159 * in all methods. Normally, InterruptionExceptions are thrown
160 * in put/take and offer(msec)/poll(msec) if interruption
161 * is detected upon entry to the method, as well as in any
162 * later context surrounding waits.
163 * <p>
164 * If a put returns normally, an offer
165 * returns true, or a put or poll returns non-null, the operation
166 * completed successfully.
167 * In all other cases, the operation fails cleanly -- the
168 * element is not put or taken.
169 * <p>
170 * As with Sync classes, spinloops are not directly supported,
171 * are not particularly recommended for routine use, but are not hard
172 * to construct. For example, here is an exponential backoff version:
173 * <pre>
174 * Object backOffTake(Channel q) throws InterruptedException {
175 * long waitTime = 0;
176 * for (;;) {
177 * Object x = q.poll(0);
178 * if (x != null)
179 * return x;
180 * else {
181 * Thread.sleep(waitTime);
182 * waitTime = 3 * waitTime / 2 + 1;
183 * }
184 * }
185 * </pre>
186 * <p>
187 * <b>Sample Usage</b>. Here is a producer/consumer design
188 * where the channel is used to hold Runnable commands representing
189 * background tasks.
190 * <pre>
191 * class Service {
192 * private final Channel channel = ... some Channel implementation;
193 *
194 * private void backgroundTask(int taskParam) { ... }
195 *
196 * public void action(final int arg) {
197 * Runnable command =
198 * new Runnable() {
199 * public void run() { backgroundTask(arg); }
200 * };
201 * try { channel.put(command) }
202 * catch (InterruptedException ex) {
203 * Thread.currentThread().interrupt(); // ignore but propagate
204 * }
205 * }
206 *
207 * public Service() {
208 * Runnable backgroundLoop =
209 * new Runnable() {
210 * public void run() {
211 * for (;;) {
212 * try {
213 * Runnable task = (Runnable)(channel.take());
214 * task.run();
215 * }
216 * catch (InterruptedException ex) { return; }
217 * }
218 * }
219 * };
220 * new Thread(backgroundLoop).start();
221 * }
222 * }
223 *
224 * </pre>
225 * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
226 * @see Sync
227 * @see BoundedChannel
228 **/
229
230 public interface Channel extends Puttable, Takable {
231
232 /**
233 * Place item in the channel, possibly waiting indefinitely until
234 * it can be accepted. Channels implementing the BoundedChannel
235 * subinterface are generally guaranteed to block on puts upon
236 * reaching capacity, but other implementations may or may not block.
237 * @param item the element to be inserted. Should be non-null.
238 * @exception InterruptedException if the current thread has
239 * been interrupted at a point at which interruption
240 * is detected, in which case the element is guaranteed not
241 * to be inserted. Otherwise, on normal return, the element is guaranteed
242 * to have been inserted.
243 **/
244 public void put(Object item) throws InterruptedException;
245
246 /**
247 * Place item in channel only if it can be accepted within
248 * msecs milliseconds. The time bound is interpreted in
249 * a coarse-grained, best-effort fashion.
250 * @param item the element to be inserted. Should be non-null.
251 * @param msecs the number of milliseconds to wait. If less than
252 * or equal to zero, the method does not perform any timed waits,
253 * but might still require
254 * access to a synchronization lock, which can impose unbounded
255 * delay if there is a lot of contention for the channel.
256 * @return true if accepted, else false
257 * @exception InterruptedException if the current thread has
258 * been interrupted at a point at which interruption
259 * is detected, in which case the element is guaranteed not
260 * to be inserted (i.e., is equivalent to a false return).
261 **/
262 public boolean offer(Object item, long msecs) throws InterruptedException;
263
264 /**
265 * Return and remove an item from channel,
266 * possibly waiting indefinitely until
267 * such an item exists.
268 * @return some item from the channel. Different implementations
269 * may guarantee various properties (such as FIFO) about that item
270 * @exception InterruptedException if the current thread has
271 * been interrupted at a point at which interruption
272 * is detected, in which case state of the channel is unchanged.
273 *
274 **/
275 public Object take() throws InterruptedException;
276
277
278 /**
279 * Return and remove an item from channel only if one is available within
280 * msecs milliseconds. The time bound is interpreted in a coarse
281 * grained, best-effort fashion.
282 * @param msecs the number of milliseconds to wait. If less than
283 * or equal to zero, the operation does not perform any timed waits,
284 * but might still require
285 * access to a synchronization lock, which can impose unbounded
286 * delay if there is a lot of contention for the channel.
287 * @return some item, or null if the channel is empty.
288 * @exception InterruptedException if the current thread has
289 * been interrupted at a point at which interruption
290 * is detected, in which case state of the channel is unchanged
291 * (i.e., equivalent to a null return).
292 **/
293
294 public Object poll(long msecs) throws InterruptedException;
295
296 /**
297 * Return, but do not remove object at head of Channel,
298 * or null if it is empty.
299 **/
300
301 public Object peek();
302
303 }
304