comparison src/org/dancres/blitz/txn/batch/ConcurrentWriteBatcher.java @ 17:4580bb12db30

Separate command execution from command logging.
author Dan Creswell <dan.creswell@gmail.com>
date Sun, 05 Jul 2009 16:26:25 +0100
parents 3dc0c5604566
children
comparison
equal deleted inserted replaced
16:46ac1a45718a 17:4580bb12db30
5 5
6 import java.util.ArrayList; 6 import java.util.ArrayList;
7 7
8 import java.util.logging.Level; 8 import java.util.logging.Level;
9 9
10 import org.prevayler.Prevayler;
11 import org.prevayler.PrevalentSystem; 10 import org.prevayler.PrevalentSystem;
12 import org.prevayler.Command; 11 import org.prevayler.Command;
13 12
14 import org.prevayler.implementation.SnapshotPrevayler; 13 import org.prevayler.implementation.SnapshotPrevayler;
14 import org.prevayler.implementation.PrevaylerCore;
15 import org.prevayler.implementation.Snapshotter; 15 import org.prevayler.implementation.Snapshotter;
16 16
17 /** 17 /**
18 <p>Batches commands issued in the same time period together in an attempt 18 <p>Batches commands issued in the same time period together in an attempt
19 to improve log throughput via Prevayler.</p> 19 to improve log throughput via Prevayler.</p>
30 <p>Unlike <code>WriteBatcher</code>, this version supports more concurrency 30 <p>Unlike <code>WriteBatcher</code>, this version supports more concurrency
31 allowing for queuing of the next batch of log entries whilst the current 31 allowing for queuing of the next batch of log entries whilst the current
32 bunch is being written.</p> 32 bunch is being written.</p>
33 */ 33 */
34 public class ConcurrentWriteBatcher implements SnapshotPrevayler { 34 public class ConcurrentWriteBatcher implements SnapshotPrevayler {
35 private SnapshotPrevayler thePrevayler; 35 private PrevaylerCore thePrevayler;
36 36
37 private boolean amFirst = true; 37 private boolean amFirst = true;
38 38
39 private ArrayList theWrites = new ArrayList(); 39 private ArrayList theWrites = new ArrayList();
40 40
41 // Might be able to buff up to 60ms which gives average of 30 but 41 // Might be able to buff up to 60ms which gives average of 30 but
42 // we'll see. 42 // we'll see.
43 private long theWindowTime = 20; 43 private long theWindowTimeMs = 20;
44 private int theWindowTimeNs = 0;
44 45
45 public ConcurrentWriteBatcher(SnapshotPrevayler aPrevayler, 46
46 long aWindowTime) { 47 public ConcurrentWriteBatcher(PrevaylerCore aPrevayler,
48 long aWindowTimeMs, int aWindowTimeNs) {
47 49
48 thePrevayler = aPrevayler; 50 thePrevayler = aPrevayler;
49 theWindowTime = aWindowTime; 51 theWindowTimeMs = aWindowTimeMs;
52 theWindowTimeNs = aWindowTimeNs;
50 } 53 }
51 54
52 /** 55 /**
53 * Returns the underlying PrevalentSystem. 56 * Returns the underlying PrevalentSystem.
54 */ 57 */
64 throws Exception { 67 throws Exception {
65 return write(aCommand); 68 return write(aCommand);
66 } 69 }
67 70
68 private Serializable write(Command aComm) throws Exception { 71 private Serializable write(Command aComm) throws Exception {
69 WriteRequest myReq = new WriteRequest(aComm); 72 WriteRequest myReq = new WriteRequest(aComm, system());
70 73
71 boolean wasFirst = false; 74 boolean wasFirst = false;
72 75
73 WriteRequest[] myRequests = null; 76 WriteRequest[] myRequests = null;
74 77
77 theWrites.add(myReq); 80 theWrites.add(myReq);
78 amFirst = false; 81 amFirst = false;
79 wasFirst = true; 82 wasFirst = true;
80 83
81 try { 84 try {
82 wait(theWindowTime); 85 wait(theWindowTimeMs, theWindowTimeNs);
83 } catch (InterruptedException anIE) { 86 } catch (InterruptedException anIE) {
84 } 87 }
85 88
86 myRequests = new WriteRequest[theWrites.size()]; 89 myRequests = new WriteRequest[theWrites.size()];
87 myRequests = (WriteRequest[]) theWrites.toArray(myRequests); 90 myRequests = (WriteRequest[]) theWrites.toArray(myRequests);
99 } 102 }
100 103
101 return myReq.getResult(); 104 return myReq.getResult();
102 } 105 }
103 106
104 public Snapshotter takeSnapshot() throws IOException { 107 public Snapshotter takeSnapshot() throws IOException {
105 return thePrevayler.takeSnapshot(); 108 return thePrevayler.takeSnapshot();
106 } 109 }
107 110
108 private void flushAll(WriteRequest[] aRequests) { 111 private void flushAll(WriteRequest[] aRequests) {
109 int myLast = aRequests.length - 1; 112 int myLast = aRequests.length - 1;
120 myReq.execute(thePrevayler, (i == myLast)); 123 myReq.execute(thePrevayler, (i == myLast));
121 } 124 }
122 } 125 }
123 126
124 private static class WriteRequest { 127 private static class WriteRequest {
128 private PrevalentSystem theSystem;
125 private Command theCommand; 129 private Command theCommand;
126 130
127 private Exception theException; 131 private Exception theException;
128 private Serializable theResult;
129 132
130 private boolean isDone; 133 private boolean isDone;
131 134
132 WriteRequest(Command aCommand) { 135 WriteRequest(Command aCommand, PrevalentSystem aSystem) {
133 theCommand = aCommand; 136 theCommand = aCommand;
137 theSystem = aSystem;
134 } 138 }
135 139
136 void execute(Prevayler aPrev, boolean doSync) { 140 void execute(PrevaylerCore aPrev, boolean doSync) {
137 try { 141 try {
138 theResult = aPrev.executeCommand(theCommand, doSync); 142 aPrev.logCommand(theCommand, doSync);
139 } catch (Exception anE) { 143 } catch (Exception anE) {
140 theException = anE; 144 theException = anE;
141 } finally { 145 } finally {
142 synchronized(this) { 146 synchronized(this) {
143 isDone = true; 147 isDone = true;
152 try { 156 try {
153 wait(); 157 wait();
154 } catch (InterruptedException anIE) { 158 } catch (InterruptedException anIE) {
155 } 159 }
156 } 160 }
161 }
157 162
158 if (theException != null) 163 if (theException != null)
159 throw theException; 164 throw theException;
160 else 165 else {
161 return theResult; 166 Serializable myResult = theCommand.execute(theSystem);
167 return myResult;
162 } 168 }
163 } 169 }
164 } 170 }
165 } 171 }