diff src/EDU/oswego/cs/dl/util/concurrent/BoundedLinkedQueue.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/EDU/oswego/cs/dl/util/concurrent/BoundedLinkedQueue.java	Sat Mar 21 11:00:06 2009 +0000
@@ -0,0 +1,373 @@
+/*
+  File: BoundedLinkedQueue.java
+
+  Originally written by Doug Lea and released into the public domain.
+  This may be used for any purposes whatsoever without acknowledgment.
+  Thanks for the assistance and support of Sun Microsystems Labs,
+  and everyone contributing, testing, and using this code.
+
+  History:
+  Date       Who                What
+  11Jun1998  dl               Create public version
+  17Jul1998  dl               Simplified by eliminating wait counts
+  25aug1998  dl               added peek
+  10oct1999  dl               lock on node object to ensure visibility
+  27jan2000  dl               setCapacity forces immediate permit reconcile
+*/
+
+package EDU.oswego.cs.dl.util.concurrent;
+
+/**
+ * A bounded variant of 
+ * LinkedQueue 
+ * class. This class may be
+ * preferable to 
+ * BoundedBuffer 
+ * because it allows a bit more
+ * concurency among puts and takes,  because it does not
+ * pre-allocate fixed storage for elements, and allows 
+ * capacity to be dynamically reset.
+ * On the other hand, since it allocates a node object
+ * on each put, it can be slow on systems with slow
+ * allocation and GC.
+ * Also, it may be
+ * preferable to 
+ * LinkedQueue 
+ * when you need to limit
+ * the capacity to prevent resource exhaustion. This protection
+ * normally does not hurt much performance-wise: When the
+ * queue is not empty or full, most puts and
+ * takes are still usually able to execute concurrently.
+ * @see LinkedQueue 
+ * @see BoundedBuffer 
+ * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] <p>
+ **/
+
+public class BoundedLinkedQueue implements BoundedChannel {
+
+  /*
+   * It might be a bit nicer if this were declared as
+   * a subclass of LinkedQueue, or a sibling class of
+   * a common abstract class. It shares much of the
+   * basic design and bookkeeping fields. But too 
+   * many details differ to make this worth doing.
+   */
+
+
+
+  /** 
+   * Dummy header node of list. The first actual node, if it exists, is always 
+   * at head_.next. After each take, the old first node becomes the head.
+   **/
+  protected LinkedNode head_;
+
+  /** 
+   * The last node of list. Put() appends to list, so modifies last_
+   **/
+  protected LinkedNode last_;
+
+
+  /**
+   * Helper monitor. Ensures that only one put at a time executes.
+   **/
+
+  protected final Object putGuard_ = new Object();
+
+  /**
+   * Helper monitor. Protects and provides wait queue for takes
+   **/
+
+  protected final Object takeGuard_ = new Object();
+
+
+  /** Number of elements allowed **/
+  protected int capacity_;
+
+  
+  /**
+   * One side of a split permit count. 
+   * The counts represent permits to do a put. (The queue is full when zero).
+   * Invariant: putSidePutPermits_ + takeSidePutPermits_ = capacity_ - length.
+   * (The length is never separately recorded, so this cannot be
+   * checked explicitly.)
+   * To minimize contention between puts and takes, the
+   * put side uses up all of its permits before transfering them from
+   * the take side. The take side just increments the count upon each take.
+   * Thus, most puts and take can run independently of each other unless
+   * the queue is empty or full. 
+   * Initial value is queue capacity.
+   **/
+
+  protected int putSidePutPermits_; 
+
+  /** Number of takes since last reconcile **/
+  protected int takeSidePutPermits_ = 0;
+
+
+  /**
+   * Create a queue with the given capacity
+   * @exception IllegalArgumentException if capacity less or equal to zero
+   **/
+  public BoundedLinkedQueue(int capacity) {
+    if (capacity <= 0) throw new IllegalArgumentException();
+    capacity_ = capacity;
+    putSidePutPermits_ = capacity;
+    head_ =  new LinkedNode(null); 
+    last_ = head_;
+  }
+
+  /**
+   * Create a queue with the current default capacity
+   **/
+
+  public BoundedLinkedQueue() { 
+    this(DefaultChannelCapacity.get()); 
+  }
+
+  /**
+   * Move put permits from take side to put side; 
+   * return the number of put side permits that are available.
+   * Call only under synch on puGuard_ AND this.
+   **/
+  protected final int reconcilePutPermits() {
+    putSidePutPermits_ += takeSidePutPermits_;
+    takeSidePutPermits_ = 0;
+    return putSidePutPermits_;
+  }
+
+
+  /** Return the current capacity of this queue **/
+  public synchronized int capacity() { return capacity_; }
+
+
+  /** 
+   * Return the number of elements in the queue.
+   * This is only a snapshot value, that may be in the midst 
+   * of changing. The returned value will be unreliable in the presence of
+   * active puts and takes, and should only be used as a heuristic
+   * estimate, for example for resource monitoring purposes.
+   **/
+  public synchronized int size() {
+    /*
+      This should ideally synch on putGuard_, but
+      doing so would cause it to block waiting for an in-progress
+      put, which might be stuck. So we instead use whatever
+      value of putSidePutPermits_ that we happen to read.
+    */
+    return capacity_ - (takeSidePutPermits_ + putSidePutPermits_);
+  }
+
+
+  /**
+   * Reset the capacity of this queue.
+   * If the new capacity is less than the old capacity,
+   * existing elements are NOT removed, but
+   * incoming puts will not proceed until the number of elements
+   * is less than the new capacity.
+   * @exception IllegalArgumentException if capacity less or equal to zero
+   **/
+
+  public void setCapacity(int newCapacity) {
+    if (newCapacity <= 0) throw new IllegalArgumentException();
+    synchronized (putGuard_) {
+      synchronized(this) {
+        takeSidePutPermits_ += (newCapacity - capacity_);
+        capacity_ = newCapacity;
+        
+        // Force immediate reconcilation.
+        reconcilePutPermits();
+        notifyAll();
+      }
+    }
+  }
+
+
+  /** Main mechanics for take/poll **/
+  protected synchronized Object extract() {
+    synchronized(head_) {
+      Object x = null;
+      LinkedNode first = head_.next;
+      if (first != null) {
+        x = first.value;
+        first.value = null;
+        head_ = first; 
+        ++takeSidePutPermits_;
+        notify();
+      }
+      return x;
+    }
+  }
+
+  public Object peek() {
+    synchronized(head_) {
+      LinkedNode first = head_.next;
+      if (first != null) 
+        return first.value;
+      else
+        return null;
+    }
+  }
+
+  public Object take() throws InterruptedException {
+    if (Thread.interrupted()) throw new InterruptedException();
+    Object x = extract();
+    if (x != null) 
+      return x;
+    else {
+      synchronized(takeGuard_) {
+        try {
+          for (;;) {
+            x = extract();
+            if (x != null) {
+              return x;
+            }
+            else {
+              takeGuard_.wait(); 
+            }
+          }
+        }
+        catch(InterruptedException ex) {
+          takeGuard_.notify();
+          throw ex; 
+        }
+      }
+    }
+  }
+
+  public Object poll(long msecs) throws InterruptedException {
+    if (Thread.interrupted()) throw new InterruptedException();
+    Object x = extract();
+    if (x != null) 
+      return x;
+    else {
+      synchronized(takeGuard_) {
+        try {
+          long waitTime = msecs;
+          long start = (msecs <= 0)? 0: System.currentTimeMillis();
+          for (;;) {
+            x = extract();
+            if (x != null || waitTime <= 0) {
+              return x;
+            }
+            else {
+              takeGuard_.wait(waitTime); 
+              waitTime = msecs - (System.currentTimeMillis() - start);
+            }
+          }
+        }
+        catch(InterruptedException ex) {
+          takeGuard_.notify();
+          throw ex; 
+        }
+      }
+    }
+  }
+
+  /** Notify a waiting take if needed **/
+  protected final void allowTake() {
+    synchronized(takeGuard_) {
+      takeGuard_.notify();
+    }
+  }
+
+
+  /**
+   * Create and insert a node.
+   * Call only under synch on putGuard_
+   **/
+  protected void insert(Object x) { 
+    --putSidePutPermits_;
+    LinkedNode p = new LinkedNode(x);
+    synchronized(last_) {
+      last_.next = p;
+      last_ = p;
+    }
+  }
+
+
+  /* 
+     put and offer(ms) differ only in policy before insert/allowTake
+  */
+
+  public void put(Object x) throws InterruptedException {
+    if (x == null) throw new IllegalArgumentException();
+    if (Thread.interrupted()) throw new InterruptedException();
+
+    synchronized(putGuard_) {
+
+      if (putSidePutPermits_ <= 0) { // wait for permit. 
+        synchronized(this) {
+          if (reconcilePutPermits() <= 0) {
+            try {
+              for(;;) {
+                wait();
+                if (reconcilePutPermits() > 0) {
+                  break;
+                }
+              }
+            }
+            catch (InterruptedException ex) { 
+              notify(); 
+              throw ex; 
+            }
+          }
+        }
+      }
+      insert(x);
+    }
+    // call outside of lock to loosen put/take coupling
+    allowTake();
+  }
+
+  public boolean offer(Object x, long msecs) throws InterruptedException {
+    if (x == null) throw new IllegalArgumentException();
+    if (Thread.interrupted()) throw new InterruptedException();
+
+    synchronized(putGuard_) {
+
+      if (putSidePutPermits_ <= 0) {
+        synchronized(this) {
+          if (reconcilePutPermits() <= 0) {
+            if (msecs <= 0)
+              return false;
+            else {
+              try {
+                long waitTime = msecs;
+                long start = System.currentTimeMillis();
+                
+                for(;;) {
+                  wait(waitTime);
+                  if (reconcilePutPermits() > 0) {
+                    break;
+                  }
+                  else {
+                    waitTime = msecs - (System.currentTimeMillis() - start);
+                    if (waitTime <= 0) {
+                      return false;
+                    }
+                  }
+                }
+              }
+              catch (InterruptedException ex) { 
+                notify(); 
+                throw ex; 
+              }
+            }
+          }
+        }
+      }
+
+      insert(x);
+    }
+
+    allowTake();
+    return true;
+  }
+
+  public boolean isEmpty() {
+    synchronized(head_) {
+      return head_.next == null;
+    }
+  }    
+    
+}