comparison src/org/dancres/blitz/entry/SleeveCache.java @ 16:46ac1a45718a

Add support for cache partitioning to improve concurrency.
author Dan Creswell <dan.creswell@gmail.com>
date Sun, 05 Jul 2009 16:25:18 +0100
parents 3dc0c5604566
children 62b11f04d859
comparison
equal deleted inserted replaced
15:cd96fcac1487 16:46ac1a45718a
52 */ 52 */
53 class SleeveCache implements StatGenerator { 53 class SleeveCache implements StatGenerator {
54 static Logger theLogger = 54 static Logger theLogger =
55 Logging.newLogger("org.dancres.blitz.disk.SleeveCache"); 55 Logging.newLogger("org.dancres.blitz.disk.SleeveCache");
56 56
57 private ArcCache theStoreCache; 57 private static final int DESIRED_ENTRIES_PER_PARTITION = 128;
58
59 private final ArcCache[] theStoreCaches;
60 private final int theNumPartitions;
61 private final int thePartitionsMask;
58 62
59 private Storage theStore; 63 private Storage theStore;
60 64
61 private CountersImpl theCounters; 65 private CountersImpl theCounters;
62 66
110 114
111 try { 115 try {
112 theConstraints = 116 theConstraints =
113 EntryConstraints.getConstraints(theStore.getType()); 117 EntryConstraints.getConstraints(theStore.getType());
114 } catch (ConfigurationException aCE) { 118 } catch (ConfigurationException aCE) {
119 thePartitionsMask = 0;
120 theNumPartitions = 0;
121 theStoreCaches = new ArcCache[0];
115 theLogger.log(Level.SEVERE, 122 theLogger.log(Level.SEVERE,
116 "Couldn't load constraints for type " + 123 "Couldn't load constraints for type " +
117 theStore.getType(), aCE); 124 theStore.getType(), aCE);
118 IOException myIOE = 125 IOException myIOE =
119 new IOException("Couldn't load constraints for type " + 126 new IOException("Couldn't load constraints for type " +
120 theStore.getType()); 127 theStore.getType());
121 myIOE.initCause(aCE); 128 myIOE.initCause(aCE);
122 throw myIOE; 129 throw myIOE;
123 } 130 }
124 131
125 CacheSize myCacheSize = (CacheSize) theConstraints.get(CacheSize.class); 132 CacheSize myCacheSize = (CacheSize) theConstraints.get(CacheSize.class);
126 133
134 int myNumPartitions;
135
136 if (DESIRED_ENTRIES_PER_PARTITION == -1)
137 myNumPartitions = 1;
138 else
139 myNumPartitions = (myCacheSize.getSize() / DESIRED_ENTRIES_PER_PARTITION);
140
141 // Find nearest power of 2 > or =
142 //
143 int myPower;
144 for (myPower = 1; myPower < myNumPartitions; myPower = myPower << 1);
145 theNumPartitions = myPower;
146
147 thePartitionsMask = theNumPartitions - 1;
148
127 theLogger.log(Level.INFO, aStore.getType() + " cache size = " 149 theLogger.log(Level.INFO, aStore.getType() + " cache size = "
128 + myCacheSize.getSize()); 150 + myCacheSize.getSize() + " partitions = " + theNumPartitions + " mask = " + Integer.toHexString(thePartitionsMask));
129 151
130 theStoreCache = new ArcCache(aStore, myCacheSize.getSize()); 152 theStoreCaches = new ArcCache[theNumPartitions];
131 153
132 theIndexer = CacheIndexer.getIndexer(theStore.getType()); 154 for (int i = 0; i < theNumPartitions; i++) {
133 155 theStoreCaches[i] = new ArcCache(aStore, theNumPartitions);
134 theStoreCache.add(new CacheListenerImpl(theIndexer)); 156
157 theIndexer = CacheIndexer.getIndexer(theStore.getType());
158
159 theStoreCaches[i].add(new CacheListenerImpl(theIndexer));
160 }
135 161
136 try { 162 try {
137 theCounters = new CountersImpl(theStore.getType(), 163 theCounters = new CountersImpl(theStore.getType(),
138 theStore.getNumEntries()); 164 theStore.getNumEntries());
139 } catch (IOException anIOE) { 165 } catch (IOException anIOE) {
169 195
170 return new SearchStat(theId, theStore.getType(), myTitles, 196 return new SearchStat(theId, theStore.getType(), myTitles,
171 myMisses, myDeld); 197 myMisses, myDeld);
172 } 198 }
173 199
200 private int getPartition(CacheBlockDescriptor aCBD) {
201 OID myOID = (OID) aCBD.getId();
202
203 return getPartition(myOID);
204 }
205
206 private int getPartition(OID anOID) {
207 return anOID.hashCode() & thePartitionsMask;
208 }
209
210 private int getPartition(EntrySleeveImpl aSleeve) {
211 return getPartition(aSleeve.getOID());
212 }
213
174 void forceSync(CacheBlockDescriptor aCBD) throws IOException { 214 void forceSync(CacheBlockDescriptor aCBD) throws IOException {
175 theStoreCache.forceSync(aCBD); 215 theStoreCaches[getPartition(aCBD)].forceSync(aCBD);
176 } 216 }
177 217
178 CacheBlockDescriptor load(OID aOID) throws IOException { 218 CacheBlockDescriptor load(OID aOID) throws IOException {
179 return theStoreCache.find(aOID); 219 return theStoreCaches[getPartition(aOID)].find(aOID);
180 } 220 }
181 221
182 CacheBlockDescriptor add(EntrySleeveImpl aSleeve) throws IOException { 222 CacheBlockDescriptor add(EntrySleeveImpl aSleeve) throws IOException {
183 return theStoreCache.insert(aSleeve); 223 return theStoreCaches[getPartition(aSleeve)].insert(aSleeve);
184 } 224 }
185 225
186 RecoverySummary recover(EntrySleeveImpl aSleeve) 226 RecoverySummary recover(EntrySleeveImpl aSleeve)
187 throws IOException { 227 throws IOException {
188 return theStoreCache.recover(aSleeve); 228 return theStoreCaches[getPartition(aSleeve)].recover(aSleeve);
189 } 229 }
190 230
191 boolean renew(OID aOID, long anExpiry) throws IOException { 231 boolean renew(OID aOID, long anExpiry) throws IOException {
192 CacheBlockDescriptor myCBD = theStoreCache.find(aOID); 232 CacheBlockDescriptor myCBD = theStoreCaches[getPartition(aOID)].find(aOID);
193 233
194 if (myCBD == null) { 234 if (myCBD == null) {
195 return false; 235 return false;
196 } 236 }
197 237
225 boolean cancel(OID aOID) throws IOException { 265 boolean cancel(OID aOID) throws IOException {
226 return renew(aOID, 0); 266 return renew(aOID, 0);
227 } 267 }
228 268
229 void sync() throws IOException { 269 void sync() throws IOException {
230 theStoreCache.sync(); 270 for (int i = 0; i < theNumPartitions; i++) {
271 theStoreCaches[i].sync();
272 }
231 } 273 }
232 274
233 Counters getCounters() { 275 Counters getCounters() {
234 return theCounters; 276 return theCounters;
235 } 277 }
259 301
260 if (!anEscort.writing(myInfo)) 302 if (!anEscort.writing(myInfo))
261 return; 303 return;
262 304
263 // Now make it visible 305 // Now make it visible
264 CacheBlockDescriptor myCBD = theStoreCache.insert(mySleeve); 306 CacheBlockDescriptor myCBD = theStoreCaches[getPartition(mySleeve)].insert(mySleeve);
265 myCBD.release(); 307 myCBD.release();
266 308
267 if (theLogger.isLoggable(Level.FINE)) 309 if (theLogger.isLoggable(Level.FINE))
268 theLogger.log(Level.FINE, "Unwritten: " + mySleeve.getOID()); 310 theLogger.log(Level.FINE, "Unwritten: " + mySleeve.getOID());
269 } 311 }
383 theTrackers[STORAGE_TRACKER]); 425 theTrackers[STORAGE_TRACKER]);
384 } 426 }
385 427
386 LongtermOffer getOffer(OID anOID) throws IOException { 428 LongtermOffer getOffer(OID anOID) throws IOException {
387 429
388 CacheBlockDescriptor myCBD = theStoreCache.find(anOID); 430 CacheBlockDescriptor myCBD = theStoreCaches[getPartition(anOID)].find(anOID);
389 431
390 if (myCBD != null) { 432 if (myCBD != null) {
391 return new LongtermOfferImpl(myCBD); 433 return new LongtermOfferImpl(myCBD);
392 } else 434 } else
393 return null; 435 return null;
394 } 436 }
395 437
396 boolean find(SearchVisitor aVisitor, OID aOID, MangledEntry aPreload) 438 boolean find(SearchVisitor aVisitor, OID aOID, MangledEntry aPreload)
397 throws IOException { 439 throws IOException {
398 440
399 CacheBlockDescriptor myCBD = theStoreCache.find(aOID); 441 CacheBlockDescriptor myCBD = theStoreCaches[getPartition(aOID)].find(aOID);
400 442
401 long myStartTime = System.currentTimeMillis(); 443 long myStartTime = System.currentTimeMillis();
402 444
403 boolean offered = false; 445 boolean offered = false;
404 446
514 556
515 while (aLocator.fetchNext()) { 557 while (aLocator.fetchNext()) {
516 558
517 myId = aLocator.getOID(); 559 myId = aLocator.getOID();
518 560
519 CacheBlockDescriptor myCBD = theStoreCache.find(myId); 561 CacheBlockDescriptor myCBD = theStoreCaches[getPartition(myId)].find(myId);
520 562
521 if (myCBD != null) { 563 if (myCBD != null) {
522 mySleeve = (EntrySleeveImpl) myCBD.getContent(); 564 mySleeve = (EntrySleeveImpl) myCBD.getContent();
523 565
524 if (! mySleeve.getState().test(SleeveState.DELETED)) { 566 if (! mySleeve.getState().test(SleeveState.DELETED)) {
622 try { 664 try {
623 while (aLocator.fetchNext()) { 665 while (aLocator.fetchNext()) {
624 666
625 OID myId = aLocator.getOID(); 667 OID myId = aLocator.getOID();
626 668
627 CacheBlockDescriptor myCBD = theStoreCache.find(myId); 669 CacheBlockDescriptor myCBD = theStoreCaches[getPartition(myId)].find(myId);
628 670
629 OpInfo myInfo = null; 671 OpInfo myInfo = null;
630 672
631 if (myCBD != null) { 673 if (myCBD != null) {
632 EntrySleeveImpl mySleeve = 674 EntrySleeveImpl mySleeve =