diff src/EDU/oswego/cs/dl/util/concurrent/BoundedBuffer.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/EDU/oswego/cs/dl/util/concurrent/BoundedBuffer.java	Sat Mar 21 11:00:06 2009 +0000
@@ -0,0 +1,186 @@
+/*
+  File: BoundedBuffer.java
+
+  Originally written by Doug Lea and released into the public domain.
+  This may be used for any purposes whatsoever without acknowledgment.
+  Thanks for the assistance and support of Sun Microsystems Labs,
+  and everyone contributing, testing, and using this code.
+
+  History:
+  Date       Who                What
+  11Jun1998  dl               Create public version
+  17Jul1998  dl               Simplified by eliminating wait counts
+  25aug1998  dl               added peek
+   5May1999  dl               replace % with conditional (slightly faster)
+*/
+
+package EDU.oswego.cs.dl.util.concurrent;
+
+/**
+ * Efficient array-based bounded buffer class.
+ * Adapted from CPJ, chapter 8, which describes design.
+ * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] <p>
+ **/
+
+public class BoundedBuffer implements BoundedChannel {
+
+  protected final Object[]  array_;      // the elements
+
+  protected int takePtr_ = 0;            // circular indices
+  protected int putPtr_ = 0;       
+
+  protected int usedSlots_ = 0;          // length
+  protected int emptySlots_;             // capacity - length
+
+  /**
+   * Helper monitor to handle puts. 
+   **/
+  protected final Object putMonitor_ = new Object();
+
+  /**
+   * Create a BoundedBuffer with the given capacity.
+   * @exception IllegalArgumentException if capacity less or equal to zero
+   **/
+  public BoundedBuffer(int capacity) throws IllegalArgumentException {
+    if (capacity <= 0) throw new IllegalArgumentException();
+    array_ = new Object[capacity];
+    emptySlots_ = capacity;
+  }
+
+  /**
+   * Create a buffer with the current default capacity
+   **/
+
+  public BoundedBuffer() { 
+    this(DefaultChannelCapacity.get()); 
+  }
+
+  /** 
+   * Return the number of elements in the buffer.
+   * This is only a snapshot value, that may change
+   * immediately after returning.
+   **/
+  public synchronized int size() { return usedSlots_; }
+
+  public int capacity() { return array_.length; }
+
+  protected void incEmptySlots() {
+    synchronized(putMonitor_) {
+      ++emptySlots_;
+      putMonitor_.notify();
+    }
+  }
+
+  protected synchronized void incUsedSlots() {
+    ++usedSlots_;
+    notify();
+  }
+
+  protected final void insert(Object x) { // mechanics of put
+    --emptySlots_;
+    array_[putPtr_] = x;
+    if (++putPtr_ >= array_.length) putPtr_ = 0;
+  }
+
+  protected final Object extract() { // mechanics of take
+    --usedSlots_;
+    Object old = array_[takePtr_];
+    array_[takePtr_] = null;
+    if (++takePtr_ >= array_.length) takePtr_ = 0;
+    return old;
+  }
+
+  public Object peek() {
+    synchronized(this) {
+      if (usedSlots_ > 0)
+        return array_[takePtr_];
+      else
+        return null;
+    }
+  }
+
+
+  public void put(Object x) throws InterruptedException {
+    if (x == null) throw new IllegalArgumentException();
+    if (Thread.interrupted()) throw new InterruptedException();
+
+    synchronized(putMonitor_) {
+      while (emptySlots_ <= 0) {
+	try { putMonitor_.wait(); }
+        catch (InterruptedException ex) {
+          putMonitor_.notify();
+          throw ex;
+        }
+      }
+      insert(x);
+    }
+    incUsedSlots();
+  }
+
+  public boolean offer(Object x, long msecs) throws InterruptedException {
+    if (x == null) throw new IllegalArgumentException();
+    if (Thread.interrupted()) throw new InterruptedException();
+
+    synchronized(putMonitor_) {
+      long start = (msecs <= 0)? 0 : System.currentTimeMillis();
+      long waitTime = msecs;
+      while (emptySlots_ <= 0) {
+        if (waitTime <= 0) return false;
+	try { putMonitor_.wait(waitTime); }
+        catch (InterruptedException ex) {
+          putMonitor_.notify();
+          throw ex;
+        }
+        waitTime = msecs - (System.currentTimeMillis() - start);
+      }
+      insert(x);
+    }
+    incUsedSlots();
+    return true;
+  }
+
+
+
+  public  Object take() throws InterruptedException { 
+    if (Thread.interrupted()) throw new InterruptedException();
+    Object old = null; 
+    synchronized(this) { 
+      while (usedSlots_ <= 0) {
+        try { wait(); }
+        catch (InterruptedException ex) {
+          notify();
+          throw ex; 
+        }
+      }
+      old = extract();
+    }
+    incEmptySlots();
+    return old;
+  }
+
+  public  Object poll(long msecs) throws InterruptedException { 
+    if (Thread.interrupted()) throw new InterruptedException();
+    Object old = null; 
+    synchronized(this) { 
+      long start = (msecs <= 0)? 0 : System.currentTimeMillis();
+      long waitTime = msecs;
+      
+      while (usedSlots_ <= 0) {
+        if (waitTime <= 0) return null;
+        try { wait(waitTime); }
+        catch (InterruptedException ex) {
+          notify();
+          throw ex; 
+        }
+        waitTime = msecs - (System.currentTimeMillis() - start);
+
+      }
+      old = extract();
+    }
+    incEmptySlots();
+    return old;
+  }
+
+}
+
+