comparison src/EDU/oswego/cs/dl/util/concurrent/SemaphoreControlledChannel.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:3dc0c5604566
1 /*
2 File: SemaphoreControlledChannel.java
3
4 Originally written by Doug Lea and released into the public domain.
5 This may be used for any purposes whatsoever without acknowledgment.
6 Thanks for the assistance and support of Sun Microsystems Labs,
7 and everyone contributing, testing, and using this code.
8
9 History:
10 Date Who What
11 16Jun1998 dl Create public version
12 5Aug1998 dl replaced int counters with longs
13 08dec2001 dl reflective constructor now uses longs too.
14 */
15
16 package EDU.oswego.cs.dl.util.concurrent;
17 import java.lang.reflect.*;
18
19 /**
20 * Abstract class for channels that use Semaphores to
21 * control puts and takes.
22 * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
23 **/
24
25 public abstract class SemaphoreControlledChannel implements BoundedChannel {
26 protected final Semaphore putGuard_;
27 protected final Semaphore takeGuard_;
28 protected int capacity_;
29
30 /**
31 * Create a channel with the given capacity and default
32 * semaphore implementation
33 * @exception IllegalArgumentException if capacity less or equal to zero
34 **/
35
36 public SemaphoreControlledChannel(int capacity)
37 throws IllegalArgumentException {
38 if (capacity <= 0) throw new IllegalArgumentException();
39 capacity_ = capacity;
40 putGuard_ = new Semaphore(capacity);
41 takeGuard_ = new Semaphore(0);
42 }
43
44
45 /**
46 * Create a channel with the given capacity and
47 * semaphore implementations instantiated from the supplied class
48 * @exception IllegalArgumentException if capacity less or equal to zero.
49 * @exception NoSuchMethodException If class does not have constructor
50 * that intializes permits
51 * @exception SecurityException if constructor information
52 * not accessible
53 * @exception InstantiationException if semaphore class is abstract
54 * @exception IllegalAccessException if constructor cannot be called
55 * @exception InvocationTargetException if semaphore constructor throws an
56 * exception
57 **/
58 public SemaphoreControlledChannel(int capacity, Class semaphoreClass)
59 throws IllegalArgumentException,
60 NoSuchMethodException,
61 SecurityException,
62 InstantiationException,
63 IllegalAccessException,
64 InvocationTargetException {
65 if (capacity <= 0) throw new IllegalArgumentException();
66 capacity_ = capacity;
67 Class[] longarg = { Long.TYPE };
68 Constructor ctor = semaphoreClass.getDeclaredConstructor(longarg);
69 Long[] cap = { new Long(capacity) };
70 putGuard_ = (Semaphore)(ctor.newInstance(cap));
71 Long[] zero = { new Long(0) };
72 takeGuard_ = (Semaphore)(ctor.newInstance(zero));
73 }
74
75
76
77 public int capacity() { return capacity_; }
78
79 /**
80 * Return the number of elements in the buffer.
81 * This is only a snapshot value, that may change
82 * immediately after returning.
83 **/
84
85 public int size() { return (int)(takeGuard_.permits()); }
86
87 /**
88 * Internal mechanics of put.
89 **/
90 protected abstract void insert(Object x);
91
92 /**
93 * Internal mechanics of take.
94 **/
95 protected abstract Object extract();
96
97 public void put(Object x) throws InterruptedException {
98 if (x == null) throw new IllegalArgumentException();
99 if (Thread.interrupted()) throw new InterruptedException();
100 putGuard_.acquire();
101 try {
102 insert(x);
103 takeGuard_.release();
104 }
105 catch (ClassCastException ex) {
106 putGuard_.release();
107 throw ex;
108 }
109 }
110
111 public boolean offer(Object x, long msecs) throws InterruptedException {
112 if (x == null) throw new IllegalArgumentException();
113 if (Thread.interrupted()) throw new InterruptedException();
114 if (!putGuard_.attempt(msecs))
115 return false;
116 else {
117 try {
118 insert(x);
119 takeGuard_.release();
120 return true;
121 }
122 catch (ClassCastException ex) {
123 putGuard_.release();
124 throw ex;
125 }
126 }
127 }
128
129 public Object take() throws InterruptedException {
130 if (Thread.interrupted()) throw new InterruptedException();
131 takeGuard_.acquire();
132 try {
133 Object x = extract();
134 putGuard_.release();
135 return x;
136 }
137 catch (ClassCastException ex) {
138 takeGuard_.release();
139 throw ex;
140 }
141 }
142
143 public Object poll(long msecs) throws InterruptedException {
144 if (Thread.interrupted()) throw new InterruptedException();
145 if (!takeGuard_.attempt(msecs))
146 return null;
147 else {
148 try {
149 Object x = extract();
150 putGuard_.release();
151 return x;
152 }
153 catch (ClassCastException ex) {
154 takeGuard_.release();
155 throw ex;
156 }
157 }
158 }
159
160 }