comparison src/EDU/oswego/cs/dl/util/concurrent/BoundedBuffer.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:3dc0c5604566
1 /*
2 File: BoundedBuffer.java
3
4 Originally written by Doug Lea and released into the public domain.
5 This may be used for any purposes whatsoever without acknowledgment.
6 Thanks for the assistance and support of Sun Microsystems Labs,
7 and everyone contributing, testing, and using this code.
8
9 History:
10 Date Who What
11 11Jun1998 dl Create public version
12 17Jul1998 dl Simplified by eliminating wait counts
13 25aug1998 dl added peek
14 5May1999 dl replace % with conditional (slightly faster)
15 */
16
17 package EDU.oswego.cs.dl.util.concurrent;
18
19 /**
20 * Efficient array-based bounded buffer class.
21 * Adapted from CPJ, chapter 8, which describes design.
22 * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] <p>
23 **/
24
25 public class BoundedBuffer implements BoundedChannel {
26
27 protected final Object[] array_; // the elements
28
29 protected int takePtr_ = 0; // circular indices
30 protected int putPtr_ = 0;
31
32 protected int usedSlots_ = 0; // length
33 protected int emptySlots_; // capacity - length
34
35 /**
36 * Helper monitor to handle puts.
37 **/
38 protected final Object putMonitor_ = new Object();
39
40 /**
41 * Create a BoundedBuffer with the given capacity.
42 * @exception IllegalArgumentException if capacity less or equal to zero
43 **/
44 public BoundedBuffer(int capacity) throws IllegalArgumentException {
45 if (capacity <= 0) throw new IllegalArgumentException();
46 array_ = new Object[capacity];
47 emptySlots_ = capacity;
48 }
49
50 /**
51 * Create a buffer with the current default capacity
52 **/
53
54 public BoundedBuffer() {
55 this(DefaultChannelCapacity.get());
56 }
57
58 /**
59 * Return the number of elements in the buffer.
60 * This is only a snapshot value, that may change
61 * immediately after returning.
62 **/
63 public synchronized int size() { return usedSlots_; }
64
65 public int capacity() { return array_.length; }
66
67 protected void incEmptySlots() {
68 synchronized(putMonitor_) {
69 ++emptySlots_;
70 putMonitor_.notify();
71 }
72 }
73
74 protected synchronized void incUsedSlots() {
75 ++usedSlots_;
76 notify();
77 }
78
79 protected final void insert(Object x) { // mechanics of put
80 --emptySlots_;
81 array_[putPtr_] = x;
82 if (++putPtr_ >= array_.length) putPtr_ = 0;
83 }
84
85 protected final Object extract() { // mechanics of take
86 --usedSlots_;
87 Object old = array_[takePtr_];
88 array_[takePtr_] = null;
89 if (++takePtr_ >= array_.length) takePtr_ = 0;
90 return old;
91 }
92
93 public Object peek() {
94 synchronized(this) {
95 if (usedSlots_ > 0)
96 return array_[takePtr_];
97 else
98 return null;
99 }
100 }
101
102
103 public void put(Object x) throws InterruptedException {
104 if (x == null) throw new IllegalArgumentException();
105 if (Thread.interrupted()) throw new InterruptedException();
106
107 synchronized(putMonitor_) {
108 while (emptySlots_ <= 0) {
109 try { putMonitor_.wait(); }
110 catch (InterruptedException ex) {
111 putMonitor_.notify();
112 throw ex;
113 }
114 }
115 insert(x);
116 }
117 incUsedSlots();
118 }
119
120 public boolean offer(Object x, long msecs) throws InterruptedException {
121 if (x == null) throw new IllegalArgumentException();
122 if (Thread.interrupted()) throw new InterruptedException();
123
124 synchronized(putMonitor_) {
125 long start = (msecs <= 0)? 0 : System.currentTimeMillis();
126 long waitTime = msecs;
127 while (emptySlots_ <= 0) {
128 if (waitTime <= 0) return false;
129 try { putMonitor_.wait(waitTime); }
130 catch (InterruptedException ex) {
131 putMonitor_.notify();
132 throw ex;
133 }
134 waitTime = msecs - (System.currentTimeMillis() - start);
135 }
136 insert(x);
137 }
138 incUsedSlots();
139 return true;
140 }
141
142
143
144 public Object take() throws InterruptedException {
145 if (Thread.interrupted()) throw new InterruptedException();
146 Object old = null;
147 synchronized(this) {
148 while (usedSlots_ <= 0) {
149 try { wait(); }
150 catch (InterruptedException ex) {
151 notify();
152 throw ex;
153 }
154 }
155 old = extract();
156 }
157 incEmptySlots();
158 return old;
159 }
160
161 public Object poll(long msecs) throws InterruptedException {
162 if (Thread.interrupted()) throw new InterruptedException();
163 Object old = null;
164 synchronized(this) {
165 long start = (msecs <= 0)? 0 : System.currentTimeMillis();
166 long waitTime = msecs;
167
168 while (usedSlots_ <= 0) {
169 if (waitTime <= 0) return null;
170 try { wait(waitTime); }
171 catch (InterruptedException ex) {
172 notify();
173 throw ex;
174 }
175 waitTime = msecs - (System.currentTimeMillis() - start);
176
177 }
178 old = extract();
179 }
180 incEmptySlots();
181 return old;
182 }
183
184 }
185
186