Mercurial > hg > blitz_condensed
diff src/org/dancres/blitz/txn/TxnManagerState.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/org/dancres/blitz/txn/TxnManagerState.java Sat Mar 21 11:00:06 2009 +0000 @@ -0,0 +1,422 @@ +package org.dancres.blitz.txn; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.rmi.RemoteException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.logging.Level; + +import net.jini.core.transaction.TransactionException; +import net.jini.core.transaction.UnknownTransactionException; +import net.jini.core.transaction.server.TransactionConstants; + +import org.prevayler.AlarmClock; +import org.prevayler.PrevalentSystem; +import org.prevayler.SnapshotContributor; + +/** + Responsible for tracking/managing transactions. This responsiblity is split + across two classes. TxnManager handles control aspects whilst + TxnManagerState tracks the transactional information. <P> + + @see org.dancres.blitz.txn.TxnManager + */ +class TxnManagerState implements PrevalentSystem { + static final long serialVersionUID = -5650181362477845180L; + + private static boolean UPGRADE = false; + + /* + These member variables are serialized as part of writeObject and + de-serialized as part of readObject (which also initializes appropriate + state). i.e. We don't just serialize this object directly. + */ + private AlarmClock theClock; + + private HashMap theTxns = new HashMap(); + + private Serializable[] theSnapshotContributions = new Serializable[0]; + + private ArrayList theSnapshotContributors = new ArrayList(); + + static void enableUpgrade() { + UPGRADE = true; + } + + public void clock(AlarmClock aClock) { + theClock = aClock; + } + + public AlarmClock clock() { + return theClock; + } + + public List getActiveTxnIds() { + ArrayList myTxnIds = new ArrayList(); + + synchronized(this) { + Iterator myTxns = theTxns.keySet().iterator(); + + while (myTxns.hasNext()) { + TxnId myId = (TxnId) myTxns.next(); + myTxnIds.add(myId); + } + } + + return myTxnIds; + } + + public void add(SnapshotContributor aContributor) { + synchronized(theSnapshotContributors) { + if (!theSnapshotContributors.contains(aContributor)) + theSnapshotContributors.add(aContributor); + } + } + + public void remove(SnapshotContributor aContributor) { + synchronized(theSnapshotContributors) { + theSnapshotContributors.remove(aContributor); + } + } + + public Serializable[] getSnapshotContributions() { + return theSnapshotContributions; + } + + private void writeObject(ObjectOutputStream anOut) throws IOException { + anOut.writeObject(LogVersion.VERSION); + + /* + We only save PREPARED transactions, ignoring ACTIVES because + they are transient and their state changes won't be applied + until we've issued prepare and then commit or abort. The ACTIVES + will either die due to failure or, post the sync, add operations + to the log. Note that, whilst a transaction is active, it generates + no log records at all hence the reason we don't need to save them. + Commited or aborted updates in cache which need flushing to disk + should have already been sync'd before we get this far. + */ + ArrayList myPrepared = new ArrayList(); + + synchronized(this) { + // Write out clock + // + anOut.writeObject(theClock); + + Iterator myTxns = theTxns.keySet().iterator(); + + while (myTxns.hasNext()) { + TxnId myId = (TxnId) myTxns.next(); + + TxnState myState = getState(myId); + + try { + int myStatus = myState.getStatus(); + + if (myStatus == TransactionConstants.PREPARED) { + myPrepared.add(myState); + } + + } catch (TransactionException aTE) { + // Whoops, got nailed checking status, logged in the call + // nothing to do. + } + } + + anOut.writeInt(myPrepared.size()); + + for (int i = 0; i < myPrepared.size(); i++) { + anOut.writeObject(myPrepared.get(i)); + } + } + + /* + Write out any user-code snapshot contributions + */ + ArrayList myContributions = new ArrayList(); + + synchronized(theSnapshotContributors) { + for (int i = 0; i < theSnapshotContributors.size(); i++) { + myContributions.add(((SnapshotContributor) theSnapshotContributors.get(i)).getContribution()); + } + } + + Serializable[] myUserData = new Serializable[myContributions.size()]; + myUserData = (Serializable[]) myContributions.toArray(myUserData); + + anOut.writeObject(myUserData); + } + + private void readObject(ObjectInputStream anIn) + throws IOException, ClassNotFoundException { + + boolean isUpgrade = false; + + theTxns = new HashMap(); + theSnapshotContributors = new ArrayList(); + + Object myFirst = anIn.readObject(); + + /* + If there's no LogVersion, chances are we're looking at a pre 1.13 + log format - upgrade is simple as there's no LogVersion and there + will be no user checkpoint data so we just ignore those fields. + */ + if (! (myFirst instanceof LogVersion)) { + TxnManager.theLogger.log(Level.SEVERE, "Upgrading old transaction log"); + isUpgrade = true; + theClock = (AlarmClock) myFirst; + } else { + LogVersion myVersion = (LogVersion) myFirst; + + if (!myVersion.equals(LogVersion.VERSION)) + throw new IOException("Yikes - log versions don't match - upgrade?" + myVersion); + + theClock = (AlarmClock) anIn.readObject(); + } + + int myNumRecords = anIn.readInt(); + + synchronized(this) { + + for (int i = 0; i < myNumRecords; i++) { + TxnState myState = (TxnState) anIn.readObject(); + + try { + myState.prepare(true); + } catch (UnknownTransactionException aUTE) { + IOException anIOE = new IOException("Failed to recover prepare"); + anIOE.initCause(aUTE); + throw anIOE; + } + + theTxns.put(myState.getId(), myState); + } + } + + if (isUpgrade) + theSnapshotContributions = new Serializable[0]; + else + theSnapshotContributions = (Serializable[]) anIn.readObject(); + } + + private TxnState getState(TxnId anId) { + synchronized(this) { + return (TxnState) theTxns.get(anId); + } + } + + /** + Resolve a transaction using this method before calling any of + <code>prepare</code>, <code>commit</code>, <code>abort</code> or + <code>prepareAndCommit</code>. + + @todo Add Janitor/Checker thread to clear out dead transaction + state - see comments in method + */ + TxnState getTxnFor(TxnId anId, TxnGateway aGateway, boolean mustExist) + throws UnknownTransactionException { + + TxnState myState = null; + + if (mustExist) { + myState = getState(anId); + } else { + synchronized(this) { + myState = (TxnState) theTxns.get(anId); + } + + /* + If state doesn't exist, we need to join and update the state + */ + if (myState == null) { + try { + aGateway.join(anId); + } catch (Exception anException) { + TxnManager.theLogger.log(Level.SEVERE, + "Failed to join txn" + + anId, anException); + + throw new UnknownTransactionException(); + } + + /* + There's a race condition here where the transaction + could be prepared before we get our state updated. + If that happens, the prepare method will bounce + the prepare call blowing the transaction out. + + This will leave us with a bit of dead txn state which + we ought to cleanup by invoking getState etc. This + could be done by a janitor thread in the future. + + In reality: + + (a) This is unlikely + (b) Should this happen, we're going to be the least of + the problems because someone somewhere thinks + the transaction is active (this thread is acting on + their behalf) and someone else is closing it out. + When the client associated with this thread invokes + commit there'll be a big nasty mess. + */ + synchronized(this) { + + /* + Up till now, we race to create/join the transaction + (see above). Now we must put it right.... + */ + myState = (TxnState) theTxns.get(anId); + if (myState == null) { + myState = new TxnState(anId); + theTxns.put(anId, myState); + } + } + } + } + + if (myState == null) + throw new UnknownTransactionException(); + else + return myState; + } + + /** + In cases where no explicit transaction has been passed in by a caller, + create a null transaction which is an internal, fully transactional + replacement which can be used for the duration of the operation + in question. + */ + TxnState newNullTxn() throws RemoteException { + TxnId myId = TxnId.newNullTxn(); + TxnState myState = new TxnState(myId); + + synchronized(this) { + theTxns.put(myId, myState); + } + + return myState; + } + + /** + In cases where no state will be changed (no Entry's taken or written), + create an instance of this transaction which, when commited or aborted + will be undone but not logged. + */ + TxnState newIdentityTxn() throws RemoteException { + TxnId myId = TxnId.newNullTxn(); + TxnState myState = new TxnState(myId, true); + + synchronized(this) { + theTxns.put(myId, myState); + } + + return myState; + } + + /** + Do not call this method directly - it should only be invoked from + a Prevayler command. + */ + int prepare(TxnState aState) + throws UnknownTransactionException, IOException { + + /* + Do we know about this transaction? + + If we don't we've failed and are now doing recovery so we must + re-insert the state. + */ + boolean needsRestore = (getState(aState.getId()) == null); + + if (needsRestore) { + synchronized(this) { + theTxns.put(aState.getId(), aState); + } + } + + return aState.prepare(needsRestore); + } + + /** + Do not call this method directly - it should only be invoked from + a Prevayler command. + */ + void commit(TxnId anId) + throws UnknownTransactionException, IOException { + + TxnState myState = getTxnFor(anId, null, true); + + myState.commit(); + + removeTxn(anId); + } + + /** + Do not call this method directly - it should only be invoked from + a Prevayler command. + */ + void abort(TxnId anId) + throws UnknownTransactionException, IOException { + + TxnState myState = getTxnFor(anId, null, true); + + myState.abort(); + removeTxn(anId); + } + + void abortAll() throws IOException { + synchronized(this) { + Iterator myTxns = theTxns.keySet().iterator(); + + while (myTxns.hasNext()) { + TxnId myId = (TxnId) myTxns.next(); + + TxnState myState = getState(myId); + + try { + int myStatus = myState.getStatus(); + + if ((myStatus == TransactionConstants.PREPARED) || + (myStatus == TransactionConstants.ACTIVE)) { + + /* + * AbortAll is a naive operation in that it has no + * awareness of a specific transaction thus it cannot + * explicitly vote one of them off so we must do it + * ourselves + */ + myState.vote(); + myState.abort(); + myTxns.remove(); + } + + } catch (TransactionException aTE) { + // Whoops, got nailed checking status, logged in the call + // nothing to do. + } + } + } + } + + private void removeTxn(TxnId anId) { + TxnState myState; + + synchronized(this) { + myState = (TxnState) theTxns.remove(anId); + } + } + + int getNumActiveTxns() { + synchronized(this) { + return theTxns.size(); + } + } +} + +