view src/org/dancres/blitz/disk/WriteDaemon.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
line wrap: on
line source

package org.dancres.blitz.disk;

import java.util.logging.*;

import net.jini.config.ConfigurationException;

import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;

import org.dancres.blitz.Logging;

import org.dancres.blitz.ActiveObject;
import org.dancres.blitz.ActiveObjectRegistry;

import org.dancres.blitz.config.ConfigurationFactory;

import org.dancres.blitz.task.Tasks;
import org.dancres.blitz.task.Task;

import org.dancres.blitz.stats.StatGenerator;
import org.dancres.blitz.stats.Stat;
import org.dancres.blitz.stats.StatsBoard;
import org.dancres.blitz.stats.IOStat;

/**
   <p> Certain elements of Blitz (such as ArcCache) require all writes to be
   asynchronous to avoid deadlock situations and reduce time spent waiting for
   I/O completion by a client thread.  WriteDaemon provides the asynchronous
   I/O infrastructure to satisfy such requirements. </p>

   <p> We stop when Disk is stop'd which allows for proper sync'ing </p>

   <p>We can only have one thread for writing at this moment because:
   <ol>
   <li>WriteBuffer assumes serialized updating of images.  If two threads
   start performing jobs on the same UID, then there's currently scope for
   a collision.  The problem is compounded by the fact that we can't simply
   requeue the job because that will break checkpointing/sync'ing</li>
   <li>Sync'ing assumes that there's only one thread which will execute the
   appropriate callback placed in the queue once all prior Jobs have been
   dispatched.  This ordering dependency is what complicates the WriteBuffer
   issue above.</li>
   <li>There's an ordering issue - we must write before we decide to then
   delete something that is on disk.</li>
   </ol>
   </p>

   @see org.dancres.blitz.arc.ArcCache
*/
public class WriteDaemon implements StatGenerator {
    static Logger theLogger =
        Logging.newLogger("org.dancres.blitz.disk.WriteDaemon");

    private static int MAX_WRITE_THREADS;
    private static int THREAD_KEEPALIVE;
    private static int DESIRED_PENDING_WRITES;
    private static int THROTTLE_PENDING_WRITES;
    private static long THROTTLE_PAUSE;

    static {
        try {
            MAX_WRITE_THREADS = ((Integer)
                ConfigurationFactory.getEntry("maxWriteThreads", 
                                              int.class,
                                              new Integer(5))).intValue();
            THREAD_KEEPALIVE = ((Integer)
                ConfigurationFactory.getEntry("threadKeepalive", 
                                              int.class,
                                              new Integer(60000))).intValue();
            DESIRED_PENDING_WRITES = ((Integer)
                ConfigurationFactory.getEntry("desiredPendingWrites", 
                                              int.class,
                                              new Integer(20))).intValue();

            THROTTLE_PENDING_WRITES = ((Integer)
                ConfigurationFactory.getEntry("throttlePendingWrites", 
                                              int.class,
                                              new Integer(Integer.MAX_VALUE))).intValue();
            THROTTLE_PAUSE = ((Long)
                ConfigurationFactory.getEntry("throttlePause", 
                                              long.class,
                                              new Long(50))).longValue();
        } catch (ConfigurationException aCE) {
        }
    }

    private static WriteDaemon theDaemon = new WriteDaemon();

    private int thePendingCount;
    private LinkedQueue thePendingUpdates = new LinkedQueue();

    private LinkedQueue theAsyncUpdates;
    private PooledExecutor theWriters;

    private LinkedQueue theCompletions;
    private PooledExecutor theCompleters;

    private IOStats theIOStats = new IOStats();

    private long theStatId = StatGenerator.UNSET_ID;
    private int theThrottleCount = 0;

    private WriteDaemon() {
        theAsyncUpdates = new LinkedQueue();
        theCompletions = new LinkedQueue();

        theWriters = new PooledExecutor(theAsyncUpdates, MAX_WRITE_THREADS);
        theCompleters = new PooledExecutor(theCompletions, 1);

        theLogger.log(Level.INFO, "Async keepalive: " + THREAD_KEEPALIVE);
        theLogger.log(Level.INFO, "Pending write size: " +
                      DESIRED_PENDING_WRITES);
        theLogger.log(Level.INFO, "Throttle write size: " + 
                      THROTTLE_PENDING_WRITES);
        theLogger.log(Level.INFO, "Throttle pause: " + 
                      THROTTLE_PAUSE);

        theWriters.setKeepAliveTime(THREAD_KEEPALIVE);
        theCompleters.setKeepAliveTime(THREAD_KEEPALIVE);

        StatsBoard.get().add(this);
    }

