comparison src/org/dancres/blitz/txn/batch/ConcurrentWriteBatcher.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children 4580bb12db30
comparison
equal deleted inserted replaced
-1:000000000000 0:3dc0c5604566
1 package org.dancres.blitz.txn.batch;
2
3 import java.io.IOException;
4 import java.io.Serializable;
5
6 import java.util.ArrayList;
7
8 import java.util.logging.Level;
9
10 import org.prevayler.Prevayler;
11 import org.prevayler.PrevalentSystem;
12 import org.prevayler.Command;
13
14 import org.prevayler.implementation.SnapshotPrevayler;
15 import org.prevayler.implementation.Snapshotter;
16
17 /**
18 <p>Batches commands issued in the same time period together in an attempt
19 to improve log throughput via Prevayler.</p>
20
21 <p>Batch-writing can reduce the number of forced flushes to disk whilst
22 increasing the amount of data written with each flush. This has a
23 positive effect on throughput under concurrent load.</p>
24
25 <p>Time is specified in ms, first thread into barrier waits this amount
26 of time for other writers. Other writers entering the barrier are
27 now blocked until the first entrant commits all writes to log. When first
28 thread awakes, all writes are done as a group followed by a single sync.</p>
29
30 <p>Unlike <code>WriteBatcher</code>, this version supports more concurrency
31 allowing for queuing of the next batch of log entries whilst the current
32 bunch is being written.</p>
33 */
34 public class ConcurrentWriteBatcher implements SnapshotPrevayler {
35 private SnapshotPrevayler thePrevayler;
36
37 private boolean amFirst = true;
38
39 private ArrayList theWrites = new ArrayList();
40
41 // Might be able to buff up to 60ms which gives average of 30 but
42 // we'll see.
43 private long theWindowTime = 20;
44
45 public ConcurrentWriteBatcher(SnapshotPrevayler aPrevayler,
46 long aWindowTime) {
47
48 thePrevayler = aPrevayler;
49 theWindowTime = aWindowTime;
50 }
51
52 /**
53 * Returns the underlying PrevalentSystem.
54 */
55 public PrevalentSystem system() {
56 return thePrevayler.system();
57 }
58
59 public Serializable executeCommand(Command aCommand) throws Exception {
60 return write(aCommand);
61 }
62
63 public Serializable executeCommand(Command aCommand, boolean sync)
64 throws Exception {
65 return write(aCommand);
66 }
67
68 private Serializable write(Command aComm) throws Exception {
69 WriteRequest myReq = new WriteRequest(aComm);
70
71 boolean wasFirst = false;
72
73 WriteRequest[] myRequests = null;
74
75 synchronized(this) {
76 if (amFirst) {
77 theWrites.add(myReq);
78 amFirst = false;
79 wasFirst = true;
80
81 try {
82 wait(theWindowTime);
83 } catch (InterruptedException anIE) {
84 }
85
86 myRequests = new WriteRequest[theWrites.size()];
87 myRequests = (WriteRequest[]) theWrites.toArray(myRequests);
88
89 theWrites.clear();
90 amFirst = true;
91
92 } else {
93 theWrites.add(myReq);
94 }
95 }
96
97 if (wasFirst) {
98 flushAll(myRequests);
99 }
100
101 return myReq.getResult();
102 }
103
104 public Snapshotter takeSnapshot() throws IOException {
105 return thePrevayler.takeSnapshot();
106 }
107
108 private void flushAll(WriteRequest[] aRequests) {
109 int myLast = aRequests.length - 1;
110
111 if (WriteBatcher.theLogger.isLoggable(Level.FINE))
112 WriteBatcher.theLogger.log(Level.FINE,
113 "Flushing " +
114 aRequests.length + " to log");
115
116 // Do all writes, last with sync
117 for (int i = 0; i < aRequests.length; i++) {
118 WriteRequest myReq = aRequests[i];
119
120 myReq.execute(thePrevayler, (i == myLast));
121 }
122 }
123
124 private static class WriteRequest {
125 private Command theCommand;
126
127 private Exception theException;
128 private Serializable theResult;
129
130 private boolean isDone;
131
132 WriteRequest(Command aCommand) {
133 theCommand = aCommand;
134 }
135
136 void execute(Prevayler aPrev, boolean doSync) {
137 try {
138 theResult = aPrev.executeCommand(theCommand, doSync);
139 } catch (Exception anE) {
140 theException = anE;
141 } finally {
142 synchronized(this) {
143 isDone = true;
144 notify();
145 }
146 }
147 }
148
149 Serializable getResult() throws Exception{
150 synchronized(this) {
151 while (!isDone) {
152 try {
153 wait();
154 } catch (InterruptedException anIE) {
155 }
156 }
157
158 if (theException != null)
159 throw theException;
160 else
161 return theResult;
162 }
163 }
164 }
165 }