diff src/EDU/oswego/cs/dl/util/concurrent/FJTaskRunner.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/EDU/oswego/cs/dl/util/concurrent/FJTaskRunner.java	Sat Mar 21 11:00:06 2009 +0000
@@ -0,0 +1,974 @@
+/*
+  File: FJTaskRunner.java
+
+  Originally written by Doug Lea and released into the public domain.
+  This may be used for any purposes whatsoever without acknowledgment.
+  Thanks for the assistance and support of Sun Microsystems Labs,
+  and everyone contributing, testing, and using this code.
+
+  History:
+  Date       Who                What
+  7Jan1999   dl                 First public release
+  13Jan1999  dl                 correct a stat counter update; 
+                                ensure inactive status on run termination;
+                                misc minor cleaup
+  14Jan1999  dl                 Use random starting point in scan;
+                                variable renamings.
+  18Jan1999  dl                 Runloop allowed to die on task exception;
+                                remove useless timed join
+  22Jan1999  dl                 Rework scan to allow use of priorities.
+  6Feb1999   dl                 Documentation updates.
+  7Mar1999   dl                 Add array-based coInvoke
+  31Mar1999  dl                 Revise scan to remove need for NullTasks
+  27Apr1999  dl                 Renamed
+  23oct1999  dl                 Earlier detect of interrupt in scanWhileIdling
+  24nov1999  dl                 Now works on JVMs that do not properly
+                                implement read-after-write of 2 volatiles.
+*/
+
+package EDU.oswego.cs.dl.util.concurrent;
+
+import java.util.Random;
+
+/**
+ * Specialized Thread subclass for running FJTasks.
+ * <p>
+ * Each FJTaskRunner keeps FJTasks in a double-ended queue (DEQ).
+ * Double-ended queues support stack-based operations
+ * push and pop, as well as queue-based operations put and take.
+ * Normally, threads run their own tasks. But they
+ * may also steal tasks from each others DEQs.
+ * <p>
+ * The algorithms are minor variants of those used
+ * in <A href="http://supertech.lcs.mit.edu/cilk/"> Cilk</A> and
+ * <A href="http://www.cs.utexas.edu/users/hood/"> Hood</A>, and
+ * to a lesser extent 
+ * <A href="http://www.cs.uga.edu/~dkl/filaments/dist.html"> Filaments</A>,
+ * but are adapted to work in Java.
+ * <p>
+ * The two most important capabilities are:
+ * <ul>
+ *  <li> Fork a FJTask: 
+ *  <pre>
+ *  Push task onto DEQ
+ *  </pre>
+ *  <li> Get a task to run (for example within taskYield)
+ *  <pre>
+ *  If DEQ is not empty, 
+ *     Pop a task and run it.
+ *  Else if any other DEQ is not empty, 
+ *     Take ("steal") a task from it and run it.
+ *  Else if the entry queue for our group is not empty,
+ *     Take a task from it and run it.
+ *  Else if current thread is otherwise idling
+ *     If all threads are idling
+ *        Wait for a task to be put on group entry queue
+ *  Else
+ *      Yield or Sleep for a while, and then retry
+ *  </pre>
+ * </ul>
+ * The push, pop, and put are designed to only ever called by the
+ * current thread, and take (steal) is only ever called by
+ * other threads.
+ * All other operations are composites and variants of these,
+ * plus a few miscellaneous bookkeeping methods.
+ * <p>
+ * Implementations of the underlying representations and operations
+ * are geared for use on JVMs operating on multiple CPUs (although
+ * they should of course work fine on single CPUs as well).
+ * <p>
+ * A possible snapshot of a FJTaskRunner's DEQ is:
+ * <pre>
+ *     0     1     2     3     4     5     6    ...
+ *  +-----+-----+-----+-----+-----+-----+-----+--
+ *  |     |  t  |  t  |  t  |  t  |     |     | ...  deq array
+ *  +-----+-----+-----+-----+-----+-----+-----+--
+ *           ^                       ^
+ *          base                    top 
+ *   (incremented                     (incremented 
+ *       on take,                         on push    
+ *    decremented                     decremented
+ *       on put)                          on pop)
+ * </pre>
+ * <p>
+ * FJTasks are held in elements of the DEQ. 
+ * They are maintained in a bounded array that
+ * works similarly to a circular bounded buffer. To ensure
+ * visibility of stolen FJTasks across threads, the array elements
+ * must be <code>volatile</code>. 
+ * Using volatile rather than synchronizing suffices here since
+ * each task accessed by a thread is either one that it
+ * created or one that has never seen before. Thus we cannot
+ * encounter any staleness problems executing run methods,
+ * although FJTask programmers must be still sure to either synch or use
+ * volatile for shared data within their run methods.
+ * <p>
+ * However, since there is no way
+ * to declare an array of volatiles in Java, the DEQ elements actually
+ * hold VolatileTaskRef objects, each of which in turn holds a
+ * volatile reference to a FJTask. 
+ * Even with the double-indirection overhead of 
+ * volatile refs, using an array for the DEQ works out
+ * better than linking them since fewer shared
+ * memory locations need to be
+ * touched or modified by the threads while using the DEQ.
+ * Further, the double indirection may alleviate cache-line
+ * sharing effects (which cannot otherwise be directly dealt with in Java).
+ * <p>
+ * The indices for the <code>base</code> and <code>top</code> of the DEQ
+ * are declared as volatile. The main contention point with
+ * multiple FJTaskRunner threads occurs when one thread is trying
+ * to pop its own stack while another is trying to steal from it.
+ * This is handled via a specialization of Dekker's algorithm,
+ * in which the popping thread pre-decrements <code>top</code>,
+ * and then checks it against <code>base</code>. 
+ * To be conservative in the face of JVMs that only partially
+ * honor the specification for volatile, the pop proceeds
+ * without synchronization only if there are apparently enough
+ * items for both a simultaneous pop and take to succeed.
+ * It otherwise enters a 
+ * synchronized lock to check if the DEQ is actually empty,
+ * if so failing. The stealing thread
+ * does almost the opposite, but is set up to be less likely
+ * to win in cases of contention: Steals always run under synchronized
+ * locks in order to avoid conflicts with other ongoing steals.
+ * They pre-increment <code>base</code>, and then check against
+ * <code>top</code>. They back out (resetting the base index 
+ * and failing to steal) if the
+ * DEQ is empty or is about to become empty by an ongoing pop.
+ * <p>
+ * A push operation can normally run concurrently with a steal.
+ * A push enters a synch lock only if the DEQ appears full so must
+ * either be resized or have indices adjusted due to wrap-around
+ * of the bounded DEQ. The put operation always requires synchronization.
+ * <p>
+ * When a FJTaskRunner thread has no tasks of its own to run, 
+ * it tries to be a good citizen. 
+ * Threads run at lower priority while scanning for work.
+ * <p>
+ * If the task is currently waiting
+ * via yield, the thread alternates scans (starting at a randomly 
+ * chosen victim) with Thread.yields. This is
+ * well-behaved so long as the JVM handles Thread.yield in a
+ * sensible fashion. (It need not. Thread.yield is so underspecified
+ * that it is legal for a JVM to treat it as a no-op.) This also
+ * keeps things well-behaved even if we are running on a uniprocessor
+ * JVM using a simple cooperative threading model.
+ * <p>
+ * If a thread needing work is
+ * is otherwise idle (which occurs only in the main runloop), and
+ * there are no available tasks to steal or poll, it
+ * instead enters into a sleep-based (actually timed wait(msec))
+ * phase in which it progressively sleeps for longer durations
+ * (up to a maximum of FJTaskRunnerGroup.MAX_SLEEP_TIME,
+ * currently 100ms) between scans. 
+ * If all threads in the group
+ * are idling, they further progress to a hard wait phase, suspending
+ * until a new task is entered into the FJTaskRunnerGroup entry queue.
+ * A sleeping FJTaskRunner thread may be awakened by a new
+ * task being put into the group entry queue or by another FJTaskRunner
+ * becoming active, but not merely by some DEQ becoming non-empty.
+ * Thus the MAX_SLEEP_TIME provides a bound for sleep durations
+ * in cases where all but one worker thread start sleeping
+ * even though there will eventually be work produced
+ * by a thread that is taking a long time to place tasks in DEQ.
+ * These sleep mechanics are handled in the FJTaskRunnerGroup class.
+ * <p>
+ * Composite operations such as taskJoin include heavy
+ * manual inlining of the most time-critical operations
+ * (mainly FJTask.invoke). 
+ * This opens up a few opportunities for further hand-optimizations. 
+ * Until Java compilers get a lot smarter, these tweaks
+ * improve performance significantly enough for task-intensive 
+ * programs to be worth the poorer maintainability and code duplication.
+ * <p>
+ * Because they are so fragile and performance-sensitive, nearly
+ * all methods are declared as final. However, nearly all fields
+ * and methods are also declared as protected, so it is possible,
+ * with much care, to extend functionality in subclasses. (Normally
+ * you would also need to subclass FJTaskRunnerGroup.)
+ * <p>
+ * None of the normal java.lang.Thread class methods should ever be called
+ * on FJTaskRunners. For this reason, it might have been nicer to
+ * declare FJTaskRunner as a Runnable to run within a Thread. However,
+ * this would have complicated many minor logistics. And since
+ * no FJTaskRunner methods should normally be called from outside the
+ * FJTask and FJTaskRunnerGroup classes either, this decision doesn't impact
+ * usage.
+ * <p>
+ * You might think that layering this kind of framework on top of
+ * Java threads, which are already several levels removed from raw CPU
+ * scheduling on most systems, would lead to very poor performance. 
+ * But on the platforms
+ * tested, the performance is quite good.
+ * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
+ * @see FJTask
+ * @see FJTaskRunnerGroup
+ **/
+
+public class FJTaskRunner extends Thread {
+  
+  /** The group of which this FJTaskRunner is a member **/
+  protected final FJTaskRunnerGroup group;
+
+  /**
+   *  Constructor called only during FJTaskRunnerGroup initialization
+   **/
+
+  protected FJTaskRunner(FJTaskRunnerGroup g) { 
+    group = g;
+    victimRNG = new Random(System.identityHashCode(this));
+    runPriority = getPriority();
+    setDaemon(true);
+  }
+
+  /**
+   * Return the FJTaskRunnerGroup of which this thread is a member
+   **/
+  
+  protected final FJTaskRunnerGroup getGroup() { return group; }
+
+
+  /* ------------ DEQ Representation ------------------- */
+
+
+  /**
+   * FJTasks are held in an array-based DEQ with INITIAL_CAPACITY
+   * elements. The DEQ is grown if necessary, but default value is
+   * normally much more than sufficient unless  there are
+   * user programming errors or questionable operations generating
+   * large numbers of Tasks without running them.
+   * Capacities must be a power of two. 
+   **/
+
+  protected static final int INITIAL_CAPACITY = 4096; 
+
+  /**
+   * The maximum supported DEQ capacity.
+   * When exceeded, FJTaskRunner operations throw Errors
+   **/
+
+  protected static final int MAX_CAPACITY = 1 << 30;
+
+  /**
+   * An object holding a single volatile reference to a FJTask.
+   **/
+  
+  protected final static class VolatileTaskRef {
+    /** The reference **/
+    protected volatile FJTask ref;
+
+    /** Set the reference **/
+    protected final void put(FJTask r) { ref = r; }
+    /** Return the reference **/
+    protected final FJTask get()     { return ref; }
+    /** Return the reference and clear it **/
+    protected final FJTask take()    { FJTask r = ref; ref = null; return r;  }
+
+    /**
+     * Initialization utility for constructing arrays. 
+     * Make an array of given capacity and fill it with
+     * VolatileTaskRefs.
+     **/
+    protected static VolatileTaskRef[] newArray(int cap) {
+      VolatileTaskRef[] a = new VolatileTaskRef[cap];
+      for (int k = 0; k < cap; k++) a[k] = new VolatileTaskRef();
+      return a;
+    }
+
+  }
+
+  /**
+   * The DEQ array.
+   **/
+    
+  protected VolatileTaskRef[] deq = VolatileTaskRef.newArray(INITIAL_CAPACITY);
+
+  /** Current size of the task DEQ **/
+  protected int deqSize() { return deq.length; }
+
+  /** 
+   * Current top of DEQ. Generally acts just like a stack pointer in an 
+   * array-based stack, except that it circularly wraps around the
+   * array, as in an array-based queue. The value is NOT
+   * always kept within <code>0 ... deq.length</code> though. 
+   * The current top element is always at <code>top & (deq.length-1)</code>.
+   * To avoid integer overflow, top is reset down 
+   * within bounds whenever it is noticed to be out out bounds;
+   * at worst when it is at <code>2 * deq.length</code>.
+   **/
+  protected volatile int top = 0;
+
+
+  /** 
+   * Current base of DEQ. Acts like a take-pointer in an
+   * array-based bounded queue. Same bounds and usage as top.
+   **/
+
+  protected volatile int base = 0;
+
+
+  /**
+   * An extra object to synchronize on in order to
+   * achieve a memory barrier.
+   **/
+
+  protected final Object barrier = new Object();
+
+  /* ------------ Other BookKeeping ------------------- */
+
+  /**
+   * Record whether current thread may be processing a task
+   * (i.e., has been started and is not in an idle wait).
+   * Accessed, under synch, ONLY by FJTaskRunnerGroup, but the field is
+   * stored here for simplicity.
+   **/
+
+  protected boolean active = false;
+
+  /** Random starting point generator for scan() **/
+  protected final Random victimRNG;
+
+
+  /** Priority to use while scanning for work **/
+  protected int scanPriority = FJTaskRunnerGroup.DEFAULT_SCAN_PRIORITY;
+
+  /** Priority to use while running tasks **/
+  protected int runPriority;
+
+  /**
+   * Set the priority to use while scanning.
+   * We do not bother synchronizing access, since
+   * by the time the value is needed, both this FJTaskRunner 
+   * and its FJTaskRunnerGroup will
+   * necessarily have performed enough synchronization
+   * to avoid staleness problems of any consequence.
+   **/
+  protected void setScanPriority(int pri) { scanPriority = pri; }
+
+
+  /**
+   * Set the priority to use while running tasks.
+   * Same usage and rationale as setScanPriority.
+   **/
+  protected void setRunPriority(int pri) {  runPriority = pri; }
+
+  /**
+   * Compile-time constant for statistics gathering.
+   * Even when set, reported values may not be accurate
+   * since all are read and written without synchronization.
+   **/
+
+
+
+  static final boolean COLLECT_STATS = true;
+  // static final boolean COLLECT_STATS = false;
+
+
+  // for stat collection
+
+  /** Total number of tasks run **/
+  protected int runs = 0;
+
+  /** Total number of queues scanned for work **/
+  protected int scans = 0;
+
+  /** Total number of tasks obtained via scan **/
+  protected int steals = 0;
+
+
+
+
+  /* ------------ DEQ operations ------------------- */
+
+
+  /**
+   * Push a task onto DEQ.
+   * Called ONLY by current thread.
+   **/
+
+  protected final void push(final FJTask r) {
+    int t = top;
+
+    /*
+      This test catches both overflows and index wraps.  It doesn't
+      really matter if base value is in the midst of changing in take. 
+      As long as deq length is < 2^30, we are guaranteed to catch wrap in
+      time since base can only be incremented at most length times
+      between pushes (or puts). 
+    */
+
+    if (t < (base & (deq.length-1)) + deq.length) {
+
+      deq[t & (deq.length-1)].put(r);
+      top = t + 1;
+    }
+
+    else  // isolate slow case to increase chances push is inlined
+      slowPush(r); // check overflow and retry
+  }
+
+
+  /**
+   * Handle slow case for push
+   **/
+
+  protected synchronized void slowPush(final FJTask r) {
+    checkOverflow();
+    push(r); // just recurse -- this one is sure to succeed.
+  }
+
+
+  /**
+   * Enqueue task at base of DEQ.
+   * Called ONLY by current thread.
+   * This method is currently not called from class FJTask. It could be used
+   * as a faster way to do FJTask.start, but most users would
+   * find the semantics too confusing and unpredictable.
+   **/
+
+  protected final synchronized void put(final FJTask r) {
+    for (;;) {
+      int b = base - 1;
+      if (top < b + deq.length) {
+        
+        int newBase = b & (deq.length-1);
+        deq[newBase].put(r);
+        base = newBase;
+        
+        if (b != newBase) { // Adjust for index underflow
+          int newTop = top & (deq.length-1);
+          if (newTop < newBase) newTop += deq.length;
+          top = newTop;
+        }
+        return;
+      }
+      else {
+        checkOverflow();
+        // ... and retry
+      }
+    }
+  }
+
+  /**
+   * Return a popped task, or null if DEQ is empty.
+   * Called ONLY by current thread.
+   * <p>
+   * This is not usually called directly but is
+   * instead inlined in callers. This version differs from the
+   * cilk algorithm in that pop does not fully back down and
+   * retry in the case of potential conflict with take. It simply
+   * rechecks under synch lock. This gives a preference
+   * for threads to run their own tasks, which seems to
+   * reduce flailing a bit when there are few tasks to run.
+   **/
+
+  protected final FJTask pop() {
+    /* 
+       Decrement top, to force a contending take to back down.
+    */
+
+    int t = --top;      
+
+    /*
+      To avoid problems with JVMs that do not properly implement
+      read-after-write of a pair of volatiles, we conservatively
+      grab without lock only if the DEQ appears to have at least two
+      elements, thus guaranteeing that both a pop and take will succeed,
+      even if the pre-increment in take is not seen by current thread.
+      Otherwise we recheck under synch.
+    */
+
+    if (base + 1 < t) 
+      return deq[t & (deq.length-1)].take();
+    else
+      return confirmPop(t);
+
+  }
+
+
+  /**
+   * Check under synch lock if DEQ is really empty when doing pop. 
+   * Return task if not empty, else null.
+   **/
+
+  protected final synchronized FJTask confirmPop(int provisionalTop) {
+    if (base <= provisionalTop) 
+      return deq[provisionalTop & (deq.length-1)].take();
+    else {    // was empty
+      /*
+        Reset DEQ indices to zero whenever it is empty.
+        This both avoids unnecessary calls to checkOverflow
+        in push, and helps keep the DEQ from accumulating garbage
+      */
+
+      top = base = 0;
+      return null;
+    }
+  }
+
+
+  /** 
+   * Take a task from the base of the DEQ.
+   * Always called by other threads via scan()
+   **/
+
+  
+  protected final synchronized FJTask take() {
+
+    /*
+      Increment base in order to suppress a contending pop
+    */
+    
+    int b = base++;     
+    
+    if (b < top) 
+      return confirmTake(b);
+    else {
+      // back out
+      base = b; 
+      return null;
+    }
+  }
+
+
+  /**
+   * double-check a potential take
+   **/
+  
+  protected FJTask confirmTake(int oldBase) {
+
+    /*
+      Use a second (guaranteed uncontended) synch
+      to serve as a barrier in case JVM does not
+      properly process read-after-write of 2 volatiles
+    */
+
+    synchronized(barrier) {
+      if (oldBase < top) {
+        /*
+          We cannot call deq[oldBase].take here because of possible races when
+          nulling out versus concurrent push operations.  Resulting
+          accumulated garbage is swept out periodically in
+          checkOverflow, or more typically, just by keeping indices
+          zero-based when found to be empty in pop, which keeps active
+          region small and constantly overwritten. 
+        */
+        
+        return deq[oldBase & (deq.length-1)].get();
+      }
+      else {
+        base = oldBase;
+        return null;
+      }
+    }
+  }
+
+
+  /**
+   * Adjust top and base, and grow DEQ if necessary.
+   * Called only while DEQ synch lock being held.
+   * We don't expect this to be called very often. In most
+   * programs using FJTasks, it is never called.
+   **/
+
+  protected void checkOverflow() { 
+    int t = top;
+    int b = base;
+    
+    if (t - b < deq.length-1) { // check if just need an index reset
+      
+      int newBase = b & (deq.length-1);
+      int newTop  = top & (deq.length-1);
+      if (newTop < newBase) newTop += deq.length;
+      top = newTop;
+      base = newBase;
+      
+      /* 
+         Null out refs to stolen tasks. 
+         This is the only time we can safely do it.
+      */
+      
+      int i = newBase;
+      while (i != newTop && deq[i].ref != null) {
+        deq[i].ref = null;
+        i = (i - 1) & (deq.length-1);
+      }
+      
+    }
+    else { // grow by doubling array
+      
+      int newTop = t - b;
+      int oldcap = deq.length;
+      int newcap = oldcap * 2;
+      
+      if (newcap >= MAX_CAPACITY)
+        throw new Error("FJTask queue maximum capacity exceeded");
+      
+      VolatileTaskRef[] newdeq = new VolatileTaskRef[newcap];
+      
+      // copy in bottom half of new deq with refs from old deq
+      for (int j = 0; j < oldcap; ++j) newdeq[j] = deq[b++ & (oldcap-1)];
+      
+      // fill top half of new deq with new refs
+      for (int j = oldcap; j < newcap; ++j) newdeq[j] = new VolatileTaskRef();
+      
+      deq = newdeq;
+      base = 0;
+      top = newTop;
+    }
+  }
+
+
+  /* ------------ Scheduling  ------------------- */
+
+
+  /**
+   * Do all but the pop() part of yield or join, by
+   * traversing all DEQs in our group looking for a task to
+   * steal. If none, it checks the entry queue. 
+   * <p>
+   * Since there are no good, portable alternatives,
+   * we rely here on a mixture of Thread.yield and priorities
+   * to reduce wasted spinning, even though these are
+   * not well defined. We are hoping here that the JVM
+   * does something sensible.
+   * @param waitingFor if non-null, the current task being joined
+   **/
+
+  protected void scan(final FJTask waitingFor) {
+
+    FJTask task = null;
+
+    // to delay lowering priority until first failure to steal
+    boolean lowered = false;
+    
+    /*
+      Circularly traverse from a random start index. 
+      
+      This differs slightly from cilk version that uses a random index
+      for each attempted steal.
+      Exhaustive scanning might impede analytic tractablity of 
+      the scheduling policy, but makes it much easier to deal with
+      startup and shutdown.
+    */
+    
+    FJTaskRunner[] ts = group.getArray();
+    int idx = victimRNG.nextInt(ts.length);
+    
+    for (int i = 0; i < ts.length; ++i) {
+      
+      FJTaskRunner t = ts[idx];
+      if (++idx >= ts.length) idx = 0; // circularly traverse
+      
+      if (t != null && t != this) {
+        
+        if (waitingFor != null && waitingFor.isDone()) {
+          break;
+        }
+        else {
+          if (COLLECT_STATS) ++scans;
+          task = t.take();
+          if (task != null) {
+            if (COLLECT_STATS) ++steals;
+            break;
+          }
+          else if (isInterrupted()) {
+            break;
+          }
+          else if (!lowered) { // if this is first fail, lower priority
+            lowered = true;
+            setPriority(scanPriority);
+          }
+          else {           // otherwise we are at low priority; just yield
+            yield();
+          }
+        }
+      }
+      
+    } 
+
+    if (task == null) {
+      if (COLLECT_STATS) ++scans;
+      task = group.pollEntryQueue();
+      if (COLLECT_STATS) if (task != null) ++steals;
+    }
+    
+    if (lowered) setPriority(runPriority);
+    
+    if (task != null && !task.isDone()) {
+      if (COLLECT_STATS) ++runs;
+      task.run(); 
+      task.setDone(); 
+    }
+
+  }
+
+  /**
+   * Same as scan, but called when current thread is idling.
+   * It repeatedly scans other threads for tasks,
+   * sleeping while none are available. 
+   * <p>
+   * This differs from scan mainly in that
+   * since there is no reason to return to recheck any
+   * condition, we iterate until a task is found, backing
+   * off via sleeps if necessary.
+   **/
+
+  protected void scanWhileIdling() {
+    FJTask task = null;
+    
+    boolean lowered = false;
+    long iters = 0;
+    
+    FJTaskRunner[] ts = group.getArray();
+    int idx = victimRNG.nextInt(ts.length);
+    
+    do {
+      for (int i = 0; i < ts.length; ++i) {
+        
+        FJTaskRunner t = ts[idx];
+        if (++idx >= ts.length) idx = 0; // circularly traverse
+        
+        if (t != null && t != this) {
+          if (COLLECT_STATS) ++scans;
+          
+          task = t.take();
+          if (task != null) {
+            if (COLLECT_STATS) ++steals;
+            if (lowered) setPriority(runPriority);
+            group.setActive(this);
+            break;
+          }
+        }
+      } 
+      
+      if (task == null) {
+        if (isInterrupted()) 
+          return;
+        
+        if (COLLECT_STATS) ++scans;
+        task = group.pollEntryQueue();
+        
+        if (task != null) {
+          if (COLLECT_STATS) ++steals;
+          if (lowered) setPriority(runPriority);
+          group.setActive(this);
+        }
+        else {
+          ++iters;
+          //  Check here for yield vs sleep to avoid entering group synch lock
+          if (iters >= group.SCANS_PER_SLEEP) {
+            group.checkActive(this, iters);
+            if (isInterrupted())
+              return;
+          }
+          else if (!lowered) {
+            lowered = true;
+            setPriority(scanPriority);
+          }
+          else {
+            yield();
+          }
+        }
+      }
+    } while (task == null);
+
+
+    if (!task.isDone()) {
+      if (COLLECT_STATS) ++runs;
+      task.run(); 
+      task.setDone(); 
+    }
+    
+  }
+
+  /* ------------  composite operations ------------------- */
+
+    
+  /**
+   * Main runloop
+   **/
+
+  public void run() {
+    try{ 
+      while (!interrupted()) {
+        
+        FJTask task = pop();
+        if (task != null) {
+          if (!task.isDone()) {
+            // inline FJTask.invoke
+            if (COLLECT_STATS) ++runs;
+            task.run(); 
+            task.setDone(); 
+          }
+        }
+        else
+          scanWhileIdling();
+      }
+    }
+    finally {
+      group.setInactive(this);
+    }
+  }
+
+  /**
+   * Execute a task in this thread. Generally called when current task
+   * cannot otherwise continue.
+   **/
+
+    
+  protected final void taskYield() {
+    FJTask task = pop();
+    if (task != null) {
+      if (!task.isDone()) {
+        if (COLLECT_STATS) ++runs;
+        task.run(); 
+        task.setDone(); 
+      }
+    }
+    else
+      scan(null);
+  }
+
+
+  /**
+   * Process tasks until w is done.
+   * Equivalent to <code>while(!w.isDone()) taskYield(); </code>
+   **/
+
+  protected final void taskJoin(final FJTask w) {
+
+    while (!w.isDone()) { 
+
+      FJTask task = pop();
+      if (task != null) {
+        if (!task.isDone()) {
+          if (COLLECT_STATS) ++runs;
+          task.run(); 
+          task.setDone(); 
+          if (task == w) return; // fast exit if we just ran w
+        }
+      }
+      else
+        scan(w);
+    }
+  }
+
+  /**
+   * A specialized expansion of
+   * <code> w.fork(); invoke(v); w.join(); </code>
+   **/
+
+
+  protected final void coInvoke(final FJTask w, final FJTask v) {
+
+    // inline  push
+
+    int t = top;
+    if (t < (base & (deq.length-1)) + deq.length) {
+
+      deq[t & (deq.length-1)].put(w);
+      top = t + 1;
+
+      // inline  invoke
+
+      if (!v.isDone()) { 
+        if (COLLECT_STATS) ++runs; 
+        v.run(); 
+        v.setDone(); 
+      }
+      
+      // inline  taskJoin
+      
+      while (!w.isDone()) {
+        FJTask task  = pop();
+        if (task != null) {
+          if (!task.isDone()) {
+            if (COLLECT_STATS) ++runs;
+            task.run(); 
+            task.setDone(); 
+            if (task == w) return; // fast exit if we just ran w
+          }
+        }
+        else
+          scan(w);
+      }
+    }
+
+    else      // handle non-inlinable cases
+      slowCoInvoke(w, v);
+  }
+
+
+  /**
+   * Backup to handle noninlinable cases of coInvoke
+   **/
+
+  protected void slowCoInvoke(final FJTask w, final FJTask v) {
+    push(w); // let push deal with overflow
+    FJTask.invoke(v);
+    taskJoin(w);
+  }
+
+
+  /**
+   * Array-based version of coInvoke
+   **/
+
+  protected final void coInvoke(FJTask[] tasks) {
+    int nforks = tasks.length - 1;
+
+    // inline bulk push of all but one task
+
+    int t = top;
+
+    if (nforks >= 0 && t + nforks < (base & (deq.length-1)) + deq.length) {
+      for (int i = 0; i < nforks; ++i) {
+        deq[t++ & (deq.length-1)].put(tasks[i]);
+        top = t;
+      }
+
+      // inline invoke of one task
+      FJTask v = tasks[nforks];
+      if (!v.isDone()) { 
+        if (COLLECT_STATS) ++runs; 
+        v.run(); 
+        v.setDone(); 
+      }
+      
+      // inline  taskJoins
+      
+      for (int i = 0; i < nforks; ++i) { 
+        FJTask w = tasks[i];
+        while (!w.isDone()) {
+
+          FJTask task = pop();
+          if (task != null) {
+            if (!task.isDone()) {
+              if (COLLECT_STATS) ++runs;
+              task.run(); 
+              task.setDone(); 
+            }
+          }
+          else
+            scan(w);
+        }
+      }
+    }
+
+    else  // handle non-inlinable cases
+      slowCoInvoke(tasks);
+  }
+
+  /**
+   * Backup to handle atypical or noninlinable cases of coInvoke
+   **/
+
+  protected void slowCoInvoke(FJTask[] tasks) {
+    for (int i = 0; i < tasks.length; ++i) push(tasks[i]);
+    for (int i = 0; i < tasks.length; ++i) taskJoin(tasks[i]);
+  }
+
+}
+