Mercurial > hg > blitz_condensed
comparison src/EDU/oswego/cs/dl/util/concurrent/LinkedQueue.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:3dc0c5604566 |
---|---|
1 /* | |
2 File: LinkedQueue.java | |
3 | |
4 Originally written by Doug Lea and released into the public domain. | |
5 This may be used for any purposes whatsoever without acknowledgment. | |
6 Thanks for the assistance and support of Sun Microsystems Labs, | |
7 and everyone contributing, testing, and using this code. | |
8 | |
9 History: | |
10 Date Who What | |
11 11Jun1998 dl Create public version | |
12 25aug1998 dl added peek | |
13 10dec1998 dl added isEmpty | |
14 10oct1999 dl lock on node object to ensure visibility | |
15 */ | |
16 | |
17 package EDU.oswego.cs.dl.util.concurrent; | |
18 | |
19 /** | |
20 * A linked list based channel implementation. | |
21 * The algorithm avoids contention between puts | |
22 * and takes when the queue is not empty. | |
23 * Normally a put and a take can proceed simultaneously. | |
24 * (Although it does not allow multiple concurrent puts or takes.) | |
25 * This class tends to perform more efficently than | |
26 * other Channel implementations in producer/consumer | |
27 * applications. | |
28 * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] | |
29 **/ | |
30 | |
31 public class LinkedQueue implements Channel { | |
32 | |
33 | |
34 /** | |
35 * Dummy header node of list. The first actual node, if it exists, is always | |
36 * at head_.next. After each take, the old first node becomes the head. | |
37 **/ | |
38 protected LinkedNode head_; | |
39 | |
40 /** | |
41 * Helper monitor for managing access to last node. | |
42 **/ | |
43 protected final Object putLock_ = new Object(); | |
44 | |
45 /** | |
46 * The last node of list. Put() appends to list, so modifies last_ | |
47 **/ | |
48 protected LinkedNode last_; | |
49 | |
50 /** | |
51 * The number of threads waiting for a take. | |
52 * Notifications are provided in put only if greater than zero. | |
53 * The bookkeeping is worth it here since in reasonably balanced | |
54 * usages, the notifications will hardly ever be necessary, so | |
55 * the call overhead to notify can be eliminated. | |
56 **/ | |
57 protected int waitingForTake_ = 0; | |
58 | |
59 public LinkedQueue() { | |
60 head_ = new LinkedNode(null); | |
61 last_ = head_; | |
62 } | |
63 | |
64 /** Main mechanics for put/offer **/ | |
65 protected void insert(Object x) { | |
66 synchronized(putLock_) { | |
67 LinkedNode p = new LinkedNode(x); | |
68 synchronized(last_) { | |
69 last_.next = p; | |
70 last_ = p; | |
71 } | |
72 if (waitingForTake_ > 0) | |
73 putLock_.notify(); | |
74 } | |
75 } | |
76 | |
77 /** Main mechanics for take/poll **/ | |
78 protected synchronized Object extract() { | |
79 synchronized(head_) { | |
80 Object x = null; | |
81 LinkedNode first = head_.next; | |
82 if (first != null) { | |
83 x = first.value; | |
84 first.value = null; | |
85 head_ = first; | |
86 } | |
87 return x; | |
88 } | |
89 } | |
90 | |
91 | |
92 public void put(Object x) throws InterruptedException { | |
93 if (x == null) throw new IllegalArgumentException(); | |
94 if (Thread.interrupted()) throw new InterruptedException(); | |
95 insert(x); | |
96 } | |
97 | |
98 public boolean offer(Object x, long msecs) throws InterruptedException { | |
99 if (x == null) throw new IllegalArgumentException(); | |
100 if (Thread.interrupted()) throw new InterruptedException(); | |
101 insert(x); | |
102 return true; | |
103 } | |
104 | |
105 public Object take() throws InterruptedException { | |
106 if (Thread.interrupted()) throw new InterruptedException(); | |
107 // try to extract. If fail, then enter wait-based retry loop | |
108 Object x = extract(); | |
109 if (x != null) | |
110 return x; | |
111 else { | |
112 synchronized(putLock_) { | |
113 try { | |
114 ++waitingForTake_; | |
115 for (;;) { | |
116 x = extract(); | |
117 if (x != null) { | |
118 --waitingForTake_; | |
119 return x; | |
120 } | |
121 else { | |
122 putLock_.wait(); | |
123 } | |
124 } | |
125 } | |
126 catch(InterruptedException ex) { | |
127 --waitingForTake_; | |
128 putLock_.notify(); | |
129 throw ex; | |
130 } | |
131 } | |
132 } | |
133 } | |
134 | |
135 public Object peek() { | |
136 synchronized(head_) { | |
137 LinkedNode first = head_.next; | |
138 if (first != null) | |
139 return first.value; | |
140 else | |
141 return null; | |
142 } | |
143 } | |
144 | |
145 | |
146 public boolean isEmpty() { | |
147 synchronized(head_) { | |
148 return head_.next == null; | |
149 } | |
150 } | |
151 | |
152 public Object poll(long msecs) throws InterruptedException { | |
153 if (Thread.interrupted()) throw new InterruptedException(); | |
154 Object x = extract(); | |
155 if (x != null) | |
156 return x; | |
157 else { | |
158 synchronized(putLock_) { | |
159 try { | |
160 long waitTime = msecs; | |
161 long start = (msecs <= 0)? 0 : System.currentTimeMillis(); | |
162 ++waitingForTake_; | |
163 for (;;) { | |
164 x = extract(); | |
165 if (x != null || waitTime <= 0) { | |
166 --waitingForTake_; | |
167 return x; | |
168 } | |
169 else { | |
170 putLock_.wait(waitTime); | |
171 waitTime = msecs - (System.currentTimeMillis() - start); | |
172 } | |
173 } | |
174 } | |
175 catch(InterruptedException ex) { | |
176 --waitingForTake_; | |
177 putLock_.notify(); | |
178 throw ex; | |
179 } | |
180 } | |
181 } | |
182 } | |
183 } | |
184 | |
185 |