Mercurial > hg > blitz_condensed
diff src/EDU/oswego/cs/dl/util/concurrent/misc/PipedChannel.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/EDU/oswego/cs/dl/util/concurrent/misc/PipedChannel.java Sat Mar 21 11:00:06 2009 +0000 @@ -0,0 +1,117 @@ + +package EDU.oswego.cs.dl.util.concurrent.misc; +import EDU.oswego.cs.dl.util.concurrent.*; + +import java.io.*; + +/** + * A channel based on a java.io.PipedInputStream and + * java.io.PipedOutputStream. Elements are serialized + * using ObjectInputStreams and ObjectOutputStreams + * upon insertion and extraction from the pipe. + * <p> + * IO Exceptions are transformed into Errors. This is + * in general not a good idea, but seems to be the most + * reasonable compromise for the intended usage contexts. + * <p> + * <b>Status</b> Uncertain. There are enough + * conceptual and implementation snags surrounding use + * of pipes as Channels to downplay use. However, + * without such bridges, people would have to + * duplicate code that should work the same way in both cases. + * + * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] + **/ + +public class PipedChannel extends SemaphoreControlledChannel { + protected ObjectInputStream in_; + protected ObjectOutputStream out_; + + protected final PipedOutputStream outp_; + protected final PipedInputStream inp_; + + + public PipedChannel() { + super(1); + + try { + outp_ = new PipedOutputStream(); + inp_ = new PipedInputStream(); + inp_.connect(outp_); + } + catch (IOException ex) { + ex.printStackTrace(); + throw new Error("Cannot construct Pipe?"); + } + } + + + /** + * Return input stream, first constructing if necessary. + * Needed because Object streams can block on open. + **/ + + protected synchronized ObjectInputStream in() { + try { + if (in_ == null) in_ = new ObjectInputStream(inp_); + return in_; + } + catch (IOException ex) { + ex.printStackTrace(); + throw new Error("IO exception during open"); + } + } + + /** + * Return output stream, first constructing if necessary. + * Needed because Object streams can block on open. + **/ + protected synchronized ObjectOutputStream out() { + try { + if (out_ == null) out_ = new ObjectOutputStream(outp_); + return out_; + } + catch (IOException ex) { + ex.printStackTrace(); + throw new Error("IO exception during open"); + } + } + + + /** Shared mechanics for put-based methods **/ + protected void insert(Object x) { + try { + out().writeObject(x); + } + catch (InterruptedIOException ex) { + Thread.currentThread().interrupt(); + } + catch (IOException ex) { + ex.printStackTrace(); + throw new Error("IO exception during put"); + } + } + + /** Shared mechanics for take-based methods **/ + protected Object extract() { + try { + return in().readObject(); + } + catch (InterruptedIOException ex) { + Thread.currentThread().interrupt(); + return null; + } + catch (IOException ex) { + ex.printStackTrace(); + throw new Error("IO exception during take"); + } + catch (ClassNotFoundException ex) { + ex.printStackTrace(); + throw new Error("Serialization exception during take"); + } + } + + /** Stubbed out for now **/ + public Object peek() { return null; } +} +