comparison src/org/dancres/blitz/txn/batch/WriteBatcher.java @ 17:4580bb12db30

Separate command execution from command logging.
author Dan Creswell <dan.creswell@gmail.com>
date Sun, 05 Jul 2009 16:26:25 +0100
parents 3dc0c5604566
children
comparison
equal deleted inserted replaced
16:46ac1a45718a 17:4580bb12db30
14 14
15 import org.prevayler.implementation.SnapshotPrevayler; 15 import org.prevayler.implementation.SnapshotPrevayler;
16 import org.prevayler.implementation.Snapshotter; 16 import org.prevayler.implementation.Snapshotter;
17 17
18 import org.dancres.blitz.Logging; 18 import org.dancres.blitz.Logging;
19 import org.prevayler.implementation.PrevaylerCore;
19 20
20 /** 21 /**
21 <p>Batches commands issued in the same time period together in an attempt 22 <p>Batches commands issued in the same time period together in an attempt
22 to improve log throughput via Prevayler.</p> 23 to improve log throughput via Prevayler.</p>
23 24
32 */ 33 */
33 public class WriteBatcher implements SnapshotPrevayler { 34 public class WriteBatcher implements SnapshotPrevayler {
34 static Logger theLogger = 35 static Logger theLogger =
35 Logging.newLogger("org.dancres.blitz.txn.LogBatcher"); 36 Logging.newLogger("org.dancres.blitz.txn.LogBatcher");
36 37
37 private SnapshotPrevayler thePrevayler; 38 private PrevaylerCore thePrevayler;
38 39
39 private boolean amFirst = true; 40 private boolean amFirst = true;
40 41
41 private ArrayList theWrites = new ArrayList(); 42 private ArrayList theWrites = new ArrayList();
42 43
43 // Might be able to buff up to 60ms which gives average of 30 but 44 // Might be able to buff up to 60ms which gives average of 30 but
44 // we'll see. 45 // we'll see.
45 private long theWindowTime = 20; 46 private long theWindowTimeMs = 0;
47 private int theWindowTimeNs = 0;
46 48
47 public WriteBatcher(SnapshotPrevayler aPrevayler, long aWindowTime) { 49 public WriteBatcher(PrevaylerCore aPrevayler, long aWindowTimeMs, int aWindowTimeNs) {
48 thePrevayler = aPrevayler; 50 thePrevayler = aPrevayler;
49 theWindowTime = aWindowTime; 51 theWindowTimeMs = aWindowTimeMs;
52 theWindowTimeNs = aWindowTimeNs;
50 } 53 }
51 54
52 /** 55 /**
53 * Returns the underlying PrevalentSystem. 56 * Returns the underlying PrevalentSystem.
54 */ 57 */
64 throws Exception { 67 throws Exception {
65 return write(aCommand); 68 return write(aCommand);
66 } 69 }
67 70
68 private Serializable write(Command aComm) throws Exception { 71 private Serializable write(Command aComm) throws Exception {
69 WriteRequest myReq = new WriteRequest(aComm); 72 WriteRequest myReq = new WriteRequest(system(), aComm);
70 73
71 synchronized(this) { 74 synchronized(this) {
72 if (amFirst) { 75 if (amFirst) {
73 theWrites.add(myReq); 76 theWrites.add(myReq);
74 amFirst = false; 77 amFirst = false;
75 78
76 try { 79 try {
77 wait(theWindowTime); 80 wait(theWindowTimeMs, theWindowTimeNs);
78 } catch (InterruptedException anIE) { 81 } catch (InterruptedException anIE) {
79 } 82 }
80 83
81 flushAll(); 84 flushAll();
82 amFirst = true; 85 amFirst = true;
111 theWrites.clear(); 114 theWrites.clear();
112 } 115 }
113 116
114 private static class WriteRequest { 117 private static class WriteRequest {
115 private Command theCommand; 118 private Command theCommand;
119 private PrevalentSystem theSystem;
116 120
117 private Exception theException; 121 private Exception theException;
118 private Serializable theResult;
119 122
120 private boolean isDone; 123 private boolean isDone;
121 124
122 WriteRequest(Command aCommand) { 125 WriteRequest(PrevalentSystem aSystem, Command aCommand) {
123 theCommand = aCommand; 126 theCommand = aCommand;
127 theSystem = aSystem;
124 } 128 }
125 129
126 void execute(Prevayler aPrev, boolean doSync) { 130 void execute(PrevaylerCore aPrev, boolean doSync) {
127 try { 131 try {
128 theResult = aPrev.executeCommand(theCommand, doSync); 132 aPrev.logCommand(theCommand, doSync);
129 } catch (Exception anE) { 133 } catch (Exception anE) {
130 theException = anE; 134 theException = anE;
131 } finally { 135 } finally {
132 synchronized(this) { 136 synchronized(this) {
133 isDone = true; 137 isDone = true;
146 } 150 }
147 151
148 if (theException != null) 152 if (theException != null)
149 throw theException; 153 throw theException;
150 else 154 else
151 return theResult; 155 return theCommand.execute(theSystem);
152 } 156 }
153 } 157 }
154 } 158 }
155 } 159 }