comparison src/EDU/oswego/cs/dl/util/concurrent/CondVar.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:3dc0c5604566
1 /*
2 File: ConditionVariable.java
3
4 Originally written by Doug Lea and released into the public domain.
5 This may be used for any purposes whatsoever without acknowledgment.
6 Thanks for the assistance and support of Sun Microsystems Labs,
7 and everyone contributing, testing, and using this code.
8
9 History:
10 Date Who What
11 11Jun1998 dl Create public version
12 */
13
14 package EDU.oswego.cs.dl.util.concurrent;
15
16 /**
17 * This class is designed for fans of POSIX pthreads programming.
18 * If you restrict yourself to Mutexes and CondVars, you can
19 * use most of your favorite constructions. Don't randomly mix them
20 * with synchronized methods or blocks though.
21 * <p>
22 * Method names and behavior are as close as is reasonable to
23 * those in POSIX.
24 * <p>
25 * <b>Sample Usage.</b> Here is a full version of a bounded buffer
26 * that implements the BoundedChannel interface, written in
27 * a style reminscent of that in POSIX programming books.
28 * <pre>
29 * class CVBuffer implements BoundedChannel {
30 * private final Mutex mutex;
31 * private final CondVar notFull;
32 * private final CondVar notEmpty;
33 * private int count = 0;
34 * private int takePtr = 0;
35 * private int putPtr = 0;
36 * private final Object[] array;
37 *
38 * public CVBuffer(int capacity) {
39 * array = new Object[capacity];
40 * mutex = new Mutex();
41 * notFull = new CondVar(mutex);
42 * notEmpty = new CondVar(mutex);
43 * }
44 *
45 * public int capacity() { return array.length; }
46 *
47 * public void put(Object x) throws InterruptedException {
48 * mutex.acquire();
49 * try {
50 * while (count == array.length) {
51 * notFull.await();
52 * }
53 * array[putPtr] = x;
54 * putPtr = (putPtr + 1) % array.length;
55 * ++count;
56 * notEmpty.signal();
57 * }
58 * finally {
59 * mutex.release();
60 * }
61 * }
62 *
63 * public Object take() throws InterruptedException {
64 * Object x = null;
65 * mutex.acquire();
66 * try {
67 * while (count == 0) {
68 * notEmpty.await();
69 * }
70 * x = array[takePtr];
71 * array[takePtr] = null;
72 * takePtr = (takePtr + 1) % array.length;
73 * --count;
74 * notFull.signal();
75 * }
76 * finally {
77 * mutex.release();
78 * }
79 * return x;
80 * }
81 *
82 * public boolean offer(Object x, long msecs) throws InterruptedException {
83 * mutex.acquire();
84 * try {
85 * if (count == array.length) {
86 * notFull.timedwait(msecs);
87 * if (count == array.length)
88 * return false;
89 * }
90 * array[putPtr] = x;
91 * putPtr = (putPtr + 1) % array.length;
92 * ++count;
93 * notEmpty.signal();
94 * return true;
95 * }
96 * finally {
97 * mutex.release();
98 * }
99 * }
100 *
101 * public Object poll(long msecs) throws InterruptedException {
102 * Object x = null;
103 * mutex.acquire();
104 * try {
105 * if (count == 0) {
106 * notEmpty.timedwait(msecs);
107 * if (count == 0)
108 * return null;
109 * }
110 * x = array[takePtr];
111 * array[takePtr] = null;
112 * takePtr = (takePtr + 1) % array.length;
113 * --count;
114 * notFull.signal();
115 * }
116 * finally {
117 * mutex.release();
118 * }
119 * return x;
120 * }
121 * }
122 *
123 * </pre>
124 * @see Mutex
125 * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
126
127 **/
128
129 public class CondVar {
130
131 /** The mutex **/
132 protected final Sync mutex_;
133
134 /**
135 * Create a new CondVar that relies on the given mutual
136 * exclusion lock.
137 * @param mutex A non-reentrant mutual exclusion lock.
138 * Standard usage is to supply an instance of <code>Mutex</code>,
139 * but, for example, a Semaphore initialized to 1 also works.
140 * On the other hand, many other Sync implementations would not
141 * work here, so some care is required to supply a sensible
142 * synchronization object.
143 * In normal use, the mutex should be one that is used for <em>all</em>
144 * synchronization of the object using the CondVar. Generally,
145 * to prevent nested monitor lockouts, this
146 * object should not use any native Java synchronized blocks.
147 **/
148
149 public CondVar(Sync mutex) {
150 mutex_ = mutex;
151 }
152
153 /**
154 * Wait for notification. This operation at least momentarily
155 * releases the mutex. The mutex is always held upon return,
156 * even if interrupted.
157 * @exception InterruptedException if the thread was interrupted
158 * before or during the wait. However, if the thread is interrupted
159 * after the wait but during mutex re-acquisition, the interruption
160 * is ignored, while still ensuring
161 * that the currentThread's interruption state stays true, so can
162 * be probed by callers.
163 **/
164 public void await() throws InterruptedException {
165 if (Thread.interrupted()) throw new InterruptedException();
166 try {
167 synchronized(this) {
168 mutex_.release();
169 try {
170 wait();
171 }
172 catch (InterruptedException ex) {
173 notify();
174 throw ex;
175 }
176 }
177 }
178 finally {
179 // Must ignore interrupt on re-acquire
180 boolean interrupted = false;
181 for (;;) {
182 try {
183 mutex_.acquire();
184 break;
185 }
186 catch (InterruptedException ex) {
187 interrupted = true;
188 }
189 }
190 if (interrupted) {
191 Thread.currentThread().interrupt();
192 }
193 }
194 }
195
196 /**
197 * Wait for at most msecs for notification.
198 * This operation at least momentarily
199 * releases the mutex. The mutex is always held upon return,
200 * even if interrupted.
201 * @param msecs The time to wait. A value less than or equal to zero
202 * causes a momentarily release
203 * and re-acquire of the mutex, and always returns false.
204 * @return false if at least msecs have elapsed
205 * upon resumption; else true. A
206 * false return does NOT necessarily imply that the thread was
207 * not notified. For example, it might have been notified
208 * after the time elapsed but just before resuming.
209 * @exception InterruptedException if the thread was interrupted
210 * before or during the wait.
211 **/
212
213 public boolean timedwait(long msecs) throws InterruptedException {
214 if (Thread.interrupted()) throw new InterruptedException();
215 boolean success = false;
216 try {
217 synchronized(this) {
218 mutex_.release();
219 try {
220 if (msecs > 0) {
221 long start = System.currentTimeMillis();
222 wait(msecs);
223 success = System.currentTimeMillis() - start <= msecs;
224 }
225 }
226 catch (InterruptedException ex) {
227 notify();
228 throw ex;
229 }
230 }
231 }
232 finally {
233 // Must ignore interrupt on re-acquire
234 boolean interrupted = false;
235 for (;;) {
236 try {
237 mutex_.acquire();
238 break;
239 }
240 catch (InterruptedException ex) {
241 interrupted = true;
242 }
243 }
244 if (interrupted) {
245 Thread.currentThread().interrupt();
246 }
247 }
248 return success;
249 }
250
251 /**
252 * Notify a waiting thread.
253 * If one exists, a non-interrupted thread will return
254 * normally (i.e., not via InterruptedException) from await or timedwait.
255 **/
256 public synchronized void signal() {
257 notify();
258 }
259
260 /** Notify all waiting threads **/
261 public synchronized void broadcast() {
262 notifyAll();
263 }
264 }