comparison src/EDU/oswego/cs/dl/util/concurrent/QueuedSemaphore.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:3dc0c5604566
1 /*
2 File: QueuedSemaphore.java
3
4 Originally written by Doug Lea and released into the public domain.
5 This may be used for any purposes whatsoever without acknowledgment.
6 Thanks for the assistance and support of Sun Microsystems Labs,
7 and everyone contributing, testing, and using this code.
8
9 History:
10 Date Who What
11 11Jun1998 dl Create public version
12 5Aug1998 dl replaced int counters with longs
13 24Aug1999 dl release(n): screen arguments
14 */
15
16
17 package EDU.oswego.cs.dl.util.concurrent;
18
19 /**
20 * Abstract base class for semaphores relying on queued wait nodes.
21 * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
22 **/
23
24
25 public abstract class QueuedSemaphore extends Semaphore {
26
27 protected final WaitQueue wq_;
28
29 QueuedSemaphore(WaitQueue q, long initialPermits) {
30 super(initialPermits);
31 wq_ = q;
32 }
33
34 public void acquire() throws InterruptedException {
35 if (Thread.interrupted()) throw new InterruptedException();
36 if (precheck()) return;
37 WaitQueue.WaitNode w = new WaitQueue.WaitNode();
38 w.doWait(this);
39 }
40
41 public boolean attempt(long msecs) throws InterruptedException {
42 if (Thread.interrupted()) throw new InterruptedException();
43 if (precheck()) return true;
44 if (msecs <= 0) return false;
45
46 WaitQueue.WaitNode w = new WaitQueue.WaitNode();
47 return w.doTimedWait(this, msecs);
48 }
49
50 protected synchronized boolean precheck() {
51 boolean pass = (permits_ > 0);
52 if (pass) --permits_;
53 return pass;
54 }
55
56 protected synchronized boolean recheck(WaitQueue.WaitNode w) {
57 boolean pass = (permits_ > 0);
58 if (pass) --permits_;
59 else wq_.insert(w);
60 return pass;
61 }
62
63
64 protected synchronized WaitQueue.WaitNode getSignallee() {
65 WaitQueue.WaitNode w = wq_.extract();
66 if (w == null) ++permits_; // if none, inc permits for new arrivals
67 return w;
68 }
69
70 public void release() {
71 for (;;) {
72 WaitQueue.WaitNode w = getSignallee();
73 if (w == null) return; // no one to signal
74 if (w.signal()) return; // notify if still waiting, else skip
75 }
76 }
77
78 /** Release N permits **/
79 public void release(long n) {
80 if (n < 0) throw new IllegalArgumentException("Negative argument");
81
82 for (long i = 0; i < n; ++i) release();
83 }
84
85 /**
86 * Base class for internal queue classes for semaphores, etc.
87 * Relies on subclasses to actually implement queue mechanics
88 **/
89
90 protected static abstract class WaitQueue {
91
92 protected abstract void insert(WaitNode w);// assumed not to block
93 protected abstract WaitNode extract(); // should return null if empty
94
95 protected static class WaitNode {
96 boolean waiting = true;
97 WaitNode next = null;
98
99 protected synchronized boolean signal() {
100 boolean signalled = waiting;
101 if (signalled) {
102 waiting = false;
103 notify();
104 }
105 return signalled;
106 }
107
108 protected synchronized boolean doTimedWait(QueuedSemaphore sem,
109 long msecs)
110 throws InterruptedException {
111 if (sem.recheck(this) || !waiting)
112 return true;
113 else if (msecs <= 0) {
114 waiting = false;
115 return false;
116 }
117 else {
118 long waitTime = msecs;
119 long start = System.currentTimeMillis();
120
121 try {
122 for (;;) {
123 wait(waitTime);
124 if (!waiting) // definitely signalled
125 return true;
126 else {
127 waitTime = msecs - (System.currentTimeMillis() - start);
128 if (waitTime <= 0) { // timed out
129 waiting = false;
130 return false;
131 }
132 }
133 }
134 }
135 catch(InterruptedException ex) {
136 if (waiting) { // no notification
137 waiting = false; // invalidate for the signaller
138 throw ex;
139 }
140 else { // thread was interrupted after it was notified
141 Thread.currentThread().interrupt();
142 return true;
143 }
144 }
145 }
146 }
147
148 protected synchronized void doWait(QueuedSemaphore sem)
149 throws InterruptedException {
150 if (!sem.recheck(this)) {
151 try {
152 while (waiting) wait();
153 }
154 catch(InterruptedException ex) {
155 if (waiting) { // no notification
156 waiting = false; // invalidate for the signaller
157 throw ex;
158 }
159 else { // thread was interrupted after it was notified
160 Thread.currentThread().interrupt();
161 return;
162 }
163 }
164 }
165 }
166 }
167
168 }
169
170
171 }