diff src/org/dancres/blitz/txn/batch/ConcurrentWriteBatcher.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children 4580bb12db30
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/org/dancres/blitz/txn/batch/ConcurrentWriteBatcher.java	Sat Mar 21 11:00:06 2009 +0000
@@ -0,0 +1,165 @@
+package org.dancres.blitz.txn.batch;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import java.util.ArrayList;
+
+import java.util.logging.Level;
+
+import org.prevayler.Prevayler;
+import org.prevayler.PrevalentSystem;
+import org.prevayler.Command;
+
+import org.prevayler.implementation.SnapshotPrevayler;
+import org.prevayler.implementation.Snapshotter;
+
+/**
+   <p>Batches commands issued in the same time period together in an attempt
+   to improve log throughput via Prevayler.</p>
+
+   <p>Batch-writing can reduce the number of forced flushes to disk whilst
+   increasing the amount of data written with each flush.  This has a
+   positive effect on throughput under concurrent load.</p>
+   
+   <p>Time is specified in ms, first thread into barrier waits this amount
+   of time for other writers.  Other writers entering the barrier are
+   now blocked until the first entrant commits all writes to log.  When first
+   thread awakes, all writes are done as a group followed by a single sync.</p>
+
+   <p>Unlike <code>WriteBatcher</code>, this version supports more concurrency
+   allowing for queuing of the next batch of log entries whilst the current
+   bunch is being written.</p>
+ */
+public class ConcurrentWriteBatcher implements SnapshotPrevayler {
+    private SnapshotPrevayler thePrevayler;
+
+    private boolean amFirst = true;
+
+    private ArrayList theWrites = new ArrayList();
+
+    // Might be able to buff up to 60ms which gives average of 30 but
+    // we'll see.
+    private long theWindowTime = 20;
+
+    public ConcurrentWriteBatcher(SnapshotPrevayler aPrevayler,
+        long aWindowTime) {
+        
+        thePrevayler = aPrevayler;
+        theWindowTime = aWindowTime;
+    }
+
+    /**
+     * Returns the underlying PrevalentSystem.
+     */
+    public PrevalentSystem system() {
+        return thePrevayler.system();
+    }
+
+    public Serializable executeCommand(Command aCommand) throws Exception {
+        return write(aCommand);
+    }
+
+    public Serializable executeCommand(Command aCommand, boolean sync)
+        throws Exception {
+        return write(aCommand);
+    }
+
+    private Serializable write(Command aComm) throws Exception {
+        WriteRequest myReq = new WriteRequest(aComm);
+
+        boolean wasFirst = false;
+
+        WriteRequest[] myRequests  = null;
+
+        synchronized(this) {
+            if (amFirst) {
+                theWrites.add(myReq);
+                amFirst = false;
+                wasFirst = true;
+
+                try {
+                    wait(theWindowTime);
+                } catch (InterruptedException anIE) {
+                }
+
+                myRequests = new WriteRequest[theWrites.size()];
+                myRequests = (WriteRequest[]) theWrites.toArray(myRequests);
+
+                theWrites.clear();
+                amFirst = true;
+
+            } else {
+                theWrites.add(myReq);
+            }
+        }
+
+        if (wasFirst) {
+            flushAll(myRequests);
+        }
+
+        return myReq.getResult();
+    }
+
+	public Snapshotter takeSnapshot() throws IOException {
+        return thePrevayler.takeSnapshot();
+    }
+
+    private void flushAll(WriteRequest[] aRequests) {
+        int myLast = aRequests.length - 1;
+
+        if (WriteBatcher.theLogger.isLoggable(Level.FINE))
+            WriteBatcher.theLogger.log(Level.FINE,
+                                       "Flushing " +
+                                       aRequests.length + " to log");
+
+        // Do all writes, last with sync
+        for (int i = 0; i < aRequests.length; i++) {
+            WriteRequest myReq = aRequests[i];
+
+            myReq.execute(thePrevayler, (i == myLast));                
+        }
+    }
+
+    private static class WriteRequest {
+        private Command theCommand;
+
+        private Exception theException;
+        private Serializable theResult;
+
+        private boolean isDone;
+
+        WriteRequest(Command aCommand) {
+            theCommand = aCommand;
+        }
+
+        void execute(Prevayler aPrev, boolean doSync) {
+            try {
+                theResult = aPrev.executeCommand(theCommand, doSync);
+            } catch (Exception anE) {
+                theException = anE;
+            } finally {
+                synchronized(this) {
+                    isDone = true;
+                    notify();
+                }
+            }
+        }
+
+        Serializable getResult() throws Exception{
+            synchronized(this) {
+                while (!isDone) {
+                    try {
+                        wait();
+                    } catch (InterruptedException anIE) {
+                    }
+                }
+
+                if (theException != null)
+                    throw theException;
+                else
+                    return theResult;
+            }
+        }
+    }
+}