Mercurial > hg > blitz_condensed
diff src/org/dancres/blitz/entry/SleeveCache.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children | 46ac1a45718a |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/org/dancres/blitz/entry/SleeveCache.java Sat Mar 21 11:00:06 2009 +0000 @@ -0,0 +1,666 @@ +package org.dancres.blitz.entry; + +import java.io.IOException; + +import java.util.ArrayList; + +import java.util.logging.*; + +import net.jini.core.transaction.TransactionException; +import net.jini.config.ConfigurationException; + +import org.dancres.blitz.mangler.MangledEntry; + +import org.dancres.blitz.Logging; +import org.dancres.blitz.stats.StatGenerator; +import org.dancres.blitz.stats.Stat; +import org.dancres.blitz.stats.SearchStat; +import org.dancres.blitz.stats.StatsBoard; + +import org.dancres.blitz.oid.OID; + +import org.dancres.blitz.arc.ArcCache; +import org.dancres.blitz.arc.CacheBlockDescriptor; +import org.dancres.blitz.arc.RecoverySummary; + +import org.dancres.blitz.txn.TxnManager; + +import org.dancres.blitz.entry.ci.CacheIndexer; + +import org.dancres.blitz.config.CacheSize; +import org.dancres.blitz.config.Fifo; +import org.dancres.blitz.config.EntryConstraints; + +import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicLong; + +/** + The organization of the space implementation can be viewed as being + similar to a memory hierarchy with EntryReposImpl/SpaceImpl being where + central processing happens. SleeveCache can be viewed as level 1 cache + whilst storage is main memory. Thus all "traffic" goes through SleeveCache + which interacts appropriately with Storage when it cannot satisfy the + demand itself. <P> + + Is responsible for indexing and caching of unpacked EntrySleeveImpls. + Implemented using an ArcCache.<P> + + @see org.dancres.blitz.arc.ArcCache + + @todo One way to split caches down further in the face of concurrency + demands would be to start maintaining multiple caches scoped or hashed on + the zone id or even the uid itself. + */ +class SleeveCache implements StatGenerator { + static Logger theLogger = + Logging.newLogger("org.dancres.blitz.disk.SleeveCache"); + + private ArcCache theStoreCache; + + private Storage theStore; + + private CountersImpl theCounters; + + private EntryConstraints theConstraints; + + private CacheIndexer theIndexer; + + private long theId = StatGenerator.UNSET_ID; + + private static class OfferTracker { + private String theType; + private AtomicLong theMissed = new AtomicLong(); + private AtomicLong theDeld = new AtomicLong(); + + OfferTracker(String aType) { + theType = aType; + } + + void incMissed() { + theMissed.incrementAndGet(); + } + + void incDeld() { + theDeld.incrementAndGet(); + } + + public String getTitle() { + return theType; + } + + public long getMisses() { + return theMissed.get(); + } + + public long getDeld() { + return theDeld.get(); + } + } + + private static final int CACHED_TRACKER = 0; + private static final int DIRTY_TRACKER = 1; + private static final int STORAGE_TRACKER = 2; + + private OfferTracker[] theTrackers = new OfferTracker[] { + new OfferTracker("Cached"), new OfferTracker("Dirty"), + new OfferTracker("Storage") + }; + + SleeveCache(Storage aStore) throws IOException { + theStore = aStore; + + try { + theConstraints = + EntryConstraints.getConstraints(theStore.getType()); + } catch (ConfigurationException aCE) { + theLogger.log(Level.SEVERE, + "Couldn't load constraints for type " + + theStore.getType(), aCE); + IOException myIOE = + new IOException("Couldn't load constraints for type " + + theStore.getType()); + myIOE.initCause(aCE); + throw myIOE; + } + + CacheSize myCacheSize = (CacheSize) theConstraints.get(CacheSize.class); + + theLogger.log(Level.INFO, aStore.getType() + " cache size = " + + myCacheSize.getSize()); + + theStoreCache = new ArcCache(aStore, myCacheSize.getSize()); + + theIndexer = CacheIndexer.getIndexer(theStore.getType()); + + theStoreCache.add(new CacheListenerImpl(theIndexer)); + + try { + theCounters = new CountersImpl(theStore.getType(), + theStore.getNumEntries()); + } catch (IOException anIOE) { + theLogger.log(Level.SEVERE, + "Couldn't read instance count from storage: " + + theStore.getType() + + " statistics will be inaccurate", anIOE); + + theCounters = new CountersImpl(theStore.getType(), 0); + } + + StatsBoard.get().add(this); + } + + public void setId(long anId) { + theId = anId; + } + + public long getId() { + return theId; + } + + public Stat generate() { + String[] myTitles = new String[theTrackers.length]; + long[] myMisses = new long[theTrackers.length]; + long[] myDeld = new long[theTrackers.length]; + + for (int i = 0; i < theTrackers.length; i++) { + myTitles[i] = theTrackers[i].getTitle(); + myMisses[i] = theTrackers[i].getMisses(); + myDeld[i] = theTrackers[i].getDeld(); + } + + return new SearchStat(theId, theStore.getType(), myTitles, + myMisses, myDeld); + } + + void forceSync(CacheBlockDescriptor aCBD) throws IOException { + theStoreCache.forceSync(aCBD); + } + + CacheBlockDescriptor load(OID aOID) throws IOException { + return theStoreCache.find(aOID); + } + + CacheBlockDescriptor add(EntrySleeveImpl aSleeve) throws IOException { + return theStoreCache.insert(aSleeve); + } + + RecoverySummary recover(EntrySleeveImpl aSleeve) + throws IOException { + return theStoreCache.recover(aSleeve); + } + + boolean renew(OID aOID, long anExpiry) throws IOException { + CacheBlockDescriptor myCBD = theStoreCache.find(aOID); + + if (myCBD == null) { + return false; + } + + EntrySleeveImpl mySleeve = (EntrySleeveImpl) myCBD.getContent(); + + if (!mySleeve.hasExpired(System.currentTimeMillis()) && + !mySleeve.getState().test(SleeveState.DELETED)) { + + if (anExpiry == 0) { + // Don't reset expiry - it's useful to storage to know + // what the lease was last + mySleeve.getState().set(SleeveState.DELETED); + + // Update stats + theCounters.didPurge(); + } else { + mySleeve.setExpiry(anExpiry); + } + + mySleeve.markDirty(); + myCBD.release(); + + return true; + } + + myCBD.release(); + + return false; + } + + boolean cancel(OID aOID) throws IOException { + return renew(aOID, 0); + } + + void sync() throws IOException { + theStoreCache.sync(); + } + + Counters getCounters() { + return theCounters; + } + + void close() { + // Nothing to do here - we could destroy the counters but we put that + // in deleteAll which makes more sense as close doesn't mean deletion + // theStoreCache.dump(); + } + + void write(MangledEntry anEntry, long anExpiry, WriteEscort anEscort) + throws IOException { + + + OID myID = theStore.getNextId(); + EntrySleeveImpl mySleeve = null; + + if (theLogger.isLoggable(Level.FINE)) + theLogger.log(Level.FINE, "Written: " + myID + ", " + + anExpiry + ", " + + (anExpiry - System.currentTimeMillis())); + + mySleeve = new EntrySleeveImpl(myID, anEntry, anExpiry); + + // Ready to write but tell the escort first + OpInfo myInfo = new WriteEntryOpInfo(mySleeve); + + if (!anEscort.writing(myInfo)) + return; + + // Now make it visible + CacheBlockDescriptor myCBD = theStoreCache.insert(mySleeve); + myCBD.release(); + + if (theLogger.isLoggable(Level.FINE)) + theLogger.log(Level.FINE, "Unwritten: " + mySleeve.getOID()); + } + + void find(MangledEntry anEntry, SearchVisitor aVisitor) + throws IOException { + + if (theConstraints.get(Fifo.class) != null) + fifoFind(anEntry, aVisitor); + else + fastFind(anEntry, aVisitor); + } + + /** + <p>If we're in FIFO mode, we know that each TupleLocator is sorted into + FIFO order. Thus we can merge across all locators on the fly + and obtain a fully ordered, non-duplicate set of tuple id's for + matching using a simply algorithm.</p> + + <p>By sorting across all locators and applying global FIFO order + we can maximize use of the cache and avoid excessive disk hits if + we have sufficient of the FIFO ordering in cache. Thus we get + graceful degradation rather than consulting disk for the definitive + ordering regardless.</p> + */ + private void fifoFind(MangledEntry anEntry, SearchVisitor aVisitor) + throws IOException { + + long mySearchStart = System.currentTimeMillis(); + + ArrayList myLocators = new ArrayList(); + + TupleLocator myLocator = theStore.find(anEntry); + + if (myLocator != null) + myLocators.add(myLocator); + + myLocator = theStore.findCached(anEntry); + + if (myLocator != null) + myLocators.add(myLocator); + + myLocator = theIndexer.find(anEntry); + + if (myLocator != null) + myLocators.add(myLocator); + + if (myLocators.size() == 0) + return; + + TupleLocator[] mySortedSources = new TupleLocator[myLocators.size()]; + + mySortedSources = (TupleLocator[]) myLocators.toArray(mySortedSources); + + myLocator = new SortingLocator(mySortedSources); + + offerAndReleaseLocator(myLocator, aVisitor, mySearchStart, + theTrackers[CACHED_TRACKER]); + } + + private void fastFind(MangledEntry anEntry, SearchVisitor aVisitor) + throws IOException { + + long mySearchStart = System.currentTimeMillis(); + + /* + Basic approach is to send the template to the CacheIndexer and + ask it to return suitable IDs which we will then pin and try. + Note we should set a flag which boycotts a load from disk so that + flushed or deleted entries are not loaded more than once. + + If we cannot satisfy the Visitor that way, we repeat the exercise + with storage which returns OID/byte[] pairs which we can then + pin. + + Storage and CacheIndexer are now free to plan searches as they + see fit based on template. + */ + + TupleLocator myLocator = theIndexer.find(anEntry); + + if (theLogger.isLoggable(Level.FINE)) + theLogger.log(Level.FINE, "Searching[cache]: " + myLocator); + + if (myLocator != null) { + if (offerAndReleaseLocator(myLocator, aVisitor, mySearchStart, + theTrackers[CACHED_TRACKER])) { + /* + System.err.println("Cache search time: " + + (System.currentTimeMillis() - + mySearchStart)); + */ + return; + } + } + + if (theLogger.isLoggable(Level.FINE)) + theLogger.log(Level.FINE, "Searching[StoreCache]: " + myLocator); + + myLocator = theStore.findCached(anEntry); + + if (myLocator != null) { + if (offerAndReleaseLocator(myLocator, aVisitor, mySearchStart, + theTrackers[DIRTY_TRACKER])) + return; + } + + myLocator = theStore.find(anEntry); + + if (myLocator == null) { + if (theLogger.isLoggable(Level.FINE)) + theLogger.log(Level.FINE, "Got no matches on disk"); + return; + } + + offerAndReleaseLocator(myLocator, aVisitor, mySearchStart, + theTrackers[STORAGE_TRACKER]); + } + + LongtermOffer getOffer(OID anOID) throws IOException { + + CacheBlockDescriptor myCBD = theStoreCache.find(anOID); + + if (myCBD != null) { + return new LongtermOfferImpl(myCBD); + } else + return null; + } + + boolean find(SearchVisitor aVisitor, OID aOID, MangledEntry aPreload) + throws IOException { + + CacheBlockDescriptor myCBD = theStoreCache.find(aOID); + + long myStartTime = System.currentTimeMillis(); + + boolean offered = false; + + if (myCBD != null) { + try { + EntrySleeveImpl mySleeve = + (EntrySleeveImpl) myCBD.getContent(); + + /* + If the JS specification is changed to cope with the issues + discussed in http://archives.java.sun.com/cgi-bin/wa?A2=ind0311&L=javaspaces-users&F=&S=&P=4599 and http://archives.java.sun.com/cgi-bin/wa?A2=ind0311&L=javaspaces-users&F=&S=&P=3590 then we need to do two things: + + (1) Allow the SearchVisitor to see Sleeve's even if they've + expired. + (2) Having "shown" it to the SearchVisitor we'd need to + query the ReapFilters and if they don't boycott, mark the + item deleted. + + These two steps have the effect of allowing a *ifExists to + conflict on lease-expired entries that have been locked + by a transaction and ensures we only delete such entries + when no transactions have posession of them anymore. Of + course, this is somewhat slower and less efficient as + there's never a circumstance under which we can be assured + that a SearchVisitor *never* sees a particular entry again. + + If we must implement the strategy of flunking a transaction + owing to a lock on a lease expired object, this would be + best dealt with by having TxnOp's check expiries at prepare + or commit time. However this is much less appealing as what + is basically a pessimistic transaction API becomes + optimistic in this case and only in this case. + + All this applies to the similar statement in offer() below. + */ + if (! mySleeve.getState().test(SleeveState.DELETED)) { + + // If it's expired, mark it deleted, subject to filters + // + if (mySleeve.hasExpired(myStartTime)) { + + if (! EntryRepositoryFactory.getReaper().filter(mySleeve)) { + mySleeve.getState().set(SleeveState.DELETED); + mySleeve.markDirty(); + + // Update stats + theCounters.didPurge(); + } + } else { + OpInfo myInfo = + new FindEntryOpInfo(theStore.getType(), + mySleeve.getOID(), + aVisitor.isDeleter()); + + SearchOffer myOffer; + + if (aPreload != null) { + myOffer = new SearchOfferImpl(aPreload, myInfo); + } else { + theLogger.log(Level.FINE, "NOT using preload"); + + myOffer = new SearchOfferImpl(mySleeve.getEntry(), + myInfo); + } + + aVisitor.offer(myOffer); + + offered = true; + } + } else { + theTrackers[CACHED_TRACKER].incDeld(); + } + } finally { + myCBD.release(); + } + } else { + theTrackers[CACHED_TRACKER].incMissed(); + } + + return offered; + } + + private boolean offerAndReleaseLocator(TupleLocator aLocator, + SearchVisitor aVisitor, + long aStartTime, + OfferTracker aTracker) + throws IOException { + + try { + return offer(aLocator, aVisitor, aStartTime, aTracker); + } finally { + aLocator.release(); + } + } + + /** + @return <code>true</code> means offering can stop for better or worse. + <code>false</code> indicates that the search should continue if there + are other sources of offers. + */ + private boolean offer(TupleLocator aLocator, SearchVisitor aVisitor, + long aStartTime, OfferTracker aTracker) + throws IOException { + + int myVisitorResponse = SearchVisitor.TRY_AGAIN; + + OID myId = null; + EntrySleeveImpl mySleeve = null; + String myType = theStore.getType(); + boolean isDeletion = aVisitor.isDeleter(); + + // long myStart = System.currentTimeMillis(); + + while (aLocator.fetchNext()) { + + myId = aLocator.getOID(); + + CacheBlockDescriptor myCBD = theStoreCache.find(myId); + + if (myCBD != null) { + mySleeve = (EntrySleeveImpl) myCBD.getContent(); + + if (! mySleeve.getState().test(SleeveState.DELETED)) { + + // If it's expired, mark it deleted, subject to filters + // + if (mySleeve.hasExpired(aStartTime)) { + if (! EntryRepositoryFactory.getReaper().filter(mySleeve)) { + mySleeve.getState().set(SleeveState.DELETED); + mySleeve.markDirty(); + + // Update stats + theCounters.didPurge(); + } + } else { + + OpInfo myInfo = + new FindEntryOpInfo(myType, mySleeve.getOID(), + isDeletion); + + SearchOfferImpl myOffer = + new SearchOfferImpl(mySleeve.getEntry(), myInfo); + + myVisitorResponse = + aVisitor.offer(myOffer); + } + } else { + aTracker.incDeld();; + } + + myCBD.release(); + + if ((myVisitorResponse == SearchVisitor.STOP) || + (myVisitorResponse == SearchVisitor.ACCEPTED)) { + break; + } + } else { + aTracker.incMissed(); + } + } + + /* + System.out.println("Time to offer: " + + (System.currentTimeMillis() - + myStart)); + */ + + if (myVisitorResponse != SearchVisitor.TRY_AGAIN) + return true; + else + return false; + } + + void deleteAll() throws IOException { + theCounters.destroy(); + + /* + Basic approach is to send the template to the CacheIndexer and + ask it to return suitable IDs which we will then pin and try. + Note we should set a flag which boycotts a load from disk so that + flushed or deleted entries are not loaded more than once. + + If we cannot satisfy the Visitor that way, we repeat the exercise + with storage which returns OID/byte[] pairs which we can then + pin. + + Storage and CacheIndexer are now free to plan searches as they + see fit based on template. + */ + + TupleLocator myLocator = theIndexer.find(MangledEntry.NULL_TEMPLATE); + + if (theLogger.isLoggable(Level.FINE)) + theLogger.log(Level.FINE, "Searching[cache]: " + myLocator); + + if (myLocator != null) { + delete(myLocator); + } + + if (theLogger.isLoggable(Level.FINE)) + theLogger.log(Level.FINE, "Searching[StoreCache]: " + myLocator); + + myLocator = theStore.findCached(MangledEntry.NULL_TEMPLATE); + + if (myLocator != null) { + delete(myLocator); + } + + myLocator = theStore.find(MangledEntry.NULL_TEMPLATE); + + if (myLocator == null) { + if (theLogger.isLoggable(Level.FINE)) + theLogger.log(Level.FINE, "Got no matches on disk"); + return; + } + + delete(myLocator); + } + + private void delete(TupleLocator aLocator) throws IOException { + try { + while (aLocator.fetchNext()) { + + OID myId = aLocator.getOID(); + + CacheBlockDescriptor myCBD = theStoreCache.find(myId); + + OpInfo myInfo = null; + + if (myCBD != null) { + EntrySleeveImpl mySleeve = + (EntrySleeveImpl) myCBD.getContent(); + + if (! mySleeve.getState().test(SleeveState.DELETED)) { + mySleeve.getState().set(SleeveState.DELETED); + mySleeve.markDirty(); + + myInfo = + new FindEntryOpInfo(theStore.getType(), + mySleeve.getOID(), + true); + } + + myCBD.release(); + + if (myInfo != null) { + try { + TxnManager.get().log(new ForcedCommit(myInfo)); + } catch (TransactionException aTE) { + IOException myIOE = new IOException("Eeek failed to delete Entry"); + myIOE.initCause(aTE); + throw myIOE; + } + } + } + } + } finally { + aLocator.release(); + } + } + + public String toString() { + return "SC: " + theStore.getType(); + } +}