diff src/org/dancres/blitz/disk/Disk.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/org/dancres/blitz/disk/Disk.java	Sat Mar 21 11:00:06 2009 +0000
@@ -0,0 +1,529 @@
+package org.dancres.blitz.disk;
+
+import java.io.File;
+import java.io.InputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Properties;
+
+import java.util.logging.*;
+
+import net.jini.config.ConfigurationException;
+
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.TransactionConfig;
+import com.sleepycat.je.Transaction;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.LockStats;
+import com.sleepycat.je.EnvironmentStats;
+import com.sleepycat.je.StatsConfig;
+
+import org.dancres.blitz.Logging;
+
+import org.dancres.blitz.config.ConfigurationFactory;
+import org.dancres.util.NumUtil;
+
+/**
+   This class is responsible for managing the underlying BerkeleyDB
+   infrastructure.  Anyone using this class to create/manipulate databases
+   should likely be registered as a Syncable instance so that it is aware
+   of requests for synchronization and closure of databases.   
+ */
+public class Disk {
+    private static Environment theEnv;
+    // private static TransactionConfig theTxnConfig;
+
+    private static String theLocation;
+    private static long theDbCacheSize;
+    private static int theMaxTxns;
+    private static int maxDbLog;
+    private static int maxLogIteratorBuffer;
+    private static int maxLogBuffers;
+    private static int maxLogBufferBytes;
+    private static int maxNodeEntries;
+
+    private static RandomAccessFile theLockFile;
+    private static FileChannel theLockChannel;
+    private static FileLock theLock;
+
+    private static List theDbs;
+
+    private static boolean isTransient;
+
+    static Logger theLogger =
+        Logging.newLogger("org.dancres.blitz.disk.Disk");
+
+    private static List theSyncTasks = new ArrayList();
+
+    static {
+        try {
+            theLocation =
+                (String) ConfigurationFactory.getEntry("persistDir",
+                                                       String.class);
+
+            try {
+                String myNewCacheForm =
+                    ((String) ConfigurationFactory.getEntry("dbCache",
+                        String.class,
+                        new Long(1024 * 1024).toString()));
+
+                try {
+                    theDbCacheSize = NumUtil.convertToBytes(myNewCacheForm);
+                } catch (IllegalArgumentException anIE) {
+                    theLogger.log(Level.SEVERE, "Failed to parse cache size",
+                        anIE);
+                    throw new Error("Cannot start - failed to parse cace size");
+                }
+
+            } catch (ConfigurationException aCE) {
+                // Ignore it for now and try the old format
+                //
+                theDbCacheSize =
+                    ((Long) ConfigurationFactory.getEntry("dbCache",
+                        long.class,
+                        new Long(1024 * 1024))).longValue();
+            }
+
+            theMaxTxns =
+                ((Integer) ConfigurationFactory.getEntry("maxDbTxns",
+                                                         int.class,
+                                                         new Integer(256))).intValue();
+
+            maxDbLog =
+                ((Integer) ConfigurationFactory.getEntry("maxDbLog",
+                                                         int.class,
+                                                         new Integer(10000000))).intValue();
+
+            maxLogIteratorBuffer =
+                ((Integer) ConfigurationFactory.getEntry("maxLogIteratorBuff",
+                                                         int.class,
+                                                         new Integer(1024))).intValue();
+
+            maxLogBuffers =
+                ((Integer) ConfigurationFactory.getEntry("maxLogBuffers",
+                                                         int.class,
+                                                         new Integer(5))).intValue();
+
+            maxLogBufferBytes =
+                ((Integer) ConfigurationFactory.getEntry("maxLogBufferBytes",
+                                                         int.class,
+                                                         new Integer(4620000))).intValue();
+
+            maxNodeEntries =
+                ((Integer) ConfigurationFactory.getEntry("maxNodeEntries",
+                                                         int.class,
+                                                         new Integer(128))).intValue();
+                    
+            theLogger.log(Level.INFO, "Max txns: " + theMaxTxns);
+            theLogger.log(Level.INFO, "DbCache: " + theDbCacheSize);
+        } catch (Exception anE) {
+            theLogger.log(Level.SEVERE, "Couldn't get Disk config",
+                          anE);
+            throw new Error("Disk didn't start", anE);
+        }
+    }
+
+    public static void setTransient(boolean transientDisk) {
+        isTransient = transientDisk;
+    }
+
+    public static void init() {
+        try {
+            new File(theLocation).mkdirs();
+
+            lockLocation();
+
+            Properties myDbProps = new Properties();
+
+            InputStream myPropsStream =
+                Disk.class.getResourceAsStream("db.properties");
+
+            if (myPropsStream == null)
+                throw new IOException("Failed to load default db settings");
+
+            myDbProps.load(myPropsStream);
+
+            if (isTransient) {
+                theLogger.log(Level.INFO,
+                              "Forced checkpointer on for transient ops");
+                myDbProps.setProperty("je.env.runCheckpointer", "true");
+            }
+
+            myDbProps.setProperty("je.log.fileMax",
+                                  Integer.toString(maxDbLog));
+
+            /*
+              Run a benchmark with these disabled, then run them again
+              enabled.
+            myDbProps.setProperty("je.log.iteratorReadSize",
+                                  Integer.toString(maxLogIteratorBuffer));
+
+            myDbProps.setProperty("je.log.numBuffers",
+                                  Integer.toString(maxLogBuffers));
+
+            myDbProps.setProperty("je.log.totalBufferBytes",
+                                  Integer.toString(maxLogBufferBytes));
+
+            myDbProps.setProperty("je.nodeMaxEntries",
+                                  Integer.toString(maxNodeEntries));
+
+            */
+
+            EnvironmentConfig myConfig = new EnvironmentConfig(myDbProps);
+            myConfig.setCacheSize(theDbCacheSize);
+            myConfig.setTransactional(true);
+            myConfig.setAllowCreate(true);
+
+            // theTxnConfig = new TransactionConfig();
+            // theTxnConfig.setNoSync(true);
+
+            theLogger.log(Level.INFO, "Opening Database");
+
+            theEnv = new Environment(new File(theLocation), myConfig);
+
+            theLogger.log(Level.INFO, "Database recovery complete");
+
+            theDbs = theEnv.getDatabaseNames();
+
+        } catch (UnsatisfiedLinkError aULE) {
+            theLogger.log(Level.SEVERE, "Warning, didn't load library for db cleanly - are you using Inca X?", aULE);
+            theLogger.log(Level.SEVERE, "Try restarting the container...");
+            theLogger.log(Level.SEVERE, "Ignoring library load failure - if Blitz doesn't boot, check your library path");
+            ClassLoader myLoader = Disk.class.getClassLoader();
+            throw new Error("Disk didn't start: " + myLoader, aULE);
+        } catch (Exception anE) {
+            theLogger.log(Level.SEVERE, "Couldn't start Disk",
+                          anE);
+            throw new Error("Disk didn't start", anE);
+        } catch (Error anErr) {
+            theLogger.log(Level.SEVERE, "Got error", anErr);
+            theLogger.log(Level.SEVERE, anErr.getMessage());
+
+            ClassLoader myLoader = Disk.class.getClassLoader();
+            throw new Error("Disk didn't start: " + myLoader, anErr);
+        }
+    }
+
+    /**
+       Eradicate state associated with Disk
+     */
+    public static synchronized void destroy() {
+        deleteFiles(getDbLocation());
+    }
+
+    /**
+       Eradicate state held in some specific directory
+     */
+    public static synchronized void clean(String aDir) {
+        deleteFiles(aDir);
+    }
+
+    private static String getDbLocation() {
+        return theLocation;
+    }
+
+    public static void add(Syncable aSyncable) {
+        synchronized(theSyncTasks) {
+            theSyncTasks.add(aSyncable);
+        }
+    }
+
+    public static void remove(Syncable aSyncable) {
+        synchronized(theSyncTasks) {
+            theSyncTasks.remove(aSyncable);
+        }
+    }
+
+    private static Syncable[] getSyncTasks() {
+        synchronized(theSyncTasks) {
+            Syncable[] myTasks = new Syncable[theSyncTasks.size()];
+            return (Syncable[]) theSyncTasks.toArray(myTasks);
+        }
+    }
+
+    public static void stop() throws Exception {
+        if (theEnv != null) {
+            theLogger.info("BDB closing");
+            try {
+                WriteDaemon.get().halt();
+
+                close();
+
+                theEnv.sync();
+                theEnv.close();
+                theLogger.info("BDB closed");
+            } catch (DatabaseException aDBE) {
+                theLogger.log(Level.SEVERE, "Couldn't close BDB", aDBE);
+                throw new Exception("Couldn't close BDB");
+            }
+        }
+    }
+
+    /**
+       If the backup directory doesn't exist, it will be created.
+       If the backup directory does exist, the caller should ensure that
+       it has been cleared before the backup is performed.  This permits
+       the caller to determine what to do with old backups beforehand.
+     */
+    public static void backup(String aDestDir)
+        throws IOException {
+        
+        File myDest = new File(aDestDir);
+
+        myDest.mkdir();
+
+        File[] myFiles = myDest.listFiles();
+
+        if (myFiles.length > 0)
+            throw new IOException("Backup dir should be empty");
+
+        BackupTask myTask = new BackupTask(new File(getDbLocation()), myDest);
+
+        try {
+            /*
+              We cannot use sync because we need WriteDaemon to perform
+              the backup post sync'ing the queue.  This is required to
+              prevent WriteDaemon from performing further updates whilst
+              we perform the backup and prevents issues with state leakage.
+             */
+            Syncable[] mySyncables = getSyncTasks();
+
+            for (int i = 0; i < mySyncables.length; i++) {
+                mySyncables[i].sync();
+            }
+
+            WriteDaemon.get().queue(myTask);
+            WriteDaemon.get().push();
+
+        } catch (Exception anE) {
+            IOException myIOE = new IOException("Couldn't start sync");
+            myIOE.initCause(anE);
+            throw myIOE;
+        }
+
+        myTask.waitForCompletion();
+    }
+
+    private static void deleteFiles(String aDir) {
+        theLogger.log(Level.INFO, "Deleting: " + aDir);
+
+        File myDir = new File(aDir);
+
+        File[] myFiles = myDir.listFiles();
+
+        if (myFiles == null)
+            return;
+
+        for (int i = 0; i < myFiles.length; i++) {
+            File myFile = myFiles[i];
+
+            if (myFile.isFile()) {
+                theLogger.log(Level.INFO, "Deleting: " + myFile);
+                myFile.delete();
+            } else {
+                theLogger.log(Level.INFO, "Leaving: " + myFile);
+            }
+        }
+    }
+
+    private static void lockLocation() throws IOException {
+        theLockFile = new RandomAccessFile(new File(theLocation,
+                                                    "blitz.lock"), "rw");
+
+        theLockChannel = theLockFile.getChannel();
+
+        theLock = theLockChannel.tryLock();
+
+        if (theLock == null)
+            throw new IOException("Couldn't lock, are you running another Blitz instance in this directory?");
+    }
+
+    private static void close() throws Exception {
+        Syncable[] mySyncables = getSyncTasks();
+
+        for (int i = 0; i < mySyncables.length; i++) {
+            mySyncables[i].close();
+        }
+
+        if (theLock != null)
+            theLock.release();
+
+        if (theLockChannel != null)
+            theLockChannel.close();
+
+        if (theLockFile != null)
+            theLockFile.close();
+    }
+
+    /**
+       <p>Blocks the caller whilst a sync-to-disk is performed.
+       Sync-to-disk requires:</p>
+
+       <ol>
+       <li>Flush dirty state from caches into WriteDaemon queue</li>
+       <li>Flush queue</li>
+       <li>Wait for queue flush</li>
+       <li>Checkpoint Db</li>
+       </ol>
+
+     */
+    public static void sync() throws Exception {
+        sync(null);
+    }
+
+    /**
+       @param aCompletionTask if null, the caller is blocked until
+       state has successfully been flushed to disk.  Note that the point
+       at which the caller will be awoken is guarenteed to be after state
+       was sync'd but may not be immediately afterwards.  If non-null,
+       the caller will not be blocked because the code that is dependent
+       on completion of the flush is assumed to be in the passed completion
+       task.  As per the null case, this completion task will be executed
+       at some point after WriteDaemon flushed the queue but not necessarily
+       immediately.
+
+       @see org.dancres.blitz.disk.WriteDaemon
+     */
+    public static void sync(Runnable aCompletionTask)
+        throws Exception {
+
+        Syncable[] mySyncables = getSyncTasks();
+
+        for (int i = 0; i < mySyncables.length; i++) {
+            mySyncables[i].sync();
+        }
+
+        SyncFinalizer myCompleter = new SyncFinalizer(theEnv, aCompletionTask);
+        WriteDaemon.get().push(myCompleter);
+
+        /*
+          SyncFinalizer will figure out whether to block the caller.
+          If aCompletionTask is non-null, waitForCompletion will not block
+          otherwise it will.
+         */
+        myCompleter.waitForCompletion();
+    }
+    
+    public static Database newDb(Transaction aTxn, String aDbName,
+                                 DatabaseConfig aConfig)
+        throws DatabaseException {
+
+        aConfig.setTransactional(true);
+        
+        synchronized(theDbs) {
+            theDbs.add(aDbName);
+        }
+
+        try {
+            return theEnv.openDatabase(aTxn, aDbName, aConfig);
+        } catch (DatabaseException aDBE) {
+            synchronized(theDbs) {
+                theDbs.remove(aDbName);
+                throw aDBE;
+            }
+        }
+    }
+
+    /**
+       Delete's the underlying Db database.  WARNING:  This can be blocked
+       by checkpoints or failed by replication if insufficient time has passed.
+     */
+    public static void deleteDb(String aName) throws IOException {
+        try {
+            synchronized(theDbs) {
+                theDbs.remove(aName);
+            }
+
+            theEnv.removeDatabase(null, aName);
+        } catch (DatabaseException aDbe) {
+            theLogger.log(Level.SEVERE, "Couldn't delete Db", aDbe);
+            throw new IOException("Failed to delete db");
+        }
+    }
+
+    public static boolean dbExists(String aName) {
+        synchronized(theDbs) {
+            return theDbs.contains(aName);
+        }
+    }
+
+    static Transaction newTxn() throws DatabaseException {
+        TransactionConfig myConfig = new TransactionConfig();
+        myConfig.setNoSync(true);
+
+        return theEnv.beginTransaction(null, myConfig);
+    }
+
+    static Transaction newNonBlockingTxn() throws DatabaseException {
+        TransactionConfig myConfig = new TransactionConfig();
+        myConfig.setNoSync(true);
+        myConfig.setNoWait(true);
+
+        return theEnv.beginTransaction(null, myConfig);
+    }
+
+    public static void dumpLocks() {
+        try {
+            StatsConfig myConfig = new StatsConfig();
+            myConfig.setFast(false);
+
+            LockStats myStats = theEnv.getLockStats(myConfig);
+
+            System.err.println("Locks");
+            System.err.println("Owners:" + myStats.getNOwners());
+            System.err.println("RdLock:" + myStats.getNReadLocks());
+            System.err.println("WrLock:" + myStats.getNWriteLocks());
+            System.err.println("Total locks:" + myStats.getNTotalLocks());
+            System.err.println("Waiters:" + myStats.getNWaiters());
+            
+        } catch (DatabaseException anE) {
+            System.err.println("Whoops couldn't dump stats");
+        }
+    }
+
+    public static void dumpStats() {
+        try {
+            StatsConfig myConfig = new StatsConfig();
+            myConfig.setFast(false);
+
+            EnvironmentStats myStats = theEnv.getStats(myConfig);
+
+            System.err.println(myStats);
+
+            /*
+            System.err.println("Log Buffer bytes: " +
+                               myStats.getBufferBytes());
+
+            System.err.println("Cache misses: " + myStats.getNCacheMiss());
+
+            System.err.println("Cache data bytes" +
+                               myStats.getCacheDataBytes());
+
+            System.err.println("Cache total bytes" +
+                               myStats.getCacheTotalBytes());
+             */
+        } catch (Exception anE) {
+            WriteDaemon.theLogger.log(Level.INFO,
+                                      "Couldn't dump stats", anE);
+        }
+    }
+
+    public static void main(String args[]) {
+        try {
+            System.out.println("Disk storing at: " + Disk.getDbLocation());
+            Disk.stop();
+        } catch (Exception anE) {
+            System.err.println("Got errors during test - see log");
+        }
+    }    
+}