comparison src/org/dancres/blitz/entry/KeyIndex.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:3dc0c5604566
1 package org.dancres.blitz.entry;
2
3 import java.io.IOException;
4 import java.io.Serializable;
5 import java.io.FileNotFoundException;
6
7 import java.util.logging.*;
8 import java.util.HashMap;
9
10 import com.sleepycat.je.Database;
11 import com.sleepycat.je.Cursor;
12 import com.sleepycat.je.DatabaseException;
13 import com.sleepycat.je.DatabaseEntry;
14 import com.sleepycat.je.Transaction;
15 import com.sleepycat.je.DatabaseConfig;
16 import com.sleepycat.je.LockMode;
17 import com.sleepycat.je.OperationStatus;
18 import com.sleepycat.je.DeadlockException;
19 import com.sleepycat.je.LockNotGrantedException;
20
21 import org.dancres.util.BytePacker;
22
23 import org.dancres.blitz.mangler.MangledEntry;
24 import org.dancres.blitz.mangler.MangledField;
25
26 import org.dancres.blitz.disk.DiskTxn;
27 import org.dancres.blitz.disk.Disk;
28 import org.dancres.blitz.disk.BackoffGenerator;
29
30 import org.dancres.blitz.oid.OID;
31 import org.dancres.blitz.oid.OIDFactory;
32
33 /**
34 This class serves as a key generator for a DB secondary index and knows how
35 to associate with the main db. <P>
36
37 Note that duplicate keys are allowed in these databases because various
38 key values can, of course, hash to the same value.
39 */
40 class KeyIndex implements Serializable {
41 private static Logger theLogger =
42 Logger.getLogger("org.dancres.disk.KeyIndex");
43
44 private String theType;
45 private String theIndexName;
46 private int theOffset;
47
48 private transient Database theSecondaryDb;
49
50 KeyIndex(String aType, String anIndexName, int anOffset) {
51 theType = aType;
52 theIndexName = anIndexName;
53 theOffset = anOffset;
54 }
55
56 public String getName() {
57 return theIndexName;
58 }
59
60 void init() throws IOException {
61 /*
62 Open db and create if necessary, invoke associate
63 ignore bucket 0 entities. Implies entry's in
64 main DB should store Object so they can store both
65 MangledEntry instances and arbitrary data.
66
67 Keep a note of Db instance as we'll need it to do close.
68 */
69 try {
70 DatabaseConfig myConfig = new DatabaseConfig();
71 myConfig.setAllowCreate(true);
72 myConfig.setSortedDuplicates(true);
73
74 // Allow duplicates
75 //
76 // theSecondaryDb.set_flags(Db.DB_DUP | Db.DB_REVSPLITOFF);
77
78 theSecondaryDb = Disk.newDb(null, theType + "_" + theIndexName,
79 myConfig);
80
81 } catch (DatabaseException aDbe) {
82 theLogger.log(Level.SEVERE, "Got Dbe", aDbe);
83 throw new IOException("Dbe");
84 }
85 }
86
87 void index(OID anId, MangledField[] aFields, DiskTxn aTxn)
88 throws DatabaseException {
89
90 MangledField myField = aFields[theOffset];
91
92 if (myField.isNull())
93 return;
94
95 // Safety check
96 if (myField.getName().equals(theIndexName)) {
97 byte[] myHash = new byte[4];
98 BytePacker myPacker = BytePacker.getMSBPacker(myHash);
99 myPacker.putInt(myField.hashCode(), 0);
100
101 byte[] myTarget = OIDFactory.getKey(anId);
102
103 theSecondaryDb.put(aTxn.getDbTxn(),
104 new DatabaseEntry(myHash),
105 new DatabaseEntry(myTarget));
106 } else {
107 throw new RuntimeException("Eeek, field we found isn't ours");
108 }
109 }
110
111 void unIndex(OID anId, MangledField[] aFields)
112 throws IOException {
113
114 MangledField myField = aFields[theOffset];
115
116 if (myField.isNull())
117 return;
118
119 // Pack the hash as key into secondary index
120 byte[] myHash = new byte[4];
121 BytePacker myPacker = BytePacker.getMSBPacker(myHash);
122 myPacker.putInt(myField.hashCode(), 0);
123 DatabaseEntry myHashKey = new DatabaseEntry(myHash);
124
125 // We'll need to compare the data of each index entry with the key
126 // of our target entry
127 byte[] myEntryKey = OIDFactory.getKey(anId);
128 DatabaseEntry myKeyValue = new DatabaseEntry(myEntryKey);
129 int myRetryCount = 0;
130
131 do {
132 DiskTxn myTxn = DiskTxn.newNonBlockingStandalone();
133 Cursor myCursor = newCursor(myTxn);
134
135 try {
136 // Locate all entries under the hashcode value of our field
137 OperationStatus myResult = myCursor.getSearchBoth(myHashKey,
138 myKeyValue,
139 null); // LockMode.RMW);
140
141 /*
142 If we crashed whilst doing a delete, some indexes may not have
143 an entry, that's okay we just note it in the log
144 */
145 if ((myResult.equals(OperationStatus.NOTFOUND)) ||
146 (myResult.equals(OperationStatus.KEYEMPTY)))
147 theLogger.log(Level.SEVERE,
148 "Warning, didn't find an index entry " + anId +
149 ", " + myField.hashCode() + ", " +
150 myResult.equals(OperationStatus.NOTFOUND));
151 else {
152 myCursor.delete();
153 }
154
155 myCursor.close();
156
157 myTxn.commit();
158
159 if (myRetryCount != 0) {
160 theLogger.log(Level.FINE,
161 "Total retries: " + myRetryCount);
162 }
163
164 return;
165 } catch (DatabaseException aDbe) {
166 /*
167 Argh, docs say it'll throw Deadlock but code says
168 LockNotGranted.....
169 */
170 if ((aDbe instanceof DeadlockException) ||
171 (aDbe instanceof LockNotGrantedException)) {
172
173 if (theLogger.isLoggable(Level.FINEST))
174 theLogger.log(Level.FINEST, "Got lock exception", aDbe);
175
176 try {
177 myCursor.close();
178 } catch (DatabaseException aCDbe) {
179 theLogger.log(Level.SEVERE, "Got Dbe", aCDbe);
180 throw new IOException("Dbe");
181 }
182 myTxn.abort();
183
184 // System.err.println("Aborting index delete, retry: " +
185 // myRetryCount);
186
187 ++myRetryCount;
188
189 BackoffGenerator.pause();
190
191 } else {
192 theLogger.log(Level.SEVERE, "Got Dbe", aDbe);
193 throw new IOException("Dbe");
194 }
195 }
196 } while (true);
197 }
198
199 private boolean compare(byte[] aBytes, byte[] anotherBytes) {
200 for (int i = 0; i < aBytes.length; i++) {
201 if (aBytes[i] != anotherBytes[i])
202 return false;
203 }
204
205 return true;
206 }
207
208 Cursor newCursor() throws IOException {
209 try {
210 return theSecondaryDb.openCursor(DiskTxn.getActiveDbTxn(), null);
211 } catch (DatabaseException aDbe) {
212 theLogger.log(Level.SEVERE, "Got Dbe", aDbe);
213 throw new IOException("Dbe");
214 }
215 }
216
217 Cursor newCursor(DiskTxn aTxn) throws IOException {
218 try {
219 if (aTxn != null)
220 return theSecondaryDb.openCursor(aTxn.getDbTxn(), null);
221 else
222 return theSecondaryDb.openCursor(null, null);
223 } catch (DatabaseException aDbe) {
224 theLogger.log(Level.SEVERE, "Got Dbe", aDbe);
225 throw new IOException("Dbe");
226 }
227 }
228
229 void close() throws IOException {
230 try {
231 theSecondaryDb.close();
232 } catch (DatabaseException aDbe) {
233 theLogger.log(Level.SEVERE, "Got Dbe", aDbe);
234 throw new IOException("Dbe");
235 }
236 }
237
238 void delete() throws IOException {
239 Disk.deleteDb(theType + "_" + theIndexName);
240 }
241 }