Mercurial > hg > blitz_condensed
diff src/org/dancres/blitz/task/Tasks.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/org/dancres/blitz/task/Tasks.java Sat Mar 21 11:00:06 2009 +0000 @@ -0,0 +1,147 @@ +package org.dancres.blitz.task; + +import java.util.Iterator; +import java.util.HashMap; + +import java.util.logging.Logger; +import java.util.logging.Level; + +import net.jini.config.ConfigurationException; + +import EDU.oswego.cs.dl.util.concurrent.LinkedQueue; +import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue; +import EDU.oswego.cs.dl.util.concurrent.PooledExecutor; +import EDU.oswego.cs.dl.util.concurrent.Channel; + +import org.dancres.blitz.ActiveObject; +import org.dancres.blitz.ActiveObjectRegistry; +import org.dancres.blitz.Logging; +import org.dancres.blitz.stats.StatsBoard; + +import org.dancres.blitz.config.ConfigurationFactory; +import org.dancres.blitz.util.QueueStatGenerator; + +/** + Various operations within the space must be handled in background. + This class encapsulates a pool of threads which execute operations + (Task instances) as they are queued. <P> + + It might, at first, appear to make sense to have separate pools for + event notification, blocked call wakeups etc. in the belief that one can + better control the balance of dispatch of, for example, notifies in + comparison with searches. <P> + + In reality, this won't work as each pool has a set of threads all with the + same priority. i.e. They share whatever CPU is available in a manner + driven by the number of tasks they must perform. Thus, if one wishes + to truly balance, say, notification rate against search wakeups, one + must assign differing priorities to these <I>tasks</I> as opposed to + <I>threads</I> to ensure CPU consumption is bounded and that, whichever + tasks have priority, get to use the CPU first. + */ +public class Tasks implements ActiveObject { + private static Logger theLogger = + Logging.newLogger("org.dancres.blitz.task.Tasks"); + + private static final String DEFAULT_QUEUE = "DefaultTask"; + + private static Tasks theTasks = new Tasks(); + + private static int MAX_TASK_THREADS; + + private static int TASK_QUEUE_BOUND; + + static { + try { + MAX_TASK_THREADS = ((Integer) + ConfigurationFactory.getEntry("maxTaskThreads", + int.class, + new Integer(10))).intValue(); + TASK_QUEUE_BOUND = ((Integer) + ConfigurationFactory.getEntry("taskQueueBound", + int.class, + new Integer(0))).intValue(); + + theLogger.log(Level.INFO, "Maximum task threads: " + + MAX_TASK_THREADS); + theLogger.log(Level.INFO, "Task queue bound: " + + TASK_QUEUE_BOUND); + + } catch (ConfigurationException aCE) { + theLogger.log(Level.SEVERE, "Failed to load config", aCE); + } + } + + private HashMap theExecutors = new HashMap(); + + private Tasks() { + ActiveObjectRegistry.add(this); + } + + public static void queue(Task aTask) throws InterruptedException { + queue(DEFAULT_QUEUE, aTask); + } + + public static void queue(String aQueue, + Task aTask) throws InterruptedException { + theTasks.execute(aQueue, aTask); + } + + private void execute(String aQueue, Task aTask) + throws InterruptedException { + + getExecutor(aQueue).execute(aTask); + } + + public void begin() { + } + + public void halt() { + synchronized(theExecutors) { + Iterator myExecs = theExecutors.values().iterator(); + + while (myExecs.hasNext()) { + PooledExecutor myExec = (PooledExecutor) myExecs.next(); + myExec.shutdownNow(); + } + } + } + + private PooledExecutor getExecutor(String aName) { + PooledExecutor myExec; + + synchronized(theExecutors) { + myExec = (PooledExecutor) theExecutors.get(aName); + + if (myExec == null) { + + BoundedLinkedQueue myQueue; + + if (TASK_QUEUE_BOUND == 0) { + theLogger.log(Level.INFO, + "Creating task pool with no bounds"); + + + myQueue = new BoundedLinkedQueue(Integer.MAX_VALUE); + myExec = new PooledExecutor(myQueue, MAX_TASK_THREADS); + } else { + theLogger.log(Level.INFO, + "Creating task pool with bounds: " + + TASK_QUEUE_BOUND); + + myQueue = new BoundedLinkedQueue(TASK_QUEUE_BOUND); + myExec = + new PooledExecutor(myQueue, MAX_TASK_THREADS); + } + + myExec.setMinimumPoolSize(MAX_TASK_THREADS); + // myExec.waitWhenBlocked(); + + StatsBoard.get().add(new QueueStatGenerator(aName, myQueue)); + theExecutors.put(aName, myExec); + } + + return myExec; + } + } +}