Mercurial > hg > blitz_condensed
comparison src/EDU/oswego/cs/dl/util/concurrent/BoundedLinkedQueue.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:3dc0c5604566 |
---|---|
1 /* | |
2 File: BoundedLinkedQueue.java | |
3 | |
4 Originally written by Doug Lea and released into the public domain. | |
5 This may be used for any purposes whatsoever without acknowledgment. | |
6 Thanks for the assistance and support of Sun Microsystems Labs, | |
7 and everyone contributing, testing, and using this code. | |
8 | |
9 History: | |
10 Date Who What | |
11 11Jun1998 dl Create public version | |
12 17Jul1998 dl Simplified by eliminating wait counts | |
13 25aug1998 dl added peek | |
14 10oct1999 dl lock on node object to ensure visibility | |
15 27jan2000 dl setCapacity forces immediate permit reconcile | |
16 */ | |
17 | |
18 package EDU.oswego.cs.dl.util.concurrent; | |
19 | |
20 /** | |
21 * A bounded variant of | |
22 * LinkedQueue | |
23 * class. This class may be | |
24 * preferable to | |
25 * BoundedBuffer | |
26 * because it allows a bit more | |
27 * concurency among puts and takes, because it does not | |
28 * pre-allocate fixed storage for elements, and allows | |
29 * capacity to be dynamically reset. | |
30 * On the other hand, since it allocates a node object | |
31 * on each put, it can be slow on systems with slow | |
32 * allocation and GC. | |
33 * Also, it may be | |
34 * preferable to | |
35 * LinkedQueue | |
36 * when you need to limit | |
37 * the capacity to prevent resource exhaustion. This protection | |
38 * normally does not hurt much performance-wise: When the | |
39 * queue is not empty or full, most puts and | |
40 * takes are still usually able to execute concurrently. | |
41 * @see LinkedQueue | |
42 * @see BoundedBuffer | |
43 * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] <p> | |
44 **/ | |
45 | |
46 public class BoundedLinkedQueue implements BoundedChannel { | |
47 | |
48 /* | |
49 * It might be a bit nicer if this were declared as | |
50 * a subclass of LinkedQueue, or a sibling class of | |
51 * a common abstract class. It shares much of the | |
52 * basic design and bookkeeping fields. But too | |
53 * many details differ to make this worth doing. | |
54 */ | |
55 | |
56 | |
57 | |
58 /** | |
59 * Dummy header node of list. The first actual node, if it exists, is always | |
60 * at head_.next. After each take, the old first node becomes the head. | |
61 **/ | |
62 protected LinkedNode head_; | |
63 | |
64 /** | |
65 * The last node of list. Put() appends to list, so modifies last_ | |
66 **/ | |
67 protected LinkedNode last_; | |
68 | |
69 | |
70 /** | |
71 * Helper monitor. Ensures that only one put at a time executes. | |
72 **/ | |
73 | |
74 protected final Object putGuard_ = new Object(); | |
75 | |
76 /** | |
77 * Helper monitor. Protects and provides wait queue for takes | |
78 **/ | |
79 | |
80 protected final Object takeGuard_ = new Object(); | |
81 | |
82 | |
83 /** Number of elements allowed **/ | |
84 protected int capacity_; | |
85 | |
86 | |
87 /** | |
88 * One side of a split permit count. | |
89 * The counts represent permits to do a put. (The queue is full when zero). | |
90 * Invariant: putSidePutPermits_ + takeSidePutPermits_ = capacity_ - length. | |
91 * (The length is never separately recorded, so this cannot be | |
92 * checked explicitly.) | |
93 * To minimize contention between puts and takes, the | |
94 * put side uses up all of its permits before transfering them from | |
95 * the take side. The take side just increments the count upon each take. | |
96 * Thus, most puts and take can run independently of each other unless | |
97 * the queue is empty or full. | |
98 * Initial value is queue capacity. | |
99 **/ | |
100 | |
101 protected int putSidePutPermits_; | |
102 | |
103 /** Number of takes since last reconcile **/ | |
104 protected int takeSidePutPermits_ = 0; | |
105 | |
106 | |
107 /** | |
108 * Create a queue with the given capacity | |
109 * @exception IllegalArgumentException if capacity less or equal to zero | |
110 **/ | |
111 public BoundedLinkedQueue(int capacity) { | |
112 if (capacity <= 0) throw new IllegalArgumentException(); | |
113 capacity_ = capacity; | |
114 putSidePutPermits_ = capacity; | |
115 head_ = new LinkedNode(null); | |
116 last_ = head_; | |
117 } | |
118 | |
119 /** | |
120 * Create a queue with the current default capacity | |
121 **/ | |
122 | |
123 public BoundedLinkedQueue() { | |
124 this(DefaultChannelCapacity.get()); | |
125 } | |
126 | |
127 /** | |
128 * Move put permits from take side to put side; | |
129 * return the number of put side permits that are available. | |
130 * Call only under synch on puGuard_ AND this. | |
131 **/ | |
132 protected final int reconcilePutPermits() { | |
133 putSidePutPermits_ += takeSidePutPermits_; | |
134 takeSidePutPermits_ = 0; | |
135 return putSidePutPermits_; | |
136 } | |
137 | |
138 | |
139 /** Return the current capacity of this queue **/ | |
140 public synchronized int capacity() { return capacity_; } | |
141 | |
142 | |
143 /** | |
144 * Return the number of elements in the queue. | |
145 * This is only a snapshot value, that may be in the midst | |
146 * of changing. The returned value will be unreliable in the presence of | |
147 * active puts and takes, and should only be used as a heuristic | |
148 * estimate, for example for resource monitoring purposes. | |
149 **/ | |
150 public synchronized int size() { | |
151 /* | |
152 This should ideally synch on putGuard_, but | |
153 doing so would cause it to block waiting for an in-progress | |
154 put, which might be stuck. So we instead use whatever | |
155 value of putSidePutPermits_ that we happen to read. | |
156 */ | |
157 return capacity_ - (takeSidePutPermits_ + putSidePutPermits_); | |
158 } | |
159 | |
160 | |
161 /** | |
162 * Reset the capacity of this queue. | |
163 * If the new capacity is less than the old capacity, | |
164 * existing elements are NOT removed, but | |
165 * incoming puts will not proceed until the number of elements | |
166 * is less than the new capacity. | |
167 * @exception IllegalArgumentException if capacity less or equal to zero | |
168 **/ | |
169 | |
170 public void setCapacity(int newCapacity) { | |
171 if (newCapacity <= 0) throw new IllegalArgumentException(); | |
172 synchronized (putGuard_) { | |
173 synchronized(this) { | |
174 takeSidePutPermits_ += (newCapacity - capacity_); | |
175 capacity_ = newCapacity; | |
176 | |
177 // Force immediate reconcilation. | |
178 reconcilePutPermits(); | |
179 notifyAll(); | |
180 } | |
181 } | |
182 } | |
183 | |
184 | |
185 /** Main mechanics for take/poll **/ | |
186 protected synchronized Object extract() { | |
187 synchronized(head_) { | |
188 Object x = null; | |
189 LinkedNode first = head_.next; | |
190 if (first != null) { | |
191 x = first.value; | |
192 first.value = null; | |
193 head_ = first; | |
194 ++takeSidePutPermits_; | |
195 notify(); | |
196 } | |
197 return x; | |
198 } | |
199 } | |
200 | |
201 public Object peek() { | |
202 synchronized(head_) { | |
203 LinkedNode first = head_.next; | |
204 if (first != null) | |
205 return first.value; | |
206 else | |
207 return null; | |
208 } | |
209 } | |
210 | |
211 public Object take() throws InterruptedException { | |
212 if (Thread.interrupted()) throw new InterruptedException(); | |
213 Object x = extract(); | |
214 if (x != null) | |
215 return x; | |
216 else { | |
217 synchronized(takeGuard_) { | |
218 try { | |
219 for (;;) { | |
220 x = extract(); | |
221 if (x != null) { | |
222 return x; | |
223 } | |
224 else { | |
225 takeGuard_.wait(); | |
226 } | |
227 } | |
228 } | |
229 catch(InterruptedException ex) { | |
230 takeGuard_.notify(); | |
231 throw ex; | |
232 } | |
233 } | |
234 } | |
235 } | |
236 | |
237 public Object poll(long msecs) throws InterruptedException { | |
238 if (Thread.interrupted()) throw new InterruptedException(); | |
239 Object x = extract(); | |
240 if (x != null) | |
241 return x; | |
242 else { | |
243 synchronized(takeGuard_) { | |
244 try { | |
245 long waitTime = msecs; | |
246 long start = (msecs <= 0)? 0: System.currentTimeMillis(); | |
247 for (;;) { | |
248 x = extract(); | |
249 if (x != null || waitTime <= 0) { | |
250 return x; | |
251 } | |
252 else { | |
253 takeGuard_.wait(waitTime); | |
254 waitTime = msecs - (System.currentTimeMillis() - start); | |
255 } | |
256 } | |
257 } | |
258 catch(InterruptedException ex) { | |
259 takeGuard_.notify(); | |
260 throw ex; | |
261 } | |
262 } | |
263 } | |
264 } | |
265 | |
266 /** Notify a waiting take if needed **/ | |
267 protected final void allowTake() { | |
268 synchronized(takeGuard_) { | |
269 takeGuard_.notify(); | |
270 } | |
271 } | |
272 | |
273 | |
274 /** | |
275 * Create and insert a node. | |
276 * Call only under synch on putGuard_ | |
277 **/ | |
278 protected void insert(Object x) { | |
279 --putSidePutPermits_; | |
280 LinkedNode p = new LinkedNode(x); | |
281 synchronized(last_) { | |
282 last_.next = p; | |
283 last_ = p; | |
284 } | |
285 } | |
286 | |
287 | |
288 /* | |
289 put and offer(ms) differ only in policy before insert/allowTake | |
290 */ | |
291 | |
292 public void put(Object x) throws InterruptedException { | |
293 if (x == null) throw new IllegalArgumentException(); | |
294 if (Thread.interrupted()) throw new InterruptedException(); | |
295 | |
296 synchronized(putGuard_) { | |
297 | |
298 if (putSidePutPermits_ <= 0) { // wait for permit. | |
299 synchronized(this) { | |
300 if (reconcilePutPermits() <= 0) { | |
301 try { | |
302 for(;;) { | |
303 wait(); | |
304 if (reconcilePutPermits() > 0) { | |
305 break; | |
306 } | |
307 } | |
308 } | |
309 catch (InterruptedException ex) { | |
310 notify(); | |
311 throw ex; | |
312 } | |
313 } | |
314 } | |
315 } | |
316 insert(x); | |
317 } | |
318 // call outside of lock to loosen put/take coupling | |
319 allowTake(); | |
320 } | |
321 | |
322 public boolean offer(Object x, long msecs) throws InterruptedException { | |
323 if (x == null) throw new IllegalArgumentException(); | |
324 if (Thread.interrupted()) throw new InterruptedException(); | |
325 | |
326 synchronized(putGuard_) { | |
327 | |
328 if (putSidePutPermits_ <= 0) { | |
329 synchronized(this) { | |
330 if (reconcilePutPermits() <= 0) { | |
331 if (msecs <= 0) | |
332 return false; | |
333 else { | |
334 try { | |
335 long waitTime = msecs; | |
336 long start = System.currentTimeMillis(); | |
337 | |
338 for(;;) { | |
339 wait(waitTime); | |
340 if (reconcilePutPermits() > 0) { | |
341 break; | |
342 } | |
343 else { | |
344 waitTime = msecs - (System.currentTimeMillis() - start); | |
345 if (waitTime <= 0) { | |
346 return false; | |
347 } | |
348 } | |
349 } | |
350 } | |
351 catch (InterruptedException ex) { | |
352 notify(); | |
353 throw ex; | |
354 } | |
355 } | |
356 } | |
357 } | |
358 } | |
359 | |
360 insert(x); | |
361 } | |
362 | |
363 allowTake(); | |
364 return true; | |
365 } | |
366 | |
367 public boolean isEmpty() { | |
368 synchronized(head_) { | |
369 return head_.next == null; | |
370 } | |
371 } | |
372 | |
373 } |