diff src/EDU/oswego/cs/dl/util/concurrent/FJTaskRunnerGroup.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/EDU/oswego/cs/dl/util/concurrent/FJTaskRunnerGroup.java	Sat Mar 21 11:00:06 2009 +0000
@@ -0,0 +1,618 @@
+/*
+  File: FJTaskRunnerGroup.java
+
+  Originally written by Doug Lea and released into the public domain.
+  This may be used for any purposes whatsoever without acknowledgment.
+  Thanks for the assistance and support of Sun Microsystems Labs,
+  and everyone contributing, testing, and using this code.
+
+  History:
+  Date       Who                What
+  7Jan1999   dl                 First public release
+  12Jan1999  dl                 made getActiveCount public; misc minor cleanup.
+  14Jan1999  dl                 Added executeTask
+  20Jan1999  dl                 Allow use of priorities; reformat stats
+  6Feb1999   dl                 Lazy thread starts
+  27Apr1999  dl                 Renamed
+*/
+
+package EDU.oswego.cs.dl.util.concurrent;
+
+/**
+ * A stripped down analog of a ThreadGroup used for
+ * establishing and managing FJTaskRunner threads.
+ * ThreadRunnerGroups serve as the control boundary separating
+ * the general world of normal threads from the specialized world
+ * of FJTasks. 
+ * <p>
+ * By intent, this class does not subclass java.lang.ThreadGroup, and
+ * does not support most methods found in ThreadGroups, since they
+ * would make no sense for FJTaskRunner threads. In fact, the class
+ * does not deal with ThreadGroups at all. If you want to restrict
+ * a FJTaskRunnerGroup to a particular ThreadGroup, you can create
+ * it from within that ThreadGroup.
+ * <p>
+ * The main contextual parameter for a FJTaskRunnerGroup is
+ * the group size, established in the constructor. 
+ * Groups must be of a fixed size.
+ * There is no way to dynamically increase or decrease the number
+ * of threads in an existing group.
+ * <p>
+ * In general, the group size should be equal to the number
+ * of CPUs on the system. (Unfortunately, there is no portable
+ * means of automatically detecting the number of CPUs on a JVM, so there is
+ * no good way to automate defaults.)  In principle, when
+ * FJTasks are used for computation-intensive tasks, having only 
+ * as many threads as CPUs should minimize bookkeeping overhead
+ * and contention, and so maximize throughput. However, because
+ * FJTaskRunners lie atop Java threads, and in turn operating system
+ * thread support and scheduling policies, 
+ * it is very possible that using more threads
+ * than CPUs will improve overall throughput even though it adds
+ * to overhead. This will always be so if FJTasks are I/O bound.
+ * So it may pay to experiment a bit when tuning on particular platforms.
+ * You can also use <code>setRunPriorities</code> to either
+ * increase or decrease the priorities of active threads, which
+ * may interact with group size choice.
+ * <p>
+ * In any case, overestimating group sizes never
+ * seriously degrades performance (at least within reasonable bounds). 
+ * You can also use a value
+ * less than the number of CPUs in order to reserve processing
+ * for unrelated threads. 
+ * <p>
+ * There are two general styles for using a FJTaskRunnerGroup.
+ * You can create one group per entire program execution, for example 
+ * as a static singleton, and use it for all parallel tasks:
+ * <pre>
+ * class Tasks {
+ *   static FJTaskRunnerGroup group;
+ *   public void initialize(int groupsize) {
+ *      group = new FJTaskRunnerGroup(groupSize);
+ *   }
+ *   // ...
+ * }
+ * </pre>
+ * Alternatively, you can make new groups on the fly and use them only for
+ * particular task sets. This is more flexible,,
+ * and leads to more controllable and deterministic execution patterns,
+ * but it encounters greater overhead on startup. Also, to reclaim
+ * system resources, you should
+ * call <code>FJTaskRunnerGroup.interruptAll</code> when you are done
+ * using one-shot groups. Otherwise, because FJTaskRunners set 
+ * <code>Thread.isDaemon</code>
+ * status, they will not normally be reclaimed until program termination.
+ * <p>
+ * The main supported methods are <code>execute</code>,
+ * which starts a task processed by FJTaskRunner threads,
+ * and <code>invoke</code>, which starts one and waits for completion.
+ * For example, you might extend the above <code>FJTasks</code>
+ * class to support a task-based computation, say, the
+ * <code>Fib</code> class from the <code>FJTask</code> documentation:
+ * <pre>
+ * class Tasks { // continued
+ *   // ...
+ *   static int fib(int n) {
+ *     try {
+ *       Fib f = new Fib(n);
+ *       group.invoke(f);
+ *       return f.getAnswer();
+ *     }
+ *     catch (InterruptedException ex) {
+ *       throw new Error("Interrupted during computation");
+ *     }
+ *   }
+ * }
+ * </pre>
+ * <p>
+ * Method <code>stats()</code> can be used to monitor performance.
+ * Both FJTaskRunnerGroup and FJTaskRunner may be compiled with
+ * the compile-time constant COLLECT_STATS set to false. In this
+ * case, various simple counts reported in stats() are not collected.
+ * On platforms tested,
+ * this leads to such a tiny performance improvement that there is 
+ * very little motivation to bother.
+ *
+ * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
+ * <p>
+ * @see FJTask
+ * @see FJTaskRunner
+ **/
+
+public class FJTaskRunnerGroup implements Executor {
+
+  /** The threads in this group **/
+  protected final FJTaskRunner[] threads;
+
+  /** Group-wide queue for tasks entered via execute() **/
+  protected final LinkedQueue entryQueue = new LinkedQueue();
+
+  /** Number of threads that are not waiting for work **/
+  protected int activeCount = 0;
+
+  /** Number of threads that have been started. Used to avoid
+      unecessary contention during startup of task sets.
+  **/
+  protected int nstarted = 0;
+
+  /**
+   * Compile-time constant. If true, various counts of
+   * runs, waits, etc., are maintained. These are NOT
+   * updated with synchronization, so statistics reports
+   * might not be accurate.
+   **/
+  
+  static final boolean COLLECT_STATS = true;
+  //  static final boolean COLLECT_STATS = false;
+
+  // for stats
+
+  /** The time at which this ThreadRunnerGroup was constructed **/
+  long initTime = 0;
+
+  /** Total number of executes or invokes **/
+  int entries = 0;
+
+  static final int DEFAULT_SCAN_PRIORITY = Thread.MIN_PRIORITY+1;
+
+  /** 
+   * Create a FJTaskRunnerGroup with the indicated number
+   * of FJTaskRunner threads. Normally, the best size to use is
+   * the number of CPUs on the system. 
+   * <p>
+   * The threads in a FJTaskRunnerGroup are created with their
+   * isDaemon status set, so do not normally need to be
+   * shut down manually upon program termination.
+   **/
+
+  public FJTaskRunnerGroup(int groupSize) { 
+    threads = new FJTaskRunner[groupSize];
+    initializeThreads();
+    initTime = System.currentTimeMillis();
+  }
+
+  /**
+   * Arrange for execution of the given task
+   * by placing it in a work queue. If the argument
+   * is not of type FJTask, it is embedded in a FJTask via 
+   * <code>FJTask.Wrap</code>.
+   * @exception InterruptedException if current Thread is
+   * currently interrupted 
+   **/
+
+  public void execute(Runnable r) throws InterruptedException {
+    if (r instanceof FJTask) {
+      entryQueue.put((FJTask)r);
+    }
+    else {
+      entryQueue.put(new FJTask.Wrap(r));
+    }
+    signalNewTask();
+  }
+
+
+  /**
+   * Specialized form of execute called only from within FJTasks
+   **/
+  public void executeTask(FJTask t) {
+    try {
+      entryQueue.put(t);
+      signalNewTask();
+    }
+    catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
+
+  /**
+   * Start a task and wait it out. Returns when the task completes.
+   * @exception InterruptedException if current Thread is
+   * interrupted before completion of the task.
+   **/
+
+  public void invoke(Runnable r) throws InterruptedException {
+    InvokableFJTask w = new InvokableFJTask(r);
+    entryQueue.put(w);
+    signalNewTask();
+    w.awaitTermination();
+  }
+
+
+  /**
+   * Try to shut down all FJTaskRunner threads in this group
+   * by interrupting them all. This method is designed
+   * to be used during cleanup when it is somehow known
+   * that all threads are idle.
+   * FJTaskRunners only
+   * check for interruption when they are not otherwise
+   * processing a task (and its generated subtasks,
+   * if any), so if any threads are active, shutdown may
+   * take a while, and may lead to unpredictable
+   * task processing.
+   **/
+
+  public void interruptAll() {
+    // paranoically interrupt current thread last if in group.
+    Thread current = Thread.currentThread();
+    boolean stopCurrent = false;
+
+    for (int i = 0; i < threads.length; ++i) {
+      Thread t = threads[i];
+      if (t == current) 
+        stopCurrent = true;
+      else
+        t.interrupt();
+    }
+    if (stopCurrent)
+      current.interrupt();
+  }
+
+
+  /**
+   * Set the priority to use while a FJTaskRunner is
+   * polling for new tasks to perform. Default
+   * is currently Thread.MIN_PRIORITY+1. The value
+   * set may not go into effect immediately, but
+   * will be used at least the next time a thread scans for work.
+   **/
+  public synchronized void setScanPriorities(int pri) {
+    for (int i = 0; i < threads.length; ++i) {
+      FJTaskRunner t = threads[i];
+      t.setScanPriority(pri);
+      if (!t.active) t.setPriority(pri);
+    }
+  }
+
+
+  /**
+   * Set the priority to use while a FJTaskRunner is
+   * actively running tasks. Default
+   * is the priority that was in effect by the thread that
+   * constructed this FJTaskRunnerGroup. Setting this value
+   * while threads are running may momentarily result in
+   * them running at this priority even when idly waiting for work.
+   **/
+  public synchronized void setRunPriorities(int pri) {
+    for (int i = 0; i < threads.length; ++i) {
+      FJTaskRunner t = threads[i];
+      t.setRunPriority(pri);
+      if (t.active) t.setPriority(pri);
+    }
+  }
+
+    
+
+  /** Return the number of FJTaskRunner threads in this group **/
+
+  public int size() { return threads.length; }
+
+
+  /** 
+   * Return the number of threads that are not idly waiting for work.
+   * Beware that even active threads might not be doing any useful
+   * work, but just spinning waiting for other dependent tasks.
+   * Also, since this is just a snapshot value, some tasks
+   * may be in the process of becoming idle.
+   **/
+  public synchronized int getActiveCount() { return activeCount; }
+
+  /**
+   * Prints various snapshot statistics to System.out.
+   * <ul>
+   *   <li> For each FJTaskRunner thread (labeled as T<em>n</em>, for
+   *         <em>n</em> from zero to group size - 1):
+   *     <ul>
+   *       <li> A star "*" is printed if the thread is currently active;
+   *            that is, not sleeping while waiting for work. Because
+   *            threads gradually enter sleep modes, an active thread
+   *            may in fact be about to sleep (or wake up).
+   *       <li> <em>Q Cap</em> The current capacity of its task queue.
+   *       <li> <em>Run</em> The total number of tasks that have been run.
+   *       <li> <em>New</em> The number of these tasks that were
+   *               taken from either the entry queue or from other 
+   *               thread queues; that is, the number of tasks run
+   *               that were <em>not</em> forked by the thread itself.
+   *       <li> <em>Scan</em> The number of times other task
+   *               queues or the entry queue were polled for tasks.
+   *     </ul>
+   *   <li> <em>Execute</em> The total number of tasks entered
+   *        (but not necessarily yet run) via execute or invoke.
+   *   <li> <em>Time</em> Time in seconds since construction of this
+   *         FJTaskRunnerGroup.
+   *   <li> <em>Rate</em> The total number of tasks processed
+   *          per second across all threads. This
+   *          may be useful as a simple throughput indicator
+   *          if all processed tasks take approximately the
+   *          same time to run.
+   * </ul>
+   * <p>
+   * Cautions: Some statistics are updated and gathered 
+   * without synchronization,
+   * so may not be accurate. However, reported counts may be considered
+   * as lower bounds of actual values. 
+   * Some values may be zero if classes are compiled
+   * with COLLECT_STATS set to false. (FJTaskRunner and FJTaskRunnerGroup
+   * classes can be independently compiled with different values of
+   * COLLECT_STATS.) Also, the counts are maintained as ints so could
+   * overflow in exceptionally long-lived applications.
+   * <p>
+   * These statistics can be useful when tuning algorithms or diagnosing
+   * problems. For example:
+   * <ul>
+   *  <li> High numbers of scans may mean that there is insufficient
+   *      parallelism to keep threads busy. However, high scan rates
+   *      are expected if the number
+   *      of Executes is also high or there is a lot of global
+   *      synchronization in the application, and the system is not otherwise
+   *      busy. Threads may scan
+   *      for work hundreds of times upon startup, shutdown, and
+   *      global synch points of task sets.
+   *  <li> Large imbalances in tasks run across different threads might
+   *      just reflect contention with unrelated threads on a system
+   *      (possibly including JVM threads such as GC), but may also
+   *      indicate some systematic bias in how you generate tasks.
+   *  <li> Large task queue capacities may mean that too many tasks are being
+   *     generated before they can be run. 
+   *     Capacities are reported rather than current numbers of tasks
+   *     in queues because they are better indicators of the existence
+   *     of these kinds of possibly-transient problems.
+   *     Queue capacities are
+   *     resized on demand from their initial value of 4096 elements,
+   *     which is much more than sufficient for the kinds of 
+   *     applications that this framework is intended to best support.
+   * </ul>
+   **/
+
+  public void stats() {
+    long time = System.currentTimeMillis() - initTime;
+    double secs = ((double)time) / 1000.0;
+    long totalRuns = 0;
+    long totalScans = 0;
+    long totalSteals = 0;
+
+    System.out.print("Thread" +
+                     "\tQ Cap" +
+                       "\tScans" +
+                       "\tNew" +
+                       "\tRuns" +
+                       "\n");
+
+    for (int i = 0; i < threads.length; ++i) {
+      FJTaskRunner t = threads[i];
+      int truns = t.runs;
+      totalRuns += truns;
+
+      int tscans = t.scans;
+      totalScans += tscans;
+
+      int tsteals = t.steals;
+      totalSteals += tsteals;
+
+      String star = (getActive(t))? "*" : " ";
+
+
+      System.out.print("T" + i + star +
+                       "\t" + t.deqSize() +
+                       "\t" + tscans +
+                       "\t" + tsteals +
+                       "\t" + truns +
+                       "\n");
+    }
+
+    System.out.print("Total" +
+                     "\t    " +
+                     "\t" + totalScans +
+                     "\t" + totalSteals +
+                     "\t" + totalRuns +
+                     "\n");
+
+    System.out.print("Execute: " + entries); 
+    
+    System.out.print("\tTime: " + secs);
+
+    long rps = 0;
+    if (secs != 0) rps = Math.round((double)(totalRuns) / secs);
+
+    System.out.println("\tRate: " + rps);
+  }
+
+
+  /* ------------ Methods called only by FJTaskRunners ------------- */
+
+
+  /**
+   * Return the array of threads in this group. 
+   * Called only by FJTaskRunner.scan().
+   **/
+
+  protected FJTaskRunner[] getArray() { return threads; }
+
+
+  /**
+   * Return a task from entry queue, or null if empty.
+   * Called only by FJTaskRunner.scan().
+   **/
+
+  protected FJTask pollEntryQueue() {
+    try {
+      FJTask t = (FJTask)(entryQueue.poll(0));
+      return t;
+    }
+    catch(InterruptedException ex) { // ignore interrupts
+      Thread.currentThread().interrupt();
+      return null;
+    }
+  }
+
+
+  /**
+   * Return active status of t.
+   * Per-thread active status can only be accessed and
+   * modified via synchronized method here in the group class.
+   **/
+
+  protected synchronized boolean getActive(FJTaskRunner t) {
+    return t.active;
+  }
+
+
+  /**
+   * Set active status of thread t to true, and notify others
+   * that might be waiting for work. 
+   **/
+
+  protected synchronized void setActive(FJTaskRunner t) {
+    if (!t.active) { 
+      t.active = true;
+      ++activeCount;
+      if (nstarted < threads.length) 
+        threads[nstarted++].start();
+      else
+        notifyAll();
+    }
+  }
+
+  /**
+   * Set active status of thread t to false.
+   **/
+
+  protected synchronized void setInactive(FJTaskRunner t) {
+    if (t.active) { 
+      t.active = false;
+      --activeCount;
+    }
+  }
+
+  /**
+   * The number of times to scan other threads for tasks 
+   * before transitioning to a mode where scans are
+   * interleaved with sleeps (actually timed waits).
+   * Upon transition, sleeps are for duration of
+   * scans / SCANS_PER_SLEEP milliseconds.
+   * <p>
+   * This is not treated as a user-tunable parameter because
+   * good values do not appear to vary much across JVMs or
+   * applications. Its main role is to help avoid some
+   * useless spinning and contention during task startup.
+   **/
+  static final long SCANS_PER_SLEEP = 15;
+
+  /**
+   * The maximum time (in msecs) to sleep when a thread is idle,
+   * yet others are not, so may eventually generate work that
+   * the current thread can steal. This value reflects the maximum time
+   * that a thread may sleep when it possibly should not, because there
+   * are other active threads that might generate work. In practice,
+   * designs in which some threads become stalled because others
+   * are running yet not generating tasks are not likely to work
+   * well in this framework anyway, so the exact value does not matter
+   * too much. However, keeping it in the sub-second range does
+   * help smooth out startup and shutdown effects.
+   **/
+
+  static final long MAX_SLEEP_TIME = 100;
+
+  /**
+   * Set active status of thread t to false, and
+   * then wait until: (a) there is a task in the entry 
+   * queue, or (b) other threads are active, or (c) the current
+   * thread is interrupted. Upon return, it
+   * is not certain that there will be work available.
+   * The thread must itself check. 
+   * <p>
+   * The main underlying reason
+   * for these mechanics is that threads do not
+   * signal each other when they add elements to their queues.
+   * (This would add to task overhead, reduce locality.
+   * and increase contention.)
+   * So we must rely on a tamed form of polling. However, tasks
+   * inserted into the entry queue do result in signals, so
+   * tasks can wait on these if all of them are otherwise idle.
+   **/
+
+  protected synchronized void checkActive(FJTaskRunner t, long scans) {
+
+    setInactive(t);
+
+    try {
+      // if nothing available, do a hard wait
+      if (activeCount == 0 && entryQueue.peek() == null) { 
+        wait();
+      }
+      else { 
+        // If there is possibly some work,
+        // sleep for a while before rechecking 
+
+        long msecs = scans / SCANS_PER_SLEEP;
+        if (msecs > MAX_SLEEP_TIME) msecs = MAX_SLEEP_TIME;
+        int nsecs = (msecs == 0) ? 1 : 0; // forces shortest possible sleep
+        wait(msecs, nsecs);
+      }
+    }
+    catch (InterruptedException ex) {
+      notify(); // avoid lost notifies on interrupts
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  /* ------------ Utility methods  ------------- */
+
+  /**
+   * Start or wake up any threads waiting for work
+   **/
+
+  protected synchronized void signalNewTask() {
+    if (COLLECT_STATS) ++entries;
+    if (nstarted < threads.length) 
+       threads[nstarted++].start();
+    else
+      notify();
+  }
+
+  /**
+   * Create all FJTaskRunner threads in this group.
+   **/
+
+  protected void initializeThreads() {
+    for (int i = 0; i < threads.length; ++i) threads[i] = new FJTaskRunner(this);
+  }
+
+
+
+
+  /**
+   * Wrap wait/notify mechanics around a task so that
+   * invoke() can wait it out 
+   **/
+  protected static final class InvokableFJTask extends FJTask {
+    protected final Runnable wrapped;
+    protected boolean terminated = false;
+
+    protected InvokableFJTask(Runnable r) { wrapped = r; }
+
+    public void run() {
+      try {
+        if (wrapped instanceof FJTask)
+          FJTask.invoke((FJTask)(wrapped));
+        else
+          wrapped.run();
+      }
+      finally {
+        setTerminated();
+      }
+    }
+
+    protected synchronized void setTerminated() {
+      terminated = true;
+      notifyAll(); 
+    }
+
+    protected synchronized void awaitTermination() throws InterruptedException {
+      while (!terminated) wait();
+    }
+  }
+
+
+}
+