diff src/org/dancres/blitz/task/Tasks.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/org/dancres/blitz/task/Tasks.java	Sat Mar 21 11:00:06 2009 +0000
@@ -0,0 +1,147 @@
+package org.dancres.blitz.task;
+
+import java.util.Iterator;
+import java.util.HashMap;
+
+import java.util.logging.Logger;
+import java.util.logging.Level;
+
+import net.jini.config.ConfigurationException;
+
+import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
+import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue;
+import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
+import EDU.oswego.cs.dl.util.concurrent.Channel;
+
+import org.dancres.blitz.ActiveObject;
+import org.dancres.blitz.ActiveObjectRegistry;
+import org.dancres.blitz.Logging;
+import org.dancres.blitz.stats.StatsBoard;
+
+import org.dancres.blitz.config.ConfigurationFactory;
+import org.dancres.blitz.util.QueueStatGenerator;
+
+/**
+   Various operations within the space must be handled in background.
+   This class encapsulates a pool of threads which execute operations
+   (Task instances) as they are queued. <P>
+
+   It might, at first, appear to make sense to have separate pools for
+   event notification, blocked call wakeups etc. in the belief that one can
+   better control the balance of dispatch of, for example, notifies in
+   comparison with searches.  <P>
+
+   In reality, this won't work as each pool has a set of threads all with the
+   same priority.  i.e.  They share whatever CPU is available in a manner
+   driven by the number of tasks they must perform.  Thus, if one wishes
+   to truly balance, say, notification rate against search wakeups, one
+   must assign differing priorities to these <I>tasks</I> as opposed to
+   <I>threads</I> to ensure CPU consumption is bounded and that, whichever
+   tasks have priority, get to use the CPU first.
+ */
+public class Tasks implements ActiveObject {
+    private static Logger theLogger =
+        Logging.newLogger("org.dancres.blitz.task.Tasks");
+    
+    private static final String DEFAULT_QUEUE = "DefaultTask";
+
+    private static Tasks theTasks = new Tasks();
+
+    private static int MAX_TASK_THREADS;
+
+    private static int TASK_QUEUE_BOUND;
+
+    static {
+        try {
+            MAX_TASK_THREADS = ((Integer)
+                ConfigurationFactory.getEntry("maxTaskThreads", 
+                                              int.class,
+                                              new Integer(10))).intValue();
+            TASK_QUEUE_BOUND = ((Integer)
+                ConfigurationFactory.getEntry("taskQueueBound", 
+                                              int.class,
+                                              new Integer(0))).intValue();
+
+            theLogger.log(Level.INFO, "Maximum task threads: " +
+                          MAX_TASK_THREADS);
+            theLogger.log(Level.INFO, "Task queue bound: " +
+                          TASK_QUEUE_BOUND);
+
+        } catch (ConfigurationException aCE) {
+            theLogger.log(Level.SEVERE, "Failed to load config", aCE);
+        }
+    }
+
+    private HashMap theExecutors = new HashMap();
+
+    private Tasks() {
+        ActiveObjectRegistry.add(this);
+    }
+
+    public static void queue(Task aTask) throws InterruptedException {
+        queue(DEFAULT_QUEUE, aTask);
+    }
+
+    public static void queue(String aQueue, 
+                             Task aTask) throws InterruptedException {
+        theTasks.execute(aQueue, aTask);
+    }
+
+    private void execute(String aQueue, Task aTask)
+        throws InterruptedException {
+
+        getExecutor(aQueue).execute(aTask);
+    }
+
+    public void begin() {
+    }
+
+    public void halt() {
+        synchronized(theExecutors) {
+            Iterator myExecs = theExecutors.values().iterator();
+
+            while (myExecs.hasNext()) {
+                PooledExecutor myExec = (PooledExecutor) myExecs.next();
+                myExec.shutdownNow();
+            }
+        }
+    }
+
+    private PooledExecutor getExecutor(String aName) {
+        PooledExecutor myExec;
+
+        synchronized(theExecutors) {
+            myExec = (PooledExecutor) theExecutors.get(aName);
+
+            if (myExec == null) {
+
+                BoundedLinkedQueue myQueue;
+
+                if (TASK_QUEUE_BOUND == 0) {
+                    theLogger.log(Level.INFO,
+                                  "Creating task pool with no bounds");
+
+
+                    myQueue = new BoundedLinkedQueue(Integer.MAX_VALUE);
+                    myExec = new PooledExecutor(myQueue, MAX_TASK_THREADS);
+                } else {
+                    theLogger.log(Level.INFO,
+                                  "Creating task pool with bounds: " +
+                                  TASK_QUEUE_BOUND);
+
+                    myQueue = new BoundedLinkedQueue(TASK_QUEUE_BOUND);
+                    myExec =
+                        new PooledExecutor(myQueue, MAX_TASK_THREADS);
+                }
+
+                myExec.setMinimumPoolSize(MAX_TASK_THREADS);
+                // myExec.waitWhenBlocked();
+
+                StatsBoard.get().add(new QueueStatGenerator(aName, myQueue));
+                theExecutors.put(aName, myExec);
+            }
+
+            return myExec;
+        }
+    }
+}