Mercurial > hg > blitz_stable
diff src/org/dancres/blitz/entry/LeaseTrackerImpl.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/org/dancres/blitz/entry/LeaseTrackerImpl.java Sat Mar 21 11:00:06 2009 +0000 @@ -0,0 +1,182 @@ +package org.dancres.blitz.entry; + +import java.io.IOException; + +import java.util.logging.Level; + +import com.sleepycat.je.Database; +import com.sleepycat.je.Cursor; +import com.sleepycat.je.DatabaseEntry; +import com.sleepycat.je.DatabaseException; +import com.sleepycat.je.DatabaseConfig; +import com.sleepycat.je.OperationStatus; +import com.sleepycat.je.LockMode; + +import org.dancres.blitz.disk.Disk; +import org.dancres.blitz.disk.DiskTxn; + +import org.dancres.blitz.lease.ReapFilter; + +/** + Database layout: + + <ul> + <li>Sleeve expiries are divided into buckets according to the allocator + id of their EntryUID.</li> + <li>Each bucket contains entries consisting of the expiry(long) followed by + oid(long) making a total of 16 bytes.</li> + <li>Each bucket's entries are sorted in ascending order by expiry.</li> + <ul> + + <p> Typical page size is 4096 bytes / 16 = 256 entries per page. + Thus, if we configure sufficient allocators (good for concurrency anyway) + we can ensure we get efficient grouping of entries against pages which + allows.for good cache behaviour.</p> +*/ +class LeaseTrackerImpl implements LeaseTracker { + private Database theLeasesDb; + private String theType; + + private int theMaxAllocId; + + LeaseTrackerImpl(String aType, int aMaxAllocId) throws IOException { + EntryStorage.theLogger.log(Level.FINE, "Tracker created: " + aType); + + theType = aType; + theMaxAllocId = aMaxAllocId; + + try { + DatabaseConfig myConfig = new DatabaseConfig(); + myConfig.setAllowCreate(true); + myConfig.setSortedDuplicates(true); + + theLeasesDb = Disk.newDb(null, theType + "_leases", myConfig); + + } catch (DatabaseException aDbe) { + EntryStorage.theLogger.log(Level.SEVERE, + "Tracker failed to init db", + aDbe); + throw new IOException(); + } + } + + public void bringOutTheDead(EntryReaper aReaper) throws IOException { + /* + Read blocks from disk recovering all necessary entries which + are expired at this point in time. Then we can release the + txn and schedule up some deletes in cache via the ReapFilter. + Repeat for each bucket - we don't even need to expire all dead items + in a block... + */ + + EntryStorage.theLogger.log(Level.INFO, + "LeaseTracker bringing out the dead for: " + + theType); + + // System.err.println("LeaseTracker bringing out the dead for: " + + // theType); + DiskTxn myTxn = null; + + myTxn = DiskTxn.newStandalone(); + + try { + Cursor myCursor = theLeasesDb.openCursor(null, null); + + aReaper.clean(new ExpiredLocatorImpl(myCursor)); + + myCursor.close(); + } catch (Throwable aT) { + EntryStorage.theLogger.log(Level.SEVERE, + "Exception during reap", + aT); + throw new IOException("Fatal error in reap"); + } finally { + myTxn.commit(); + } + + EntryStorage.theLogger.log(Level.INFO, + "LeaseTracker claimed the dead for: " + + theType); + // System.err.println("LeaseTracker claimed the dead for: " + + // theType); + } + + public void delete(PersistentEntry anEntry) throws IOException { + byte[] myOid = LeaseRecordUtils.getId(anEntry); + DiskTxn myTxn = DiskTxn.newStandalone(); + + // System.err.println("Delete a lease"); + + try { + DatabaseEntry myKey = + new DatabaseEntry(LeaseRecordUtils.getBucketKey(anEntry)); + DatabaseEntry myValue = new DatabaseEntry(); + + Cursor myCursor = theLeasesDb.openCursor(myTxn.getDbTxn(), null); + OperationStatus myStatus = + myCursor.getSearchKey(myKey, myValue, LockMode.RMW); + + while ((! myStatus.equals(OperationStatus.NOTFOUND)) && + (! LeaseRecordUtils.isKey(myValue.getData(), myOid))) { + // System.err.println("Skipping for del"); + + myValue = new DatabaseEntry(); + myStatus = myCursor.getNextDup(myKey, myValue, LockMode.RMW); + } + + if (! myStatus.equals(OperationStatus.NOTFOUND)) { + // System.err.println("Lease killed"); + myCursor.delete(); + } + + myCursor.close(); + } catch (DatabaseException aDbe) { + EntryStorage.theLogger.log(Level.SEVERE, "Failed to delete entry", + aDbe); + throw new IOException(); + } finally { + myTxn.commit(); + } + } + + public void update(PersistentEntry anEntry) throws IOException { + // System.err.println("Update a lease"); + delete(anEntry); + write(anEntry); + } + + public void write(PersistentEntry anEntry) throws IOException { + // System.err.println("Write a lease"); + byte[] myBucketKey = LeaseRecordUtils.getBucketKey(anEntry); + byte[] myLeaseEntry = LeaseRecordUtils.getLeaseEntry(anEntry); + + DiskTxn myTxn = DiskTxn.newStandalone(); + + try { + theLeasesDb.put(myTxn.getDbTxn(), new DatabaseEntry(myBucketKey), + new DatabaseEntry(myLeaseEntry)); + } catch (DatabaseException aDbe) { + EntryStorage.theLogger.log(Level.SEVERE, + "Failed to insert new entry", + aDbe); + throw new IOException(); + } finally { + myTxn.commit(); + } + } + + public void close() throws IOException { + try { + theLeasesDb.close(); + } catch (DatabaseException aDbe) { + EntryStorage.theLogger.log(Level.SEVERE, + "Failed to close lease db", + aDbe); + throw new IOException(); + } + } + + public void delete() throws IOException { + Disk.deleteDb(theType + "_leases"); + } +}