comparison src/EDU/oswego/cs/dl/util/concurrent/BoundedLinkedQueue.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:3dc0c5604566
1 /*
2 File: BoundedLinkedQueue.java
3
4 Originally written by Doug Lea and released into the public domain.
5 This may be used for any purposes whatsoever without acknowledgment.
6 Thanks for the assistance and support of Sun Microsystems Labs,
7 and everyone contributing, testing, and using this code.
8
9 History:
10 Date Who What
11 11Jun1998 dl Create public version
12 17Jul1998 dl Simplified by eliminating wait counts
13 25aug1998 dl added peek
14 10oct1999 dl lock on node object to ensure visibility
15 27jan2000 dl setCapacity forces immediate permit reconcile
16 */
17
18 package EDU.oswego.cs.dl.util.concurrent;
19
20 /**
21 * A bounded variant of
22 * LinkedQueue
23 * class. This class may be
24 * preferable to
25 * BoundedBuffer
26 * because it allows a bit more
27 * concurency among puts and takes, because it does not
28 * pre-allocate fixed storage for elements, and allows
29 * capacity to be dynamically reset.
30 * On the other hand, since it allocates a node object
31 * on each put, it can be slow on systems with slow
32 * allocation and GC.
33 * Also, it may be
34 * preferable to
35 * LinkedQueue
36 * when you need to limit
37 * the capacity to prevent resource exhaustion. This protection
38 * normally does not hurt much performance-wise: When the
39 * queue is not empty or full, most puts and
40 * takes are still usually able to execute concurrently.
41 * @see LinkedQueue
42 * @see BoundedBuffer
43 * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] <p>
44 **/
45
46 public class BoundedLinkedQueue implements BoundedChannel {
47
48 /*
49 * It might be a bit nicer if this were declared as
50 * a subclass of LinkedQueue, or a sibling class of
51 * a common abstract class. It shares much of the
52 * basic design and bookkeeping fields. But too
53 * many details differ to make this worth doing.
54 */
55
56
57
58 /**
59 * Dummy header node of list. The first actual node, if it exists, is always
60 * at head_.next. After each take, the old first node becomes the head.
61 **/
62 protected LinkedNode head_;
63
64 /**
65 * The last node of list. Put() appends to list, so modifies last_
66 **/
67 protected LinkedNode last_;
68
69
70 /**
71 * Helper monitor. Ensures that only one put at a time executes.
72 **/
73
74 protected final Object putGuard_ = new Object();
75
76 /**
77 * Helper monitor. Protects and provides wait queue for takes
78 **/
79
80 protected final Object takeGuard_ = new Object();
81
82
83 /** Number of elements allowed **/
84 protected int capacity_;
85
86
87 /**
88 * One side of a split permit count.
89 * The counts represent permits to do a put. (The queue is full when zero).
90 * Invariant: putSidePutPermits_ + takeSidePutPermits_ = capacity_ - length.
91 * (The length is never separately recorded, so this cannot be
92 * checked explicitly.)
93 * To minimize contention between puts and takes, the
94 * put side uses up all of its permits before transfering them from
95 * the take side. The take side just increments the count upon each take.
96 * Thus, most puts and take can run independently of each other unless
97 * the queue is empty or full.
98 * Initial value is queue capacity.
99 **/
100
101 protected int putSidePutPermits_;
102
103 /** Number of takes since last reconcile **/
104 protected int takeSidePutPermits_ = 0;
105
106
107 /**
108 * Create a queue with the given capacity
109 * @exception IllegalArgumentException if capacity less or equal to zero
110 **/
111 public BoundedLinkedQueue(int capacity) {
112 if (capacity <= 0) throw new IllegalArgumentException();
113 capacity_ = capacity;
114 putSidePutPermits_ = capacity;
115 head_ = new LinkedNode(null);
116 last_ = head_;
117 }
118
119 /**
120 * Create a queue with the current default capacity
121 **/
122
123 public BoundedLinkedQueue() {
124 this(DefaultChannelCapacity.get());
125 }
126
127 /**
128 * Move put permits from take side to put side;
129 * return the number of put side permits that are available.
130 * Call only under synch on puGuard_ AND this.
131 **/
132 protected final int reconcilePutPermits() {
133 putSidePutPermits_ += takeSidePutPermits_;
134 takeSidePutPermits_ = 0;
135 return putSidePutPermits_;
136 }
137
138
139 /** Return the current capacity of this queue **/
140 public synchronized int capacity() { return capacity_; }
141
142
143 /**
144 * Return the number of elements in the queue.
145 * This is only a snapshot value, that may be in the midst
146 * of changing. The returned value will be unreliable in the presence of
147 * active puts and takes, and should only be used as a heuristic
148 * estimate, for example for resource monitoring purposes.
149 **/
150 public synchronized int size() {
151 /*
152 This should ideally synch on putGuard_, but
153 doing so would cause it to block waiting for an in-progress
154 put, which might be stuck. So we instead use whatever
155 value of putSidePutPermits_ that we happen to read.
156 */
157 return capacity_ - (takeSidePutPermits_ + putSidePutPermits_);
158 }
159
160
161 /**
162 * Reset the capacity of this queue.
163 * If the new capacity is less than the old capacity,
164 * existing elements are NOT removed, but
165 * incoming puts will not proceed until the number of elements
166 * is less than the new capacity.
167 * @exception IllegalArgumentException if capacity less or equal to zero
168 **/
169
170 public void setCapacity(int newCapacity) {
171 if (newCapacity <= 0) throw new IllegalArgumentException();
172 synchronized (putGuard_) {
173 synchronized(this) {
174 takeSidePutPermits_ += (newCapacity - capacity_);
175 capacity_ = newCapacity;
176
177 // Force immediate reconcilation.
178 reconcilePutPermits();
179 notifyAll();
180 }
181 }
182 }
183
184
185 /** Main mechanics for take/poll **/
186 protected synchronized Object extract() {
187 synchronized(head_) {
188 Object x = null;
189 LinkedNode first = head_.next;
190 if (first != null) {
191 x = first.value;
192 first.value = null;
193 head_ = first;
194 ++takeSidePutPermits_;
195 notify();
196 }
197 return x;
198 }
199 }
200
201 public Object peek() {
202 synchronized(head_) {
203 LinkedNode first = head_.next;
204 if (first != null)
205 return first.value;
206 else
207 return null;
208 }
209 }
210
211 public Object take() throws InterruptedException {
212 if (Thread.interrupted()) throw new InterruptedException();
213 Object x = extract();
214 if (x != null)
215 return x;
216 else {
217 synchronized(takeGuard_) {
218 try {
219 for (;;) {
220 x = extract();
221 if (x != null) {
222 return x;
223 }
224 else {
225 takeGuard_.wait();
226 }
227 }
228 }
229 catch(InterruptedException ex) {
230 takeGuard_.notify();
231 throw ex;
232 }
233 }
234 }
235 }
236
237 public Object poll(long msecs) throws InterruptedException {
238 if (Thread.interrupted()) throw new InterruptedException();
239 Object x = extract();
240 if (x != null)
241 return x;
242 else {
243 synchronized(takeGuard_) {
244 try {
245 long waitTime = msecs;
246 long start = (msecs <= 0)? 0: System.currentTimeMillis();
247 for (;;) {
248 x = extract();
249 if (x != null || waitTime <= 0) {
250 return x;
251 }
252 else {
253 takeGuard_.wait(waitTime);
254 waitTime = msecs - (System.currentTimeMillis() - start);
255 }
256 }
257 }
258 catch(InterruptedException ex) {
259 takeGuard_.notify();
260 throw ex;
261 }
262 }
263 }
264 }
265
266 /** Notify a waiting take if needed **/
267 protected final void allowTake() {
268 synchronized(takeGuard_) {
269 takeGuard_.notify();
270 }
271 }
272
273
274 /**
275 * Create and insert a node.
276 * Call only under synch on putGuard_
277 **/
278 protected void insert(Object x) {
279 --putSidePutPermits_;
280 LinkedNode p = new LinkedNode(x);
281 synchronized(last_) {
282 last_.next = p;
283 last_ = p;
284 }
285 }
286
287
288 /*
289 put and offer(ms) differ only in policy before insert/allowTake
290 */
291
292 public void put(Object x) throws InterruptedException {
293 if (x == null) throw new IllegalArgumentException();
294 if (Thread.interrupted()) throw new InterruptedException();
295
296 synchronized(putGuard_) {
297
298 if (putSidePutPermits_ <= 0) { // wait for permit.
299 synchronized(this) {
300 if (reconcilePutPermits() <= 0) {
301 try {
302 for(;;) {
303 wait();
304 if (reconcilePutPermits() > 0) {
305 break;
306 }
307 }
308 }
309 catch (InterruptedException ex) {
310 notify();
311 throw ex;
312 }
313 }
314 }
315 }
316 insert(x);
317 }
318 // call outside of lock to loosen put/take coupling
319 allowTake();
320 }
321
322 public boolean offer(Object x, long msecs) throws InterruptedException {
323 if (x == null) throw new IllegalArgumentException();
324 if (Thread.interrupted()) throw new InterruptedException();
325
326 synchronized(putGuard_) {
327
328 if (putSidePutPermits_ <= 0) {
329 synchronized(this) {
330 if (reconcilePutPermits() <= 0) {
331 if (msecs <= 0)
332 return false;
333 else {
334 try {
335 long waitTime = msecs;
336 long start = System.currentTimeMillis();
337
338 for(;;) {
339 wait(waitTime);
340 if (reconcilePutPermits() > 0) {
341 break;
342 }
343 else {
344 waitTime = msecs - (System.currentTimeMillis() - start);
345 if (waitTime <= 0) {
346 return false;
347 }
348 }
349 }
350 }
351 catch (InterruptedException ex) {
352 notify();
353 throw ex;
354 }
355 }
356 }
357 }
358 }
359
360 insert(x);
361 }
362
363 allowTake();
364 return true;
365 }
366
367 public boolean isEmpty() {
368 synchronized(head_) {
369 return head_.next == null;
370 }
371 }
372
373 }