view src/org/dancres/blitz/task/Tasks.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
line wrap: on
line source

package org.dancres.blitz.task;

import java.util.Iterator;
import java.util.HashMap;

import java.util.logging.Logger;
import java.util.logging.Level;

import net.jini.config.ConfigurationException;

import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue;
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
import EDU.oswego.cs.dl.util.concurrent.Channel;

import org.dancres.blitz.ActiveObject;
import org.dancres.blitz.ActiveObjectRegistry;
import org.dancres.blitz.Logging;
import org.dancres.blitz.stats.StatsBoard;

import org.dancres.blitz.config.ConfigurationFactory;
import org.dancres.blitz.util.QueueStatGenerator;

/**
   Various operations within the space must be handled in background.
   This class encapsulates a pool of threads which execute operations
   (Task instances) as they are queued. <P>

   It might, at first, appear to make sense to have separate pools for
   event notification, blocked call wakeups etc. in the belief that one can
   better control the balance of dispatch of, for example, notifies in
   comparison with searches.  <P>

   In reality, this won't work as each pool has a set of threads all with the
   same priority.  i.e.  They share whatever CPU is available in a manner
   driven by the number of tasks they must perform.  Thus, if one wishes
   to truly balance, say, notification rate against search wakeups, one
   must assign differing priorities to these <I>tasks</I> as opposed to
   <I>threads</I> to ensure CPU consumption is bounded and that, whichever
   tasks have priority, get to use the CPU first.
 */
public class Tasks implements ActiveObject {
    private static Logger theLogger =
        Logging.newLogger("org.dancres.blitz.task.Tasks");
    
    private static final String DEFAULT_QUEUE = "DefaultTask";

    private static Tasks theTasks = new Tasks();

    private static int MAX_TASK_THREADS;

    private static int TASK_QUEUE_BOUND;

    static {
        try {
            MAX_TASK_THREADS = ((Integer)
                ConfigurationFactory.getEntry("maxTaskThreads", 
                                              int.class,
                                              new Integer(10))).intValue();
            TASK_QUEUE_BOUND = ((Integer)
                ConfigurationFactory.getEntry("taskQueueBound", 
                                              int.class,
                                              new Integer(0))).intValue();

            theLogger.log(Level.INFO, "Maximum task threads: " +
                          MAX_TASK_THREADS);
            theLogger.log(Level.INFO, "Task queue bound: " +
                          TASK_QUEUE_BOUND);

        } catch (ConfigurationException aCE) {
            theLogger.log(Level.SEVERE, "Failed to load config", aCE);
        }
    }

    private HashMap theExecutors = new HashMap();

    private Tasks() {
        ActiveObjectRegistry.add(this);
    }

    public static void queue(Task aTask) throws InterruptedException {
        queue(DEFAULT_QUEUE, aTask);
    }

    public static void queue(String aQueue, 
                             Task aTask) throws InterruptedException {
        theTasks.execute(aQueue, aTask);
    }

    private void execute(String aQueue, Task aTask)
        throws InterruptedException {

        getExecutor(aQueue).execute(aTask);
    }

    public void begin() {
    }

    public void halt() {
        synchronized(theExecutors) {
            Iterator myExecs = theExecutors.values().iterator();

            while (myExecs.hasNext()) {
                PooledExecutor myExec = (PooledExecutor) myExecs.next();
                myExec.shutdownNow();
            }
        }
    }

    private PooledExecutor getExecutor(String aName) {
        PooledExecutor myExec;

        synchronized(theExecutors) {
            myExec = (PooledExecutor) theExecutors.get(aName);

            if (myExec == null) {

                BoundedLinkedQueue myQueue;

                if (TASK_QUEUE_BOUND == 0) {
                    theLogger.log(Level.INFO,
                                  "Creating task pool with no bounds");


                    myQueue = new BoundedLinkedQueue(Integer.MAX_VALUE);
                    myExec = new PooledExecutor(myQueue, MAX_TASK_THREADS);
                } else {
                    theLogger.log(Level.INFO,
                                  "Creating task pool with bounds: " +
                                  TASK_QUEUE_BOUND);

                    myQueue = new BoundedLinkedQueue(TASK_QUEUE_BOUND);
                    myExec =
                        new PooledExecutor(myQueue, MAX_TASK_THREADS);
                }

                myExec.setMinimumPoolSize(MAX_TASK_THREADS);
                // myExec.waitWhenBlocked();

                StatsBoard.get().add(new QueueStatGenerator(aName, myQueue));
                theExecutors.put(aName, myExec);
            }

            return myExec;
        }
    }
}