comparison src/org/dancres/blitz/disk/Disk.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:3dc0c5604566
1 package org.dancres.blitz.disk;
2
3 import java.io.File;
4 import java.io.InputStream;
5 import java.io.IOException;
6 import java.io.RandomAccessFile;
7
8 import java.nio.channels.FileChannel;
9 import java.nio.channels.FileLock;
10
11 import java.util.List;
12 import java.util.ArrayList;
13 import java.util.Iterator;
14 import java.util.Properties;
15
16 import java.util.logging.*;
17
18 import net.jini.config.ConfigurationException;
19
20 import com.sleepycat.je.Environment;
21 import com.sleepycat.je.EnvironmentConfig;
22 import com.sleepycat.je.DatabaseConfig;
23 import com.sleepycat.je.TransactionConfig;
24 import com.sleepycat.je.Transaction;
25 import com.sleepycat.je.Database;
26 import com.sleepycat.je.DatabaseException;
27 import com.sleepycat.je.LockStats;
28 import com.sleepycat.je.EnvironmentStats;
29 import com.sleepycat.je.StatsConfig;
30
31 import org.dancres.blitz.Logging;
32
33 import org.dancres.blitz.config.ConfigurationFactory;
34 import org.dancres.util.NumUtil;
35
36 /**
37 This class is responsible for managing the underlying BerkeleyDB
38 infrastructure. Anyone using this class to create/manipulate databases
39 should likely be registered as a Syncable instance so that it is aware
40 of requests for synchronization and closure of databases.
41 */
42 public class Disk {
43 private static Environment theEnv;
44 // private static TransactionConfig theTxnConfig;
45
46 private static String theLocation;
47 private static long theDbCacheSize;
48 private static int theMaxTxns;
49 private static int maxDbLog;
50 private static int maxLogIteratorBuffer;
51 private static int maxLogBuffers;
52 private static int maxLogBufferBytes;
53 private static int maxNodeEntries;
54
55 private static RandomAccessFile theLockFile;
56 private static FileChannel theLockChannel;
57 private static FileLock theLock;
58
59 private static List theDbs;
60
61 private static boolean isTransient;
62
63 static Logger theLogger =
64 Logging.newLogger("org.dancres.blitz.disk.Disk");
65
66 private static List theSyncTasks = new ArrayList();
67
68 static {
69 try {
70 theLocation =
71 (String) ConfigurationFactory.getEntry("persistDir",
72 String.class);
73
74 try {
75 String myNewCacheForm =
76 ((String) ConfigurationFactory.getEntry("dbCache",
77 String.class,
78 new Long(1024 * 1024).toString()));
79
80 try {
81 theDbCacheSize = NumUtil.convertToBytes(myNewCacheForm);
82 } catch (IllegalArgumentException anIE) {
83 theLogger.log(Level.SEVERE, "Failed to parse cache size",
84 anIE);
85 throw new Error("Cannot start - failed to parse cace size");
86 }
87
88 } catch (ConfigurationException aCE) {
89 // Ignore it for now and try the old format
90 //
91 theDbCacheSize =
92 ((Long) ConfigurationFactory.getEntry("dbCache",
93 long.class,
94 new Long(1024 * 1024))).longValue();
95 }
96
97 theMaxTxns =
98 ((Integer) ConfigurationFactory.getEntry("maxDbTxns",
99 int.class,
100 new Integer(256))).intValue();
101
102 maxDbLog =
103 ((Integer) ConfigurationFactory.getEntry("maxDbLog",
104 int.class,
105 new Integer(10000000))).intValue();
106
107 maxLogIteratorBuffer =
108 ((Integer) ConfigurationFactory.getEntry("maxLogIteratorBuff",
109 int.class,
110 new Integer(1024))).intValue();
111
112 maxLogBuffers =
113 ((Integer) ConfigurationFactory.getEntry("maxLogBuffers",
114 int.class,
115 new Integer(5))).intValue();
116
117 maxLogBufferBytes =
118 ((Integer) ConfigurationFactory.getEntry("maxLogBufferBytes",
119 int.class,
120 new Integer(4620000))).intValue();
121
122 maxNodeEntries =
123 ((Integer) ConfigurationFactory.getEntry("maxNodeEntries",
124 int.class,
125 new Integer(128))).intValue();
126
127 theLogger.log(Level.INFO, "Max txns: " + theMaxTxns);
128 theLogger.log(Level.INFO, "DbCache: " + theDbCacheSize);
129 } catch (Exception anE) {
130 theLogger.log(Level.SEVERE, "Couldn't get Disk config",
131 anE);
132 throw new Error("Disk didn't start", anE);
133 }
134 }
135
136 public static void setTransient(boolean transientDisk) {
137 isTransient = transientDisk;
138 }
139
140 public static void init() {
141 try {
142 new File(theLocation).mkdirs();
143
144 lockLocation();
145
146 Properties myDbProps = new Properties();
147
148 InputStream myPropsStream =
149 Disk.class.getResourceAsStream("db.properties");
150
151 if (myPropsStream == null)
152 throw new IOException("Failed to load default db settings");
153
154 myDbProps.load(myPropsStream);
155
156 if (isTransient) {
157 theLogger.log(Level.INFO,
158 "Forced checkpointer on for transient ops");
159 myDbProps.setProperty("je.env.runCheckpointer", "true");
160 }
161
162 myDbProps.setProperty("je.log.fileMax",
163 Integer.toString(maxDbLog));
164
165 /*
166 Run a benchmark with these disabled, then run them again
167 enabled.
168 myDbProps.setProperty("je.log.iteratorReadSize",
169 Integer.toString(maxLogIteratorBuffer));
170
171 myDbProps.setProperty("je.log.numBuffers",
172 Integer.toString(maxLogBuffers));
173
174 myDbProps.setProperty("je.log.totalBufferBytes",
175 Integer.toString(maxLogBufferBytes));
176
177 myDbProps.setProperty("je.nodeMaxEntries",
178 Integer.toString(maxNodeEntries));
179
180 */
181
182 EnvironmentConfig myConfig = new EnvironmentConfig(myDbProps);
183 myConfig.setCacheSize(theDbCacheSize);
184 myConfig.setTransactional(true);
185 myConfig.setAllowCreate(true);
186
187 // theTxnConfig = new TransactionConfig();
188 // theTxnConfig.setNoSync(true);
189
190 theLogger.log(Level.INFO, "Opening Database");
191
192 theEnv = new Environment(new File(theLocation), myConfig);
193
194 theLogger.log(Level.INFO, "Database recovery complete");
195
196 theDbs = theEnv.getDatabaseNames();
197
198 } catch (UnsatisfiedLinkError aULE) {
199 theLogger.log(Level.SEVERE, "Warning, didn't load library for db cleanly - are you using Inca X?", aULE);
200 theLogger.log(Level.SEVERE, "Try restarting the container...");
201 theLogger.log(Level.SEVERE, "Ignoring library load failure - if Blitz doesn't boot, check your library path");
202 ClassLoader myLoader = Disk.class.getClassLoader();
203 throw new Error("Disk didn't start: " + myLoader, aULE);
204 } catch (Exception anE) {
205 theLogger.log(Level.SEVERE, "Couldn't start Disk",
206 anE);
207 throw new Error("Disk didn't start", anE);
208 } catch (Error anErr) {
209 theLogger.log(Level.SEVERE, "Got error", anErr);
210 theLogger.log(Level.SEVERE, anErr.getMessage());
211
212 ClassLoader myLoader = Disk.class.getClassLoader();
213 throw new Error("Disk didn't start: " + myLoader, anErr);
214 }
215 }
216
217 /**
218 Eradicate state associated with Disk
219 */
220 public static synchronized void destroy() {
221 deleteFiles(getDbLocation());
222 }
223
224 /**
225 Eradicate state held in some specific directory
226 */
227 public static synchronized void clean(String aDir) {
228 deleteFiles(aDir);
229 }
230
231 private static String getDbLocation() {
232 return theLocation;
233 }
234
235 public static void add(Syncable aSyncable) {
236 synchronized(theSyncTasks) {
237 theSyncTasks.add(aSyncable);
238 }
239 }
240
241 public static void remove(Syncable aSyncable) {
242 synchronized(theSyncTasks) {
243 theSyncTasks.remove(aSyncable);
244 }
245 }
246
247 private static Syncable[] getSyncTasks() {
248 synchronized(theSyncTasks) {
249 Syncable[] myTasks = new Syncable[theSyncTasks.size()];
250 return (Syncable[]) theSyncTasks.toArray(myTasks);
251 }
252 }
253
254 public static void stop() throws Exception {
255 if (theEnv != null) {
256 theLogger.info("BDB closing");
257 try {
258 WriteDaemon.get().halt();
259
260 close();
261
262 theEnv.sync();
263 theEnv.close();
264 theLogger.info("BDB closed");
265 } catch (DatabaseException aDBE) {
266 theLogger.log(Level.SEVERE, "Couldn't close BDB", aDBE);
267 throw new Exception("Couldn't close BDB");
268 }
269 }
270 }
271
272 /**
273 If the backup directory doesn't exist, it will be created.
274 If the backup directory does exist, the caller should ensure that
275 it has been cleared before the backup is performed. This permits
276 the caller to determine what to do with old backups beforehand.
277 */
278 public static void backup(String aDestDir)
279 throws IOException {
280
281 File myDest = new File(aDestDir);
282
283 myDest.mkdir();
284
285 File[] myFiles = myDest.listFiles();
286
287 if (myFiles.length > 0)
288 throw new IOException("Backup dir should be empty");
289
290 BackupTask myTask = new BackupTask(new File(getDbLocation()), myDest);
291
292 try {
293 /*
294 We cannot use sync because we need WriteDaemon to perform
295 the backup post sync'ing the queue. This is required to
296 prevent WriteDaemon from performing further updates whilst
297 we perform the backup and prevents issues with state leakage.
298 */
299 Syncable[] mySyncables = getSyncTasks();
300
301 for (int i = 0; i < mySyncables.length; i++) {
302 mySyncables[i].sync();
303 }
304
305 WriteDaemon.get().queue(myTask);
306 WriteDaemon.get().push();
307
308 } catch (Exception anE) {
309 IOException myIOE = new IOException("Couldn't start sync");
310 myIOE.initCause(anE);
311 throw myIOE;
312 }
313
314 myTask.waitForCompletion();
315 }
316
317 private static void deleteFiles(String aDir) {
318 theLogger.log(Level.INFO, "Deleting: " + aDir);
319
320 File myDir = new File(aDir);
321
322 File[] myFiles = myDir.listFiles();
323
324 if (myFiles == null)
325 return;
326
327 for (int i = 0; i < myFiles.length; i++) {
328 File myFile = myFiles[i];
329
330 if (myFile.isFile()) {
331 theLogger.log(Level.INFO, "Deleting: " + myFile);
332 myFile.delete();
333 } else {
334 theLogger.log(Level.INFO, "Leaving: " + myFile);
335 }
336 }
337 }
338
339 private static void lockLocation() throws IOException {
340 theLockFile = new RandomAccessFile(new File(theLocation,
341 "blitz.lock"), "rw");
342
343 theLockChannel = theLockFile.getChannel();
344
345 theLock = theLockChannel.tryLock();
346
347 if (theLock == null)
348 throw new IOException("Couldn't lock, are you running another Blitz instance in this directory?");
349 }
350
351 private static void close() throws Exception {
352 Syncable[] mySyncables = getSyncTasks();
353
354 for (int i = 0; i < mySyncables.length; i++) {
355 mySyncables[i].close();
356 }
357
358 if (theLock != null)
359 theLock.release();
360
361 if (theLockChannel != null)
362 theLockChannel.close();
363
364 if (theLockFile != null)
365 theLockFile.close();
366 }
367
368 /**
369 <p>Blocks the caller whilst a sync-to-disk is performed.
370 Sync-to-disk requires:</p>
371
372 <ol>
373 <li>Flush dirty state from caches into WriteDaemon queue</li>
374 <li>Flush queue</li>
375 <li>Wait for queue flush</li>
376 <li>Checkpoint Db</li>
377 </ol>
378
379 */
380 public static void sync() throws Exception {
381 sync(null);
382 }
383
384 /**
385 @param aCompletionTask if null, the caller is blocked until
386 state has successfully been flushed to disk. Note that the point
387 at which the caller will be awoken is guarenteed to be after state
388 was sync'd but may not be immediately afterwards. If non-null,
389 the caller will not be blocked because the code that is dependent
390 on completion of the flush is assumed to be in the passed completion
391 task. As per the null case, this completion task will be executed
392 at some point after WriteDaemon flushed the queue but not necessarily
393 immediately.
394
395 @see org.dancres.blitz.disk.WriteDaemon
396 */
397 public static void sync(Runnable aCompletionTask)
398 throws Exception {
399
400 Syncable[] mySyncables = getSyncTasks();
401
402 for (int i = 0; i < mySyncables.length; i++) {
403 mySyncables[i].sync();
404 }
405
406 SyncFinalizer myCompleter = new SyncFinalizer(theEnv, aCompletionTask);
407 WriteDaemon.get().push(myCompleter);
408
409 /*
410 SyncFinalizer will figure out whether to block the caller.
411 If aCompletionTask is non-null, waitForCompletion will not block
412 otherwise it will.
413 */
414 myCompleter.waitForCompletion();
415 }
416
417 public static Database newDb(Transaction aTxn, String aDbName,
418 DatabaseConfig aConfig)
419 throws DatabaseException {
420
421 aConfig.setTransactional(true);
422
423 synchronized(theDbs) {
424 theDbs.add(aDbName);
425 }
426
427 try {
428 return theEnv.openDatabase(aTxn, aDbName, aConfig);
429 } catch (DatabaseException aDBE) {
430 synchronized(theDbs) {
431 theDbs.remove(aDbName);
432 throw aDBE;
433 }
434 }
435 }
436
437 /**
438 Delete's the underlying Db database. WARNING: This can be blocked
439 by checkpoints or failed by replication if insufficient time has passed.
440 */
441 public static void deleteDb(String aName) throws IOException {
442 try {
443 synchronized(theDbs) {
444 theDbs.remove(aName);
445 }
446
447 theEnv.removeDatabase(null, aName);
448 } catch (DatabaseException aDbe) {
449 theLogger.log(Level.SEVERE, "Couldn't delete Db", aDbe);
450 throw new IOException("Failed to delete db");
451 }
452 }
453
454 public static boolean dbExists(String aName) {
455 synchronized(theDbs) {
456 return theDbs.contains(aName);
457 }
458 }
459
460 static Transaction newTxn() throws DatabaseException {
461 TransactionConfig myConfig = new TransactionConfig();
462 myConfig.setNoSync(true);
463
464 return theEnv.beginTransaction(null, myConfig);
465 }
466
467 static Transaction newNonBlockingTxn() throws DatabaseException {
468 TransactionConfig myConfig = new TransactionConfig();
469 myConfig.setNoSync(true);
470 myConfig.setNoWait(true);
471
472 return theEnv.beginTransaction(null, myConfig);
473 }
474
475 public static void dumpLocks() {
476 try {
477 StatsConfig myConfig = new StatsConfig();
478 myConfig.setFast(false);
479
480 LockStats myStats = theEnv.getLockStats(myConfig);
481
482 System.err.println("Locks");
483 System.err.println("Owners:" + myStats.getNOwners());
484 System.err.println("RdLock:" + myStats.getNReadLocks());
485 System.err.println("WrLock:" + myStats.getNWriteLocks());
486 System.err.println("Total locks:" + myStats.getNTotalLocks());
487 System.err.println("Waiters:" + myStats.getNWaiters());
488
489 } catch (DatabaseException anE) {
490 System.err.println("Whoops couldn't dump stats");
491 }
492 }
493
494 public static void dumpStats() {
495 try {
496 StatsConfig myConfig = new StatsConfig();
497 myConfig.setFast(false);
498
499 EnvironmentStats myStats = theEnv.getStats(myConfig);
500
501 System.err.println(myStats);
502
503 /*
504 System.err.println("Log Buffer bytes: " +
505 myStats.getBufferBytes());
506
507 System.err.println("Cache misses: " + myStats.getNCacheMiss());
508
509 System.err.println("Cache data bytes" +
510 myStats.getCacheDataBytes());
511
512 System.err.println("Cache total bytes" +
513 myStats.getCacheTotalBytes());
514 */
515 } catch (Exception anE) {
516 WriteDaemon.theLogger.log(Level.INFO,
517 "Couldn't dump stats", anE);
518 }
519 }
520
521 public static void main(String args[]) {
522 try {
523 System.out.println("Disk storing at: " + Disk.getDbLocation());
524 Disk.stop();
525 } catch (Exception anE) {
526 System.err.println("Got errors during test - see log");
527 }
528 }
529 }