comparison src/org/dancres/blitz/entry/EntryStorage.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:3dc0c5604566
1 package org.dancres.blitz.entry;
2
3 import java.io.IOException;
4 import java.io.FileNotFoundException;
5
6 import java.util.logging.*;
7
8 import java.util.ArrayList;
9
10 import com.sleepycat.je.*;
11
12 import net.jini.config.ConfigurationException;
13
14 import org.dancres.blitz.mangler.MangledField;
15 import org.dancres.blitz.mangler.MangledEntry;
16
17 import org.dancres.util.BytePacker;
18
19 import org.dancres.blitz.Logging;
20
21 import org.dancres.util.ObjectTransformer;
22
23 import org.dancres.blitz.meta.RegistryFactory;
24 import org.dancres.blitz.meta.Registry;
25
26 import org.dancres.blitz.entry.ci.CacheIndexer;
27
28 import org.dancres.blitz.disk.DiskTxn;
29 import org.dancres.blitz.disk.Disk;
30 import org.dancres.blitz.disk.RetryingUpdate;
31 import org.dancres.blitz.disk.RetryableOperation;
32
33 import org.dancres.blitz.oid.Allocator;
34 import org.dancres.blitz.oid.AllocatorFactory;
35 import org.dancres.blitz.oid.OID;
36 import org.dancres.blitz.oid.OIDFactory;
37
38 import org.dancres.blitz.cache.Identifiable;
39 import org.dancres.blitz.cache.Identifier;
40
41 import org.dancres.blitz.config.Fifo;
42 import org.dancres.blitz.config.ReadAhead;
43 import org.dancres.blitz.config.EntryConstraints;
44
45 import org.dancres.blitz.stats.StatsBoard;
46 import org.dancres.blitz.stats.FieldsStat;
47
48 /**
49 <p>Used to maintain all Entry instances of a particular type but does not
50 include "java.lang.Object". "java.lang.Object" is considered the root
51 node for internal implementation purposes. Storage requirements associated
52 with this Entry type are handled by RootStorage.</p>
53
54 <p>MetaDB contains a list of known sub-types, a set of KeyIndex instances.
55 </p>
56
57 @see org.dancres.blitz.entry.RootStorage
58 */
59 class EntryStorage implements Storage, EntryEditor {
60 /*
61 Used in searching to indicate various results
62 */
63
64 /**
65 Indicates a particular search field was null and therefore wildcard
66 and thus doesn't filter matches
67 */
68 private static final int WAS_NULL = -1;
69
70 /**
71 Indicates a field was not wildcard and thus filters matches but
72 when queries produced no hits
73 */
74 private static final int NO_HITS = -2;
75
76 static Logger theLogger =
77 Logging.newLogger("org.dancres.blitz.disk.Storage");
78
79 private String theType;
80
81 /**
82 Duplicate keys not allowed in the main database
83 */
84 private Database theMainDb;
85
86 private Registry theMetaData;
87
88 private ArrayList theSubtypes = new ArrayList();
89 private String[] theCurrentSubtypes = new String[0];
90
91 private KeyIndex[] theIndexes;
92
93 private boolean noSchemaDefined = false;
94
95 private Allocator theAllocator;
96
97 private WriteScheduler theWriteScheduler;
98
99 private CacheIndexer theIndexer;
100
101 private LeaseTracker theTracker;
102
103 private EntryConstraints theConstraints;
104
105 EntryStorage(String aType) {
106 theType = aType;
107 }
108
109 public String getType() {
110 return theType;
111 }
112
113 public String getName() {
114 return theType;
115 }
116
117 /**
118 Used for debug purposes
119 */
120 public String toString() {
121 return theType;
122 }
123
124 public TupleLocator findCached(MangledEntry anEntry) {
125 return null;
126
127 // return theIndexer.find(anEntry);
128 }
129
130 public boolean init(boolean mustExist) throws IOException {
131 if (mustExist) {
132 if (! Disk.dbExists(theType)) {
133 return false;
134 }
135 }
136
137 try {
138 theConstraints = EntryConstraints.getConstraints(theType);
139 } catch (ConfigurationException aCE) {
140 IOException myIOE = new IOException("Failed to load constraints");
141 myIOE.initCause(aCE);
142 throw myIOE;
143 }
144
145
146 theWriteScheduler = new WriteScheduler((EntryEditor) this);
147
148 theIndexer =
149 CacheIndexer.newIndexer(theType, theConstraints);
150
151 try {
152 theMetaData = RegistryFactory.get(theType, null);
153
154 DatabaseConfig myConfig = new DatabaseConfig();
155 myConfig.setAllowCreate(true);
156
157 theMainDb = Disk.newDb(null, theType, myConfig);
158
159 /*
160 If there are no index elements, we've not been created
161 explicitly previously. We may have been implicitly created
162 whilst modelling inheritance hierarchy but that doesn't count
163 */
164 DiskTxn myTxn = DiskTxn.newStandalone();
165
166 try {
167 byte[] myIndexInfo =
168 theMetaData.getAccessor(myTxn).loadRaw(FixedOIDs.INDEXES_KEY);
169
170 if (myIndexInfo == null) {
171 noSchemaDefined = true;
172 } else {
173 loadIndexes(myIndexInfo);
174 }
175 } finally {
176 myTxn.commit();
177 }
178
179 myTxn = DiskTxn.newStandalone();
180
181 try {
182 byte[] mySubtypeInfo =
183 theMetaData.getAccessor(myTxn).loadRaw(FixedOIDs.SUBTYPES_KEY);
184
185 if (mySubtypeInfo != null) {
186 theSubtypes =(ArrayList)
187 ObjectTransformer.toObject(mySubtypeInfo);
188 String[] myTypes = new String[theSubtypes.size()];
189
190 theCurrentSubtypes =
191 (String[]) theSubtypes.toArray(myTypes);
192 }
193 } finally {
194 myTxn.commit();
195 }
196
197 } catch (FileNotFoundException aFNFE) {
198 theLogger.log(Level.SEVERE, "Couldn't open type db", aFNFE);
199 throw new IOException("Couldn't open type db");
200 } catch (DatabaseException aDbe) {
201 theLogger.log(Level.SEVERE, "Got Dbe", aDbe);
202 throw new IOException("Dbe");
203 }
204
205 // Initialize Allocator
206 //
207 if (theConstraints.get(Fifo.class) != null)
208 theAllocator = AllocatorFactory.get(theType, true);
209 else
210 theAllocator = AllocatorFactory.get(theType, false);
211
212 // Initialize LeaseTracker
213 theTracker =
214 LeaseTrackerFactory.getTracker(theType, theAllocator);
215
216 return true;
217 }
218
219 public int getNumEntries() throws IOException {
220 if (theMainDb == null)
221 return 0;
222
223 try {
224 StatsConfig myConfig = new StatsConfig();
225 myConfig.setFast(false);
226
227 BtreeStats myStats = (BtreeStats) theMainDb.getStats(myConfig);
228 return (int) myStats.getLeafNodeCount();
229 } catch (DatabaseException aDbe) {
230 theLogger.log(Level.SEVERE, "Couldn't read num entries from Db",
231 aDbe);
232 throw new IOException("Dbe");
233 }
234 }
235
236 public synchronized void addSubtype(String aType) throws IOException {
237 if (! theSubtypes.contains(aType)) {
238 theSubtypes.add(aType);
239 String[] myTypes = new String[theSubtypes.size()];
240 theCurrentSubtypes = (String[]) theSubtypes.toArray(myTypes);
241
242 theMetaData.getAccessor().save(FixedOIDs.SUBTYPES_KEY,
243 theSubtypes);
244 }
245 }
246
247 public synchronized String[] getSubtypes() {
248 return theCurrentSubtypes;
249 }
250
251 public synchronized void setFields(MangledField[] aSetOfFields)
252 throws IOException {
253
254 /*
255 It's possible that a couple of threads may have seen the undefined
256 schema state - in this case, they'll all rush to sort out the
257 schema - in these cases, we ignore those duplicate requests.
258 */
259 if (noSchemaDefined == false)
260 return;
261
262 ArrayList myFields = new ArrayList();
263
264 theIndexes = new KeyIndex[aSetOfFields.length];
265
266 // Fields are always in the same order
267 for (int i = 0; i < aSetOfFields.length; i++) {
268 myFields.add(aSetOfFields[i].getName());
269 KeyIndex myIndex = newIndex(aSetOfFields[i].getName(), i);
270 theIndexes[i] = myIndex;
271 }
272
273 theMetaData.getAccessor().save(FixedOIDs.INDEXES_KEY, theIndexes);
274
275 noSchemaDefined = false;
276
277 StatsBoard.get().add(new FieldsStat(theType, myFields));
278 }
279
280 public boolean noSchemaDefined() {
281 return noSchemaDefined;
282 }
283
284 public void close() throws IOException {
285 try {
286
287 theTracker.close();
288
289 if (theIndexes != null) {
290 for (int i = 0; i < theIndexes.length; i++) {
291 theIndexes[i].close();
292 }
293 }
294
295 theMainDb.close();
296
297 } catch (DatabaseException aDbe) {
298 theLogger.log(Level.SEVERE, "Couldn't close maindb", aDbe);
299 throw new IOException("Dbe");
300 }
301
302 theMetaData.close();
303 }
304
305 public void delete() throws IOException {
306 theTracker.delete();
307
308 RegistryFactory.delete(theType);
309
310 AllocatorFactory.delete(theType);
311
312 if (theIndexes != null) {
313 for (int i = 0; i < theIndexes.length; i++) {
314 theIndexes[i].delete();
315 }
316 }
317
318 Disk.deleteDb(theType);
319 }
320
321 public OID getNextId() throws IOException {
322 return theAllocator.getNextId();
323 }
324
325 public void bringOutTheDead(EntryReaper aReaper) throws IOException {
326 theTracker.bringOutTheDead(aReaper);
327 }
328
329 private KeyIndex newIndex(String aName, int anOffset) throws IOException {
330 KeyIndex myIndex = new KeyIndex(theType, aName, anOffset);
331 myIndex.init();
332
333 return myIndex;
334 }
335
336 private void loadIndexes(byte[] aSetOfIndexes)
337 throws IOException {
338
339 ArrayList myFields = new ArrayList();
340
341 theIndexes = (KeyIndex[]) ObjectTransformer.toObject(aSetOfIndexes);
342
343 for (int i = 0; i < theIndexes.length; i++) {
344 myFields.add(theIndexes[i].getName());
345 theIndexes[i].init();
346 }
347
348 StatsBoard.get().add(new FieldsStat(theType, myFields));
349 }
350
351 /* *******************************************************************
352 * BackingStore impl
353 * *******************************************************************/
354
355 public void save(Identifiable anIdentifiable) throws IOException {
356 // Not done here, delegated to the WriteScheduler for asynch write
357 //
358 theWriteScheduler.add((EntrySleeveImpl) anIdentifiable);
359 }
360
361 public Identifiable load(Identifier anId) throws IOException {
362 // Check there isn't a dirty copy in write cache before hitting disk
363 Identifiable myIdentifiable =
364 theWriteScheduler.dirtyRead((OID) anId);
365
366 if (myIdentifiable == null) {
367 byte[] myPackage = load((OID) anId);
368
369 if (myPackage != null) {
370 // System.out.println("Got from disk: " + anId);
371 return new EntrySleeveImpl(myPackage);
372 } else return null;
373 } else {
374 // System.out.println("Got from dirty: " + anId);
375 return myIdentifiable;
376 }
377 }
378
379 private byte[] load(final OID anId) throws IOException {
380 if (theLogger.isLoggable(Level.FINEST))
381 theLogger.log(Level.FINEST, "Sload: " + anId);
382
383 // System.err.println("Sload: " + anId);
384
385 final DatabaseEntry myKey =
386 new DatabaseEntry(OIDFactory.getKey(anId));
387
388 RetryableOperation myOp =
389 new RetryableOperation() {
390 public Object perform(DiskTxn aTxn) throws DatabaseException {
391 DatabaseEntry myData = new DatabaseEntry();
392
393 OperationStatus myStatus;
394
395 myStatus = theMainDb.get(aTxn.getDbTxn(), myKey, myData,
396 null);
397
398 if (! myStatus.equals(OperationStatus.NOTFOUND)) {
399 return myData.getData();
400 }
401
402 return null;
403 }
404
405 public String toString() {
406 return "Load: " + anId;
407 }
408 };
409
410 Object myResult = new RetryingUpdate(myOp).commit();
411
412 return (myResult == null) ? null : (byte[]) myResult;
413 }
414
415 public void delete(PersistentEntry anEntry) throws IOException {
416 final OID myId = anEntry.getOID();
417 MangledField[] myKeys = anEntry.getEntry().getFields();
418
419 if (theLogger.isLoggable(Level.FINEST))
420 theLogger.log(Level.FINEST, "Sdelete: " + myId);
421
422 // System.err.println("Sdelete: " + myId);
423
424 theTracker.delete(anEntry);
425
426 // Delete from indexes in reverse
427 for (int i = (theIndexes.length - 1); i >= 0; i--) {
428 theIndexes[i].unIndex(myId, myKeys);
429 }
430
431 RetryableOperation myOp =
432 new RetryableOperation() {
433 public Object perform(DiskTxn aTxn) throws DatabaseException {
434 OperationStatus myStatus =
435 theMainDb.delete(aTxn.getDbTxn(),
436 new DatabaseEntry(OIDFactory.getKey(myId)));
437
438 if (myStatus.equals(OperationStatus.NOTFOUND))
439 theLogger.log(Level.SEVERE,
440 "Warning failed to delete key from maindb");
441
442 return null;
443 }
444
445 public String toString() {
446 return "Delete: " + myId;
447 }
448 };
449
450 new RetryingUpdate(myOp).commit();
451 }
452
453 public void update(PersistentEntry anEntry) throws IOException {
454 final OID myId = anEntry.getOID();
455 final byte[] myPackage = anEntry.flatten();
456
457 if (theLogger.isLoggable(Level.FINEST))
458 theLogger.log(Level.FINEST, "Supdate: " + myId);
459
460 // System.err.println("Supdate: " + myId);
461
462 theTracker.update(anEntry);
463
464 RetryableOperation myOp =
465 new RetryableOperation() {
466 public Object perform(DiskTxn aTxn) throws DatabaseException {
467 theMainDb.put(aTxn.getDbTxn(),
468 new DatabaseEntry(OIDFactory.getKey(myId)),
469 new DatabaseEntry(myPackage));
470
471 return null;
472 }
473
474 public String toString() {
475 return "Update: " + myId;
476 }
477 };
478
479 new RetryingUpdate(myOp).commit();
480 }
481
482 public void write(PersistentEntry anEntry) throws IOException {
483 final OID myId = anEntry.getOID();
484 final MangledField[] myKeys = anEntry.getEntry().getFields();
485 final byte[] myPackage = anEntry.flatten();
486
487 if (theLogger.isLoggable(Level.FINEST))
488 theLogger.log(Level.FINEST, "Swrite: " + myId);
489
490 // System.err.println("Swrite: " + myId);
491
492 theTracker.write(anEntry);
493
494 RetryableOperation myOp =
495 new RetryableOperation() {
496 public Object perform(DiskTxn aTxn) throws DatabaseException {
497
498 /*
499 This must be an all or nothing operation - we cannot have
500 some indexes with an inserted entry and others without or
501 a missing main db entry.
502 */
503
504 // Index in reverse
505 for (int i = (theIndexes.length - 1); i >= 0; i--) {
506 theIndexes[i].index(myId, myKeys, aTxn);
507 }
508
509 theMainDb.put(aTxn.getDbTxn(),
510 new DatabaseEntry(OIDFactory.getKey(myId)),
511 new DatabaseEntry(myPackage));
512
513 return null;
514 }
515
516 public String toString() {
517 return "Write: " + myId;
518 }
519 };
520
521 new RetryingUpdate(myOp).commit();
522 }
523
524 /* *********************************************************************
525 Search code starts here
526 *********************************************************************/
527
528 public TupleLocator find(MangledEntry anEntry) throws IOException {
529 if (noSchemaDefined())
530 return null;
531
532 theLogger.log(Level.FINEST, "find");
533
534 /*
535 For a full maindb search we pass empty key and data with the
536 DBNEXT flag.
537
538 For a secondary index search we pass a specified key and empty data
539 with DBSET.
540 */
541 try {
542 if ((anEntry == null) || (anEntry.isWildcard())) {
543 theLogger.log(Level.FINEST, "wildcard");
544
545 Cursor myCursor =
546 theMainDb.openCursor(null, null);
547
548 OperationStatus myStatus =
549 myCursor.getNext(new DatabaseEntry(),
550 new DatabaseEntry(), null);
551
552 if (myStatus.equals(OperationStatus.NOTFOUND)) {
553 myCursor.close();
554 return null;
555 } else {
556 ReadAhead myRead =
557 (ReadAhead) theConstraints.get(ReadAhead.class);
558
559 return new PrimaryLocatorImpl(myCursor, anEntry,
560 myRead.getSize());
561 }
562 } else {
563 theLogger.log(Level.FINEST, "index");
564
565 // Locate the smallest search set
566 Cursor myCursor = null;
567 MangledField[] myFields = anEntry.getFields();
568 byte[] myPackedKey = new byte[4];
569 BytePacker myPacker = BytePacker.getMSBPacker(myPackedKey);
570
571 int[] mySizes = new int[myFields.length];
572
573 if ((anEntry.getType().equals(theType)) &&
574 (myFields.length != theIndexes.length))
575 theLogger.log(Level.WARNING, "Possible schema change detected - matching may fail" + theType);
576
577 // Do the analysis in reverse
578 for (int i = (myFields.length - 1); i >= 0; i--) {
579 MangledField myField = myFields[i];
580
581 if (myField.isNull())
582 mySizes[i] = WAS_NULL;
583 else {
584 theLogger.log(Level.FINEST, "scanning: " +
585 myField.getName());
586
587 int myHashCode = myField.hashCode();
588 myPacker.putInt(myHashCode, 0);
589
590 DiskTxn myStandalone = DiskTxn.newStandalone();
591
592 myCursor = theIndexes[i].newCursor(null);
593
594 OperationStatus myStatus =
595 myCursor.getSearchKey(new DatabaseEntry(myPackedKey), new DatabaseEntry(), null);
596
597 // If we get results back we can do a compare
598 if (! myStatus.equals(OperationStatus.NOTFOUND)) {
599
600 theLogger.log(Level.FINEST, "Got: " +
601 myCursor.count() + " entries");
602
603 mySizes[i] = myCursor.count();
604 } else {
605 theLogger.log(Level.FINEST, "Got 0 entries");
606 mySizes[i] = NO_HITS;
607 }
608
609 myCursor.close();
610 myStandalone.commit();
611 }
612 }
613
614 int mySmallestSize = Integer.MAX_VALUE;
615 int myChoice = -1;
616
617 for (int i = 0; i < mySizes.length; i++) {
618 /*
619 If the field contributed no filtering because it was
620 wildcard, ignore it. Note that _at least one_ of the
621 fields must be non-null because we aren't wildcard
622 matching (caught and handled above)
623 */
624 if (mySizes[i] != WAS_NULL) {
625 /*
626 If any contributing index returned no hits, we know
627 there is going to be no match so we can stop right
628 now
629 */
630 if (mySizes[i] == NO_HITS) {
631 theLogger.log(Level.FINEST, "Aborting search, one field didn't match");
632 return null;
633 } else if (mySizes[i] < mySmallestSize) {
634 /*
635 The index under consideration produced fewer
636 matches than our previous choice
637 */
638 myChoice = i;
639 mySmallestSize = mySizes[i];
640 }
641 }
642 }
643
644 /*
645 If an Entry has all null fields, it's a wildcard which is
646 handled above. If the Entry is not wildcard and any one
647 of the indexes yielded no hits, we will have already
648 exited above.
649
650 Thus we are left with whichever index yielded the lowest
651 number of hits. So we now load up that index and return
652 it.
653 */
654 theLogger.log(Level.FINEST, "Searching: " +
655 mySmallestSize + " from " +
656 theIndexes[myChoice].getName());
657
658 myPacker.putInt(myFields[myChoice].hashCode(), 0);
659
660 /*
661 Now we've decided on the cursor we wish to use, we can
662 open it in the main txn.
663 */
664 myCursor = theIndexes[myChoice].newCursor(null);
665
666 DatabaseEntry myHashKey = new DatabaseEntry(myPackedKey);
667
668 OperationStatus myStatus =
669 myCursor.getSearchKey(myHashKey, new DatabaseEntry(),
670 null);
671
672 if (! myStatus.equals(OperationStatus.NOTFOUND)) {
673
674 theLogger.log(Level.FINEST, "Got: " +
675 myCursor.count() + " entries");
676
677 ReadAhead myRead =
678 (ReadAhead) theConstraints.get(ReadAhead.class);
679
680 return new IndexLocatorImpl(myCursor, myHashKey,
681 anEntry, myRead.getSize());
682 } else {
683 myCursor.close();
684 return null;
685 }
686 }
687 } catch (DatabaseException aDbe) {
688 theLogger.log(Level.SEVERE, "Got Dbe", aDbe);
689 throw new IOException("Dbe");
690 }
691 }
692 }