diff src/org/dancres/blitz/txnlock/TxnLock.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/org/dancres/blitz/txnlock/TxnLock.java	Sat Mar 21 11:00:06 2009 +0000
@@ -0,0 +1,437 @@
+package org.dancres.blitz.txnlock;
+
+import java.util.LinkedList;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.*;
+
+import org.dancres.blitz.entry.OpInfo;
+
+import org.dancres.blitz.task.Task;
+import org.dancres.blitz.task.Tasks;
+
+import org.dancres.blitz.txn.TxnId;
+import org.dancres.blitz.txn.TxnState;
+
+import org.dancres.blitz.Logging;
+
+/**
+   <p> Every transaction (null or otherwise) must secure a lock for an
+   EntrySleeve before it can deemed to have succeeded.  When the transaction is
+   commited these locks are then released. </p>
+
+   <p> Things are slightly more complicated than this because we must handle
+   lock conflict and integrate that with the dispatching of blocked takes or
+   reads such that we can wake up those blocked matches when the blocking
+   transaction is commited. </p>
+ */
+public class TxnLock {
+    private static final Logger theLogger =
+        Logging.newLogger("org.dancres.blitz.txnlock.TxnLock");
+
+    public static final int WRITE = -1;
+    public static final int READ = -2;
+    public static final int DELETE = -3;
+
+    public static final int SUCCESS = 1;
+    public static final int CONFLICT = 2;
+    public static final int FAIL = 3;
+
+    private List theConflicts;
+
+    private ArrayList theLockStates = new ArrayList();
+
+    /*
+      DEBUG BITS
+     */
+    private static final boolean DEBUG = false;
+    private static Object theIdLock = new Object();
+    private static long theNextId = 0;
+    private long theId = -1;
+
+    private static class LockState {
+        private int theType;
+        private TxnId theOwner;
+
+        LockState(int aType, TxnId anOwner) {
+            theType = aType;
+            theOwner = anOwner;
+        }
+
+        int getType() {
+            return theType;
+        }
+
+        TxnId getOwner() {
+            return theOwner;
+        }
+
+        public String toString() {
+            return theOwner + ":" + theType;
+        }
+    }
+
+    TxnLock() {
+        if (DEBUG) {
+            synchronized(theIdLock) {
+                theId = ++theNextId;
+            }
+        }
+    }
+
+    public boolean isActive() {
+        return (theLockStates.size() != 0);
+    }
+
+    public int test(TxnState anAcquirer, int aDesiredOp) {
+        TxnId myTxnId = anAcquirer.getId();
+
+        if (aDesiredOp == WRITE) {
+            // Always succeeds
+            return SUCCESS;
+        }
+
+        switch(aDesiredOp) {
+            case READ : {
+                boolean deleterIsMe = hasDeleterThatsMe(myTxnId);
+                TxnId aDeleter = hasDeleter();
+                TxnId aWriter = hasWriterOtherThan(myTxnId);
+
+                if (deleterIsMe) {
+                    return FAIL;
+                } else if ((aDeleter != null) || (aWriter != null)) {
+                    return CONFLICT;
+                } else {
+                    return SUCCESS;
+                }
+            }
+            case DELETE : {
+                boolean deleterIsMe = hasDeleterThatsMe(myTxnId);
+                TxnId aDeleter = hasDeleter();
+                TxnId aWriter = hasWriterOtherThan(myTxnId);
+                TxnId aReader = hasReaderOtherThan(myTxnId);
+
+                if (deleterIsMe)
+                    return FAIL;
+                else if ((aDeleter != null) ||
+                         (aReader != null) ||
+                         (aWriter != null)) {
+                    return CONFLICT;
+                } else {
+                    return SUCCESS;
+                }
+            }
+            default : {
+                throw new RuntimeException("Unrecognised acquire op");
+            }
+        }
+    }
+
+    /**
+       One calls this method to assert an operation lock on a particular
+       EntryUID.  On success <code>true</code> is returned and the acquire
+       is complete.  In the case of failure, the caller is not blocked.
+       Rather, we register a callback against the transaction which will
+       be invoked after it relinquishes it's lock.  Note that there might
+       well be more than one blocking transaction but we only bother
+       registering against one of them.  This might result in a few false
+       wakeups but they are unlikely to be performance inhibiting.  This
+       method should be called in a synchronized block. <P>
+
+       Note that writes always succeed so there's no need to pass a callback
+       for such acquire requests.
+
+       @param anAcquirer TxnId of the transaction which wishes to assert the
+       lock.
+       @param aDesiredOp should be one of DELETE, READ,
+       WRITE
+       @param aParty called should a blocking transaction be encountered and
+       a second time when the blocking transaction has relinquished the lock
+       @param aHandback passed to aParty to allow multiplexing of conflicts
+       @param isRecovery when <code>true</code> causes a lock to be asserted
+       without performing checks.
+
+       @return One of, SUCCESS, CONFLICT or FAIL.
+     */
+    public int acquire(TxnState anAcquirer, int aDesiredOp, BaulkedParty aParty,
+                       Object aHandback, boolean isRecovery) {
+
+        TxnId myTxnId = anAcquirer.getId();
+
+        if (aDesiredOp == WRITE) {
+            // Always succeeds
+            theLockStates.add(new LockState(WRITE, myTxnId));
+            return SUCCESS;
+        }
+
+        if (isRecovery) {
+            switch (aDesiredOp) {
+                case READ : {
+                    theLockStates.add(new LockState(READ, myTxnId));
+                    break;
+                }
+                case DELETE: {
+                    theLockStates.add(new LockState(DELETE, myTxnId));
+                    break;
+                }
+            }
+
+            return SUCCESS;
+        }
+
+        switch(aDesiredOp) {
+            case READ : {
+                boolean deleterIsMe = hasDeleterThatsMe(myTxnId);
+                TxnId aDeleter = hasDeleter();
+                TxnId aWriter = hasWriterOtherThan(myTxnId);
+
+                if (deleterIsMe) {
+                    return FAIL;
+                } else if ((aDeleter != null) || (aWriter != null)) {
+                    addConflict(aParty, aHandback,
+                                (aDeleter == null) ? aWriter : aDeleter);
+                    return CONFLICT;
+                } else {
+                    theLockStates.add(new LockState(READ, myTxnId));
+                    return SUCCESS;
+                }
+            }
+            case DELETE : {
+                boolean deleterIsMe = hasDeleterThatsMe(myTxnId);
+                TxnId aDeleter = hasDeleter();
+                TxnId aWriter = hasWriterOtherThan(myTxnId);
+                TxnId aReader = hasReaderOtherThan(myTxnId);
+
+                if (deleterIsMe)
+                    return FAIL;
+                else if ((aDeleter != null) ||
+                         (aReader != null) ||
+                         (aWriter != null)) {
+                    if (aDeleter != null)
+                        addConflict(aParty, aHandback, aDeleter);
+                    else
+                        addConflict(aParty, aHandback,
+                                    (aReader == null) ? aWriter : aReader);
+                    return CONFLICT;
+                } else {
+                    theLockStates.add(new LockState(DELETE,
+                                                    myTxnId));
+                    return SUCCESS;
+                }
+            }
+            default : {
+                throw new RuntimeException("Unrecognised acquire op");
+            }
+        }
+    }
+
+    private void addConflict(BaulkedParty aParty, Object aHandback,
+                             TxnId aConflicter) {
+        if (aParty == null)
+            return;
+
+        if (theConflicts == null)
+            theConflicts = new ArrayList();
+        // theConflicts = new LinkedList();
+
+        theConflicts.add(new Callback(aConflicter, aParty, aHandback));
+    }
+
+    private static class Callback implements Task {
+        private TxnId theBlocker;
+        private BaulkedParty theParty;
+        private Object theHandback;
+
+        Callback(TxnId aBlocker, BaulkedParty aParty, Object aHandback) {
+            theBlocker = aBlocker;
+            theParty = aParty;
+            theHandback = aHandback;
+            theParty.blocked(aHandback);
+        }
+
+        TxnId getBlocker() {
+            return theBlocker;
+        }
+
+        BaulkedParty getBaulked() {
+            return theParty;
+        }
+
+        public void run() {
+            theParty.unblocked(theHandback);
+        }
+    }
+
+    public void release(TxnState anAcquirer, int anOp) {
+        TxnId myId = anAcquirer.getId();
+
+        int myIndex = 0;
+        boolean haveReleasedLock = false;
+        int myOutstanding = 0;
+
+        /*
+          Examine all lock states and:
+
+          (1) Release the relevant lock state
+          (2) Take note of any other lock states associated with this
+          transaction.
+        */
+        ArrayList myDispatches = new ArrayList();
+
+        synchronized(this) {
+            while (myIndex < theLockStates.size()) {
+                LockState myState = (LockState) theLockStates.get(myIndex);
+
+                // If the lock state is associated with this transaction...
+                if (myState.getOwner().equals(myId)) {
+                    /*
+                      If we've already released a lock of this type for this
+                      transaction just note that we've found another lock held
+                      by the transaction.  Otherwise, release the lock and note
+                      we did that.
+                    */
+                    if ((myState.getType() == anOp) && (! haveReleasedLock)) {
+                        theLockStates.remove(myIndex);
+                        haveReleasedLock = true;
+                        continue;
+                    } else {
+                        ++myOutstanding;
+                    }
+                }
+
+                ++myIndex;
+            }
+
+            if (myOutstanding != 0) {
+                // System.err.println("Still have outstanding: " + myOutstanding);
+                // We haven't released all locks yet - no point waking up blockers
+                return;
+            } else {
+                // System.err.println("Waking up conflicters");
+            }
+
+            // Now process any outstanding blockers
+            myIndex = 0;
+            while ((theConflicts != null) && (myIndex < theConflicts.size())) {
+                Callback myCallback = (Callback) theConflicts.get(myIndex);
+
+                if (myCallback.getBlocker().equals(myId)) {
+                    theConflicts.remove(myCallback);
+
+                    // System.err.println("Dispatching callback");
+                    myDispatches.add(myCallback);
+                } else {
+                    ++myIndex;
+                }
+            }
+        }
+
+        if (myDispatches.size() > 0) {
+            try {
+                Tasks.queue(new DispatchTask(myDispatches));
+            } catch (InterruptedException anIE) {
+                theLogger.log(Level.SEVERE,
+                              "Failed to queue Txn callback", anIE);
+            }
+        }
+    }
+
+    static class DispatchTask implements Task {
+        private ArrayList theDispatches;
+
+        DispatchTask(ArrayList aDispatches) {
+            theDispatches = aDispatches;
+        }
+
+        public void run() {
+            for (int i = 0; i < theDispatches.size(); i++) {
+                ((Task) theDispatches.get(i)).run();
+            }
+        }
+    }
+
+    public TxnId getWriter() {
+        for (int i = 0; i < theLockStates.size(); i++) {
+            LockState myState = (LockState) theLockStates.get(i);
+
+            if (myState.getType() == WRITE) {
+                return myState.getOwner();
+            }
+        }
+
+        return null;
+    }
+
+    public synchronized boolean hasWriter(TxnId anId) {
+        for (int i = 0; i < theLockStates.size(); i++) {
+            LockState myState = (LockState) theLockStates.get(i);
+
+            if ((myState.getType() == WRITE) &&
+                (myState.getOwner().equals(anId)))
+                return true;
+        }
+
+        return false;
+    }
+
+    public synchronized boolean hasOnly(TxnId anId, int anOp) {
+        if (theLockStates.size() != 1)
+            return false;
+
+        LockState myState = (LockState) theLockStates.get(0);
+
+        return ((myState.getOwner().equals(anId)) &&
+                (myState.getType() == anOp));
+    }
+
+    private TxnId hasDeleter() {
+        for (int i = 0; i < theLockStates.size(); i++) {
+            LockState myState = (LockState) theLockStates.get(i);
+
+            if (myState.getType() == DELETE)
+                return myState.getOwner();
+        }
+
+        return null;
+    }
+
+    private TxnId hasReaderOtherThan(TxnId anId) {
+        for (int i = 0; i < theLockStates.size(); i++) {
+            LockState myState = (LockState) theLockStates.get(i);
+
+            if ((myState.getType() == READ) &&
+                (! myState.getOwner().equals(anId)))
+                return myState.getOwner();
+        }
+
+        return null;
+    }
+
+    private TxnId hasWriterOtherThan(TxnId anId) {
+        for (int i = 0; i < theLockStates.size(); i++) {
+            LockState myState = (LockState) theLockStates.get(i);
+
+            if ((myState.getType() == WRITE) &&
+                (! myState.getOwner().equals(anId)))
+                return myState.getOwner();
+        }
+
+        return null;
+    }
+
+    private boolean hasDeleterThatsMe(TxnId anId) {
+        for (int i = 0; i < theLockStates.size(); i++) {
+            LockState myState = (LockState) theLockStates.get(i);
+
+            if ((myState.getType() == DELETE) &&
+                (myState.getOwner().equals(anId)))
+                return true;
+        }
+
+        return false;
+    }
+
+    public String toString() {
+        return "TLock: " + theId;
+    }
+}