    public static WriteDaemon get() {
        return theDaemon;
    }

    /**
       Queue a task for execution by the WriteDaemon thread.
     */
    public void queue(Runnable anUpdate) {

        // If the write queue is getting too large, start stalling
        if (theIOStats.getQueueSize() > THROTTLE_PENDING_WRITES) {
            ++theThrottleCount;

            theLogger.log(Level.WARNING,
                "Write queue overflowing - THROTTLING");

            try {
                Thread.sleep(THROTTLE_PAUSE);
            } catch (InterruptedException anIE) {
                theLogger.log(Level.SEVERE, "Throttle broken!");
            }
        }

        synchronized(this) {
            try {
                thePendingUpdates.put(new OutputTracker(anUpdate));
                ++thePendingCount;

                if (thePendingCount >= DESIRED_PENDING_WRITES) {
                    pushImpl();
                }
            } catch (InterruptedException anIE) {
                theLogger.log(Level.SEVERE, "Failed to queue update", anIE);
            }
        }

        theIOStats.incAsyncInCount();
    }

    /**
       Should only be called from within a sync block.
     */
    private void pushImpl() {
        Object myTask;

        try {
            while ((myTask = thePendingUpdates.poll(0)) != null) {
                theWriters.execute((Runnable) myTask);
            }
        } catch (InterruptedException anIE) {
        }
        
        thePendingCount = 0;
    }

    /**
       Force the queue to be processed
     */
    void push() {
        synchronized(this) {
            pushImpl();
        }
    }

    /**
       <p>Force the updates in the queue to disk.  On completion, invoke the
       passed task.  Note this task is processed asynchronously outside
       of the WriteDaemon thread.  This allows the WriteDaemon to continue
       processing updates whilst the completion task runs.</p>

       <p>If you want a task to be performed synchronously by the WriteDaemon
       after other requests, <code>queue</code> the task and invoke
       <code>push</code>.</p>
     */
    void push(Task aCompletionTask) {
        if (aCompletionTask == null)
            throw new IllegalArgumentException();
        else {
            synchronized(this) {
                /*
                  We ensure the passed task doesn't execute until the
                  queue has been emptied by putting a barrier task at the end 
                  of the queue.  Thus, when we force the queue, this barrier
                  will only execute once all preceeding updates have been
                  performed.
                 */
                queue(new Scheduler(aCompletionTask));
                pushImpl();
            }
        }
    }

    void halt() {
        theLogger.log(Level.INFO, "WriteDaemon doing halt");
        theWriters.shutdownAfterProcessingCurrentlyQueuedTasks();
        theCompleters.shutdownAfterProcessingCurrentlyQueuedTasks();
        theLogger.log(Level.INFO, "WriteDaemon done halt");
    }

    /**
       We do not run the completion task in-line.  This is potentially time
       consuming and we have more important things to do (write updates) so
       we palm the completion task off to a separate thread pool when we've
       done the necessary work.
     */
    private class Scheduler implements Runnable {
        private Task theTask;

        Scheduler(Task aCompletionTask) {
            theTask = aCompletionTask;
        }

        public void run() {
            try {
                theCompleters.execute(theTask);
            } catch (InterruptedException anIE) {
                theLogger.log(Level.SEVERE, "Failed to scheduler sync completion task", anIE);
            }
        }
    }

    private class OutputTracker implements Runnable {
        private Runnable theWrite;

        OutputTracker(Runnable aRunnable) {
            theWrite = aRunnable;
        }

        public void run() {
            theWrite.run();

            theIOStats.incAsyncOutCount();
        }
    }

    /**
       @return the id of the StatGenerator that produced the stat
       AdministrableStat.UNSET_ID if the id has never been set
     */
    public long getId() {
        return theStatId;
    }

    public void setId(long anId) {
        theStatId = anId;
    }

    public Stat generate() {
        return new IOStat(theStatId, theIOStats.getTimePerIn(),
            theIOStats.getTimePerOut(), theIOStats.getInOutRatio(),
            theIOStats.getQueueSize(), theThrottleCount);
    }
}