Mercurial > hg > blitz_condensed
comparison src/org/dancres/blitz/disk/Disk.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:3dc0c5604566 |
---|---|
1 package org.dancres.blitz.disk; | |
2 | |
3 import java.io.File; | |
4 import java.io.InputStream; | |
5 import java.io.IOException; | |
6 import java.io.RandomAccessFile; | |
7 | |
8 import java.nio.channels.FileChannel; | |
9 import java.nio.channels.FileLock; | |
10 | |
11 import java.util.List; | |
12 import java.util.ArrayList; | |
13 import java.util.Iterator; | |
14 import java.util.Properties; | |
15 | |
16 import java.util.logging.*; | |
17 | |
18 import net.jini.config.ConfigurationException; | |
19 | |
20 import com.sleepycat.je.Environment; | |
21 import com.sleepycat.je.EnvironmentConfig; | |
22 import com.sleepycat.je.DatabaseConfig; | |
23 import com.sleepycat.je.TransactionConfig; | |
24 import com.sleepycat.je.Transaction; | |
25 import com.sleepycat.je.Database; | |
26 import com.sleepycat.je.DatabaseException; | |
27 import com.sleepycat.je.LockStats; | |
28 import com.sleepycat.je.EnvironmentStats; | |
29 import com.sleepycat.je.StatsConfig; | |
30 | |
31 import org.dancres.blitz.Logging; | |
32 | |
33 import org.dancres.blitz.config.ConfigurationFactory; | |
34 import org.dancres.util.NumUtil; | |
35 | |
36 /** | |
37 This class is responsible for managing the underlying BerkeleyDB | |
38 infrastructure. Anyone using this class to create/manipulate databases | |
39 should likely be registered as a Syncable instance so that it is aware | |
40 of requests for synchronization and closure of databases. | |
41 */ | |
42 public class Disk { | |
43 private static Environment theEnv; | |
44 // private static TransactionConfig theTxnConfig; | |
45 | |
46 private static String theLocation; | |
47 private static long theDbCacheSize; | |
48 private static int theMaxTxns; | |
49 private static int maxDbLog; | |
50 private static int maxLogIteratorBuffer; | |
51 private static int maxLogBuffers; | |
52 private static int maxLogBufferBytes; | |
53 private static int maxNodeEntries; | |
54 | |
55 private static RandomAccessFile theLockFile; | |
56 private static FileChannel theLockChannel; | |
57 private static FileLock theLock; | |
58 | |
59 private static List theDbs; | |
60 | |
61 private static boolean isTransient; | |
62 | |
63 static Logger theLogger = | |
64 Logging.newLogger("org.dancres.blitz.disk.Disk"); | |
65 | |
66 private static List theSyncTasks = new ArrayList(); | |
67 | |
68 static { | |
69 try { | |
70 theLocation = | |
71 (String) ConfigurationFactory.getEntry("persistDir", | |
72 String.class); | |
73 | |
74 try { | |
75 String myNewCacheForm = | |
76 ((String) ConfigurationFactory.getEntry("dbCache", | |
77 String.class, | |
78 new Long(1024 * 1024).toString())); | |
79 | |
80 try { | |
81 theDbCacheSize = NumUtil.convertToBytes(myNewCacheForm); | |
82 } catch (IllegalArgumentException anIE) { | |
83 theLogger.log(Level.SEVERE, "Failed to parse cache size", | |
84 anIE); | |
85 throw new Error("Cannot start - failed to parse cace size"); | |
86 } | |
87 | |
88 } catch (ConfigurationException aCE) { | |
89 // Ignore it for now and try the old format | |
90 // | |
91 theDbCacheSize = | |
92 ((Long) ConfigurationFactory.getEntry("dbCache", | |
93 long.class, | |
94 new Long(1024 * 1024))).longValue(); | |
95 } | |
96 | |
97 theMaxTxns = | |
98 ((Integer) ConfigurationFactory.getEntry("maxDbTxns", | |
99 int.class, | |
100 new Integer(256))).intValue(); | |
101 | |
102 maxDbLog = | |
103 ((Integer) ConfigurationFactory.getEntry("maxDbLog", | |
104 int.class, | |
105 new Integer(10000000))).intValue(); | |
106 | |
107 maxLogIteratorBuffer = | |
108 ((Integer) ConfigurationFactory.getEntry("maxLogIteratorBuff", | |
109 int.class, | |
110 new Integer(1024))).intValue(); | |
111 | |
112 maxLogBuffers = | |
113 ((Integer) ConfigurationFactory.getEntry("maxLogBuffers", | |
114 int.class, | |
115 new Integer(5))).intValue(); | |
116 | |
117 maxLogBufferBytes = | |
118 ((Integer) ConfigurationFactory.getEntry("maxLogBufferBytes", | |
119 int.class, | |
120 new Integer(4620000))).intValue(); | |
121 | |
122 maxNodeEntries = | |
123 ((Integer) ConfigurationFactory.getEntry("maxNodeEntries", | |
124 int.class, | |
125 new Integer(128))).intValue(); | |
126 | |
127 theLogger.log(Level.INFO, "Max txns: " + theMaxTxns); | |
128 theLogger.log(Level.INFO, "DbCache: " + theDbCacheSize); | |
129 } catch (Exception anE) { | |
130 theLogger.log(Level.SEVERE, "Couldn't get Disk config", | |
131 anE); | |
132 throw new Error("Disk didn't start", anE); | |
133 } | |
134 } | |
135 | |
136 public static void setTransient(boolean transientDisk) { | |
137 isTransient = transientDisk; | |
138 } | |
139 | |
140 public static void init() { | |
141 try { | |
142 new File(theLocation).mkdirs(); | |
143 | |
144 lockLocation(); | |
145 | |
146 Properties myDbProps = new Properties(); | |
147 | |
148 InputStream myPropsStream = | |
149 Disk.class.getResourceAsStream("db.properties"); | |
150 | |
151 if (myPropsStream == null) | |
152 throw new IOException("Failed to load default db settings"); | |
153 | |
154 myDbProps.load(myPropsStream); | |
155 | |
156 if (isTransient) { | |
157 theLogger.log(Level.INFO, | |
158 "Forced checkpointer on for transient ops"); | |
159 myDbProps.setProperty("je.env.runCheckpointer", "true"); | |
160 } | |
161 | |
162 myDbProps.setProperty("je.log.fileMax", | |
163 Integer.toString(maxDbLog)); | |
164 | |
165 /* | |
166 Run a benchmark with these disabled, then run them again | |
167 enabled. | |
168 myDbProps.setProperty("je.log.iteratorReadSize", | |
169 Integer.toString(maxLogIteratorBuffer)); | |
170 | |
171 myDbProps.setProperty("je.log.numBuffers", | |
172 Integer.toString(maxLogBuffers)); | |
173 | |
174 myDbProps.setProperty("je.log.totalBufferBytes", | |
175 Integer.toString(maxLogBufferBytes)); | |
176 | |
177 myDbProps.setProperty("je.nodeMaxEntries", | |
178 Integer.toString(maxNodeEntries)); | |
179 | |
180 */ | |
181 | |
182 EnvironmentConfig myConfig = new EnvironmentConfig(myDbProps); | |
183 myConfig.setCacheSize(theDbCacheSize); | |
184 myConfig.setTransactional(true); | |
185 myConfig.setAllowCreate(true); | |
186 | |
187 // theTxnConfig = new TransactionConfig(); | |
188 // theTxnConfig.setNoSync(true); | |
189 | |
190 theLogger.log(Level.INFO, "Opening Database"); | |
191 | |
192 theEnv = new Environment(new File(theLocation), myConfig); | |
193 | |
194 theLogger.log(Level.INFO, "Database recovery complete"); | |
195 | |
196 theDbs = theEnv.getDatabaseNames(); | |
197 | |
198 } catch (UnsatisfiedLinkError aULE) { | |
199 theLogger.log(Level.SEVERE, "Warning, didn't load library for db cleanly - are you using Inca X?", aULE); | |
200 theLogger.log(Level.SEVERE, "Try restarting the container..."); | |
201 theLogger.log(Level.SEVERE, "Ignoring library load failure - if Blitz doesn't boot, check your library path"); | |
202 ClassLoader myLoader = Disk.class.getClassLoader(); | |
203 throw new Error("Disk didn't start: " + myLoader, aULE); | |
204 } catch (Exception anE) { | |
205 theLogger.log(Level.SEVERE, "Couldn't start Disk", | |
206 anE); | |
207 throw new Error("Disk didn't start", anE); | |
208 } catch (Error anErr) { | |
209 theLogger.log(Level.SEVERE, "Got error", anErr); | |
210 theLogger.log(Level.SEVERE, anErr.getMessage()); | |
211 | |
212 ClassLoader myLoader = Disk.class.getClassLoader(); | |
213 throw new Error("Disk didn't start: " + myLoader, anErr); | |
214 } | |
215 } | |
216 | |
217 /** | |
218 Eradicate state associated with Disk | |
219 */ | |
220 public static synchronized void destroy() { | |
221 deleteFiles(getDbLocation()); | |
222 } | |
223 | |
224 /** | |
225 Eradicate state held in some specific directory | |
226 */ | |
227 public static synchronized void clean(String aDir) { | |
228 deleteFiles(aDir); | |
229 } | |
230 | |
231 private static String getDbLocation() { | |
232 return theLocation; | |
233 } | |
234 | |
235 public static void add(Syncable aSyncable) { | |
236 synchronized(theSyncTasks) { | |
237 theSyncTasks.add(aSyncable); | |
238 } | |
239 } | |
240 | |
241 public static void remove(Syncable aSyncable) { | |
242 synchronized(theSyncTasks) { | |
243 theSyncTasks.remove(aSyncable); | |
244 } | |
245 } | |
246 | |
247 private static Syncable[] getSyncTasks() { | |
248 synchronized(theSyncTasks) { | |
249 Syncable[] myTasks = new Syncable[theSyncTasks.size()]; | |
250 return (Syncable[]) theSyncTasks.toArray(myTasks); | |
251 } | |
252 } | |
253 | |
254 public static void stop() throws Exception { | |
255 if (theEnv != null) { | |
256 theLogger.info("BDB closing"); | |
257 try { | |
258 WriteDaemon.get().halt(); | |
259 | |
260 close(); | |
261 | |
262 theEnv.sync(); | |
263 theEnv.close(); | |
264 theLogger.info("BDB closed"); | |
265 } catch (DatabaseException aDBE) { | |
266 theLogger.log(Level.SEVERE, "Couldn't close BDB", aDBE); | |
267 throw new Exception("Couldn't close BDB"); | |
268 } | |
269 } | |
270 } | |
271 | |
272 /** | |
273 If the backup directory doesn't exist, it will be created. | |
274 If the backup directory does exist, the caller should ensure that | |
275 it has been cleared before the backup is performed. This permits | |
276 the caller to determine what to do with old backups beforehand. | |
277 */ | |
278 public static void backup(String aDestDir) | |
279 throws IOException { | |
280 | |
281 File myDest = new File(aDestDir); | |
282 | |
283 myDest.mkdir(); | |
284 | |
285 File[] myFiles = myDest.listFiles(); | |
286 | |
287 if (myFiles.length > 0) | |
288 throw new IOException("Backup dir should be empty"); | |
289 | |
290 BackupTask myTask = new BackupTask(new File(getDbLocation()), myDest); | |
291 | |
292 try { | |
293 /* | |
294 We cannot use sync because we need WriteDaemon to perform | |
295 the backup post sync'ing the queue. This is required to | |
296 prevent WriteDaemon from performing further updates whilst | |
297 we perform the backup and prevents issues with state leakage. | |
298 */ | |
299 Syncable[] mySyncables = getSyncTasks(); | |
300 | |
301 for (int i = 0; i < mySyncables.length; i++) { | |
302 mySyncables[i].sync(); | |
303 } | |
304 | |
305 WriteDaemon.get().queue(myTask); | |
306 WriteDaemon.get().push(); | |
307 | |
308 } catch (Exception anE) { | |
309 IOException myIOE = new IOException("Couldn't start sync"); | |
310 myIOE.initCause(anE); | |
311 throw myIOE; | |
312 } | |
313 | |
314 myTask.waitForCompletion(); | |
315 } | |
316 | |
317 private static void deleteFiles(String aDir) { | |
318 theLogger.log(Level.INFO, "Deleting: " + aDir); | |
319 | |
320 File myDir = new File(aDir); | |
321 | |
322 File[] myFiles = myDir.listFiles(); | |
323 | |
324 if (myFiles == null) | |
325 return; | |
326 | |
327 for (int i = 0; i < myFiles.length; i++) { | |
328 File myFile = myFiles[i]; | |
329 | |
330 if (myFile.isFile()) { | |
331 theLogger.log(Level.INFO, "Deleting: " + myFile); | |
332 myFile.delete(); | |
333 } else { | |
334 theLogger.log(Level.INFO, "Leaving: " + myFile); | |
335 } | |
336 } | |
337 } | |
338 | |
339 private static void lockLocation() throws IOException { | |
340 theLockFile = new RandomAccessFile(new File(theLocation, | |
341 "blitz.lock"), "rw"); | |
342 | |
343 theLockChannel = theLockFile.getChannel(); | |
344 | |
345 theLock = theLockChannel.tryLock(); | |
346 | |
347 if (theLock == null) | |
348 throw new IOException("Couldn't lock, are you running another Blitz instance in this directory?"); | |
349 } | |
350 | |
351 private static void close() throws Exception { | |
352 Syncable[] mySyncables = getSyncTasks(); | |
353 | |
354 for (int i = 0; i < mySyncables.length; i++) { | |
355 mySyncables[i].close(); | |
356 } | |
357 | |
358 if (theLock != null) | |
359 theLock.release(); | |
360 | |
361 if (theLockChannel != null) | |
362 theLockChannel.close(); | |
363 | |
364 if (theLockFile != null) | |
365 theLockFile.close(); | |
366 } | |
367 | |
368 /** | |
369 <p>Blocks the caller whilst a sync-to-disk is performed. | |
370 Sync-to-disk requires:</p> | |
371 | |
372 <ol> | |
373 <li>Flush dirty state from caches into WriteDaemon queue</li> | |
374 <li>Flush queue</li> | |
375 <li>Wait for queue flush</li> | |
376 <li>Checkpoint Db</li> | |
377 </ol> | |
378 | |
379 */ | |
380 public static void sync() throws Exception { | |
381 sync(null); | |
382 } | |
383 | |
384 /** | |
385 @param aCompletionTask if null, the caller is blocked until | |
386 state has successfully been flushed to disk. Note that the point | |
387 at which the caller will be awoken is guarenteed to be after state | |
388 was sync'd but may not be immediately afterwards. If non-null, | |
389 the caller will not be blocked because the code that is dependent | |
390 on completion of the flush is assumed to be in the passed completion | |
391 task. As per the null case, this completion task will be executed | |
392 at some point after WriteDaemon flushed the queue but not necessarily | |
393 immediately. | |
394 | |
395 @see org.dancres.blitz.disk.WriteDaemon | |
396 */ | |
397 public static void sync(Runnable aCompletionTask) | |
398 throws Exception { | |
399 | |
400 Syncable[] mySyncables = getSyncTasks(); | |
401 | |
402 for (int i = 0; i < mySyncables.length; i++) { | |
403 mySyncables[i].sync(); | |
404 } | |
405 | |
406 SyncFinalizer myCompleter = new SyncFinalizer(theEnv, aCompletionTask); | |
407 WriteDaemon.get().push(myCompleter); | |
408 | |
409 /* | |
410 SyncFinalizer will figure out whether to block the caller. | |
411 If aCompletionTask is non-null, waitForCompletion will not block | |
412 otherwise it will. | |
413 */ | |
414 myCompleter.waitForCompletion(); | |
415 } | |
416 | |
417 public static Database newDb(Transaction aTxn, String aDbName, | |
418 DatabaseConfig aConfig) | |
419 throws DatabaseException { | |
420 | |
421 aConfig.setTransactional(true); | |
422 | |
423 synchronized(theDbs) { | |
424 theDbs.add(aDbName); | |
425 } | |
426 | |
427 try { | |
428 return theEnv.openDatabase(aTxn, aDbName, aConfig); | |
429 } catch (DatabaseException aDBE) { | |
430 synchronized(theDbs) { | |
431 theDbs.remove(aDbName); | |
432 throw aDBE; | |
433 } | |
434 } | |
435 } | |
436 | |
437 /** | |
438 Delete's the underlying Db database. WARNING: This can be blocked | |
439 by checkpoints or failed by replication if insufficient time has passed. | |
440 */ | |
441 public static void deleteDb(String aName) throws IOException { | |
442 try { | |
443 synchronized(theDbs) { | |
444 theDbs.remove(aName); | |
445 } | |
446 | |
447 theEnv.removeDatabase(null, aName); | |
448 } catch (DatabaseException aDbe) { | |
449 theLogger.log(Level.SEVERE, "Couldn't delete Db", aDbe); | |
450 throw new IOException("Failed to delete db"); | |
451 } | |
452 } | |
453 | |
454 public static boolean dbExists(String aName) { | |
455 synchronized(theDbs) { | |
456 return theDbs.contains(aName); | |
457 } | |
458 } | |
459 | |
460 static Transaction newTxn() throws DatabaseException { | |
461 TransactionConfig myConfig = new TransactionConfig(); | |
462 myConfig.setNoSync(true); | |
463 | |
464 return theEnv.beginTransaction(null, myConfig); | |
465 } | |
466 | |
467 static Transaction newNonBlockingTxn() throws DatabaseException { | |
468 TransactionConfig myConfig = new TransactionConfig(); | |
469 myConfig.setNoSync(true); | |
470 myConfig.setNoWait(true); | |
471 | |
472 return theEnv.beginTransaction(null, myConfig); | |
473 } | |
474 | |
475 public static void dumpLocks() { | |
476 try { | |
477 StatsConfig myConfig = new StatsConfig(); | |
478 myConfig.setFast(false); | |
479 | |
480 LockStats myStats = theEnv.getLockStats(myConfig); | |
481 | |
482 System.err.println("Locks"); | |
483 System.err.println("Owners:" + myStats.getNOwners()); | |
484 System.err.println("RdLock:" + myStats.getNReadLocks()); | |
485 System.err.println("WrLock:" + myStats.getNWriteLocks()); | |
486 System.err.println("Total locks:" + myStats.getNTotalLocks()); | |
487 System.err.println("Waiters:" + myStats.getNWaiters()); | |
488 | |
489 } catch (DatabaseException anE) { | |
490 System.err.println("Whoops couldn't dump stats"); | |
491 } | |
492 } | |
493 | |
494 public static void dumpStats() { | |
495 try { | |
496 StatsConfig myConfig = new StatsConfig(); | |
497 myConfig.setFast(false); | |
498 | |
499 EnvironmentStats myStats = theEnv.getStats(myConfig); | |
500 | |
501 System.err.println(myStats); | |
502 | |
503 /* | |
504 System.err.println("Log Buffer bytes: " + | |
505 myStats.getBufferBytes()); | |
506 | |
507 System.err.println("Cache misses: " + myStats.getNCacheMiss()); | |
508 | |
509 System.err.println("Cache data bytes" + | |
510 myStats.getCacheDataBytes()); | |
511 | |
512 System.err.println("Cache total bytes" + | |
513 myStats.getCacheTotalBytes()); | |
514 */ | |
515 } catch (Exception anE) { | |
516 WriteDaemon.theLogger.log(Level.INFO, | |
517 "Couldn't dump stats", anE); | |
518 } | |
519 } | |
520 | |
521 public static void main(String args[]) { | |
522 try { | |
523 System.out.println("Disk storing at: " + Disk.getDbLocation()); | |
524 Disk.stop(); | |
525 } catch (Exception anE) { | |
526 System.err.println("Got errors during test - see log"); | |
527 } | |
528 } | |
529 } |