diff src/org/dancres/blitz/BulkTakeVisitor.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/org/dancres/blitz/BulkTakeVisitor.java	Sat Mar 21 11:00:06 2009 +0000
@@ -0,0 +1,423 @@
+package org.dancres.blitz;
+
+import java.io.IOException;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import java.util.logging.*;
+
+import net.jini.core.transaction.TransactionException;
+import net.jini.space.JavaSpace;
+
+import org.dancres.blitz.mangler.MangledEntry;
+
+import org.dancres.blitz.entry.*;
+
+import org.dancres.blitz.txn.TxnState;
+
+import org.dancres.blitz.txnlock.*;
+
+import org.dancres.blitz.oid.OID;
+
+import org.dancres.blitz.notify.EventGenerator;
+import org.dancres.blitz.notify.EventQueue;
+import org.dancres.blitz.notify.QueueEvent;
+import org.dancres.blitz.notify.EventGeneratorState;
+
+class BulkTakeVisitor implements BulkMatchTask, SearchVisitor {
+    private static final Logger theLogger =
+        Logging.newLogger("org.dancres.blitz.BulkTakeVisitor");
+
+    private MangledEntry[] theTemplates;
+    private TxnState theTxnState;
+    private long theLimit;
+    private EventGeneratorImpl theSearchTask;
+    private SearchVisitorAdapter theAdapter = new SearchVisitorAdapter();
+    private BaulkedParty theParty;
+
+    private ArrayList theEntries = new ArrayList();
+
+    private int theStatus = SearchVisitor.TRY_AGAIN;
+
+    private TransactionException theException;
+
+    private boolean needsWakeup = false;
+
+    BulkTakeVisitor(MangledEntry[] aTemplates, TxnState aTxnState,
+                    long aLimit, VisitorBaulkedPartyFactory aFactory)
+        throws IOException {
+
+        theTemplates = aTemplates;
+        theTxnState = aTxnState;
+        theLimit = aLimit;
+        theSearchTask = new EventGeneratorImpl(theTemplates);
+        theParty = aFactory.newParty(this);
+        SearchTasks.get().add(this);
+        EventQueue.get().insert(theSearchTask);
+    }
+
+    private void resolved() {
+        setStatus(STOP, new TransactionException());
+    }
+
+    public SearchVisitor getVisitor() {
+        return theAdapter;
+    }
+
+    public int offer(SearchOffer anOffer) {
+        theLogger.log(Level.FINE, "offer");
+
+        synchronized(this) {
+            if (haveCompleted()) {
+                theLogger.log(Level.FINE, "Have completed");
+                return STOP;
+            }
+        }
+
+        OpInfo myInfo = anOffer.getInfo();
+        MangledEntry myEntry = anOffer.getEntry();
+
+        LockMgr myMgr = TxnLocks.getLockMgr(myInfo.getType());
+        TxnLock myLock = myMgr.getLock(myInfo.getOID());
+
+        synchronized (this) {
+            int myResult;
+
+            // Picked up enough matches in the meantime? Quit...
+            if (haveCompleted())
+                return STOP;
+
+            VisitorBaulkedPartyFactory.Handback myHandback =
+                new VisitorBaulkedPartyFactory.Handback(myInfo.getType(),
+                    myInfo.getOID(), myEntry);
+
+            synchronized (myLock) {
+                myResult = myLock.acquire(theTxnState,
+                    TxnLock.DELETE,
+                    theParty, myHandback, false);
+            }
+
+            if (myResult == TxnLock.SUCCESS) {
+                // Got the lock
+                try {
+                    theTxnState.add(new EntryTxnOp(TxnLock.DELETE,
+                        myInfo,
+                        myLock));
+                } catch (TransactionException aTE) {
+                    synchronized (myLock) {
+                        myLock.release(theTxnState, TxnLock.DELETE);
+                    }
+                    return setStatus(STOP, aTE);
+                }
+
+                // Add the Entry to our list of matches
+                theEntries.add(myEntry);
+
+                /*
+                  Picked up enough matches? Quit else carry on
+                */
+                if (haveCompleted())
+                    return setStatus(STOP, null);
+                else
+                    return TRY_AGAIN;
+            } else {
+                /*
+                 One of our templates matched but we didn't
+                 get a lock.  No point in trying other templates
+                 because even if they match we might get another
+                 conflict.  We'll leave it to settle instead and
+                 look for other matches.
+                */
+                return TRY_AGAIN;
+            }
+        }
+    }
+
+    public boolean isDeleter() {
+        return true;
+    }
+
+    private boolean haveCompleted() {
+        /*
+          If we're blocking, then we'll unblock as soon as we get one
+          match, otherwise, we want to be greedy and grab as many entries
+          as our limit allows
+        */
+        if (needsWakeup) {
+            return (theEntries.size() != 0) || (theStatus == STOP);
+        } else {
+            return (theEntries.size() == theLimit) || (theStatus == STOP);
+        }
+    }
+
+    public synchronized List getEntries(long aTimeout)
+        throws TransactionException, InterruptedException {
+
+        // We only wait the once because we'll only ever wake from this
+        // if there's a result or we timeout
+        if ((theEntries.size() == 0) && (aTimeout != 0)) {
+            needsWakeup = true;
+            wait(aTimeout);
+            needsWakeup = false;
+        }
+
+        // We're returning - ensure we don't allow any more operations to
+        // avoid doing a take we'll never return....
+        setStatus(STOP, null);
+
+        if (theException != null)
+            throw theException;
+
+        return theEntries;
+    }
+
+    /**
+       We're greedy and always want more if some might be available.
+       i.e. If we haven't got to the point where we've scanned the entire
+       space (in which case user should be calling wouldBlock()) we want more!
+     */
+    public synchronized boolean wantsMore() {
+        return (theEntries.size() < theLimit);
+    }
+
+    /**
+       We only request block if we have no entries.
+     */
+    public synchronized boolean wouldBlock() {
+        return (theEntries.size() == 0);
+    }
+
+    private boolean wasNotSatisfied() {
+        return (theEntries.size() == 0);
+    }
+    
+    private synchronized int setStatus(int aState, TransactionException aTE) {
+        /*
+         * Test only for STOP, do not use haveCompleted in this case
+         * which can declare us completed and cause us to axit before
+         * we actually set STOP.
+         */
+        if (theStatus == STOP)
+            return STOP;
+        
+        theStatus = aState;
+        theException = aTE;
+
+        theSearchTask.taint(false);
+        SearchTasks.get().remove(this, wasNotSatisfied());
+
+        if (needsWakeup)
+            notify();
+
+        return theStatus;
+    }
+
+    private class EventGeneratorImpl implements EventGenerator {
+        private boolean isTainted = false;
+        private MangledEntry[] theTemplates;
+        private OID theOID;
+
+        EventGeneratorImpl(MangledEntry[] aTemplates) {
+            theTemplates = aTemplates;
+        }
+
+        public void assign(OID anOID) {
+            theOID = anOID;
+        }
+
+        public long getStartSeqNum() {
+            return 0;
+        }
+
+        public OID getId() {
+            return theOID;
+        }
+
+        public boolean isPersistent() {
+            return false;
+        }
+
+        public long getSourceId() {
+            return 0;
+        }
+
+        void taint(boolean signal) {
+            synchronized (this) {
+                // Tainting can only be done once
+                //
+                if (isTainted)
+                    return;
+
+                isTainted = true;
+            }
+
+            if (signal)
+                setStatus(STOP, new TransactionException("Destroyed"));
+
+            try {
+                EventQueue.get().kill(getId());
+            } catch (IOException anIOE) {
+                theLogger.log(Level.SEVERE,
+                    "Encountered IOException during kill", anIOE);
+            }
+
+            /*
+            try {
+                Tasks.queue(new CleanTask(getId()));
+            } catch (InterruptedException anIE) {
+                theLogger.log(Level.WARNING,
+                    "Failed to lodge cleanup for: " + getId(), anIE);
+            }
+            */
+        }
+
+        public void taint() {
+            taint(true);
+        }
+
+        public boolean canSee(QueueEvent anEvent, long aTime) {
+            synchronized (this) {
+                if (isTainted) {
+                    return false;
+                }
+            }
+
+            // Check if it's txn_ended and my txn and call resolved if it is
+            if ((anEvent.getType() == QueueEvent.TRANSACTION_ENDED) &&
+                    (theTxnState.getId().equals(anEvent.getTxn().getId()))) {
+                resolved();
+                return false;
+            }
+
+            // We want to see new writes from a transaction
+            //
+            return (anEvent.getType() == QueueEvent.ENTRY_WRITE);
+        }
+
+        public boolean matches(MangledEntry anEntry) {
+            synchronized (this) {
+                if (isTainted) {
+                    return false;
+                }
+            }
+
+            for (int i = 0; i < theTemplates.length; i++) {
+                MangledEntry myTemplate = theTemplates[i];
+
+                if (Types.isSubtype(myTemplate.getType(), anEntry.getType())) {
+                    if (myTemplate.match(anEntry))
+                        return true;
+                }
+            }
+
+            return false;
+        }
+
+        public boolean renew(long aTime) {
+            // Nothing to do as we expire by being tainted by the enclosing
+            // class only
+            //
+            return true;
+        }
+
+        public void recover(long aSeqNum) {
+            // Nothing to do
+        }
+
+        public long jumpSequenceNumber() {
+            return 0;
+        }
+
+        public long jumpSequenceNumber(long aMin) {
+            return 0;
+        }
+
+        public void ping(QueueEvent anEvent, JavaSpace aSource) {
+            synchronized (this) {
+                if (isTainted) {
+                    return;
+                }
+            }
+
+            LongtermOffer myOffer = null;
+
+            try {
+                QueueEvent.Context myContext = anEvent.getContext();
+                MangledEntry myEntry = myContext.getEntry();
+                OID myOID = myContext.getOID();
+
+                EntryRepository myRepos =
+                    EntryRepositoryFactory.get().find(myEntry.getType());
+
+                myOffer = myRepos.getOffer(myOID);
+
+                if (myOffer == null)
+                    return;
+
+                myOffer.offer(BulkTakeVisitor.this);
+
+            } catch (IOException anIOE) {
+                // Nothing can be done
+                theLogger.log(Level.SEVERE,
+                    "Encountered IOException during write offer", anIOE);
+            } finally {
+                if (myOffer != null) {
+                    try {
+                        myOffer.release();
+                    } catch (IOException anIOE) {
+                        theLogger.log(Level.SEVERE,
+                            "Encountered IOException during write offer(release)",
+                            anIOE);
+                    }
+                }
+            }
+        }
+
+        public EventGeneratorState getMemento() {
+            throw new RuntimeException(
+                "Shouldn't be happening - we're transient");
+        }
+    }
+
+    private class SearchVisitorAdapter implements SearchVisitor {
+
+        public boolean isDeleter() {
+            return BulkTakeVisitor.this.isDeleter();
+        }
+
+        public int offer(SearchOffer anOffer) {
+            if (theLogger.isLoggable(Level.FINE))
+                theLogger.log(Level.FINE, "Offer");
+
+            synchronized (this) {
+                if (haveCompleted()) {
+                    theLogger.log(Level.FINE, "Have completed");
+                    return STOP;
+                }
+            }
+
+            OpInfo myInfo = anOffer.getInfo();
+            MangledEntry myEntry = anOffer.getEntry();
+
+            for (int i = 0; i < theTemplates.length; i++) {
+                MangledEntry myTemplate = theTemplates[i];
+
+                if (Types.isSubtype(myTemplate.getType(), myInfo.getType())) {
+
+                    if ((myTemplate.isWildcard()) ||
+                        (myTemplate.match(myEntry))) {
+
+                        // If we get a match, we only need to try offer
+                        // once to see if we can lock the entry so we can
+                        // give up after the first match & offer.
+                        //
+                        return BulkTakeVisitor.this.offer(anOffer);
+                    }
+                }
+            }
+
+            return TRY_AGAIN;
+        }
+    }
+}
\ No newline at end of file