diff src/EDU/oswego/cs/dl/util/concurrent/CyclicBarrier.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/EDU/oswego/cs/dl/util/concurrent/CyclicBarrier.java	Sat Mar 21 11:00:06 2009 +0000
@@ -0,0 +1,284 @@
+/*
+  File: CyclicBarrier.java
+
+  Originally written by Doug Lea and released into the public domain.
+  This may be used for any purposes whatsoever without acknowledgment.
+  Thanks for the assistance and support of Sun Microsystems Labs,
+  and everyone contributing, testing, and using this code.
+
+  History:
+  Date       Who                What
+  11Jul1998  dl               Create public version
+  28Aug1998  dl               minor code simplification
+*/
+
+package EDU.oswego.cs.dl.util.concurrent;
+
+/**
+ * A cyclic barrier is a reasonable choice for a barrier in contexts
+ * involving a fixed sized group of threads that
+ * must occasionally wait for each other. 
+ * (A Rendezvous better handles applications in which
+ * any number of threads meet, n-at-a-time.)
+ * <p>
+ * CyclicBarriers use an all-or-none breakage model
+ * for failed synchronization attempts: If threads
+ * leave a barrier point prematurely because of timeout
+ * or interruption, others will also leave abnormally
+ * (via BrokenBarrierException), until
+ * the barrier is <code>restart</code>ed. This is usually
+ * the simplest and best strategy for sharing knowledge
+ * about failures among cooperating threads in the most
+ * common usages contexts of Barriers.
+ * This implementation  has the property that interruptions
+ * among newly arriving threads can cause as-yet-unresumed
+ * threads from a previous barrier cycle to return out
+ * as broken. This transmits breakage
+ * as early as possible, but with the possible byproduct that
+ * only some threads returning out of a barrier will realize
+ * that it is newly broken. (Others will not realize this until a
+ * future cycle.) (The Rendezvous class has a more uniform, but
+ * sometimes less desirable policy.)
+ * <p>
+ * Barriers support an optional Runnable command
+ * that is run once per barrier point.
+ * <p>
+ * <b>Sample usage</b> Here is a code sketch of 
+ *  a  barrier in a parallel decomposition design.
+ * <pre>
+ * class Solver {
+ *   final int N;
+ *   final float[][] data;
+ *   final CyclicBarrier barrier;
+ *   
+ *   class Worker implements Runnable {
+ *      int myRow;
+ *      Worker(int row) { myRow = row; }
+ *      public void run() {
+ *         while (!done()) {
+ *            processRow(myRow);
+ *
+ *            try {
+ *              barrier.barrier(); 
+ *            }
+ *            catch (InterruptedException ex) { return; }
+ *            catch (BrokenBarrierException ex) { return; }
+ *         }
+ *      }
+ *   }
+ *
+ *   public Solver(float[][] matrix) {
+ *     data = matrix;
+ *     N = matrix.length;
+ *     barrier = new CyclicBarrier(N);
+ *     barrier.setBarrierCommand(new Runnable() {
+ *       public void run() { mergeRows(...); }
+ *     });
+ *     for (int i = 0; i < N; ++i) {
+ *       new Thread(new Worker(i)).start();
+ *     waitUntilDone();
+ *    }
+ * }
+ * </pre>
+ * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
+
+ **/
+public class CyclicBarrier implements Barrier {
+
+  protected final int parties_;
+  protected boolean broken_ = false;
+  protected Runnable barrierCommand_ = null;
+  protected int count_; // number of parties still waiting
+  protected int resets_ = 0; // incremented on each release
+
+  /** 
+   * Create a CyclicBarrier for the indicated number of parties,
+   * and no command to run at each barrier.
+   * @exception IllegalArgumentException if parties less than or equal to zero.
+   **/
+
+  public CyclicBarrier(int parties) { this(parties, null); }
+
+  /** 
+   * Create a CyclicBarrier for the indicated number of parties.
+   * and the given command to run at each barrier point.
+   * @exception IllegalArgumentException if parties less than or equal to zero.
+   **/
+
+  public CyclicBarrier(int parties, Runnable command) { 
+    if (parties <= 0) throw new IllegalArgumentException();
+    parties_ = parties; 
+    count_ = parties;
+    barrierCommand_ = command;
+  }
+
+  /**
+   * Set the command to run at the point at which all threads reach the
+   * barrier. This command is run exactly once, by the thread
+   * that trips the barrier. The command is not run if the barrier is
+   * broken.
+   * @param command the command to run. If null, no command is run.
+   * @return the previous command
+   **/
+
+  public synchronized Runnable setBarrierCommand(Runnable command) {
+    Runnable old = barrierCommand_;
+    barrierCommand_ = command;
+    return old;
+  }
+
+  public synchronized boolean broken() { return broken_; }
+
+  /**
+   * Reset to initial state. Clears both the broken status
+   * and any record of waiting threads, and releases all
+   * currently waiting threads with indeterminate return status.
+   * This method is intended only for use in recovery actions
+   * in which it is somehow known
+   * that no thread could possibly be relying on the
+   * the synchronization properties of this barrier.
+   **/
+
+  public synchronized void restart() { 
+    broken_ = false; 
+    ++resets_;
+    count_ = parties_;
+    notifyAll();
+  }
+  
+ 
+  public int parties() { return parties_; }
+
+  /**
+   * Enter barrier and wait for the other parties()-1 threads.
+   * @return the arrival index: the number of other parties 
+   * that were still waiting
+   * upon entry. This is a unique value from zero to parties()-1.
+   * If it is zero, then the current
+   * thread was the last party to hit barrier point
+   * and so was responsible for releasing the others. 
+   * @exception BrokenBarrierException if any other thread
+   * in any previous or current barrier 
+   * since either creation or the last <code>restart</code>
+   * operation left the barrier
+   * prematurely due to interruption or time-out. (If so,
+   * the <code>broken</code> status is also set.)
+   * Threads that are noticied to have been
+   * interrupted <em>after</em> being released are not considered
+   * to have broken the barrier.
+   * In all cases, the interruption
+   * status of the current thread is preserved, so can be tested
+   * by checking <code>Thread.interrupted</code>. 
+   * @exception InterruptedException if this thread was interrupted
+   * during the barrier, and was the one causing breakage. 
+   * If so, <code>broken</code> status is also set.
+   **/
+
+  public int barrier() throws InterruptedException, BrokenBarrierException {
+    return doBarrier(false, 0);
+  }
+
+  /**
+   * Enter barrier and wait at most msecs for the other parties()-1 threads.
+   * @return if not timed out, the arrival index: the number of other parties 
+   * that were still waiting
+   * upon entry. This is a unique value from zero to parties()-1.
+   * If it is zero, then the current
+   * thread was the last party to hit barrier point
+   * and so was responsible for releasing the others. 
+   * @exception BrokenBarrierException 
+   * if any other thread
+   * in any previous or current barrier 
+   * since either creation or the last <code>restart</code>
+   * operation left the barrier
+   * prematurely due to interruption or time-out. (If so,
+   * the <code>broken</code> status is also set.) 
+   * Threads that are noticed to have been
+   * interrupted <em>after</em> being released are not considered
+   * to have broken the barrier.
+   * In all cases, the interruption
+   * status of the current thread is preserved, so can be tested
+   * by checking <code>Thread.interrupted</code>. 
+   * @exception InterruptedException if this thread was interrupted
+   * during the barrier. If so, <code>broken</code> status is also set.
+   * @exception TimeoutException if this thread timed out waiting for
+   *  the barrier. If the timeout occured while already in the
+   * barrier, <code>broken</code> status is also set.
+   **/
+
+  public int attemptBarrier(long msecs) 
+    throws InterruptedException, TimeoutException, BrokenBarrierException {
+    return doBarrier(true, msecs);
+  }
+
+  protected synchronized int doBarrier(boolean timed, long msecs) 
+    throws InterruptedException, TimeoutException, BrokenBarrierException  { 
+    
+    int index = --count_;
+
+    if (broken_) {
+      throw new BrokenBarrierException(index);
+    }
+    else if (Thread.interrupted()) {
+      broken_ = true;
+      notifyAll();
+      throw new InterruptedException();
+    }
+    else if (index == 0) {  // tripped
+      count_ = parties_;
+      ++resets_;
+      notifyAll();
+      try {
+        if (barrierCommand_ != null)
+          barrierCommand_.run();
+        return 0;
+      }
+      catch (RuntimeException ex) {
+        broken_ = true;
+        return 0;
+      }
+    }
+    else if (timed && msecs <= 0) {
+      broken_ = true;
+      notifyAll();
+      throw new TimeoutException(msecs);
+    }
+    else {                   // wait until next reset
+      int r = resets_;      
+      long startTime = (timed)? System.currentTimeMillis() : 0;
+      long waitTime = msecs;
+      for (;;) {
+        try {
+          wait(waitTime);
+        }
+        catch (InterruptedException ex) {
+          // Only claim that broken if interrupted before reset
+          if (resets_ == r) { 
+            broken_ = true;
+            notifyAll();
+            throw ex;
+          }
+          else {
+            Thread.currentThread().interrupt(); // propagate
+          }
+        }
+
+        if (broken_) 
+          throw new BrokenBarrierException(index);
+
+        else if (r != resets_)
+          return index;
+
+        else if (timed) {
+          waitTime = msecs - (System.currentTimeMillis() - startTime);
+          if  (waitTime <= 0) {
+            broken_ = true;
+            notifyAll();
+            throw new TimeoutException(msecs);
+          }
+        }
+      }
+    }
+  }
+
+}