diff src/org/dancres/blitz/txn/batch/OptimisticBatcher.java @ 32:a77f0a9ed93c 2.1.1

Add support for optimistic log batching.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 12 Jun 2010 10:42:31 +0100
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/org/dancres/blitz/txn/batch/OptimisticBatcher.java	Sat Jun 12 10:42:31 2010 +0100
@@ -0,0 +1,157 @@
+package org.dancres.blitz.txn.batch;
+
+import org.prevayler.Command;
+import org.prevayler.PrevalentSystem;
+import org.prevayler.implementation.PrevaylerCore;
+import org.prevayler.implementation.SnapshotPrevayler;
+import org.prevayler.implementation.Snapshotter;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class OptimisticBatcher implements SnapshotPrevayler {
+    private PrevaylerCore _prevayler;
+    private boolean _writing = false;
+    private ArrayList<WriteRequest> _writes = new ArrayList<WriteRequest>();
+
+    // private AtomicInteger _inCount = new AtomicInteger();
+    // private AtomicInteger _outCount = new AtomicInteger();
+
+    public OptimisticBatcher(PrevaylerCore aPrevayler) {
+        _prevayler = aPrevayler;
+    }
+
+    public PrevalentSystem system() {
+        return _prevayler.system();
+    }
+
+    public Serializable executeCommand(Command aCommand) throws Exception {
+        return write(aCommand, true);
+    }
+
+    public Serializable executeCommand(Command aCommand, boolean sync)
+        throws Exception {
+        return write(aCommand, sync);
+    }
+
+    private Serializable write(Command aComm, boolean sync) throws Exception {
+        boolean someoneWriting = false;
+        WriteRequest myReq = null;
+
+        synchronized(this) {
+            someoneWriting = _writing;
+
+            // If someone is already writing, we add to their queue of work
+            //
+            if (_writing) {
+                myReq = new WriteRequest(aComm);
+                _writes.add(myReq);
+            } else {
+                _writing = true;
+            }
+        }
+
+        // If we are waiting on someone's queue
+        //
+        if (someoneWriting) {
+            // If we want to wait until the log is flushed
+            //
+            if (sync)
+                myReq.await();
+
+            return aComm.execute(_prevayler.system());
+        } else {
+            // We are handling the write queue, write our stuff now
+            //
+            _prevayler.logCommand(aComm, false);
+
+            ArrayList<WriteRequest> myAllWrites = new ArrayList<WriteRequest>();
+
+            // While there other writes scoop them up and write them
+            //
+            ArrayList<WriteRequest> myBuffer = new ArrayList<WriteRequest>();
+
+            while (haveWrites()) {
+                synchronized(this) {
+                    myBuffer.clear();
+                    myBuffer.addAll(_writes);
+                    _writes.clear();
+                }
+
+                Iterator<WriteRequest> myWrites = myBuffer.iterator();
+                while (myWrites.hasNext())
+                    _prevayler.logCommand(myWrites.next().getCommand(), false);
+
+                myAllWrites.addAll(myBuffer);
+            }
+
+            // Now dispatch execution of all logged commands - execute our own first
+            //
+            try {
+                return aComm.execute(_prevayler.system());
+            } finally {
+                Iterator<WriteRequest> myTargets = myAllWrites.iterator();
+                while (myTargets.hasNext()) {
+                    // _outCount.incrementAndGet();
+                    myTargets.next().dispatch();
+                }
+
+                // System.out.println("Logger: incount = " + _inCount + " outcount = " + _outCount);
+            }
+        }
+    }
+
+    private boolean haveWrites() throws Exception {
+        synchronized(this) {
+            if (_writes.size() > 0)
+                return true;
+            else {
+                _writing = false;
+                _prevayler.flush();
+                
+                return false;
+            }
+        }
+    }
+    
+    public Snapshotter takeSnapshot() throws IOException {
+        return _prevayler.takeSnapshot();
+    }
+
+    private class WriteRequest {
+        private Command _comm;
+        private Object _lock = new Object();
+        private boolean _exit = false;
+
+        WriteRequest(Command aComm) {
+            // _inCount.incrementAndGet();
+            _comm = aComm;
+        }
+
+        Command getCommand() {
+            return _comm;
+        }
+
+        void dispatch() {
+            synchronized(_lock) {
+                _exit = true;
+                _lock.notify();
+            }
+        }
+
+        void await() {
+            synchronized(_lock) {
+                while (! _exit) {
+                    try {
+                        _lock.wait();
+                    } catch (InterruptedException anIE) {
+
+                    }
+                }
+            }
+        }
+    }
+}