comparison src/EDU/oswego/cs/dl/util/concurrent/misc/PipedChannel.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:3dc0c5604566
1
2 package EDU.oswego.cs.dl.util.concurrent.misc;
3 import EDU.oswego.cs.dl.util.concurrent.*;
4
5 import java.io.*;
6
7 /**
8 * A channel based on a java.io.PipedInputStream and
9 * java.io.PipedOutputStream. Elements are serialized
10 * using ObjectInputStreams and ObjectOutputStreams
11 * upon insertion and extraction from the pipe.
12 * <p>
13 * IO Exceptions are transformed into Errors. This is
14 * in general not a good idea, but seems to be the most
15 * reasonable compromise for the intended usage contexts.
16 * <p>
17 * <b>Status</b> Uncertain. There are enough
18 * conceptual and implementation snags surrounding use
19 * of pipes as Channels to downplay use. However,
20 * without such bridges, people would have to
21 * duplicate code that should work the same way in both cases.
22 *
23 * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
24 **/
25
26 public class PipedChannel extends SemaphoreControlledChannel {
27 protected ObjectInputStream in_;
28 protected ObjectOutputStream out_;
29
30 protected final PipedOutputStream outp_;
31 protected final PipedInputStream inp_;
32
33
34 public PipedChannel() {
35 super(1);
36
37 try {
38 outp_ = new PipedOutputStream();
39 inp_ = new PipedInputStream();
40 inp_.connect(outp_);
41 }
42 catch (IOException ex) {
43 ex.printStackTrace();
44 throw new Error("Cannot construct Pipe?");
45 }
46 }
47
48
49 /**
50 * Return input stream, first constructing if necessary.
51 * Needed because Object streams can block on open.
52 **/
53
54 protected synchronized ObjectInputStream in() {
55 try {
56 if (in_ == null) in_ = new ObjectInputStream(inp_);
57 return in_;
58 }
59 catch (IOException ex) {
60 ex.printStackTrace();
61 throw new Error("IO exception during open");
62 }
63 }
64
65 /**
66 * Return output stream, first constructing if necessary.
67 * Needed because Object streams can block on open.
68 **/
69 protected synchronized ObjectOutputStream out() {
70 try {
71 if (out_ == null) out_ = new ObjectOutputStream(outp_);
72 return out_;
73 }
74 catch (IOException ex) {
75 ex.printStackTrace();
76 throw new Error("IO exception during open");
77 }
78 }
79
80
81 /** Shared mechanics for put-based methods **/
82 protected void insert(Object x) {
83 try {
84 out().writeObject(x);
85 }
86 catch (InterruptedIOException ex) {
87 Thread.currentThread().interrupt();
88 }
89 catch (IOException ex) {
90 ex.printStackTrace();
91 throw new Error("IO exception during put");
92 }
93 }
94
95 /** Shared mechanics for take-based methods **/
96 protected Object extract() {
97 try {
98 return in().readObject();
99 }
100 catch (InterruptedIOException ex) {
101 Thread.currentThread().interrupt();
102 return null;
103 }
104 catch (IOException ex) {
105 ex.printStackTrace();
106 throw new Error("IO exception during take");
107 }
108 catch (ClassNotFoundException ex) {
109 ex.printStackTrace();
110 throw new Error("Serialization exception during take");
111 }
112 }
113
114 /** Stubbed out for now **/
115 public Object peek() { return null; }
116 }
117