diff src/org/dancres/blitz/txn/TxnManager.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children 4580bb12db30
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/org/dancres/blitz/txn/TxnManager.java	Sat Mar 21 11:00:06 2009 +0000
@@ -0,0 +1,742 @@
+package org.dancres.blitz.txn;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.rmi.RemoteException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import net.jini.config.ConfigurationException;
+import net.jini.core.transaction.Transaction;
+import net.jini.core.transaction.TransactionException;
+import net.jini.core.transaction.UnknownTransactionException;
+import net.jini.core.transaction.server.ServerTransaction;
+import net.jini.core.transaction.server.TransactionConstants;
+import net.jini.core.transaction.server.TransactionManager;
+
+import org.dancres.blitz.Logging;
+import org.dancres.blitz.config.ConfigurationFactory;
+import org.dancres.blitz.disk.Disk;
+import org.dancres.blitz.stats.StatsBoard;
+import org.dancres.blitz.task.Tasks;
+import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
+import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock;
+import org.prevayler.Command;
+import org.prevayler.SnapshotContributor;
+import org.prevayler.implementation.SnapshotPrevayler;
+import org.prevayler.implementation.Snapshotter;
+
+/**
+   Responsible for tracking/managing transactions.  This responsiblity is split
+   across two classes.  TxnManager handles control aspects whilst
+   TxnManagerState tracks the transactional information. <P>
+
+   We make our lives a little easier by making null transactions look like a
+   normal transaction which keeps things consistent and clean bar an implied
+   prepare/commit cycle in the operation code of SpaceImpl.  It also assists
+   us in that the transaction code can be largely tested without an external
+   transaction manager because a lot of it is exercised during the use of null
+   operations. <P>
+
+   The transaction subsystems logging and snapshot/checkpointing requirements
+   are handled by holding all state in TxnManagerState and making it a
+   <code>PrevalentSystem</code>.  Snapshots are treated as the equivalent of
+   checkpoints and are triggered in a separate thread.  <P>
+
+   Note that snapshots cannot be performed alongside the processing of commands
+   thus, we take a write on a readerswriter lock during snapshot and release it
+   after.  All other operations are performed under a readlock.  Note that the
+   write lock must be asserted BEFORE invoking snapshot to avoid deadlock.
+   The snapshot is only written to disk after Disk.sync completes.<P>
+
+   The actual workings are a little different from the norm with TxnManager
+   generating commands which act upon TxnManagerState.  Certain methods don't
+   generate commands at all because they don't represent a true state change.
+   Typically these methods are related to introducing new initial state into
+   TxnManagerState such as a transaction which doesn't need to be made
+   durable (saved to log etc.) until it reaches the prepared state. <P>
+
+   Prepare records a durable record of the transaction's operations whilst
+   commit and abort result in those operations being applied. <P>
+
+   @see org.prevayler.PrevalentSystem
+   @see org.prevayler.Command
+   @see org.dancres.blitz.txn.TxnManagerState
+
+   @todo Micro-optimization - we don't need to write an abort record if
+   the txn isn't prepared - just clear it out!
+ */
+public class TxnManager {
+
+    static Logger theLogger =
+        Logging.newLogger("org.dancres.blitz.TxnManager", Level.INFO);
+
+    private static boolean LOG_CKPTS;
+
+    static {
+        try {
+            LOG_CKPTS =
+                ((Boolean)
+                 ConfigurationFactory.getEntry("logCkpts",
+                                               Boolean.class,
+                                               new Boolean(false))).booleanValue();
+        } catch (ConfigurationException aCE) {
+            theLogger.log(Level.SEVERE, "Couldn't load config", aCE);
+        }
+    }
+
+    private static TxnManager theManager;
+
+    private TxnManagerState theManagerState;
+
+    private ReadWriteLock theLock = new WriterPreferenceReadWriteLock();
+
+    private SnapshotPrevayler thePrevayler;
+
+    private CheckpointTrigger theCheckpointTrigger;
+
+    private long theCheckpointCount = 0;
+
+    private TxnGateway theGateway;
+
+    private boolean isRecovery = true;
+
+    public static synchronized void init(TxnGateway aGateway)
+        throws Exception {
+
+        if (theManager != null) {
+            return;
+        } else {
+            theManager = new TxnManager(aGateway);
+            theManager.recover();
+        }
+    }
+
+    public static synchronized TxnManager get() {
+        return theManager;
+    }
+
+    private TxnManager(TxnGateway aGateway) {
+        theGateway = aGateway;
+    }
+
+    private void recover() throws Exception {
+        StoragePersonality myPersonality = 
+            StoragePersonalityFactory.getPersonality();
+
+        theLogger.log(Level.INFO, "Doing log recovery...");
+
+        thePrevayler = myPersonality.getPrevayler(new TxnManagerState());
+
+        theLogger.log(Level.INFO, "Log Recovery complete...");
+
+        theManagerState = (TxnManagerState) thePrevayler.system();
+
+        theCheckpointTrigger =
+            myPersonality.getCheckpointTrigger(new CheckpointerImpl(this));
+
+        StatsBoard.get().add(new TxnStatGenerator());
+
+        issueCheckpoint(true);
+
+        isRecovery = false;
+
+        // Startup the transaction pinger
+        new TxnPinger(theManagerState);
+    }
+
+    /**
+       @return <code>true</code> if the TransactionManager is performing
+       recovery.  When we are performing recovery the log files cannot be
+       written to by user code (that is code outside of the transaction
+       package).
+     */
+    public boolean isRecovery() {
+        return isRecovery;
+    }
+
+    public void add(SnapshotContributor aContributor) {
+        theManagerState.add(aContributor);
+    }
+    
+    public void remove(SnapshotContributor aContributor) {
+        theManagerState.remove(aContributor);
+    }
+
+    public TxnGateway getGateway() {
+        return theGateway;
+    }
+
+    /**
+       @return the number of transactions currently being processed.  Includes
+       ACTIVE, PREPARED, COMMITTED and ABORTED (where the last two states may be
+       present depending on state with respect to log records etc).
+     */
+    int getActiveTxnCount() {
+        try {
+            int myActiveTxnCount;
+
+            theLock.readLock().acquire();
+
+            myActiveTxnCount = theManagerState.getNumActiveTxns();
+
+            theLock.readLock().release();
+
+            return myActiveTxnCount;
+
+        } catch (InterruptedException anIE) {
+            theLogger.log(Level.SEVERE, "Couldn't get active txn count",
+                          anIE);
+
+            return -1;
+        }
+    }
+
+    public TxnState getTxnFor(TxnId anId)
+        throws UnknownTransactionException, RemoteException {
+        return theManagerState.getTxnFor(anId, theGateway, true);
+    }
+
+    /**
+       Resolve a JINI transaction using this method before calling any of
+       <code>prepare</code>, <code>commit</code>, <code>abort</code> or
+       <code>prepareAndCommit</code>.
+     */
+    public TxnState getTxnFor(Transaction aTransaction, boolean mustExist) 
+        throws UnknownTransactionException, RemoteException {
+
+        TxnId myId = convertToId(aTransaction);
+        return theManagerState.getTxnFor(myId, theGateway, mustExist);
+    }
+
+    /**
+       Invoked as part of the path for incoming transaction control via
+       the remote TransactionManager
+     */
+    public TxnState getTxnFor(TransactionManager aMgr, long anId)
+        throws UnknownTransactionException, RemoteException {
+
+        TxnId myId = convertToId(aMgr, anId);
+        return theManagerState.getTxnFor(myId, theGateway, true);
+    }
+
+    /**
+       This method is slightly more flexible then getTxnFor in that it will
+       accept a <code>null</code> and return a new null transaction as well
+       as the usual mapping of JINI txn to internal txn state.
+     */
+    public TxnState resolve(Transaction aTransaction)
+        throws UnknownTransactionException, RemoteException {
+
+        if (aTransaction == null)
+            return newNullTxn();
+        else
+            return getTxnFor(aTransaction, false);
+    }
+
+    /**
+       In cases where no explicit transaction has been passed in by a caller,
+       create a null transaction which is an internal, fully transactional
+       replacement which can be used for the duration of the operation
+       in question.
+     */
+    public TxnState newNullTxn() throws RemoteException {
+        return theManagerState.newNullTxn();
+    }
+
+    /**
+       In cases where no state will be changed (no Entry's taken or written),
+       create an instance of this transaction which, when commited or aborted
+       will be undone but not logged.
+     */
+    public TxnState newIdentityTxn() throws RemoteException {
+        return theManagerState.newIdentityTxn();
+    }
+
+    /**
+       Doesn't throw a DbException because it just writes to disk.  Restore
+       is coped with inside TxnManagerState when we're doing recovery and
+       an Exception could be thrown then.
+     */
+    public int prepare(TxnState aState) throws UnknownTransactionException {
+
+        try {
+            theLock.readLock().acquire();
+
+            aState.vote();
+
+            boolean dontLog = ((aState.isIdentity()) || (aState.hasNoOps()));
+
+            Integer myResult = (Integer)
+                execute(new PrepCommand(aState), dontLog);
+            
+            theLock.readLock().release();
+
+            theCheckpointTrigger.loggedCommand();
+
+            return myResult.intValue();
+
+        } catch (InterruptedException anIE) {
+            theLogger.log(Level.SEVERE, "Failed to log prepare", anIE);
+            throw new UnknownTransactionException();
+        } catch (Exception anE) {
+            theLock.readLock().release();
+            theLogger.log(Level.SEVERE, "Failed to log prepare", anE);
+            throw new UnknownTransactionException();
+        }
+    }
+
+    public void commit(TxnState aState) throws UnknownTransactionException {
+
+        try {
+            theLock.readLock().acquire();
+
+            boolean dontLog = ((aState.isIdentity()) || (aState.hasNoOps()));
+
+            execute(new CommitCommand(aState.getId()), dontLog);
+            
+            theLock.readLock().release();
+
+            aState.doFinalize();
+
+            theCheckpointTrigger.loggedCommand();
+
+        } catch (InterruptedException anIE) {
+            theLogger.log(Level.SEVERE, "Failed to log commit", anIE);
+            throw new UnknownTransactionException();
+        } catch (Exception anE) {
+            theLock.readLock().release();
+            theLogger.log(Level.SEVERE, "Failed to log commit", anE);
+            throw new UnknownTransactionException();
+        }
+    }
+
+    public void abort(TxnState aState) throws UnknownTransactionException {
+
+        try {
+            theLock.readLock().acquire();
+
+            int myResultingState = aState.vote();
+
+            /*
+                We're in at least voting state and possibly prepared state.
+                If at this point the txn is an identity transaction or
+                it has no operations we needn't log it.  We also needn't log
+                it in the case where we've not yet written a prepare record
+                to disk which would be indicated by vote() returning
+                VOTING as opposed to PREPARED.
+             */
+            boolean dontLog =
+                ((aState.isIdentity()) || (aState.hasNoOps()) ||
+                    (myResultingState == TransactionConstants.VOTING));
+
+            execute(new AbortCommand(aState.getId()), dontLog);
+            
+            theLock.readLock().release();
+
+            aState.doFinalize();
+
+            theCheckpointTrigger.loggedCommand();
+
+        } catch (InterruptedException anIE) {
+            theLogger.log(Level.SEVERE, "Failed to log abort", anIE);
+            throw new UnknownTransactionException();
+        } catch (Exception anE) {
+            theLock.readLock().release();
+            theLogger.log(Level.SEVERE, "Failed to log abort", anE);
+            throw new UnknownTransactionException();
+        }
+    }
+
+    public int prepareAndCommit(TxnState aState)
+        throws UnknownTransactionException {
+
+        try {
+            theLock.readLock().acquire();
+
+            aState.vote();
+
+            boolean dontLog = ((aState.isIdentity()) || (aState.hasNoOps()));
+
+            Integer myResult = (Integer)
+                execute(new PrepCommitCommand(aState), dontLog);
+
+            theLock.readLock().release();
+
+            aState.doFinalize();
+
+            theCheckpointTrigger.loggedCommand();
+
+            return myResult.intValue();
+            
+        } catch (InterruptedException anIE) {
+            theLogger.log(Level.SEVERE, "Failed to log prepCommit", anIE);
+            throw new UnknownTransactionException();
+        } catch (Exception anE) {
+            theLock.readLock().release();
+            theLogger.log(Level.SEVERE, "Failed to log prepCommit", anE);
+            throw new UnknownTransactionException();
+        }
+    }
+
+    /**
+       Log a specific single action in a transaction of its own.  This is
+       typically used by elements of Blitz that need to record some state
+       transition that would need to be re-applied at recovery.
+     */
+    public void log(TxnOp anOp) throws TransactionException {
+        tryLog(anOp, Long.MAX_VALUE);
+    }
+
+    /**
+       Attempt to log an action.  The attempt is bounded by the specified
+       timeout such that if the action cannot be logged within the time, it
+       will be abandoned.
+
+       @return <code>true</code> if the action was written, <code>false</code>
+       otherwise.
+    */
+    public boolean tryLog(TxnOp anOp, long aTimeout)
+        throws TransactionException {
+
+        try {
+            TxnState myEnclosing = newNullTxn();
+
+            myEnclosing.add(anOp);
+
+            if (theLock.readLock().attempt(aTimeout)) {
+                myEnclosing.vote();
+                
+                // Given the contract of timeout, we can make this loss'y'
+                thePrevayler.executeCommand(new PrepCommitCommand(myEnclosing),
+                        false);
+                theLock.readLock().release();
+                theCheckpointTrigger.loggedCommand();
+                return true;
+            } else
+                return false;
+        } catch (InterruptedException anIE) {
+            theLogger.log(Level.SEVERE, "Failed to log Action", anIE);
+            throw new TransactionException();
+        } catch (Exception anE) {
+            theLock.readLock().release();
+            theLogger.log(Level.SEVERE, "Failed to log Action", anE);
+            throw new TransactionException();
+        }
+    }
+
+    public void abortAll() throws IOException {
+
+        try {
+            theLock.readLock().acquire();
+
+            // Has to be logged
+            //
+            thePrevayler.executeCommand(new AbortAllCommand());
+            
+            theLock.readLock().release();
+
+            theCheckpointTrigger.loggedCommand();
+
+        } catch (InterruptedException anIE) {
+            theLogger.log(Level.SEVERE, "Failed to log abortAll", anIE);
+            throw new IOException();
+        } catch (Exception anE) {
+            theLock.readLock().release();
+            theLogger.log(Level.SEVERE, "Failed to log abortAll", anE);
+            throw new IOException();
+        }
+    }
+
+    private TxnId convertToId(TransactionManager aMgr, long anId)
+        throws RemoteException {
+
+        return new TxnId(aMgr, anId);
+    }
+
+    private TxnId convertToId(Transaction aTxn) throws RemoteException {
+        ServerTransaction myTxn = (ServerTransaction) aTxn;
+        
+        return new TxnId(myTxn.mgr, myTxn.id);
+    }
+
+    /**
+     * @todo There's a race condition here where state might change after we
+     * check for in-memory execute or not leading to incorrect behaviour in
+     * face of a crash.  This needs fixing whilst at the same time we should
+     * address the issue of an abort on an ACTIVE transaction which therefore
+     * needn't be logged.  What we need to do is modify each command to report
+     * if it needs to be logged.  We then modify the prevayler to execute
+     * the command and then determine if the outcome needs logging and log
+     * if necessary.  Only the SnapshotPrevaylerImpl really cares about this
+     * so only it needs tweaking.  This state should probably be deduced
+     * by the TxnState itself or TxnManagerState - it certainly needs to be
+     * done under the lock of the TxnState.  The decision points for whether
+     * a transaction is idempotent are at initial prepare (where we discover
+     * if there are no listeners or the txn is the identity txn) and at an
+     * abort when the transaction it still active (and thus we haven't written
+     * a log record).
+     */
+    private Serializable execute(Command aCommand, boolean dontLog)
+        throws Exception {
+        if (dontLog) {
+            // System.err.println("Bypass");
+            return aCommand.execute(theManagerState);
+        } else {
+            // System.err.println("Full log");
+            return thePrevayler.executeCommand(aCommand);
+        }
+    }
+
+    /**
+       Run a hot backup.  Basic contract is that we will catch all but the
+       currently active transactions (whose effects will be confined to the
+       cache anyway.
+
+       @param aDestDir the mount point/directory to copy the files to.
+     */
+    public void hotBackup(String aDestDir) throws IOException {
+
+        /*
+          First do an unlocked sync to get most updates from the cache
+          to disk.
+
+          Then take the txn lock and perform the second sync part of
+          which then does the file copy for backup purposes
+         */
+        try {
+            Disk.sync();
+        } catch (Exception anE) {
+            IOException anIOE = new IOException("Failed to start backup (initial sync)");
+            anIOE.initCause(anE);
+
+            throw anIOE;
+        }
+
+        try {
+            theLock.writeLock().acquire();
+
+            try {
+                Disk.backup(aDestDir);
+            } finally {
+                theLock.writeLock().release();
+            }
+
+        } catch (InterruptedException anIE) {
+            IOException anIOE = new IOException("Failed to start backup");
+            anIOE.initCause(anIE);
+
+            throw anIOE;
+        }
+    }
+
+    /**
+       Used by external entities to request a snapshot which could be used
+       by another Blitz instance - i.e. think of this as a copy type operation.
+
+       @todo The checking of active transactions is not bullet proof because
+       we could get a new transaction before we start the checkpoint.  Consider
+       a barrier or some other fix (requires a decision on how to handle
+       waiting for settling of transactions etc).  We're keeping it simple for
+       now.
+     */
+    public void requestSnapshot() throws TransactionException, IOException {
+        try {
+            int myActiveTxnCount;
+
+            theLock.readLock().acquire();
+
+            myActiveTxnCount = theManagerState.getNumActiveTxns();
+
+            theLock.readLock().release();
+
+            if (myActiveTxnCount != 0)
+                throw new TransactionException(
+                        "Cannot snapshot with active transactions it's bad for your data");
+        } catch (InterruptedException anIE) {
+            throw new TransactionException("Couldn't check for outstanding transactions");
+        }
+
+        CheckpointTask myTask = new CheckpointTask();
+
+        try {
+            Tasks.queue("checkpoints", myTask);
+        } catch (InterruptedException anIE) {
+            theLogger.log(Level.SEVERE, "Failed to queue checkpoint task",
+                          anIE);
+            throw new IOException("Failed to queue checkpoint task");
+        }
+
+        myTask.waitForCompletion();
+    }
+
+    /**
+       Requests a checkpoint and blocks 'til completion.  This method is
+       used internally by the Blitz core and unlike requestSnapshot makes
+       no efforts to render the filesystem into a state which could be copied
+       to another Blitz instance.
+     */
+    public void requestSyncCheckpoint() throws IOException {
+        issueCheckpoint(true);
+    }
+
+    /**
+       This method is used internally by the Blitz core and unlike
+       requestSnapshot makes no efforts to render the filesystem into a state
+       which could be copied to another Blitz instance.
+
+       <ol>
+       <li> Assert a write lock to prevent further commands from being
+       logged. </li>
+       <li> Snapshot the state of the PrevalentSystem (TxnManagerState) </li>
+       <li> Snapshot results in changeover to a new log file.
+       <li> Release the write lock to allow further commands to enter the new
+       log file </li>
+       <li> Invoke Disk.sync to flush dirty data to disk which will callback
+       on completion </li>
+       <li> Callback triggers writing of the snapshot which invalidates all
+       log files previous to the one started above </li>
+       </ol>
+
+       <p>In the event of failure before sync'ing is complete - i.e. the
+       snapshot has not been saved, log files from the previous snapshot
+       onwards will be used to re-construct state.  If there's no previous
+       snapshot, all log files will be replayed to reconstruct state. </p>
+
+       @see org.dancres.blitz.disk.Disk
+     */
+    void requestAsyncCheckpoint() throws IOException {
+        issueCheckpoint(false);
+    }
+
+    /**
+       Internal implementation of the checkpoint operation used by snapshot
+       and checkpoint methods above.
+
+       @param isBlocking specifies whether to block the caller until the
+       checkpoint completes
+     */
+    private void issueCheckpoint(boolean isBlocking) throws IOException {
+
+        // Only checkpoint if the trigger says it's okay
+        if (theCheckpointTrigger.checkpointsDisabled())
+            return;
+
+        try {
+            long myCkptId;
+
+            synchronized(this) {
+                myCkptId = theCheckpointCount++;
+            }
+
+            if (LOG_CKPTS)
+                theLogger.log(Level.INFO, "Checkpoint::start: " + myCkptId);
+
+            theLock.writeLock().acquire();
+
+            // Issue tentative checkpoint - change over logs
+            // and carry over prepared state from old log in snapshotter
+            // which will be commited/aborted in the new log
+            Snapshotter mySnapper = thePrevayler.takeSnapshot();
+
+            theLock.writeLock().release();
+
+            // Now sync disks and save snapshot at completion
+            if (isBlocking) {
+                BlockingSnapshotTask myTask =
+                    new BlockingSnapshotTask(myCkptId, mySnapper);
+
+                Disk.sync(myTask);
+                myTask.waitForCompletion();
+            } else {
+                Disk.sync(new SnapshotTask(myCkptId, mySnapper));
+            }
+
+            if (LOG_CKPTS)
+                theLogger.log(Level.INFO, "Checkpoint::end: " + myCkptId);
+
+        } catch (InterruptedException anIE) {
+            theLogger.log(Level.SEVERE, "Failed to get lock for ckpt", anIE);
+            throw new IOException("Failed to lock for ckpt");
+        } catch (Exception anE) {
+            theLogger.log(Level.SEVERE, "Failed to sync", anE);
+            throw new IOException("Failed to sync");
+        }
+    }
+
+    /**
+       An instance of this object is passed to Disk.sync.  It's run method
+       will be called once Disk has completed the sync task.  When called,
+       it saves the snapshot to disk which obsoletes logs previous to the
+       snapshot (where previous is defined as a log with a sequence number
+       less than that of the snapshot).
+     */
+    class SnapshotTask implements Runnable {
+        private Snapshotter theSnapper;
+        private long theCkptId;
+
+        SnapshotTask(long aCkptId, Snapshotter aSnapper) {
+            theCkptId = aCkptId;
+            theSnapper = aSnapper;
+        }
+
+        public void run() {
+            try {
+                if (LOG_CKPTS)
+                    theLogger.log(Level.INFO, 
+                                  "Disks sync'd - save snapshot: " +
+                                  theCkptId);
+                theSnapper.save();
+
+                if (LOG_CKPTS)
+                    theLogger.log(Level.INFO, "Snapshot saved: " + theCkptId);
+            } catch (IOException anIOE) {
+                theLogger.log(Level.SEVERE, "Failed to save snapshot on sync",
+                              anIOE);
+            }
+        }
+    }
+
+    class BlockingSnapshotTask implements Runnable {
+        private Snapshotter theSnapper;
+        private long theCkptId;
+        private boolean isComplete;
+
+        BlockingSnapshotTask(long aCkptId, Snapshotter aSnapper) {
+            theCkptId = aCkptId;
+            theSnapper = aSnapper;
+        }
+
+        public void run() {
+            try {
+                if (LOG_CKPTS)
+                    theLogger.log(Level.INFO, 
+                                  "Disks sync'd - save snapshot: " +
+                                  theCkptId);
+                theSnapper.save();
+
+                synchronized(this) {
+                    isComplete = true;
+                    notify();
+                }
+
+                if (LOG_CKPTS)
+                    theLogger.log(Level.INFO, "Snapshot saved: " + theCkptId);
+            } catch (IOException anIOE) {
+                theLogger.log(Level.SEVERE, "Failed to save snapshot on sync",
+                              anIOE);
+            }
+        }
+
+        void waitForCompletion() {
+            synchronized(this) {
+                while(! isComplete) {
+                    try {
+                        wait();
+                    } catch (InterruptedException anIE) {
+                    }
+                }
+            }
+        }
+    }
+}