comparison src/EDU/oswego/cs/dl/util/concurrent/misc/CVBuffer.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:3dc0c5604566
1
2 package EDU.oswego.cs.dl.util.concurrent.misc;
3 import EDU.oswego.cs.dl.util.concurrent.*;
4
5
6 public class CVBuffer implements BoundedChannel {
7 private final Mutex mutex;
8 private final CondVar notFull;
9 private final CondVar notEmpty;
10 private int count = 0;
11 private int takePtr = 0;
12 private int putPtr = 0;
13 private final Object[] array;
14
15 public CVBuffer(int cap) {
16 array = new Object[cap];
17 mutex = new Mutex();
18 notFull = new CondVar(mutex);
19 notEmpty = new CondVar(mutex);
20 }
21
22 public CVBuffer() {
23 this(DefaultChannelCapacity.get());
24 }
25
26 public int capacity() { return array.length; }
27
28 public void put(Object x) throws InterruptedException {
29 mutex.acquire();
30 try {
31 while (count == array.length) {
32 notFull.await();
33 }
34 array[putPtr] = x;
35 putPtr = (putPtr + 1) % array.length;
36 ++count;
37 notEmpty.signal();
38 }
39 finally {
40 mutex.release();
41 }
42 }
43
44 public Object take() throws InterruptedException {
45 Object x = null;
46 mutex.acquire();
47 try {
48 while (count == 0) {
49 notEmpty.await();
50 }
51 x = array[takePtr];
52 array[takePtr] = null;
53 takePtr = (takePtr + 1) % array.length;
54 --count;
55 notFull.signal();
56 }
57 finally {
58 mutex.release();
59 }
60 return x;
61 }
62
63 public boolean offer(Object x, long msecs) throws InterruptedException {
64 mutex.acquire();
65 try {
66 if (count == array.length) {
67 notFull.timedwait(msecs);
68 if (count == array.length)
69 return false;
70 }
71 array[putPtr] = x;
72 putPtr = (putPtr + 1) % array.length;
73 ++count;
74 notEmpty.signal();
75 return true;
76 }
77 finally {
78 mutex.release();
79 }
80 }
81
82 public Object poll(long msecs) throws InterruptedException {
83 Object x = null;
84 mutex.acquire();
85 try {
86 if (count == 0) {
87 notEmpty.timedwait(msecs);
88 if (count == 0)
89 return null;
90 }
91 x = array[takePtr];
92 array[takePtr] = null;
93 takePtr = (takePtr + 1) % array.length;
94 --count;
95 notFull.signal();
96 }
97 finally {
98 mutex.release();
99 }
100 return x;
101 }
102
103 public Object peek() {
104 try {
105 mutex.acquire();
106 try {
107 if (count == 0)
108 return null;
109 else
110 return array[takePtr];
111 }
112 finally {
113 mutex.release();
114 }
115 }
116 catch (InterruptedException ex) {
117 Thread.currentThread().interrupt();
118 return null;
119 }
120 }
121
122 }
123