comparison src/org/dancres/blitz/entry/LeaseTrackerImpl.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:3dc0c5604566
1 package org.dancres.blitz.entry;
2
3 import java.io.IOException;
4
5 import java.util.logging.Level;
6
7 import com.sleepycat.je.Database;
8 import com.sleepycat.je.Cursor;
9 import com.sleepycat.je.DatabaseEntry;
10 import com.sleepycat.je.DatabaseException;
11 import com.sleepycat.je.DatabaseConfig;
12 import com.sleepycat.je.OperationStatus;
13 import com.sleepycat.je.LockMode;
14
15 import org.dancres.blitz.disk.Disk;
16 import org.dancres.blitz.disk.DiskTxn;
17
18 import org.dancres.blitz.lease.ReapFilter;
19
20 /**
21 Database layout:
22
23 <ul>
24 <li>Sleeve expiries are divided into buckets according to the allocator
25 id of their EntryUID.</li>
26 <li>Each bucket contains entries consisting of the expiry(long) followed by
27 oid(long) making a total of 16 bytes.</li>
28 <li>Each bucket's entries are sorted in ascending order by expiry.</li>
29 <ul>
30
31 <p> Typical page size is 4096 bytes / 16 = 256 entries per page.
32 Thus, if we configure sufficient allocators (good for concurrency anyway)
33 we can ensure we get efficient grouping of entries against pages which
34 allows.for good cache behaviour.</p>
35 */
36 class LeaseTrackerImpl implements LeaseTracker {
37 private Database theLeasesDb;
38 private String theType;
39
40 private int theMaxAllocId;
41
42 LeaseTrackerImpl(String aType, int aMaxAllocId) throws IOException {
43 EntryStorage.theLogger.log(Level.FINE, "Tracker created: " + aType);
44
45 theType = aType;
46 theMaxAllocId = aMaxAllocId;
47
48 try {
49 DatabaseConfig myConfig = new DatabaseConfig();
50 myConfig.setAllowCreate(true);
51 myConfig.setSortedDuplicates(true);
52
53 theLeasesDb = Disk.newDb(null, theType + "_leases", myConfig);
54
55 } catch (DatabaseException aDbe) {
56 EntryStorage.theLogger.log(Level.SEVERE,
57 "Tracker failed to init db",
58 aDbe);
59 throw new IOException();
60 }
61 }
62
63 public void bringOutTheDead(EntryReaper aReaper) throws IOException {
64 /*
65 Read blocks from disk recovering all necessary entries which
66 are expired at this point in time. Then we can release the
67 txn and schedule up some deletes in cache via the ReapFilter.
68 Repeat for each bucket - we don't even need to expire all dead items
69 in a block...
70 */
71
72 EntryStorage.theLogger.log(Level.INFO,
73 "LeaseTracker bringing out the dead for: " +
74 theType);
75
76 // System.err.println("LeaseTracker bringing out the dead for: " +
77 // theType);
78 DiskTxn myTxn = null;
79
80 myTxn = DiskTxn.newStandalone();
81
82 try {
83 Cursor myCursor = theLeasesDb.openCursor(null, null);
84
85 aReaper.clean(new ExpiredLocatorImpl(myCursor));
86
87 myCursor.close();
88 } catch (Throwable aT) {
89 EntryStorage.theLogger.log(Level.SEVERE,
90 "Exception during reap",
91 aT);
92 throw new IOException("Fatal error in reap");
93 } finally {
94 myTxn.commit();
95 }
96
97 EntryStorage.theLogger.log(Level.INFO,
98 "LeaseTracker claimed the dead for: " +
99 theType);
100 // System.err.println("LeaseTracker claimed the dead for: " +
101 // theType);
102 }
103
104 public void delete(PersistentEntry anEntry) throws IOException {
105 byte[] myOid = LeaseRecordUtils.getId(anEntry);
106 DiskTxn myTxn = DiskTxn.newStandalone();
107
108 // System.err.println("Delete a lease");
109
110 try {
111 DatabaseEntry myKey =
112 new DatabaseEntry(LeaseRecordUtils.getBucketKey(anEntry));
113 DatabaseEntry myValue = new DatabaseEntry();
114
115 Cursor myCursor = theLeasesDb.openCursor(myTxn.getDbTxn(), null);
116 OperationStatus myStatus =
117 myCursor.getSearchKey(myKey, myValue, LockMode.RMW);
118
119 while ((! myStatus.equals(OperationStatus.NOTFOUND)) &&
120 (! LeaseRecordUtils.isKey(myValue.getData(), myOid))) {
121 // System.err.println("Skipping for del");
122
123 myValue = new DatabaseEntry();
124 myStatus = myCursor.getNextDup(myKey, myValue, LockMode.RMW);
125 }
126
127 if (! myStatus.equals(OperationStatus.NOTFOUND)) {
128 // System.err.println("Lease killed");
129 myCursor.delete();
130 }
131
132 myCursor.close();
133 } catch (DatabaseException aDbe) {
134 EntryStorage.theLogger.log(Level.SEVERE, "Failed to delete entry",
135 aDbe);
136 throw new IOException();
137 } finally {
138 myTxn.commit();
139 }
140 }
141
142 public void update(PersistentEntry anEntry) throws IOException {
143 // System.err.println("Update a lease");
144 delete(anEntry);
145 write(anEntry);
146 }
147
148 public void write(PersistentEntry anEntry) throws IOException {
149 // System.err.println("Write a lease");
150 byte[] myBucketKey = LeaseRecordUtils.getBucketKey(anEntry);
151 byte[] myLeaseEntry = LeaseRecordUtils.getLeaseEntry(anEntry);
152
153 DiskTxn myTxn = DiskTxn.newStandalone();
154
155 try {
156 theLeasesDb.put(myTxn.getDbTxn(), new DatabaseEntry(myBucketKey),
157 new DatabaseEntry(myLeaseEntry));
158 } catch (DatabaseException aDbe) {
159 EntryStorage.theLogger.log(Level.SEVERE,
160 "Failed to insert new entry",
161 aDbe);
162 throw new IOException();
163 } finally {
164 myTxn.commit();
165 }
166 }
167
168 public void close() throws IOException {
169 try {
170 theLeasesDb.close();
171 } catch (DatabaseException aDbe) {
172 EntryStorage.theLogger.log(Level.SEVERE,
173 "Failed to close lease db",
174 aDbe);
175 throw new IOException();
176 }
177 }
178
179 public void delete() throws IOException {
180 Disk.deleteDb(theType + "_leases");
181 }
182 }