Mercurial > hg > blitz_condensed
comparison src/org/dancres/blitz/txn/batch/WriteBatcher.java @ 17:4580bb12db30
Separate command execution from command logging.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sun, 05 Jul 2009 16:26:25 +0100 |
parents | 3dc0c5604566 |
children |
comparison
equal
deleted
inserted
replaced
16:46ac1a45718a | 17:4580bb12db30 |
---|---|
14 | 14 |
15 import org.prevayler.implementation.SnapshotPrevayler; | 15 import org.prevayler.implementation.SnapshotPrevayler; |
16 import org.prevayler.implementation.Snapshotter; | 16 import org.prevayler.implementation.Snapshotter; |
17 | 17 |
18 import org.dancres.blitz.Logging; | 18 import org.dancres.blitz.Logging; |
19 import org.prevayler.implementation.PrevaylerCore; | |
19 | 20 |
20 /** | 21 /** |
21 <p>Batches commands issued in the same time period together in an attempt | 22 <p>Batches commands issued in the same time period together in an attempt |
22 to improve log throughput via Prevayler.</p> | 23 to improve log throughput via Prevayler.</p> |
23 | 24 |
32 */ | 33 */ |
33 public class WriteBatcher implements SnapshotPrevayler { | 34 public class WriteBatcher implements SnapshotPrevayler { |
34 static Logger theLogger = | 35 static Logger theLogger = |
35 Logging.newLogger("org.dancres.blitz.txn.LogBatcher"); | 36 Logging.newLogger("org.dancres.blitz.txn.LogBatcher"); |
36 | 37 |
37 private SnapshotPrevayler thePrevayler; | 38 private PrevaylerCore thePrevayler; |
38 | 39 |
39 private boolean amFirst = true; | 40 private boolean amFirst = true; |
40 | 41 |
41 private ArrayList theWrites = new ArrayList(); | 42 private ArrayList theWrites = new ArrayList(); |
42 | 43 |
43 // Might be able to buff up to 60ms which gives average of 30 but | 44 // Might be able to buff up to 60ms which gives average of 30 but |
44 // we'll see. | 45 // we'll see. |
45 private long theWindowTime = 20; | 46 private long theWindowTimeMs = 0; |
47 private int theWindowTimeNs = 0; | |
46 | 48 |
47 public WriteBatcher(SnapshotPrevayler aPrevayler, long aWindowTime) { | 49 public WriteBatcher(PrevaylerCore aPrevayler, long aWindowTimeMs, int aWindowTimeNs) { |
48 thePrevayler = aPrevayler; | 50 thePrevayler = aPrevayler; |
49 theWindowTime = aWindowTime; | 51 theWindowTimeMs = aWindowTimeMs; |
52 theWindowTimeNs = aWindowTimeNs; | |
50 } | 53 } |
51 | 54 |
52 /** | 55 /** |
53 * Returns the underlying PrevalentSystem. | 56 * Returns the underlying PrevalentSystem. |
54 */ | 57 */ |
64 throws Exception { | 67 throws Exception { |
65 return write(aCommand); | 68 return write(aCommand); |
66 } | 69 } |
67 | 70 |
68 private Serializable write(Command aComm) throws Exception { | 71 private Serializable write(Command aComm) throws Exception { |
69 WriteRequest myReq = new WriteRequest(aComm); | 72 WriteRequest myReq = new WriteRequest(system(), aComm); |
70 | 73 |
71 synchronized(this) { | 74 synchronized(this) { |
72 if (amFirst) { | 75 if (amFirst) { |
73 theWrites.add(myReq); | 76 theWrites.add(myReq); |
74 amFirst = false; | 77 amFirst = false; |
75 | 78 |
76 try { | 79 try { |
77 wait(theWindowTime); | 80 wait(theWindowTimeMs, theWindowTimeNs); |
78 } catch (InterruptedException anIE) { | 81 } catch (InterruptedException anIE) { |
79 } | 82 } |
80 | 83 |
81 flushAll(); | 84 flushAll(); |
82 amFirst = true; | 85 amFirst = true; |
111 theWrites.clear(); | 114 theWrites.clear(); |
112 } | 115 } |
113 | 116 |
114 private static class WriteRequest { | 117 private static class WriteRequest { |
115 private Command theCommand; | 118 private Command theCommand; |
119 private PrevalentSystem theSystem; | |
116 | 120 |
117 private Exception theException; | 121 private Exception theException; |
118 private Serializable theResult; | |
119 | 122 |
120 private boolean isDone; | 123 private boolean isDone; |
121 | 124 |
122 WriteRequest(Command aCommand) { | 125 WriteRequest(PrevalentSystem aSystem, Command aCommand) { |
123 theCommand = aCommand; | 126 theCommand = aCommand; |
127 theSystem = aSystem; | |
124 } | 128 } |
125 | 129 |
126 void execute(Prevayler aPrev, boolean doSync) { | 130 void execute(PrevaylerCore aPrev, boolean doSync) { |
127 try { | 131 try { |
128 theResult = aPrev.executeCommand(theCommand, doSync); | 132 aPrev.logCommand(theCommand, doSync); |
129 } catch (Exception anE) { | 133 } catch (Exception anE) { |
130 theException = anE; | 134 theException = anE; |
131 } finally { | 135 } finally { |
132 synchronized(this) { | 136 synchronized(this) { |
133 isDone = true; | 137 isDone = true; |
146 } | 150 } |
147 | 151 |
148 if (theException != null) | 152 if (theException != null) |
149 throw theException; | 153 throw theException; |
150 else | 154 else |
151 return theResult; | 155 return theCommand.execute(theSystem); |
152 } | 156 } |
153 } | 157 } |
154 } | 158 } |
155 } | 159 } |