comparison src/org/dancres/blitz/txn/batch/OptimisticBatcher.java @ 32:a77f0a9ed93c 2.1.1

Add support for optimistic log batching.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 12 Jun 2010 10:42:31 +0100
parents
children
comparison
equal deleted inserted replaced
31:243c74d599bf 32:a77f0a9ed93c
1 package org.dancres.blitz.txn.batch;
2
3 import org.prevayler.Command;
4 import org.prevayler.PrevalentSystem;
5 import org.prevayler.implementation.PrevaylerCore;
6 import org.prevayler.implementation.SnapshotPrevayler;
7 import org.prevayler.implementation.Snapshotter;
8
9 import java.io.IOException;
10 import java.io.Serializable;
11 import java.util.ArrayList;
12 import java.util.Iterator;
13 import java.util.concurrent.atomic.AtomicInteger;
14
15 public class OptimisticBatcher implements SnapshotPrevayler {
16 private PrevaylerCore _prevayler;
17 private boolean _writing = false;
18 private ArrayList<WriteRequest> _writes = new ArrayList<WriteRequest>();
19
20 // private AtomicInteger _inCount = new AtomicInteger();
21 // private AtomicInteger _outCount = new AtomicInteger();
22
23 public OptimisticBatcher(PrevaylerCore aPrevayler) {
24 _prevayler = aPrevayler;
25 }
26
27 public PrevalentSystem system() {
28 return _prevayler.system();
29 }
30
31 public Serializable executeCommand(Command aCommand) throws Exception {
32 return write(aCommand, true);
33 }
34
35 public Serializable executeCommand(Command aCommand, boolean sync)
36 throws Exception {
37 return write(aCommand, sync);
38 }
39
40 private Serializable write(Command aComm, boolean sync) throws Exception {
41 boolean someoneWriting = false;
42 WriteRequest myReq = null;
43
44 synchronized(this) {
45 someoneWriting = _writing;
46
47 // If someone is already writing, we add to their queue of work
48 //
49 if (_writing) {
50 myReq = new WriteRequest(aComm);
51 _writes.add(myReq);
52 } else {
53 _writing = true;
54 }
55 }
56
57 // If we are waiting on someone's queue
58 //
59 if (someoneWriting) {
60 // If we want to wait until the log is flushed
61 //
62 if (sync)
63 myReq.await();
64
65 return aComm.execute(_prevayler.system());
66 } else {
67 // We are handling the write queue, write our stuff now
68 //
69 _prevayler.logCommand(aComm, false);
70
71 ArrayList<WriteRequest> myAllWrites = new ArrayList<WriteRequest>();
72
73 // While there other writes scoop them up and write them
74 //
75 ArrayList<WriteRequest> myBuffer = new ArrayList<WriteRequest>();
76
77 while (haveWrites()) {
78 synchronized(this) {
79 myBuffer.clear();
80 myBuffer.addAll(_writes);
81 _writes.clear();
82 }
83
84 Iterator<WriteRequest> myWrites = myBuffer.iterator();
85 while (myWrites.hasNext())
86 _prevayler.logCommand(myWrites.next().getCommand(), false);
87
88 myAllWrites.addAll(myBuffer);
89 }
90
91 // Now dispatch execution of all logged commands - execute our own first
92 //
93 try {
94 return aComm.execute(_prevayler.system());
95 } finally {
96 Iterator<WriteRequest> myTargets = myAllWrites.iterator();
97 while (myTargets.hasNext()) {
98 // _outCount.incrementAndGet();
99 myTargets.next().dispatch();
100 }
101
102 // System.out.println("Logger: incount = " + _inCount + " outcount = " + _outCount);
103 }
104 }
105 }
106
107 private boolean haveWrites() throws Exception {
108 synchronized(this) {
109 if (_writes.size() > 0)
110 return true;
111 else {
112 _writing = false;
113 _prevayler.flush();
114
115 return false;
116 }
117 }
118 }
119
120 public Snapshotter takeSnapshot() throws IOException {
121 return _prevayler.takeSnapshot();
122 }
123
124 private class WriteRequest {
125 private Command _comm;
126 private Object _lock = new Object();
127 private boolean _exit = false;
128
129 WriteRequest(Command aComm) {
130 // _inCount.incrementAndGet();
131 _comm = aComm;
132 }
133
134 Command getCommand() {
135 return _comm;
136 }
137
138 void dispatch() {
139 synchronized(_lock) {
140 _exit = true;
141 _lock.notify();
142 }
143 }
144
145 void await() {
146 synchronized(_lock) {
147 while (! _exit) {
148 try {
149 _lock.wait();
150 } catch (InterruptedException anIE) {
151
152 }
153 }
154 }
155 }
156 }
157 }