Mercurial > hg > blitz_condensed
comparison src/org/dancres/blitz/txn/batch/ConcurrentWriteBatcher.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children | 4580bb12db30 |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:3dc0c5604566 |
---|---|
1 package org.dancres.blitz.txn.batch; | |
2 | |
3 import java.io.IOException; | |
4 import java.io.Serializable; | |
5 | |
6 import java.util.ArrayList; | |
7 | |
8 import java.util.logging.Level; | |
9 | |
10 import org.prevayler.Prevayler; | |
11 import org.prevayler.PrevalentSystem; | |
12 import org.prevayler.Command; | |
13 | |
14 import org.prevayler.implementation.SnapshotPrevayler; | |
15 import org.prevayler.implementation.Snapshotter; | |
16 | |
17 /** | |
18 <p>Batches commands issued in the same time period together in an attempt | |
19 to improve log throughput via Prevayler.</p> | |
20 | |
21 <p>Batch-writing can reduce the number of forced flushes to disk whilst | |
22 increasing the amount of data written with each flush. This has a | |
23 positive effect on throughput under concurrent load.</p> | |
24 | |
25 <p>Time is specified in ms, first thread into barrier waits this amount | |
26 of time for other writers. Other writers entering the barrier are | |
27 now blocked until the first entrant commits all writes to log. When first | |
28 thread awakes, all writes are done as a group followed by a single sync.</p> | |
29 | |
30 <p>Unlike <code>WriteBatcher</code>, this version supports more concurrency | |
31 allowing for queuing of the next batch of log entries whilst the current | |
32 bunch is being written.</p> | |
33 */ | |
34 public class ConcurrentWriteBatcher implements SnapshotPrevayler { | |
35 private SnapshotPrevayler thePrevayler; | |
36 | |
37 private boolean amFirst = true; | |
38 | |
39 private ArrayList theWrites = new ArrayList(); | |
40 | |
41 // Might be able to buff up to 60ms which gives average of 30 but | |
42 // we'll see. | |
43 private long theWindowTime = 20; | |
44 | |
45 public ConcurrentWriteBatcher(SnapshotPrevayler aPrevayler, | |
46 long aWindowTime) { | |
47 | |
48 thePrevayler = aPrevayler; | |
49 theWindowTime = aWindowTime; | |
50 } | |
51 | |
52 /** | |
53 * Returns the underlying PrevalentSystem. | |
54 */ | |
55 public PrevalentSystem system() { | |
56 return thePrevayler.system(); | |
57 } | |
58 | |
59 public Serializable executeCommand(Command aCommand) throws Exception { | |
60 return write(aCommand); | |
61 } | |
62 | |
63 public Serializable executeCommand(Command aCommand, boolean sync) | |
64 throws Exception { | |
65 return write(aCommand); | |
66 } | |
67 | |
68 private Serializable write(Command aComm) throws Exception { | |
69 WriteRequest myReq = new WriteRequest(aComm); | |
70 | |
71 boolean wasFirst = false; | |
72 | |
73 WriteRequest[] myRequests = null; | |
74 | |
75 synchronized(this) { | |
76 if (amFirst) { | |
77 theWrites.add(myReq); | |
78 amFirst = false; | |
79 wasFirst = true; | |
80 | |
81 try { | |
82 wait(theWindowTime); | |
83 } catch (InterruptedException anIE) { | |
84 } | |
85 | |
86 myRequests = new WriteRequest[theWrites.size()]; | |
87 myRequests = (WriteRequest[]) theWrites.toArray(myRequests); | |
88 | |
89 theWrites.clear(); | |
90 amFirst = true; | |
91 | |
92 } else { | |
93 theWrites.add(myReq); | |
94 } | |
95 } | |
96 | |
97 if (wasFirst) { | |
98 flushAll(myRequests); | |
99 } | |
100 | |
101 return myReq.getResult(); | |
102 } | |
103 | |
104 public Snapshotter takeSnapshot() throws IOException { | |
105 return thePrevayler.takeSnapshot(); | |
106 } | |
107 | |
108 private void flushAll(WriteRequest[] aRequests) { | |
109 int myLast = aRequests.length - 1; | |
110 | |
111 if (WriteBatcher.theLogger.isLoggable(Level.FINE)) | |
112 WriteBatcher.theLogger.log(Level.FINE, | |
113 "Flushing " + | |
114 aRequests.length + " to log"); | |
115 | |
116 // Do all writes, last with sync | |
117 for (int i = 0; i < aRequests.length; i++) { | |
118 WriteRequest myReq = aRequests[i]; | |
119 | |
120 myReq.execute(thePrevayler, (i == myLast)); | |
121 } | |
122 } | |
123 | |
124 private static class WriteRequest { | |
125 private Command theCommand; | |
126 | |
127 private Exception theException; | |
128 private Serializable theResult; | |
129 | |
130 private boolean isDone; | |
131 | |
132 WriteRequest(Command aCommand) { | |
133 theCommand = aCommand; | |
134 } | |
135 | |
136 void execute(Prevayler aPrev, boolean doSync) { | |
137 try { | |
138 theResult = aPrev.executeCommand(theCommand, doSync); | |
139 } catch (Exception anE) { | |
140 theException = anE; | |
141 } finally { | |
142 synchronized(this) { | |
143 isDone = true; | |
144 notify(); | |
145 } | |
146 } | |
147 } | |
148 | |
149 Serializable getResult() throws Exception{ | |
150 synchronized(this) { | |
151 while (!isDone) { | |
152 try { | |
153 wait(); | |
154 } catch (InterruptedException anIE) { | |
155 } | |
156 } | |
157 | |
158 if (theException != null) | |
159 throw theException; | |
160 else | |
161 return theResult; | |
162 } | |
163 } | |
164 } | |
165 } |