comparison src/org/dancres/blitz/entry/WriteBuffer.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:3dc0c5604566
1 package org.dancres.blitz.entry;
2
3 import java.io.IOException;
4
5 import java.util.TreeMap;
6 import java.util.ArrayList;
7 import java.util.Map;
8 import java.util.HashMap;
9
10 import java.util.logging.Level;
11
12 import EDU.oswego.cs.dl.util.concurrent.Mutex;
13
14 import org.dancres.blitz.oid.OID;
15
16 import org.dancres.blitz.disk.WriteDaemon;
17
18 import org.dancres.blitz.stats.StatsBoard;
19 import org.dancres.blitz.stats.StatGenerator;
20 import org.dancres.blitz.stats.Stat;
21 import org.dancres.blitz.stats.DirtyBufferStat;
22 import org.dancres.blitz.entry.ci.CacheIndexer;
23
24 /**
25 <p> Writes to disk from Storage are done asynchronously. With this being
26 the case, it's possible for SleeveCache to flush an EntrySleeveImpl in
27 favour of another and then reload it later with a resultant loss of accurate
28 state due to the fact that the write has yet to be done. </p>
29
30 <p> WriteBuffer tracks all pending writes and ensures that "dirty" state
31 that has yet to reach disk is always available to SleeveCache so that
32 on-disk state appears to be correct. This works because we also persistently
33 log such changes and ensure they've been applied at recovery or an
34 intermediate checkpoint. </p>
35
36 <p> It's possible that several updates for one EntrySleeveImpl will
37 be submitted to WriteBuffer. Rather than handle each update separately,
38 WriteBuffer consolidates them all into a single disk op which improves
39 performance and reduces latency.</p>
40 */
41 class WriteBuffer {
42 private Map theJobInfo = new HashMap();
43 private EntryEditor theEditor;
44
45 WriteBuffer(EntryEditor anEditor) {
46 theEditor = anEditor;
47 StatsBoard.get().add(new DirtyBufferGenerator(theEditor.getType(), theJobInfo));
48 }
49
50 void add(EntrySleeveImpl aSleeve) throws IOException {
51 try {
52 OID myUid = aSleeve.getOID();
53
54 if (aSleeve.getState().test(SleeveState.PINNED))
55 throw new RuntimeException("Panic, shouldn't be seeing PINNED");
56
57 // We must duplicate to ensure that further changes by the
58 // upper layer, don't pollute us.
59 //
60 WriteRequest myRequest = new WriteRequest(aSleeve.getState().get(),
61 aSleeve.getPersistentRep().duplicate());
62
63 boolean schedule = false;
64
65 synchronized(theJobInfo) {
66 ArrayList myUpdates =
67 (ArrayList) theJobInfo.get(myUid);
68
69 // Queue exists?
70 if (myUpdates == null) {
71
72 /*
73 MULTI-THREAD WRITING:
74
75 At this point, go to WriteDaemon and ask for a queue
76 to dispatch to. Stamp this onto the myUpdates
77 (which can therefore no longer be a simple ArrayList)
78 */
79 myUpdates = new ArrayList();
80 theJobInfo.put(myUid, myUpdates);
81 }
82
83 synchronized(myUpdates) {
84 // If nothing is pending, buffer it and indicate we need
85 // a job scheduled.
86 if (myUpdates.size() == 0) {
87 myUpdates.add(myRequest);
88 schedule = true;
89 } else {
90 // Recover last update (there should be only one?)
91 WriteRequest myLast = (WriteRequest)
92 myUpdates.get(myUpdates.size() - 1);
93
94 try {
95 boolean wasActive;
96
97 myLast.lock();
98
99 wasActive = myLast.isActive();
100
101 // If the last request is not active we can merge
102 // without queue'ing another job.
103 if (!wasActive) {
104 myLast.merge(myRequest);
105 }
106
107 myLast.unlock();
108
109 // Last request was active, queue a new one and
110 // ask for a job to be scheduled.
111 if (wasActive) {
112 myUpdates.add(myRequest);
113 schedule = true;
114 }
115
116 } catch (InterruptedException anIE) {
117 EntryStorage.theLogger.log(Level.SEVERE,
118 "Failed to lock previous request", anIE);
119 }
120 }
121 }
122
123 /*
124 Make sure we flip NOT_ON_DISK because we're really
125 writing. This might seem odd - for example, what would
126 happen if we had a Sleeve with NOT_ON_DISK | DELETE?
127 Well, because the Sleeve is DELETE, no other thread
128 should dirty the cache entry again. Were it to do so
129 if will cause a write problem but it would also be a bug.
130 */
131 aSleeve.getState().clear(SleeveState.NOT_ON_DISK);
132
133 if (schedule) {
134 Job myJob = new Job(myUid, this);
135
136 /*
137 MULTI-THREAD WRITING:
138
139 Use the queue id gathered from the list of updates
140 above as an argument to this method.
141
142 In this way, we can ensure all updates for one
143 UID are dispatched in the same queue which ensures
144 FIFO ordering.
145 */
146 WriteDaemon.get().queue(myJob);
147 }
148 }
149
150 } catch (Exception anE) {
151 EntryStorage.theLogger.log(Level.SEVERE,
152 "Failed to add write to cache: " +
153 theEditor, anE);
154 }
155 }
156
157 /**
158 @return copy of the entry sleeve or <code>null</code> if
159 there is no entry in the cache.
160 */
161 EntrySleeveImpl dirtyRead(OID aUID) {
162 EntrySleeveImpl myResult = null;
163
164 synchronized(theJobInfo) {
165 ArrayList myStates = (ArrayList) theJobInfo.get(aUID);
166
167 if (myStates != null) {
168 synchronized(myStates) {
169 WriteRequest myRequest = (WriteRequest)
170 myStates.get(myStates.size() - 1);
171
172 try {
173 myRequest.lock();
174
175 myResult = myRequest.newSleeve();
176
177 myRequest.unlock();
178 } catch (InterruptedException anIE) {
179 EntryStorage.theLogger.log(Level.SEVERE,
180 "Failed to copy sleeve",
181 anIE);
182 }
183 }
184 }
185 }
186
187 return myResult;
188 }
189
190 /**
191 Callback for the Job when it is executed by the WriteDaemon
192 */
193 void update(OID aUID) {
194 try {
195 ArrayList myQueue;
196
197 synchronized(theJobInfo) {
198 myQueue = (ArrayList) theJobInfo.get(aUID);
199 }
200
201 WriteRequest myRequest;
202
203 synchronized(myQueue) {
204 myRequest = (WriteRequest) myQueue.get(0);
205
206 try {
207 myRequest.lock();
208
209 myRequest.markActive();
210
211 myRequest.unlock();
212 } catch (InterruptedException anIE) {
213 EntryStorage.theLogger.log(Level.SEVERE,
214 "Failed to lock request for processing", anIE);
215 }
216 }
217
218 myRequest.flush(theEditor);
219
220 /*
221 Regardless of which write this is, once we've done one, the
222 indexes on disk contain any search material we require and
223 thus we can clear the CacheIndexer.
224 */
225 if ((myRequest.getStateFlags() & SleeveState.NOT_ON_DISK) != 0) {
226 /*
227 If it's not also marked as deleted (in which case the
228 indexer will already be updated)
229 */
230 if ((myRequest.getStateFlags() & SleeveState.DELETED) == 0) {
231 CacheIndexer.getIndexer(
232 theEditor.getType()).flushed(myRequest.getSleeve());
233 }
234 }
235
236 synchronized(theJobInfo) {
237 synchronized(myQueue) {
238
239 // Remove the consolidated image stored previously
240 myQueue.remove(0);
241
242 if (myQueue.size() == 0) {
243 theJobInfo.remove(aUID);
244 }
245 }
246 }
247
248 } catch (Exception anE) {
249 EntryStorage.theLogger.log(Level.SEVERE,
250 "Failed to sync cache: " + theEditor,
251 anE);
252 }
253 }
254
255 private static final class Job implements Runnable {
256 private OID theUID;
257 private WriteBuffer theBuffer;
258
259 Job(OID aUID, WriteBuffer aBuffer) {
260 theUID = aUID;
261 theBuffer = aBuffer;
262 }
263
264 public void run() {
265 theBuffer.update(theUID);
266 }
267 }
268
269 private static final class WriteRequest {
270 /*
271 For performance measurement only
272 */
273 private static final Object theLockObject = new Object();
274 private static int numMerges;
275
276 private int theStateFlags;
277
278 private boolean isActive;
279
280 private PersistentEntry theEntry;
281
282 private Mutex theLock = new Mutex();
283
284 WriteRequest(int aState, PersistentEntry anEntry) {
285 theEntry = anEntry;
286 theStateFlags = aState;
287 }
288
289 void markActive() {
290 isActive = true;
291 }
292
293 boolean isActive() {
294 return isActive;
295 }
296
297 void lock() throws InterruptedException {
298 theLock.acquire();
299 }
300
301 void unlock() {
302 theLock.release();
303 }
304
305 int getStateFlags() {
306 return theStateFlags;
307 }
308
309 EntrySleeveImpl getSleeve() {
310 EntrySleeveImpl mySleeve = new EntrySleeveImpl(theEntry);
311
312 mySleeve.getState().setExplicit(theStateFlags);
313 mySleeve.getState().clear(SleeveState.NOT_ON_DISK);
314
315 return mySleeve;
316 }
317
318 void merge(WriteRequest anOther) {
319 /*
320 synchronized(theLockObject) {
321 ++numMerges;
322 System.err.println("Coalesce: " + numMerges);
323 }
324 */
325
326 theStateFlags |= anOther.theStateFlags;
327
328 theEntry = anOther.getSleeve().getPersistentRep();
329 }
330
331 EntrySleeveImpl newSleeve() {
332 EntrySleeveImpl mySleeve =
333 new EntrySleeveImpl(theEntry.duplicate());
334
335 mySleeve.getState().setExplicit(theStateFlags);
336 mySleeve.getState().clear(SleeveState.NOT_ON_DISK);
337
338 return mySleeve;
339 }
340
341 void flush(EntryEditor anEditor) throws IOException {
342 if ((theStateFlags & SleeveState.DELETED) != 0) {
343 if ((theStateFlags & SleeveState.NOT_ON_DISK) == 0) {
344 anEditor.delete(theEntry);
345 }
346 } else if ((theStateFlags & SleeveState.NOT_ON_DISK) != 0){
347 anEditor.write(theEntry);
348 } else {
349 anEditor.update(theEntry);
350 }
351 }
352 }
353
354 public class DirtyBufferGenerator implements StatGenerator {
355 private long _id = StatGenerator.UNSET_ID;
356
357 private Map _jobInfo;
358 private String _type;
359
360 public DirtyBufferGenerator(String aType, Map aJobInfo) {
361 _type = aType;
362 _jobInfo = aJobInfo;
363 }
364
365 public long getId() {
366 return _id;
367 }
368
369 public void setId(long anId) {
370 _id = anId;
371 }
372
373 public Stat generate() {
374 int myBufferSize;
375
376 synchronized(_jobInfo) {
377 myBufferSize = _jobInfo.size();
378 }
379
380 return new DirtyBufferStat(_id, _type, myBufferSize);
381 }
382 }
383 }