Mercurial > hg > blitz_condensed
comparison src/org/dancres/blitz/txn/batch/OptimisticBatcher.java @ 32:a77f0a9ed93c 2.1.1
Add support for optimistic log batching.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 12 Jun 2010 10:42:31 +0100 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
31:243c74d599bf | 32:a77f0a9ed93c |
---|---|
1 package org.dancres.blitz.txn.batch; | |
2 | |
3 import org.prevayler.Command; | |
4 import org.prevayler.PrevalentSystem; | |
5 import org.prevayler.implementation.PrevaylerCore; | |
6 import org.prevayler.implementation.SnapshotPrevayler; | |
7 import org.prevayler.implementation.Snapshotter; | |
8 | |
9 import java.io.IOException; | |
10 import java.io.Serializable; | |
11 import java.util.ArrayList; | |
12 import java.util.Iterator; | |
13 import java.util.concurrent.atomic.AtomicInteger; | |
14 | |
15 public class OptimisticBatcher implements SnapshotPrevayler { | |
16 private PrevaylerCore _prevayler; | |
17 private boolean _writing = false; | |
18 private ArrayList<WriteRequest> _writes = new ArrayList<WriteRequest>(); | |
19 | |
20 // private AtomicInteger _inCount = new AtomicInteger(); | |
21 // private AtomicInteger _outCount = new AtomicInteger(); | |
22 | |
23 public OptimisticBatcher(PrevaylerCore aPrevayler) { | |
24 _prevayler = aPrevayler; | |
25 } | |
26 | |
27 public PrevalentSystem system() { | |
28 return _prevayler.system(); | |
29 } | |
30 | |
31 public Serializable executeCommand(Command aCommand) throws Exception { | |
32 return write(aCommand, true); | |
33 } | |
34 | |
35 public Serializable executeCommand(Command aCommand, boolean sync) | |
36 throws Exception { | |
37 return write(aCommand, sync); | |
38 } | |
39 | |
40 private Serializable write(Command aComm, boolean sync) throws Exception { | |
41 boolean someoneWriting = false; | |
42 WriteRequest myReq = null; | |
43 | |
44 synchronized(this) { | |
45 someoneWriting = _writing; | |
46 | |
47 // If someone is already writing, we add to their queue of work | |
48 // | |
49 if (_writing) { | |
50 myReq = new WriteRequest(aComm); | |
51 _writes.add(myReq); | |
52 } else { | |
53 _writing = true; | |
54 } | |
55 } | |
56 | |
57 // If we are waiting on someone's queue | |
58 // | |
59 if (someoneWriting) { | |
60 // If we want to wait until the log is flushed | |
61 // | |
62 if (sync) | |
63 myReq.await(); | |
64 | |
65 return aComm.execute(_prevayler.system()); | |
66 } else { | |
67 // We are handling the write queue, write our stuff now | |
68 // | |
69 _prevayler.logCommand(aComm, false); | |
70 | |
71 ArrayList<WriteRequest> myAllWrites = new ArrayList<WriteRequest>(); | |
72 | |
73 // While there other writes scoop them up and write them | |
74 // | |
75 ArrayList<WriteRequest> myBuffer = new ArrayList<WriteRequest>(); | |
76 | |
77 while (haveWrites()) { | |
78 synchronized(this) { | |
79 myBuffer.clear(); | |
80 myBuffer.addAll(_writes); | |
81 _writes.clear(); | |
82 } | |
83 | |
84 Iterator<WriteRequest> myWrites = myBuffer.iterator(); | |
85 while (myWrites.hasNext()) | |
86 _prevayler.logCommand(myWrites.next().getCommand(), false); | |
87 | |
88 myAllWrites.addAll(myBuffer); | |
89 } | |
90 | |
91 // Now dispatch execution of all logged commands - execute our own first | |
92 // | |
93 try { | |
94 return aComm.execute(_prevayler.system()); | |
95 } finally { | |
96 Iterator<WriteRequest> myTargets = myAllWrites.iterator(); | |
97 while (myTargets.hasNext()) { | |
98 // _outCount.incrementAndGet(); | |
99 myTargets.next().dispatch(); | |
100 } | |
101 | |
102 // System.out.println("Logger: incount = " + _inCount + " outcount = " + _outCount); | |
103 } | |
104 } | |
105 } | |
106 | |
107 private boolean haveWrites() throws Exception { | |
108 synchronized(this) { | |
109 if (_writes.size() > 0) | |
110 return true; | |
111 else { | |
112 _writing = false; | |
113 _prevayler.flush(); | |
114 | |
115 return false; | |
116 } | |
117 } | |
118 } | |
119 | |
120 public Snapshotter takeSnapshot() throws IOException { | |
121 return _prevayler.takeSnapshot(); | |
122 } | |
123 | |
124 private class WriteRequest { | |
125 private Command _comm; | |
126 private Object _lock = new Object(); | |
127 private boolean _exit = false; | |
128 | |
129 WriteRequest(Command aComm) { | |
130 // _inCount.incrementAndGet(); | |
131 _comm = aComm; | |
132 } | |
133 | |
134 Command getCommand() { | |
135 return _comm; | |
136 } | |
137 | |
138 void dispatch() { | |
139 synchronized(_lock) { | |
140 _exit = true; | |
141 _lock.notify(); | |
142 } | |
143 } | |
144 | |
145 void await() { | |
146 synchronized(_lock) { | |
147 while (! _exit) { | |
148 try { | |
149 _lock.wait(); | |
150 } catch (InterruptedException anIE) { | |
151 | |
152 } | |
153 } | |
154 } | |
155 } | |
156 } | |
157 } |