Mercurial > hg > blitz_condensed
diff src/EDU/oswego/cs/dl/util/concurrent/Channel.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/EDU/oswego/cs/dl/util/concurrent/Channel.java Sat Mar 21 11:00:06 2009 +0000 @@ -0,0 +1,304 @@ +/* + File: Channel.java + + Originally written by Doug Lea and released into the public domain. + This may be used for any purposes whatsoever without acknowledgment. + Thanks for the assistance and support of Sun Microsystems Labs, + and everyone contributing, testing, and using this code. + + History: + Date Who What + 11Jun1998 dl Create public version + 25aug1998 dl added peek +*/ + +package EDU.oswego.cs.dl.util.concurrent; + +/** + * Main interface for buffers, queues, pipes, conduits, etc. + * <p> + * A Channel represents anything that you can put items + * into and take them out of. As with the Sync + * interface, both + * blocking (put(x), take), + * and timeouts (offer(x, msecs), poll(msecs)) policies + * are provided. Using a + * zero timeout for offer and poll results in a pure balking policy. + * <p> + * To aid in efforts to use Channels in a more typesafe manner, + * this interface extends Puttable and Takable. You can restrict + * arguments of instance variables to this type as a way of + * guaranteeing that producers never try to take, or consumers put. + * for example: + * <pre> + * class Producer implements Runnable { + * final Puttable chan; + * Producer(Puttable channel) { chan = channel; } + * public void run() { + * try { + * for(;;) { chan.put(produce()); } + * } + * catch (InterruptedException ex) {} + * } + * Object produce() { ... } + * } + * + * + * class Consumer implements Runnable { + * final Takable chan; + * Consumer(Takable channel) { chan = channel; } + * public void run() { + * try { + * for(;;) { consume(chan.take()); } + * } + * catch (InterruptedException ex) {} + * } + * void consume(Object x) { ... } + * } + * + * class Setup { + * void main() { + * Channel chan = new SomeChannelImplementation(); + * Producer p = new Producer(chan); + * Consumer c = new Consumer(chan); + * new Thread(p).start(); + * new Thread(c).start(); + * } + * } + * </pre> + * <p> + * A given channel implementation might or might not have bounded + * capacity or other insertion constraints, so in general, you cannot tell if + * a given put will block. However, + * Channels that are designed to + * have an element capacity (and so always block when full) + * should implement the + * BoundedChannel + * subinterface. + * <p> + * Channels may hold any kind of item. However, + * insertion of null is not in general supported. Implementations + * may (all currently do) throw IllegalArgumentExceptions upon attempts to + * insert null. + * <p> + * By design, the Channel interface does not support any methods to determine + * the current number of elements being held in the channel. + * This decision reflects the fact that in + * concurrent programming, such methods are so rarely useful + * that including them invites misuse; at best they could + * provide a snapshot of current + * state, that could change immediately after being reported. + * It is better practice to instead use poll and offer to try + * to take and put elements without blocking. For example, + * to empty out the current contents of a channel, you could write: + * <pre> + * try { + * for (;;) { + * Object item = channel.poll(0); + * if (item != null) + * process(item); + * else + * break; + * } + * } + * catch(InterruptedException ex) { ... } + * </pre> + * <p> + * However, it is possible to determine whether an item + * exists in a Channel via <code>peek</code>, which returns + * but does NOT remove the next item that can be taken (or null + * if there is no such item). The peek operation has a limited + * range of applicability, and must be used with care. Unless it + * is known that a given thread is the only possible consumer + * of a channel, and that no time-out-based <code>offer</code> operations + * are ever invoked, there is no guarantee that the item returned + * by peek will be available for a subsequent take. + * <p> + * When appropriate, you can define an isEmpty method to + * return whether <code>peek</code> returns null. + * <p> + * Also, as a compromise, even though it does not appear in interface, + * implementation classes that can readily compute the number + * of elements support a <code>size()</code> method. This allows careful + * use, for example in queue length monitors, appropriate to the + * particular implementation constraints and properties. + * <p> + * All channels allow multiple producers and/or consumers. + * They do not support any kind of <em>close</em> method + * to shut down operation or indicate completion of particular + * producer or consumer threads. + * If you need to signal completion, one way to do it is to + * create a class such as + * <pre> + * class EndOfStream { + * // Application-dependent field/methods + * } + * </pre> + * And to have producers put an instance of this class into + * the channel when they are done. The consumer side can then + * check this via + * <pre> + * Object x = aChannel.take(); + * if (x instanceof EndOfStream) + * // special actions; perhaps terminate + * else + * // process normally + * </pre> + * <p> + * In time-out based methods (poll(msecs) and offer(x, msecs), + * time bounds are interpreted in + * a coarse-grained, best-effort fashion. Since there is no + * way in Java to escape out of a wait for a synchronized + * method/block, time bounds can sometimes be exceeded when + * there is a lot contention for the channel. Additionally, + * some Channel semantics entail a ``point of + * no return'' where, once some parts of the operation have completed, + * others must follow, regardless of time bound. + * <p> + * Interruptions are in general handled as early as possible + * in all methods. Normally, InterruptionExceptions are thrown + * in put/take and offer(msec)/poll(msec) if interruption + * is detected upon entry to the method, as well as in any + * later context surrounding waits. + * <p> + * If a put returns normally, an offer + * returns true, or a put or poll returns non-null, the operation + * completed successfully. + * In all other cases, the operation fails cleanly -- the + * element is not put or taken. + * <p> + * As with Sync classes, spinloops are not directly supported, + * are not particularly recommended for routine use, but are not hard + * to construct. For example, here is an exponential backoff version: + * <pre> + * Object backOffTake(Channel q) throws InterruptedException { + * long waitTime = 0; + * for (;;) { + * Object x = q.poll(0); + * if (x != null) + * return x; + * else { + * Thread.sleep(waitTime); + * waitTime = 3 * waitTime / 2 + 1; + * } + * } + * </pre> + * <p> + * <b>Sample Usage</b>. Here is a producer/consumer design + * where the channel is used to hold Runnable commands representing + * background tasks. + * <pre> + * class Service { + * private final Channel channel = ... some Channel implementation; + * + * private void backgroundTask(int taskParam) { ... } + * + * public void action(final int arg) { + * Runnable command = + * new Runnable() { + * public void run() { backgroundTask(arg); } + * }; + * try { channel.put(command) } + * catch (InterruptedException ex) { + * Thread.currentThread().interrupt(); // ignore but propagate + * } + * } + * + * public Service() { + * Runnable backgroundLoop = + * new Runnable() { + * public void run() { + * for (;;) { + * try { + * Runnable task = (Runnable)(channel.take()); + * task.run(); + * } + * catch (InterruptedException ex) { return; } + * } + * } + * }; + * new Thread(backgroundLoop).start(); + * } + * } + * + * </pre> + * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] + * @see Sync + * @see BoundedChannel +**/ + +public interface Channel extends Puttable, Takable { + + /** + * Place item in the channel, possibly waiting indefinitely until + * it can be accepted. Channels implementing the BoundedChannel + * subinterface are generally guaranteed to block on puts upon + * reaching capacity, but other implementations may or may not block. + * @param item the element to be inserted. Should be non-null. + * @exception InterruptedException if the current thread has + * been interrupted at a point at which interruption + * is detected, in which case the element is guaranteed not + * to be inserted. Otherwise, on normal return, the element is guaranteed + * to have been inserted. + **/ + public void put(Object item) throws InterruptedException; + + /** + * Place item in channel only if it can be accepted within + * msecs milliseconds. The time bound is interpreted in + * a coarse-grained, best-effort fashion. + * @param item the element to be inserted. Should be non-null. + * @param msecs the number of milliseconds to wait. If less than + * or equal to zero, the method does not perform any timed waits, + * but might still require + * access to a synchronization lock, which can impose unbounded + * delay if there is a lot of contention for the channel. + * @return true if accepted, else false + * @exception InterruptedException if the current thread has + * been interrupted at a point at which interruption + * is detected, in which case the element is guaranteed not + * to be inserted (i.e., is equivalent to a false return). + **/ + public boolean offer(Object item, long msecs) throws InterruptedException; + + /** + * Return and remove an item from channel, + * possibly waiting indefinitely until + * such an item exists. + * @return some item from the channel. Different implementations + * may guarantee various properties (such as FIFO) about that item + * @exception InterruptedException if the current thread has + * been interrupted at a point at which interruption + * is detected, in which case state of the channel is unchanged. + * + **/ + public Object take() throws InterruptedException; + + + /** + * Return and remove an item from channel only if one is available within + * msecs milliseconds. The time bound is interpreted in a coarse + * grained, best-effort fashion. + * @param msecs the number of milliseconds to wait. If less than + * or equal to zero, the operation does not perform any timed waits, + * but might still require + * access to a synchronization lock, which can impose unbounded + * delay if there is a lot of contention for the channel. + * @return some item, or null if the channel is empty. + * @exception InterruptedException if the current thread has + * been interrupted at a point at which interruption + * is detected, in which case state of the channel is unchanged + * (i.e., equivalent to a null return). + **/ + + public Object poll(long msecs) throws InterruptedException; + + /** + * Return, but do not remove object at head of Channel, + * or null if it is empty. + **/ + + public Object peek(); + +} +