Mercurial > hg > blitz_condensed
view src/org/dancres/blitz/disk/WriteDaemon.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
line wrap: on
line source
package org.dancres.blitz.disk; import java.util.logging.*; import net.jini.config.ConfigurationException; import EDU.oswego.cs.dl.util.concurrent.LinkedQueue; import EDU.oswego.cs.dl.util.concurrent.PooledExecutor; import org.dancres.blitz.Logging; import org.dancres.blitz.ActiveObject; import org.dancres.blitz.ActiveObjectRegistry; import org.dancres.blitz.config.ConfigurationFactory; import org.dancres.blitz.task.Tasks; import org.dancres.blitz.task.Task; import org.dancres.blitz.stats.StatGenerator; import org.dancres.blitz.stats.Stat; import org.dancres.blitz.stats.StatsBoard; import org.dancres.blitz.stats.IOStat; /** <p> Certain elements of Blitz (such as ArcCache) require all writes to be asynchronous to avoid deadlock situations and reduce time spent waiting for I/O completion by a client thread. WriteDaemon provides the asynchronous I/O infrastructure to satisfy such requirements. </p> <p> We stop when Disk is stop'd which allows for proper sync'ing </p> <p>We can only have one thread for writing at this moment because: <ol> <li>WriteBuffer assumes serialized updating of images. If two threads start performing jobs on the same UID, then there's currently scope for a collision. The problem is compounded by the fact that we can't simply requeue the job because that will break checkpointing/sync'ing</li> <li>Sync'ing assumes that there's only one thread which will execute the appropriate callback placed in the queue once all prior Jobs have been dispatched. This ordering dependency is what complicates the WriteBuffer issue above.</li> <li>There's an ordering issue - we must write before we decide to then delete something that is on disk.</li> </ol> </p> @see org.dancres.blitz.arc.ArcCache */ public class WriteDaemon implements StatGenerator { static Logger theLogger = Logging.newLogger("org.dancres.blitz.disk.WriteDaemon"); private static int MAX_WRITE_THREADS; private static int THREAD_KEEPALIVE; private static int DESIRED_PENDING_WRITES; private static int THROTTLE_PENDING_WRITES; private static long THROTTLE_PAUSE; static { try { MAX_WRITE_THREADS = ((Integer) ConfigurationFactory.getEntry("maxWriteThreads", int.class, new Integer(5))).intValue(); THREAD_KEEPALIVE = ((Integer) ConfigurationFactory.getEntry("threadKeepalive", int.class, new Integer(60000))).intValue(); DESIRED_PENDING_WRITES = ((Integer) ConfigurationFactory.getEntry("desiredPendingWrites", int.class, new Integer(20))).intValue(); THROTTLE_PENDING_WRITES = ((Integer) ConfigurationFactory.getEntry("throttlePendingWrites", int.class, new Integer(Integer.MAX_VALUE))).intValue(); THROTTLE_PAUSE = ((Long) ConfigurationFactory.getEntry("throttlePause", long.class, new Long(50))).longValue(); } catch (ConfigurationException aCE) { } } private static WriteDaemon theDaemon = new WriteDaemon(); private int thePendingCount; private LinkedQueue thePendingUpdates = new LinkedQueue(); private LinkedQueue theAsyncUpdates; private PooledExecutor theWriters; private LinkedQueue theCompletions; private PooledExecutor theCompleters; private IOStats theIOStats = new IOStats(); private long theStatId = StatGenerator.UNSET_ID; private int theThrottleCount = 0; private WriteDaemon() { theAsyncUpdates = new LinkedQueue(); theCompletions = new LinkedQueue(); theWriters = new PooledExecutor(theAsyncUpdates, MAX_WRITE_THREADS); theCompleters = new PooledExecutor(theCompletions, 1); theLogger.log(Level.INFO, "Async keepalive: " + THREAD_KEEPALIVE); theLogger.log(Level.INFO, "Pending write size: " + DESIRED_PENDING_WRITES); theLogger.log(Level.INFO, "Throttle write size: " + THROTTLE_PENDING_WRITES); theLogger.log(Level.INFO, "Throttle pause: " + THROTTLE_PAUSE); theWriters.setKeepAliveTime(THREAD_KEEPALIVE); theCompleters.setKeepAliveTime(THREAD_KEEPALIVE); StatsBoard.get().add(this); } public static WriteDaemon get() { return theDaemon; } /** Queue a task for execution by the WriteDaemon thread. */ public void queue(Runnable anUpdate) { // If the write queue is getting too large, start stalling if (theIOStats.getQueueSize() > THROTTLE_PENDING_WRITES) { ++theThrottleCount; theLogger.log(Level.WARNING, "Write queue overflowing - THROTTLING"); try { Thread.sleep(THROTTLE_PAUSE); } catch (InterruptedException anIE) { theLogger.log(Level.SEVERE, "Throttle broken!"); } } synchronized(this) { try { thePendingUpdates.put(new OutputTracker(anUpdate)); ++thePendingCount; if (thePendingCount >= DESIRED_PENDING_WRITES) { pushImpl(); } } catch (InterruptedException anIE) { theLogger.log(Level.SEVERE, "Failed to queue update", anIE); } } theIOStats.incAsyncInCount(); } /** Should only be called from within a sync block. */ private void pushImpl() { Object myTask; try { while ((myTask = thePendingUpdates.poll(0)) != null) { theWriters.execute((Runnable) myTask); } } catch (InterruptedException anIE) { } thePendingCount = 0; } /** Force the queue to be processed */ void push() { synchronized(this) { pushImpl(); } } /** <p>Force the updates in the queue to disk. On completion, invoke the passed task. Note this task is processed asynchronously outside of the WriteDaemon thread. This allows the WriteDaemon to continue processing updates whilst the completion task runs.</p> <p>If you want a task to be performed synchronously by the WriteDaemon after other requests, <code>queue</code> the task and invoke <code>push</code>.</p> */ void push(Task aCompletionTask) { if (aCompletionTask == null) throw new IllegalArgumentException(); else { synchronized(this) { /* We ensure the passed task doesn't execute until the queue has been emptied by putting a barrier task at the end of the queue. Thus, when we force the queue, this barrier will only execute once all preceeding updates have been performed. */ queue(new Scheduler(aCompletionTask)); pushImpl(); } } } void halt() { theLogger.log(Level.INFO, "WriteDaemon doing halt"); theWriters.shutdownAfterProcessingCurrentlyQueuedTasks(); theCompleters.shutdownAfterProcessingCurrentlyQueuedTasks(); theLogger.log(Level.INFO, "WriteDaemon done halt"); } /** We do not run the completion task in-line. This is potentially time consuming and we have more important things to do (write updates) so we palm the completion task off to a separate thread pool when we've done the necessary work. */ private class Scheduler implements Runnable { private Task theTask; Scheduler(Task aCompletionTask) { theTask = aCompletionTask; } public void run() { try { theCompleters.execute(theTask); } catch (InterruptedException anIE) { theLogger.log(Level.SEVERE, "Failed to scheduler sync completion task", anIE); } } } private class OutputTracker implements Runnable { private Runnable theWrite; OutputTracker(Runnable aRunnable) { theWrite = aRunnable; } public void run() { theWrite.run(); theIOStats.incAsyncOutCount(); } } /** @return the id of the StatGenerator that produced the stat AdministrableStat.UNSET_ID if the id has never been set */ public long getId() { return theStatId; } public void setId(long anId) { theStatId = anId; } public Stat generate() { return new IOStat(theStatId, theIOStats.getTimePerIn(), theIOStats.getTimePerOut(), theIOStats.getInOutRatio(), theIOStats.getQueueSize(), theThrottleCount); } }