Mercurial > hg > blitz_condensed
diff src/org/dancres/blitz/entry/WriteBuffer.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/org/dancres/blitz/entry/WriteBuffer.java Sat Mar 21 11:00:06 2009 +0000 @@ -0,0 +1,383 @@ +package org.dancres.blitz.entry; + +import java.io.IOException; + +import java.util.TreeMap; +import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; + +import java.util.logging.Level; + +import EDU.oswego.cs.dl.util.concurrent.Mutex; + +import org.dancres.blitz.oid.OID; + +import org.dancres.blitz.disk.WriteDaemon; + +import org.dancres.blitz.stats.StatsBoard; +import org.dancres.blitz.stats.StatGenerator; +import org.dancres.blitz.stats.Stat; +import org.dancres.blitz.stats.DirtyBufferStat; +import org.dancres.blitz.entry.ci.CacheIndexer; + +/** + <p> Writes to disk from Storage are done asynchronously. With this being + the case, it's possible for SleeveCache to flush an EntrySleeveImpl in + favour of another and then reload it later with a resultant loss of accurate + state due to the fact that the write has yet to be done. </p> + + <p> WriteBuffer tracks all pending writes and ensures that "dirty" state + that has yet to reach disk is always available to SleeveCache so that + on-disk state appears to be correct. This works because we also persistently + log such changes and ensure they've been applied at recovery or an + intermediate checkpoint. </p> + + <p> It's possible that several updates for one EntrySleeveImpl will + be submitted to WriteBuffer. Rather than handle each update separately, + WriteBuffer consolidates them all into a single disk op which improves + performance and reduces latency.</p> + */ +class WriteBuffer { + private Map theJobInfo = new HashMap(); + private EntryEditor theEditor; + + WriteBuffer(EntryEditor anEditor) { + theEditor = anEditor; + StatsBoard.get().add(new DirtyBufferGenerator(theEditor.getType(), theJobInfo)); + } + + void add(EntrySleeveImpl aSleeve) throws IOException { + try { + OID myUid = aSleeve.getOID(); + + if (aSleeve.getState().test(SleeveState.PINNED)) + throw new RuntimeException("Panic, shouldn't be seeing PINNED"); + + // We must duplicate to ensure that further changes by the + // upper layer, don't pollute us. + // + WriteRequest myRequest = new WriteRequest(aSleeve.getState().get(), + aSleeve.getPersistentRep().duplicate()); + + boolean schedule = false; + + synchronized(theJobInfo) { + ArrayList myUpdates = + (ArrayList) theJobInfo.get(myUid); + + // Queue exists? + if (myUpdates == null) { + + /* + MULTI-THREAD WRITING: + + At this point, go to WriteDaemon and ask for a queue + to dispatch to. Stamp this onto the myUpdates + (which can therefore no longer be a simple ArrayList) + */ + myUpdates = new ArrayList(); + theJobInfo.put(myUid, myUpdates); + } + + synchronized(myUpdates) { + // If nothing is pending, buffer it and indicate we need + // a job scheduled. + if (myUpdates.size() == 0) { + myUpdates.add(myRequest); + schedule = true; + } else { + // Recover last update (there should be only one?) + WriteRequest myLast = (WriteRequest) + myUpdates.get(myUpdates.size() - 1); + + try { + boolean wasActive; + + myLast.lock(); + + wasActive = myLast.isActive(); + + // If the last request is not active we can merge + // without queue'ing another job. + if (!wasActive) { + myLast.merge(myRequest); + } + + myLast.unlock(); + + // Last request was active, queue a new one and + // ask for a job to be scheduled. + if (wasActive) { + myUpdates.add(myRequest); + schedule = true; + } + + } catch (InterruptedException anIE) { + EntryStorage.theLogger.log(Level.SEVERE, + "Failed to lock previous request", anIE); + } + } + } + + /* + Make sure we flip NOT_ON_DISK because we're really + writing. This might seem odd - for example, what would + happen if we had a Sleeve with NOT_ON_DISK | DELETE? + Well, because the Sleeve is DELETE, no other thread + should dirty the cache entry again. Were it to do so + if will cause a write problem but it would also be a bug. + */ + aSleeve.getState().clear(SleeveState.NOT_ON_DISK); + + if (schedule) { + Job myJob = new Job(myUid, this); + + /* + MULTI-THREAD WRITING: + + Use the queue id gathered from the list of updates + above as an argument to this method. + + In this way, we can ensure all updates for one + UID are dispatched in the same queue which ensures + FIFO ordering. + */ + WriteDaemon.get().queue(myJob); + } + } + + } catch (Exception anE) { + EntryStorage.theLogger.log(Level.SEVERE, + "Failed to add write to cache: " + + theEditor, anE); + } + } + + /** + @return copy of the entry sleeve or <code>null</code> if + there is no entry in the cache. + */ + EntrySleeveImpl dirtyRead(OID aUID) { + EntrySleeveImpl myResult = null; + + synchronized(theJobInfo) { + ArrayList myStates = (ArrayList) theJobInfo.get(aUID); + + if (myStates != null) { + synchronized(myStates) { + WriteRequest myRequest = (WriteRequest) + myStates.get(myStates.size() - 1); + + try { + myRequest.lock(); + + myResult = myRequest.newSleeve(); + + myRequest.unlock(); + } catch (InterruptedException anIE) { + EntryStorage.theLogger.log(Level.SEVERE, + "Failed to copy sleeve", + anIE); + } + } + } + } + + return myResult; + } + + /** + Callback for the Job when it is executed by the WriteDaemon + */ + void update(OID aUID) { + try { + ArrayList myQueue; + + synchronized(theJobInfo) { + myQueue = (ArrayList) theJobInfo.get(aUID); + } + + WriteRequest myRequest; + + synchronized(myQueue) { + myRequest = (WriteRequest) myQueue.get(0); + + try { + myRequest.lock(); + + myRequest.markActive(); + + myRequest.unlock(); + } catch (InterruptedException anIE) { + EntryStorage.theLogger.log(Level.SEVERE, + "Failed to lock request for processing", anIE); + } + } + + myRequest.flush(theEditor); + + /* + Regardless of which write this is, once we've done one, the + indexes on disk contain any search material we require and + thus we can clear the CacheIndexer. + */ + if ((myRequest.getStateFlags() & SleeveState.NOT_ON_DISK) != 0) { + /* + If it's not also marked as deleted (in which case the + indexer will already be updated) + */ + if ((myRequest.getStateFlags() & SleeveState.DELETED) == 0) { + CacheIndexer.getIndexer( + theEditor.getType()).flushed(myRequest.getSleeve()); + } + } + + synchronized(theJobInfo) { + synchronized(myQueue) { + + // Remove the consolidated image stored previously + myQueue.remove(0); + + if (myQueue.size() == 0) { + theJobInfo.remove(aUID); + } + } + } + + } catch (Exception anE) { + EntryStorage.theLogger.log(Level.SEVERE, + "Failed to sync cache: " + theEditor, + anE); + } + } + + private static final class Job implements Runnable { + private OID theUID; + private WriteBuffer theBuffer; + + Job(OID aUID, WriteBuffer aBuffer) { + theUID = aUID; + theBuffer = aBuffer; + } + + public void run() { + theBuffer.update(theUID); + } + } + + private static final class WriteRequest { + /* + For performance measurement only + */ + private static final Object theLockObject = new Object(); + private static int numMerges; + + private int theStateFlags; + + private boolean isActive; + + private PersistentEntry theEntry; + + private Mutex theLock = new Mutex(); + + WriteRequest(int aState, PersistentEntry anEntry) { + theEntry = anEntry; + theStateFlags = aState; + } + + void markActive() { + isActive = true; + } + + boolean isActive() { + return isActive; + } + + void lock() throws InterruptedException { + theLock.acquire(); + } + + void unlock() { + theLock.release(); + } + + int getStateFlags() { + return theStateFlags; + } + + EntrySleeveImpl getSleeve() { + EntrySleeveImpl mySleeve = new EntrySleeveImpl(theEntry); + + mySleeve.getState().setExplicit(theStateFlags); + mySleeve.getState().clear(SleeveState.NOT_ON_DISK); + + return mySleeve; + } + + void merge(WriteRequest anOther) { + /* + synchronized(theLockObject) { + ++numMerges; + System.err.println("Coalesce: " + numMerges); + } + */ + + theStateFlags |= anOther.theStateFlags; + + theEntry = anOther.getSleeve().getPersistentRep(); + } + + EntrySleeveImpl newSleeve() { + EntrySleeveImpl mySleeve = + new EntrySleeveImpl(theEntry.duplicate()); + + mySleeve.getState().setExplicit(theStateFlags); + mySleeve.getState().clear(SleeveState.NOT_ON_DISK); + + return mySleeve; + } + + void flush(EntryEditor anEditor) throws IOException { + if ((theStateFlags & SleeveState.DELETED) != 0) { + if ((theStateFlags & SleeveState.NOT_ON_DISK) == 0) { + anEditor.delete(theEntry); + } + } else if ((theStateFlags & SleeveState.NOT_ON_DISK) != 0){ + anEditor.write(theEntry); + } else { + anEditor.update(theEntry); + } + } + } + + public class DirtyBufferGenerator implements StatGenerator { + private long _id = StatGenerator.UNSET_ID; + + private Map _jobInfo; + private String _type; + + public DirtyBufferGenerator(String aType, Map aJobInfo) { + _type = aType; + _jobInfo = aJobInfo; + } + + public long getId() { + return _id; + } + + public void setId(long anId) { + _id = anId; + } + + public Stat generate() { + int myBufferSize; + + synchronized(_jobInfo) { + myBufferSize = _jobInfo.size(); + } + + return new DirtyBufferStat(_id, _type, myBufferSize); + } + } +}