comparison src/org/dancres/blitz/entry/SleeveCache.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children 46ac1a45718a
comparison
equal deleted inserted replaced
-1:000000000000 0:3dc0c5604566
1 package org.dancres.blitz.entry;
2
3 import java.io.IOException;
4
5 import java.util.ArrayList;
6
7 import java.util.logging.*;
8
9 import net.jini.core.transaction.TransactionException;
10 import net.jini.config.ConfigurationException;
11
12 import org.dancres.blitz.mangler.MangledEntry;
13
14 import org.dancres.blitz.Logging;
15 import org.dancres.blitz.stats.StatGenerator;
16 import org.dancres.blitz.stats.Stat;
17 import org.dancres.blitz.stats.SearchStat;
18 import org.dancres.blitz.stats.StatsBoard;
19
20 import org.dancres.blitz.oid.OID;
21
22 import org.dancres.blitz.arc.ArcCache;
23 import org.dancres.blitz.arc.CacheBlockDescriptor;
24 import org.dancres.blitz.arc.RecoverySummary;
25
26 import org.dancres.blitz.txn.TxnManager;
27
28 import org.dancres.blitz.entry.ci.CacheIndexer;
29
30 import org.dancres.blitz.config.CacheSize;
31 import org.dancres.blitz.config.Fifo;
32 import org.dancres.blitz.config.EntryConstraints;
33
34 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicLong;
35
36 /**
37 The organization of the space implementation can be viewed as being
38 similar to a memory hierarchy with EntryReposImpl/SpaceImpl being where
39 central processing happens. SleeveCache can be viewed as level 1 cache
40 whilst storage is main memory. Thus all "traffic" goes through SleeveCache
41 which interacts appropriately with Storage when it cannot satisfy the
42 demand itself. <P>
43
44 Is responsible for indexing and caching of unpacked EntrySleeveImpls.
45 Implemented using an ArcCache.<P>
46
47 @see org.dancres.blitz.arc.ArcCache
48
49 @todo One way to split caches down further in the face of concurrency
50 demands would be to start maintaining multiple caches scoped or hashed on
51 the zone id or even the uid itself.
52 */
53 class SleeveCache implements StatGenerator {
54 static Logger theLogger =
55 Logging.newLogger("org.dancres.blitz.disk.SleeveCache");
56
57 private ArcCache theStoreCache;
58
59 private Storage theStore;
60
61 private CountersImpl theCounters;
62
63 private EntryConstraints theConstraints;
64
65 private CacheIndexer theIndexer;
66
67 private long theId = StatGenerator.UNSET_ID;
68
69 private static class OfferTracker {
70 private String theType;
71 private AtomicLong theMissed = new AtomicLong();
72 private AtomicLong theDeld = new AtomicLong();
73
74 OfferTracker(String aType) {
75 theType = aType;
76 }
77
78 void incMissed() {
79 theMissed.incrementAndGet();
80 }
81
82 void incDeld() {
83 theDeld.incrementAndGet();
84 }
85
86 public String getTitle() {
87 return theType;
88 }
89
90 public long getMisses() {
91 return theMissed.get();
92 }
93
94 public long getDeld() {
95 return theDeld.get();
96 }
97 }
98
99 private static final int CACHED_TRACKER = 0;
100 private static final int DIRTY_TRACKER = 1;
101 private static final int STORAGE_TRACKER = 2;
102
103 private OfferTracker[] theTrackers = new OfferTracker[] {
104 new OfferTracker("Cached"), new OfferTracker("Dirty"),
105 new OfferTracker("Storage")
106 };
107
108 SleeveCache(Storage aStore) throws IOException {
109 theStore = aStore;
110
111 try {
112 theConstraints =
113 EntryConstraints.getConstraints(theStore.getType());
114 } catch (ConfigurationException aCE) {
115 theLogger.log(Level.SEVERE,
116 "Couldn't load constraints for type " +
117 theStore.getType(), aCE);
118 IOException myIOE =
119 new IOException("Couldn't load constraints for type " +
120 theStore.getType());
121 myIOE.initCause(aCE);
122 throw myIOE;
123 }
124
125 CacheSize myCacheSize = (CacheSize) theConstraints.get(CacheSize.class);
126
127 theLogger.log(Level.INFO, aStore.getType() + " cache size = "
128 + myCacheSize.getSize());
129
130 theStoreCache = new ArcCache(aStore, myCacheSize.getSize());
131
132 theIndexer = CacheIndexer.getIndexer(theStore.getType());
133
134 theStoreCache.add(new CacheListenerImpl(theIndexer));
135
136 try {
137 theCounters = new CountersImpl(theStore.getType(),
138 theStore.getNumEntries());
139 } catch (IOException anIOE) {
140 theLogger.log(Level.SEVERE,
141 "Couldn't read instance count from storage: " +
142 theStore.getType() +
143 " statistics will be inaccurate", anIOE);
144
145 theCounters = new CountersImpl(theStore.getType(), 0);
146 }
147
148 StatsBoard.get().add(this);
149 }
150
151 public void setId(long anId) {
152 theId = anId;
153 }
154
155 public long getId() {
156 return theId;
157 }
158
159 public Stat generate() {
160 String[] myTitles = new String[theTrackers.length];
161 long[] myMisses = new long[theTrackers.length];
162 long[] myDeld = new long[theTrackers.length];
163
164 for (int i = 0; i < theTrackers.length; i++) {
165 myTitles[i] = theTrackers[i].getTitle();
166 myMisses[i] = theTrackers[i].getMisses();
167 myDeld[i] = theTrackers[i].getDeld();
168 }
169
170 return new SearchStat(theId, theStore.getType(), myTitles,
171 myMisses, myDeld);
172 }
173
174 void forceSync(CacheBlockDescriptor aCBD) throws IOException {
175 theStoreCache.forceSync(aCBD);
176 }
177
178 CacheBlockDescriptor load(OID aOID) throws IOException {
179 return theStoreCache.find(aOID);
180 }
181
182 CacheBlockDescriptor add(EntrySleeveImpl aSleeve) throws IOException {
183 return theStoreCache.insert(aSleeve);
184 }
185
186 RecoverySummary recover(EntrySleeveImpl aSleeve)
187 throws IOException {
188 return theStoreCache.recover(aSleeve);
189 }
190
191 boolean renew(OID aOID, long anExpiry) throws IOException {
192 CacheBlockDescriptor myCBD = theStoreCache.find(aOID);
193
194 if (myCBD == null) {
195 return false;
196 }
197
198 EntrySleeveImpl mySleeve = (EntrySleeveImpl) myCBD.getContent();
199
200 if (!mySleeve.hasExpired(System.currentTimeMillis()) &&
201 !mySleeve.getState().test(SleeveState.DELETED)) {
202
203 if (anExpiry == 0) {
204 // Don't reset expiry - it's useful to storage to know
205 // what the lease was last
206 mySleeve.getState().set(SleeveState.DELETED);
207
208 // Update stats
209 theCounters.didPurge();
210 } else {
211 mySleeve.setExpiry(anExpiry);
212 }
213
214 mySleeve.markDirty();
215 myCBD.release();
216
217 return true;
218 }
219
220 myCBD.release();
221
222 return false;
223 }
224
225 boolean cancel(OID aOID) throws IOException {
226 return renew(aOID, 0);
227 }
228
229 void sync() throws IOException {
230 theStoreCache.sync();
231 }
232
233 Counters getCounters() {
234 return theCounters;
235 }
236
237 void close() {
238 // Nothing to do here - we could destroy the counters but we put that
239 // in deleteAll which makes more sense as close doesn't mean deletion
240 // theStoreCache.dump();
241 }
242
243 void write(MangledEntry anEntry, long anExpiry, WriteEscort anEscort)
244 throws IOException {
245
246
247 OID myID = theStore.getNextId();
248 EntrySleeveImpl mySleeve = null;
249
250 if (theLogger.isLoggable(Level.FINE))
251 theLogger.log(Level.FINE, "Written: " + myID + ", " +
252 anExpiry + ", " +
253 (anExpiry - System.currentTimeMillis()));
254
255 mySleeve = new EntrySleeveImpl(myID, anEntry, anExpiry);
256
257 // Ready to write but tell the escort first
258 OpInfo myInfo = new WriteEntryOpInfo(mySleeve);
259
260 if (!anEscort.writing(myInfo))
261 return;
262
263 // Now make it visible
264 CacheBlockDescriptor myCBD = theStoreCache.insert(mySleeve);
265 myCBD.release();
266
267 if (theLogger.isLoggable(Level.FINE))
268 theLogger.log(Level.FINE, "Unwritten: " + mySleeve.getOID());
269 }
270
271 void find(MangledEntry anEntry, SearchVisitor aVisitor)
272 throws IOException {
273
274 if (theConstraints.get(Fifo.class) != null)
275 fifoFind(anEntry, aVisitor);
276 else
277 fastFind(anEntry, aVisitor);
278 }
279
280 /**
281 <p>If we're in FIFO mode, we know that each TupleLocator is sorted into
282 FIFO order. Thus we can merge across all locators on the fly
283 and obtain a fully ordered, non-duplicate set of tuple id's for
284 matching using a simply algorithm.</p>
285
286 <p>By sorting across all locators and applying global FIFO order
287 we can maximize use of the cache and avoid excessive disk hits if
288 we have sufficient of the FIFO ordering in cache. Thus we get
289 graceful degradation rather than consulting disk for the definitive
290 ordering regardless.</p>
291 */
292 private void fifoFind(MangledEntry anEntry, SearchVisitor aVisitor)
293 throws IOException {
294
295 long mySearchStart = System.currentTimeMillis();
296
297 ArrayList myLocators = new ArrayList();
298
299 TupleLocator myLocator = theStore.find(anEntry);
300
301 if (myLocator != null)
302 myLocators.add(myLocator);
303
304 myLocator = theStore.findCached(anEntry);
305
306 if (myLocator != null)
307 myLocators.add(myLocator);
308
309 myLocator = theIndexer.find(anEntry);
310
311 if (myLocator != null)
312 myLocators.add(myLocator);
313
314 if (myLocators.size() == 0)
315 return;
316
317 TupleLocator[] mySortedSources = new TupleLocator[myLocators.size()];
318
319 mySortedSources = (TupleLocator[]) myLocators.toArray(mySortedSources);
320
321 myLocator = new SortingLocator(mySortedSources);
322
323 offerAndReleaseLocator(myLocator, aVisitor, mySearchStart,
324 theTrackers[CACHED_TRACKER]);
325 }
326
327 private void fastFind(MangledEntry anEntry, SearchVisitor aVisitor)
328 throws IOException {
329
330 long mySearchStart = System.currentTimeMillis();
331
332 /*
333 Basic approach is to send the template to the CacheIndexer and
334 ask it to return suitable IDs which we will then pin and try.
335 Note we should set a flag which boycotts a load from disk so that
336 flushed or deleted entries are not loaded more than once.
337
338 If we cannot satisfy the Visitor that way, we repeat the exercise
339 with storage which returns OID/byte[] pairs which we can then
340 pin.
341
342 Storage and CacheIndexer are now free to plan searches as they
343 see fit based on template.
344 */
345
346 TupleLocator myLocator = theIndexer.find(anEntry);
347
348 if (theLogger.isLoggable(Level.FINE))
349 theLogger.log(Level.FINE, "Searching[cache]: " + myLocator);
350
351 if (myLocator != null) {
352 if (offerAndReleaseLocator(myLocator, aVisitor, mySearchStart,
353 theTrackers[CACHED_TRACKER])) {
354 /*
355 System.err.println("Cache search time: " +
356 (System.currentTimeMillis() -
357 mySearchStart));
358 */
359 return;
360 }
361 }
362
363 if (theLogger.isLoggable(Level.FINE))
364 theLogger.log(Level.FINE, "Searching[StoreCache]: " + myLocator);
365
366 myLocator = theStore.findCached(anEntry);
367
368 if (myLocator != null) {
369 if (offerAndReleaseLocator(myLocator, aVisitor, mySearchStart,
370 theTrackers[DIRTY_TRACKER]))
371 return;
372 }
373
374 myLocator = theStore.find(anEntry);
375
376 if (myLocator == null) {
377 if (theLogger.isLoggable(Level.FINE))
378 theLogger.log(Level.FINE, "Got no matches on disk");
379 return;
380 }
381
382 offerAndReleaseLocator(myLocator, aVisitor, mySearchStart,
383 theTrackers[STORAGE_TRACKER]);
384 }
385
386 LongtermOffer getOffer(OID anOID) throws IOException {
387
388 CacheBlockDescriptor myCBD = theStoreCache.find(anOID);
389
390 if (myCBD != null) {
391 return new LongtermOfferImpl(myCBD);
392 } else
393 return null;
394 }
395
396 boolean find(SearchVisitor aVisitor, OID aOID, MangledEntry aPreload)
397 throws IOException {
398
399 CacheBlockDescriptor myCBD = theStoreCache.find(aOID);
400
401 long myStartTime = System.currentTimeMillis();
402
403 boolean offered = false;
404
405 if (myCBD != null) {
406 try {
407 EntrySleeveImpl mySleeve =
408 (EntrySleeveImpl) myCBD.getContent();
409
410 /*
411 If the JS specification is changed to cope with the issues
412 discussed in http://archives.java.sun.com/cgi-bin/wa?A2=ind0311&L=javaspaces-users&F=&S=&P=4599 and http://archives.java.sun.com/cgi-bin/wa?A2=ind0311&L=javaspaces-users&F=&S=&P=3590 then we need to do two things:
413
414 (1) Allow the SearchVisitor to see Sleeve's even if they've
415 expired.
416 (2) Having "shown" it to the SearchVisitor we'd need to
417 query the ReapFilters and if they don't boycott, mark the
418 item deleted.
419
420 These two steps have the effect of allowing a *ifExists to
421 conflict on lease-expired entries that have been locked
422 by a transaction and ensures we only delete such entries
423 when no transactions have posession of them anymore. Of
424 course, this is somewhat slower and less efficient as
425 there's never a circumstance under which we can be assured
426 that a SearchVisitor *never* sees a particular entry again.
427
428 If we must implement the strategy of flunking a transaction
429 owing to a lock on a lease expired object, this would be
430 best dealt with by having TxnOp's check expiries at prepare
431 or commit time. However this is much less appealing as what
432 is basically a pessimistic transaction API becomes
433 optimistic in this case and only in this case.
434
435 All this applies to the similar statement in offer() below.
436 */
437 if (! mySleeve.getState().test(SleeveState.DELETED)) {
438
439 // If it's expired, mark it deleted, subject to filters
440 //
441 if (mySleeve.hasExpired(myStartTime)) {
442
443 if (! EntryRepositoryFactory.getReaper().filter(mySleeve)) {
444 mySleeve.getState().set(SleeveState.DELETED);
445 mySleeve.markDirty();
446
447 // Update stats
448 theCounters.didPurge();
449 }
450 } else {
451 OpInfo myInfo =
452 new FindEntryOpInfo(theStore.getType(),
453 mySleeve.getOID(),
454 aVisitor.isDeleter());
455
456 SearchOffer myOffer;
457
458 if (aPreload != null) {
459 myOffer = new SearchOfferImpl(aPreload, myInfo);
460 } else {
461 theLogger.log(Level.FINE, "NOT using preload");
462
463 myOffer = new SearchOfferImpl(mySleeve.getEntry(),
464 myInfo);
465 }
466
467 aVisitor.offer(myOffer);
468
469 offered = true;
470 }
471 } else {
472 theTrackers[CACHED_TRACKER].incDeld();
473 }
474 } finally {
475 myCBD.release();
476 }
477 } else {
478 theTrackers[CACHED_TRACKER].incMissed();
479 }
480
481 return offered;
482 }
483
484 private boolean offerAndReleaseLocator(TupleLocator aLocator,
485 SearchVisitor aVisitor,
486 long aStartTime,
487 OfferTracker aTracker)
488 throws IOException {
489
490 try {
491 return offer(aLocator, aVisitor, aStartTime, aTracker);
492 } finally {
493 aLocator.release();
494 }
495 }
496
497 /**
498 @return <code>true</code> means offering can stop for better or worse.
499 <code>false</code> indicates that the search should continue if there
500 are other sources of offers.
501 */
502 private boolean offer(TupleLocator aLocator, SearchVisitor aVisitor,
503 long aStartTime, OfferTracker aTracker)
504 throws IOException {
505
506 int myVisitorResponse = SearchVisitor.TRY_AGAIN;
507
508 OID myId = null;
509 EntrySleeveImpl mySleeve = null;
510 String myType = theStore.getType();
511 boolean isDeletion = aVisitor.isDeleter();
512
513 // long myStart = System.currentTimeMillis();
514
515 while (aLocator.fetchNext()) {
516
517 myId = aLocator.getOID();
518
519 CacheBlockDescriptor myCBD = theStoreCache.find(myId);
520
521 if (myCBD != null) {
522 mySleeve = (EntrySleeveImpl) myCBD.getContent();
523
524 if (! mySleeve.getState().test(SleeveState.DELETED)) {
525
526 // If it's expired, mark it deleted, subject to filters
527 //
528 if (mySleeve.hasExpired(aStartTime)) {
529 if (! EntryRepositoryFactory.getReaper().filter(mySleeve)) {
530 mySleeve.getState().set(SleeveState.DELETED);
531 mySleeve.markDirty();
532
533 // Update stats
534 theCounters.didPurge();
535 }
536 } else {
537
538 OpInfo myInfo =
539 new FindEntryOpInfo(myType, mySleeve.getOID(),
540 isDeletion);
541
542 SearchOfferImpl myOffer =
543 new SearchOfferImpl(mySleeve.getEntry(), myInfo);
544
545 myVisitorResponse =
546 aVisitor.offer(myOffer);
547 }
548 } else {
549 aTracker.incDeld();;
550 }
551
552 myCBD.release();
553
554 if ((myVisitorResponse == SearchVisitor.STOP) ||
555 (myVisitorResponse == SearchVisitor.ACCEPTED)) {
556 break;
557 }
558 } else {
559 aTracker.incMissed();
560 }
561 }
562
563 /*
564 System.out.println("Time to offer: " +
565 (System.currentTimeMillis() -
566 myStart));
567 */
568
569 if (myVisitorResponse != SearchVisitor.TRY_AGAIN)
570 return true;
571 else
572 return false;
573 }
574
575 void deleteAll() throws IOException {
576 theCounters.destroy();
577
578 /*
579 Basic approach is to send the template to the CacheIndexer and
580 ask it to return suitable IDs which we will then pin and try.
581 Note we should set a flag which boycotts a load from disk so that
582 flushed or deleted entries are not loaded more than once.
583
584 If we cannot satisfy the Visitor that way, we repeat the exercise
585 with storage which returns OID/byte[] pairs which we can then
586 pin.
587
588 Storage and CacheIndexer are now free to plan searches as they
589 see fit based on template.
590 */
591
592 TupleLocator myLocator = theIndexer.find(MangledEntry.NULL_TEMPLATE);
593
594 if (theLogger.isLoggable(Level.FINE))
595 theLogger.log(Level.FINE, "Searching[cache]: " + myLocator);
596
597 if (myLocator != null) {
598 delete(myLocator);
599 }
600
601 if (theLogger.isLoggable(Level.FINE))
602 theLogger.log(Level.FINE, "Searching[StoreCache]: " + myLocator);
603
604 myLocator = theStore.findCached(MangledEntry.NULL_TEMPLATE);
605
606 if (myLocator != null) {
607 delete(myLocator);
608 }
609
610 myLocator = theStore.find(MangledEntry.NULL_TEMPLATE);
611
612 if (myLocator == null) {
613 if (theLogger.isLoggable(Level.FINE))
614 theLogger.log(Level.FINE, "Got no matches on disk");
615 return;
616 }
617
618 delete(myLocator);
619 }
620
621 private void delete(TupleLocator aLocator) throws IOException {
622 try {
623 while (aLocator.fetchNext()) {
624
625 OID myId = aLocator.getOID();
626
627 CacheBlockDescriptor myCBD = theStoreCache.find(myId);
628
629 OpInfo myInfo = null;
630
631 if (myCBD != null) {
632 EntrySleeveImpl mySleeve =
633 (EntrySleeveImpl) myCBD.getContent();
634
635 if (! mySleeve.getState().test(SleeveState.DELETED)) {
636 mySleeve.getState().set(SleeveState.DELETED);
637 mySleeve.markDirty();
638
639 myInfo =
640 new FindEntryOpInfo(theStore.getType(),
641 mySleeve.getOID(),
642 true);
643 }
644
645 myCBD.release();
646
647 if (myInfo != null) {
648 try {
649 TxnManager.get().log(new ForcedCommit(myInfo));
650 } catch (TransactionException aTE) {
651 IOException myIOE = new IOException("Eeek failed to delete Entry");
652 myIOE.initCause(aTE);
653 throw myIOE;
654 }
655 }
656 }
657 }
658 } finally {
659 aLocator.release();
660 }
661 }
662
663 public String toString() {
664 return "SC: " + theStore.getType();
665 }
666 }