Mercurial > hg > blitz_condensed
diff src/org/dancres/blitz/disk/WriteDaemon.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/org/dancres/blitz/disk/WriteDaemon.java Sat Mar 21 11:00:06 2009 +0000 @@ -0,0 +1,276 @@ +package org.dancres.blitz.disk; + +import java.util.logging.*; + +import net.jini.config.ConfigurationException; + +import EDU.oswego.cs.dl.util.concurrent.LinkedQueue; +import EDU.oswego.cs.dl.util.concurrent.PooledExecutor; + +import org.dancres.blitz.Logging; + +import org.dancres.blitz.ActiveObject; +import org.dancres.blitz.ActiveObjectRegistry; + +import org.dancres.blitz.config.ConfigurationFactory; + +import org.dancres.blitz.task.Tasks; +import org.dancres.blitz.task.Task; + +import org.dancres.blitz.stats.StatGenerator; +import org.dancres.blitz.stats.Stat; +import org.dancres.blitz.stats.StatsBoard; +import org.dancres.blitz.stats.IOStat; + +/** + <p> Certain elements of Blitz (such as ArcCache) require all writes to be + asynchronous to avoid deadlock situations and reduce time spent waiting for + I/O completion by a client thread. WriteDaemon provides the asynchronous + I/O infrastructure to satisfy such requirements. </p> + + <p> We stop when Disk is stop'd which allows for proper sync'ing </p> + + <p>We can only have one thread for writing at this moment because: + <ol> + <li>WriteBuffer assumes serialized updating of images. If two threads + start performing jobs on the same UID, then there's currently scope for + a collision. The problem is compounded by the fact that we can't simply + requeue the job because that will break checkpointing/sync'ing</li> + <li>Sync'ing assumes that there's only one thread which will execute the + appropriate callback placed in the queue once all prior Jobs have been + dispatched. This ordering dependency is what complicates the WriteBuffer + issue above.</li> + <li>There's an ordering issue - we must write before we decide to then + delete something that is on disk.</li> + </ol> + </p> + + @see org.dancres.blitz.arc.ArcCache +*/ +public class WriteDaemon implements StatGenerator { + static Logger theLogger = + Logging.newLogger("org.dancres.blitz.disk.WriteDaemon"); + + private static int MAX_WRITE_THREADS; + private static int THREAD_KEEPALIVE; + private static int DESIRED_PENDING_WRITES; + private static int THROTTLE_PENDING_WRITES; + private static long THROTTLE_PAUSE; + + static { + try { + MAX_WRITE_THREADS = ((Integer) + ConfigurationFactory.getEntry("maxWriteThreads", + int.class, + new Integer(5))).intValue(); + THREAD_KEEPALIVE = ((Integer) + ConfigurationFactory.getEntry("threadKeepalive", + int.class, + new Integer(60000))).intValue(); + DESIRED_PENDING_WRITES = ((Integer) + ConfigurationFactory.getEntry("desiredPendingWrites", + int.class, + new Integer(20))).intValue(); + + THROTTLE_PENDING_WRITES = ((Integer) + ConfigurationFactory.getEntry("throttlePendingWrites", + int.class, + new Integer(Integer.MAX_VALUE))).intValue(); + THROTTLE_PAUSE = ((Long) + ConfigurationFactory.getEntry("throttlePause", + long.class, + new Long(50))).longValue(); + } catch (ConfigurationException aCE) { + } + } + + private static WriteDaemon theDaemon = new WriteDaemon(); + + private int thePendingCount; + private LinkedQueue thePendingUpdates = new LinkedQueue(); + + private LinkedQueue theAsyncUpdates; + private PooledExecutor theWriters; + + private LinkedQueue theCompletions; + private PooledExecutor theCompleters; + + private IOStats theIOStats = new IOStats(); + + private long theStatId = StatGenerator.UNSET_ID; + private int theThrottleCount = 0; + + private WriteDaemon() { + theAsyncUpdates = new LinkedQueue(); + theCompletions = new LinkedQueue(); + + theWriters = new PooledExecutor(theAsyncUpdates, MAX_WRITE_THREADS); + theCompleters = new PooledExecutor(theCompletions, 1); + + theLogger.log(Level.INFO, "Async keepalive: " + THREAD_KEEPALIVE); + theLogger.log(Level.INFO, "Pending write size: " + + DESIRED_PENDING_WRITES); + theLogger.log(Level.INFO, "Throttle write size: " + + THROTTLE_PENDING_WRITES); + theLogger.log(Level.INFO, "Throttle pause: " + + THROTTLE_PAUSE); + + theWriters.setKeepAliveTime(THREAD_KEEPALIVE); + theCompleters.setKeepAliveTime(THREAD_KEEPALIVE); + + StatsBoard.get().add(this); + } + + public static WriteDaemon get() { + return theDaemon; + } + + /** + Queue a task for execution by the WriteDaemon thread. + */ + public void queue(Runnable anUpdate) { + + // If the write queue is getting too large, start stalling + if (theIOStats.getQueueSize() > THROTTLE_PENDING_WRITES) { + ++theThrottleCount; + + theLogger.log(Level.WARNING, + "Write queue overflowing - THROTTLING"); + + try { + Thread.sleep(THROTTLE_PAUSE); + } catch (InterruptedException anIE) { + theLogger.log(Level.SEVERE, "Throttle broken!"); + } + } + + synchronized(this) { + try { + thePendingUpdates.put(new OutputTracker(anUpdate)); + ++thePendingCount; + + if (thePendingCount >= DESIRED_PENDING_WRITES) { + pushImpl(); + } + } catch (InterruptedException anIE) { + theLogger.log(Level.SEVERE, "Failed to queue update", anIE); + } + } + + theIOStats.incAsyncInCount(); + } + + /** + Should only be called from within a sync block. + */ + private void pushImpl() { + Object myTask; + + try { + while ((myTask = thePendingUpdates.poll(0)) != null) { + theWriters.execute((Runnable) myTask); + } + } catch (InterruptedException anIE) { + } + + thePendingCount = 0; + } + + /** + Force the queue to be processed + */ + void push() { + synchronized(this) { + pushImpl(); + } + } + + /** + <p>Force the updates in the queue to disk. On completion, invoke the + passed task. Note this task is processed asynchronously outside + of the WriteDaemon thread. This allows the WriteDaemon to continue + processing updates whilst the completion task runs.</p> + + <p>If you want a task to be performed synchronously by the WriteDaemon + after other requests, <code>queue</code> the task and invoke + <code>push</code>.</p> + */ + void push(Task aCompletionTask) { + if (aCompletionTask == null) + throw new IllegalArgumentException(); + else { + synchronized(this) { + /* + We ensure the passed task doesn't execute until the + queue has been emptied by putting a barrier task at the end + of the queue. Thus, when we force the queue, this barrier + will only execute once all preceeding updates have been + performed. + */ + queue(new Scheduler(aCompletionTask)); + pushImpl(); + } + } + } + + void halt() { + theLogger.log(Level.INFO, "WriteDaemon doing halt"); + theWriters.shutdownAfterProcessingCurrentlyQueuedTasks(); + theCompleters.shutdownAfterProcessingCurrentlyQueuedTasks(); + theLogger.log(Level.INFO, "WriteDaemon done halt"); + } + + /** + We do not run the completion task in-line. This is potentially time + consuming and we have more important things to do (write updates) so + we palm the completion task off to a separate thread pool when we've + done the necessary work. + */ + private class Scheduler implements Runnable { + private Task theTask; + + Scheduler(Task aCompletionTask) { + theTask = aCompletionTask; + } + + public void run() { + try { + theCompleters.execute(theTask); + } catch (InterruptedException anIE) { + theLogger.log(Level.SEVERE, "Failed to scheduler sync completion task", anIE); + } + } + } + + private class OutputTracker implements Runnable { + private Runnable theWrite; + + OutputTracker(Runnable aRunnable) { + theWrite = aRunnable; + } + + public void run() { + theWrite.run(); + + theIOStats.incAsyncOutCount(); + } + } + + /** + @return the id of the StatGenerator that produced the stat + AdministrableStat.UNSET_ID if the id has never been set + */ + public long getId() { + return theStatId; + } + + public void setId(long anId) { + theStatId = anId; + } + + public Stat generate() { + return new IOStat(theStatId, theIOStats.getTimePerIn(), + theIOStats.getTimePerOut(), theIOStats.getInOutRatio(), + theIOStats.getQueueSize(), theThrottleCount); + } +}