Mercurial > hg > blitz_stable
comparison src/org/dancres/blitz/txn/batch/ConcurrentWriteBatcher.java @ 18:4580bb12db30
Separate command execution from command logging.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sun, 05 Jul 2009 16:26:25 +0100 |
parents | 3dc0c5604566 |
children |
comparison
equal
deleted
inserted
replaced
17:46ac1a45718a | 18:4580bb12db30 |
---|---|
5 | 5 |
6 import java.util.ArrayList; | 6 import java.util.ArrayList; |
7 | 7 |
8 import java.util.logging.Level; | 8 import java.util.logging.Level; |
9 | 9 |
10 import org.prevayler.Prevayler; | |
11 import org.prevayler.PrevalentSystem; | 10 import org.prevayler.PrevalentSystem; |
12 import org.prevayler.Command; | 11 import org.prevayler.Command; |
13 | 12 |
14 import org.prevayler.implementation.SnapshotPrevayler; | 13 import org.prevayler.implementation.SnapshotPrevayler; |
14 import org.prevayler.implementation.PrevaylerCore; | |
15 import org.prevayler.implementation.Snapshotter; | 15 import org.prevayler.implementation.Snapshotter; |
16 | 16 |
17 /** | 17 /** |
18 <p>Batches commands issued in the same time period together in an attempt | 18 <p>Batches commands issued in the same time period together in an attempt |
19 to improve log throughput via Prevayler.</p> | 19 to improve log throughput via Prevayler.</p> |
30 <p>Unlike <code>WriteBatcher</code>, this version supports more concurrency | 30 <p>Unlike <code>WriteBatcher</code>, this version supports more concurrency |
31 allowing for queuing of the next batch of log entries whilst the current | 31 allowing for queuing of the next batch of log entries whilst the current |
32 bunch is being written.</p> | 32 bunch is being written.</p> |
33 */ | 33 */ |
34 public class ConcurrentWriteBatcher implements SnapshotPrevayler { | 34 public class ConcurrentWriteBatcher implements SnapshotPrevayler { |
35 private SnapshotPrevayler thePrevayler; | 35 private PrevaylerCore thePrevayler; |
36 | 36 |
37 private boolean amFirst = true; | 37 private boolean amFirst = true; |
38 | 38 |
39 private ArrayList theWrites = new ArrayList(); | 39 private ArrayList theWrites = new ArrayList(); |
40 | 40 |
41 // Might be able to buff up to 60ms which gives average of 30 but | 41 // Might be able to buff up to 60ms which gives average of 30 but |
42 // we'll see. | 42 // we'll see. |
43 private long theWindowTime = 20; | 43 private long theWindowTimeMs = 20; |
44 private int theWindowTimeNs = 0; | |
44 | 45 |
45 public ConcurrentWriteBatcher(SnapshotPrevayler aPrevayler, | 46 |
46 long aWindowTime) { | 47 public ConcurrentWriteBatcher(PrevaylerCore aPrevayler, |
48 long aWindowTimeMs, int aWindowTimeNs) { | |
47 | 49 |
48 thePrevayler = aPrevayler; | 50 thePrevayler = aPrevayler; |
49 theWindowTime = aWindowTime; | 51 theWindowTimeMs = aWindowTimeMs; |
52 theWindowTimeNs = aWindowTimeNs; | |
50 } | 53 } |
51 | 54 |
52 /** | 55 /** |
53 * Returns the underlying PrevalentSystem. | 56 * Returns the underlying PrevalentSystem. |
54 */ | 57 */ |
64 throws Exception { | 67 throws Exception { |
65 return write(aCommand); | 68 return write(aCommand); |
66 } | 69 } |
67 | 70 |
68 private Serializable write(Command aComm) throws Exception { | 71 private Serializable write(Command aComm) throws Exception { |
69 WriteRequest myReq = new WriteRequest(aComm); | 72 WriteRequest myReq = new WriteRequest(aComm, system()); |
70 | 73 |
71 boolean wasFirst = false; | 74 boolean wasFirst = false; |
72 | 75 |
73 WriteRequest[] myRequests = null; | 76 WriteRequest[] myRequests = null; |
74 | 77 |
77 theWrites.add(myReq); | 80 theWrites.add(myReq); |
78 amFirst = false; | 81 amFirst = false; |
79 wasFirst = true; | 82 wasFirst = true; |
80 | 83 |
81 try { | 84 try { |
82 wait(theWindowTime); | 85 wait(theWindowTimeMs, theWindowTimeNs); |
83 } catch (InterruptedException anIE) { | 86 } catch (InterruptedException anIE) { |
84 } | 87 } |
85 | 88 |
86 myRequests = new WriteRequest[theWrites.size()]; | 89 myRequests = new WriteRequest[theWrites.size()]; |
87 myRequests = (WriteRequest[]) theWrites.toArray(myRequests); | 90 myRequests = (WriteRequest[]) theWrites.toArray(myRequests); |
99 } | 102 } |
100 | 103 |
101 return myReq.getResult(); | 104 return myReq.getResult(); |
102 } | 105 } |
103 | 106 |
104 public Snapshotter takeSnapshot() throws IOException { | 107 public Snapshotter takeSnapshot() throws IOException { |
105 return thePrevayler.takeSnapshot(); | 108 return thePrevayler.takeSnapshot(); |
106 } | 109 } |
107 | 110 |
108 private void flushAll(WriteRequest[] aRequests) { | 111 private void flushAll(WriteRequest[] aRequests) { |
109 int myLast = aRequests.length - 1; | 112 int myLast = aRequests.length - 1; |
120 myReq.execute(thePrevayler, (i == myLast)); | 123 myReq.execute(thePrevayler, (i == myLast)); |
121 } | 124 } |
122 } | 125 } |
123 | 126 |
124 private static class WriteRequest { | 127 private static class WriteRequest { |
128 private PrevalentSystem theSystem; | |
125 private Command theCommand; | 129 private Command theCommand; |
126 | 130 |
127 private Exception theException; | 131 private Exception theException; |
128 private Serializable theResult; | |
129 | 132 |
130 private boolean isDone; | 133 private boolean isDone; |
131 | 134 |
132 WriteRequest(Command aCommand) { | 135 WriteRequest(Command aCommand, PrevalentSystem aSystem) { |
133 theCommand = aCommand; | 136 theCommand = aCommand; |
137 theSystem = aSystem; | |
134 } | 138 } |
135 | 139 |
136 void execute(Prevayler aPrev, boolean doSync) { | 140 void execute(PrevaylerCore aPrev, boolean doSync) { |
137 try { | 141 try { |
138 theResult = aPrev.executeCommand(theCommand, doSync); | 142 aPrev.logCommand(theCommand, doSync); |
139 } catch (Exception anE) { | 143 } catch (Exception anE) { |
140 theException = anE; | 144 theException = anE; |
141 } finally { | 145 } finally { |
142 synchronized(this) { | 146 synchronized(this) { |
143 isDone = true; | 147 isDone = true; |
152 try { | 156 try { |
153 wait(); | 157 wait(); |
154 } catch (InterruptedException anIE) { | 158 } catch (InterruptedException anIE) { |
155 } | 159 } |
156 } | 160 } |
161 } | |
157 | 162 |
158 if (theException != null) | 163 if (theException != null) |
159 throw theException; | 164 throw theException; |
160 else | 165 else { |
161 return theResult; | 166 Serializable myResult = theCommand.execute(theSystem); |
167 return myResult; | |
162 } | 168 } |
163 } | 169 } |
164 } | 170 } |
165 } | 171 } |