diff src/EDU/oswego/cs/dl/util/concurrent/SemaphoreControlledChannel.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/EDU/oswego/cs/dl/util/concurrent/SemaphoreControlledChannel.java	Sat Mar 21 11:00:06 2009 +0000
@@ -0,0 +1,160 @@
+/*
+  File: SemaphoreControlledChannel.java
+
+  Originally written by Doug Lea and released into the public domain.
+  This may be used for any purposes whatsoever without acknowledgment.
+  Thanks for the assistance and support of Sun Microsystems Labs,
+  and everyone contributing, testing, and using this code.
+
+  History:
+  Date       Who                What
+  16Jun1998  dl               Create public version
+   5Aug1998  dl               replaced int counters with longs
+  08dec2001  dl               reflective constructor now uses longs too.
+*/
+
+package EDU.oswego.cs.dl.util.concurrent;
+import java.lang.reflect.*;
+
+/**
+ * Abstract class for channels that use Semaphores to
+ * control puts and takes.
+ * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
+ **/
+
+public abstract class SemaphoreControlledChannel implements BoundedChannel {
+  protected final Semaphore putGuard_;
+  protected final Semaphore takeGuard_;
+  protected int capacity_;
+
+  /**
+   * Create a channel with the given capacity and default
+   * semaphore implementation
+   * @exception IllegalArgumentException if capacity less or equal to zero
+   **/
+
+  public SemaphoreControlledChannel(int capacity) 
+   throws IllegalArgumentException {
+    if (capacity <= 0) throw new IllegalArgumentException();
+    capacity_ = capacity;
+    putGuard_ = new Semaphore(capacity);
+    takeGuard_ = new Semaphore(0);
+  }
+
+
+  /**
+   * Create a channel with the given capacity and 
+   * semaphore implementations instantiated from the supplied class
+   * @exception IllegalArgumentException if capacity less or equal to zero.
+   * @exception NoSuchMethodException If class does not have constructor 
+   * that intializes permits
+   * @exception SecurityException if constructor information 
+   * not accessible
+   * @exception InstantiationException if semaphore class is abstract
+   * @exception IllegalAccessException if constructor cannot be called
+   * @exception InvocationTargetException if semaphore constructor throws an
+   * exception
+   **/
+  public SemaphoreControlledChannel(int capacity, Class semaphoreClass) 
+   throws IllegalArgumentException, 
+          NoSuchMethodException, 
+          SecurityException, 
+          InstantiationException, 
+          IllegalAccessException, 
+          InvocationTargetException {
+    if (capacity <= 0) throw new IllegalArgumentException();
+    capacity_ = capacity;
+    Class[] longarg = { Long.TYPE };
+    Constructor ctor = semaphoreClass.getDeclaredConstructor(longarg);
+    Long[] cap = { new Long(capacity) };
+    putGuard_ = (Semaphore)(ctor.newInstance(cap));
+    Long[] zero = { new Long(0) };
+    takeGuard_ = (Semaphore)(ctor.newInstance(zero));
+  }
+
+
+
+  public int  capacity() { return capacity_; }
+
+  /** 
+   * Return the number of elements in the buffer.
+   * This is only a snapshot value, that may change
+   * immediately after returning.
+   **/
+
+  public int size() { return (int)(takeGuard_.permits());  }
+
+  /**
+   * Internal mechanics of put.
+   **/
+  protected abstract void insert(Object x);
+
+  /**
+   * Internal mechanics of take.
+   **/
+  protected abstract Object extract();
+
+  public void put(Object x) throws InterruptedException {
+    if (x == null) throw new IllegalArgumentException();
+    if (Thread.interrupted()) throw new InterruptedException();
+    putGuard_.acquire();
+    try {
+      insert(x);
+      takeGuard_.release();
+    }
+    catch (ClassCastException ex) {
+      putGuard_.release();
+      throw ex;
+    }
+  }
+
+  public boolean offer(Object x, long msecs) throws InterruptedException {
+    if (x == null) throw new IllegalArgumentException();
+    if (Thread.interrupted()) throw new InterruptedException();
+    if (!putGuard_.attempt(msecs)) 
+      return false;
+    else {
+      try {
+        insert(x);
+        takeGuard_.release();
+        return true;
+      }
+      catch (ClassCastException ex) {
+        putGuard_.release();
+        throw ex;
+      }
+    }
+  }
+
+  public Object take() throws InterruptedException {
+    if (Thread.interrupted()) throw new InterruptedException();
+    takeGuard_.acquire();
+    try {
+      Object x = extract();
+      putGuard_.release();
+      return x;
+    }
+    catch (ClassCastException ex) {
+      takeGuard_.release();
+      throw ex;
+    }
+  }
+
+  public Object poll(long msecs) throws InterruptedException {
+    if (Thread.interrupted()) throw new InterruptedException();
+    if (!takeGuard_.attempt(msecs))
+      return null;
+    else {
+      try {
+        Object x = extract();
+        putGuard_.release();
+        return x;
+      }
+      catch (ClassCastException ex) {
+        takeGuard_.release();
+        throw ex;
+      }
+    }
+  }
+
+}