Mercurial > hg > blitz_condensed
comparison src/EDU/oswego/cs/dl/util/concurrent/BoundedBuffer.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:3dc0c5604566 |
---|---|
1 /* | |
2 File: BoundedBuffer.java | |
3 | |
4 Originally written by Doug Lea and released into the public domain. | |
5 This may be used for any purposes whatsoever without acknowledgment. | |
6 Thanks for the assistance and support of Sun Microsystems Labs, | |
7 and everyone contributing, testing, and using this code. | |
8 | |
9 History: | |
10 Date Who What | |
11 11Jun1998 dl Create public version | |
12 17Jul1998 dl Simplified by eliminating wait counts | |
13 25aug1998 dl added peek | |
14 5May1999 dl replace % with conditional (slightly faster) | |
15 */ | |
16 | |
17 package EDU.oswego.cs.dl.util.concurrent; | |
18 | |
19 /** | |
20 * Efficient array-based bounded buffer class. | |
21 * Adapted from CPJ, chapter 8, which describes design. | |
22 * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] <p> | |
23 **/ | |
24 | |
25 public class BoundedBuffer implements BoundedChannel { | |
26 | |
27 protected final Object[] array_; // the elements | |
28 | |
29 protected int takePtr_ = 0; // circular indices | |
30 protected int putPtr_ = 0; | |
31 | |
32 protected int usedSlots_ = 0; // length | |
33 protected int emptySlots_; // capacity - length | |
34 | |
35 /** | |
36 * Helper monitor to handle puts. | |
37 **/ | |
38 protected final Object putMonitor_ = new Object(); | |
39 | |
40 /** | |
41 * Create a BoundedBuffer with the given capacity. | |
42 * @exception IllegalArgumentException if capacity less or equal to zero | |
43 **/ | |
44 public BoundedBuffer(int capacity) throws IllegalArgumentException { | |
45 if (capacity <= 0) throw new IllegalArgumentException(); | |
46 array_ = new Object[capacity]; | |
47 emptySlots_ = capacity; | |
48 } | |
49 | |
50 /** | |
51 * Create a buffer with the current default capacity | |
52 **/ | |
53 | |
54 public BoundedBuffer() { | |
55 this(DefaultChannelCapacity.get()); | |
56 } | |
57 | |
58 /** | |
59 * Return the number of elements in the buffer. | |
60 * This is only a snapshot value, that may change | |
61 * immediately after returning. | |
62 **/ | |
63 public synchronized int size() { return usedSlots_; } | |
64 | |
65 public int capacity() { return array_.length; } | |
66 | |
67 protected void incEmptySlots() { | |
68 synchronized(putMonitor_) { | |
69 ++emptySlots_; | |
70 putMonitor_.notify(); | |
71 } | |
72 } | |
73 | |
74 protected synchronized void incUsedSlots() { | |
75 ++usedSlots_; | |
76 notify(); | |
77 } | |
78 | |
79 protected final void insert(Object x) { // mechanics of put | |
80 --emptySlots_; | |
81 array_[putPtr_] = x; | |
82 if (++putPtr_ >= array_.length) putPtr_ = 0; | |
83 } | |
84 | |
85 protected final Object extract() { // mechanics of take | |
86 --usedSlots_; | |
87 Object old = array_[takePtr_]; | |
88 array_[takePtr_] = null; | |
89 if (++takePtr_ >= array_.length) takePtr_ = 0; | |
90 return old; | |
91 } | |
92 | |
93 public Object peek() { | |
94 synchronized(this) { | |
95 if (usedSlots_ > 0) | |
96 return array_[takePtr_]; | |
97 else | |
98 return null; | |
99 } | |
100 } | |
101 | |
102 | |
103 public void put(Object x) throws InterruptedException { | |
104 if (x == null) throw new IllegalArgumentException(); | |
105 if (Thread.interrupted()) throw new InterruptedException(); | |
106 | |
107 synchronized(putMonitor_) { | |
108 while (emptySlots_ <= 0) { | |
109 try { putMonitor_.wait(); } | |
110 catch (InterruptedException ex) { | |
111 putMonitor_.notify(); | |
112 throw ex; | |
113 } | |
114 } | |
115 insert(x); | |
116 } | |
117 incUsedSlots(); | |
118 } | |
119 | |
120 public boolean offer(Object x, long msecs) throws InterruptedException { | |
121 if (x == null) throw new IllegalArgumentException(); | |
122 if (Thread.interrupted()) throw new InterruptedException(); | |
123 | |
124 synchronized(putMonitor_) { | |
125 long start = (msecs <= 0)? 0 : System.currentTimeMillis(); | |
126 long waitTime = msecs; | |
127 while (emptySlots_ <= 0) { | |
128 if (waitTime <= 0) return false; | |
129 try { putMonitor_.wait(waitTime); } | |
130 catch (InterruptedException ex) { | |
131 putMonitor_.notify(); | |
132 throw ex; | |
133 } | |
134 waitTime = msecs - (System.currentTimeMillis() - start); | |
135 } | |
136 insert(x); | |
137 } | |
138 incUsedSlots(); | |
139 return true; | |
140 } | |
141 | |
142 | |
143 | |
144 public Object take() throws InterruptedException { | |
145 if (Thread.interrupted()) throw new InterruptedException(); | |
146 Object old = null; | |
147 synchronized(this) { | |
148 while (usedSlots_ <= 0) { | |
149 try { wait(); } | |
150 catch (InterruptedException ex) { | |
151 notify(); | |
152 throw ex; | |
153 } | |
154 } | |
155 old = extract(); | |
156 } | |
157 incEmptySlots(); | |
158 return old; | |
159 } | |
160 | |
161 public Object poll(long msecs) throws InterruptedException { | |
162 if (Thread.interrupted()) throw new InterruptedException(); | |
163 Object old = null; | |
164 synchronized(this) { | |
165 long start = (msecs <= 0)? 0 : System.currentTimeMillis(); | |
166 long waitTime = msecs; | |
167 | |
168 while (usedSlots_ <= 0) { | |
169 if (waitTime <= 0) return null; | |
170 try { wait(waitTime); } | |
171 catch (InterruptedException ex) { | |
172 notify(); | |
173 throw ex; | |
174 } | |
175 waitTime = msecs - (System.currentTimeMillis() - start); | |
176 | |
177 } | |
178 old = extract(); | |
179 } | |
180 incEmptySlots(); | |
181 return old; | |
182 } | |
183 | |
184 } | |
185 | |
186 |