comparison src/EDU/oswego/cs/dl/util/concurrent/LinkedQueue.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:3dc0c5604566
1 /*
2 File: LinkedQueue.java
3
4 Originally written by Doug Lea and released into the public domain.
5 This may be used for any purposes whatsoever without acknowledgment.
6 Thanks for the assistance and support of Sun Microsystems Labs,
7 and everyone contributing, testing, and using this code.
8
9 History:
10 Date Who What
11 11Jun1998 dl Create public version
12 25aug1998 dl added peek
13 10dec1998 dl added isEmpty
14 10oct1999 dl lock on node object to ensure visibility
15 */
16
17 package EDU.oswego.cs.dl.util.concurrent;
18
19 /**
20 * A linked list based channel implementation.
21 * The algorithm avoids contention between puts
22 * and takes when the queue is not empty.
23 * Normally a put and a take can proceed simultaneously.
24 * (Although it does not allow multiple concurrent puts or takes.)
25 * This class tends to perform more efficently than
26 * other Channel implementations in producer/consumer
27 * applications.
28 * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
29 **/
30
31 public class LinkedQueue implements Channel {
32
33
34 /**
35 * Dummy header node of list. The first actual node, if it exists, is always
36 * at head_.next. After each take, the old first node becomes the head.
37 **/
38 protected LinkedNode head_;
39
40 /**
41 * Helper monitor for managing access to last node.
42 **/
43 protected final Object putLock_ = new Object();
44
45 /**
46 * The last node of list. Put() appends to list, so modifies last_
47 **/
48 protected LinkedNode last_;
49
50 /**
51 * The number of threads waiting for a take.
52 * Notifications are provided in put only if greater than zero.
53 * The bookkeeping is worth it here since in reasonably balanced
54 * usages, the notifications will hardly ever be necessary, so
55 * the call overhead to notify can be eliminated.
56 **/
57 protected int waitingForTake_ = 0;
58
59 public LinkedQueue() {
60 head_ = new LinkedNode(null);
61 last_ = head_;
62 }
63
64 /** Main mechanics for put/offer **/
65 protected void insert(Object x) {
66 synchronized(putLock_) {
67 LinkedNode p = new LinkedNode(x);
68 synchronized(last_) {
69 last_.next = p;
70 last_ = p;
71 }
72 if (waitingForTake_ > 0)
73 putLock_.notify();
74 }
75 }
76
77 /** Main mechanics for take/poll **/
78 protected synchronized Object extract() {
79 synchronized(head_) {
80 Object x = null;
81 LinkedNode first = head_.next;
82 if (first != null) {
83 x = first.value;
84 first.value = null;
85 head_ = first;
86 }
87 return x;
88 }
89 }
90
91
92 public void put(Object x) throws InterruptedException {
93 if (x == null) throw new IllegalArgumentException();
94 if (Thread.interrupted()) throw new InterruptedException();
95 insert(x);
96 }
97
98 public boolean offer(Object x, long msecs) throws InterruptedException {
99 if (x == null) throw new IllegalArgumentException();
100 if (Thread.interrupted()) throw new InterruptedException();
101 insert(x);
102 return true;
103 }
104
105 public Object take() throws InterruptedException {
106 if (Thread.interrupted()) throw new InterruptedException();
107 // try to extract. If fail, then enter wait-based retry loop
108 Object x = extract();
109 if (x != null)
110 return x;
111 else {
112 synchronized(putLock_) {
113 try {
114 ++waitingForTake_;
115 for (;;) {
116 x = extract();
117 if (x != null) {
118 --waitingForTake_;
119 return x;
120 }
121 else {
122 putLock_.wait();
123 }
124 }
125 }
126 catch(InterruptedException ex) {
127 --waitingForTake_;
128 putLock_.notify();
129 throw ex;
130 }
131 }
132 }
133 }
134
135 public Object peek() {
136 synchronized(head_) {
137 LinkedNode first = head_.next;
138 if (first != null)
139 return first.value;
140 else
141 return null;
142 }
143 }
144
145
146 public boolean isEmpty() {
147 synchronized(head_) {
148 return head_.next == null;
149 }
150 }
151
152 public Object poll(long msecs) throws InterruptedException {
153 if (Thread.interrupted()) throw new InterruptedException();
154 Object x = extract();
155 if (x != null)
156 return x;
157 else {
158 synchronized(putLock_) {
159 try {
160 long waitTime = msecs;
161 long start = (msecs <= 0)? 0 : System.currentTimeMillis();
162 ++waitingForTake_;
163 for (;;) {
164 x = extract();
165 if (x != null || waitTime <= 0) {
166 --waitingForTake_;
167 return x;
168 }
169 else {
170 putLock_.wait(waitTime);
171 waitTime = msecs - (System.currentTimeMillis() - start);
172 }
173 }
174 }
175 catch(InterruptedException ex) {
176 --waitingForTake_;
177 putLock_.notify();
178 throw ex;
179 }
180 }
181 }
182 }
183 }
184
185