Mercurial > hg > blitz_condensed
diff src/org/dancres/blitz/SearchVisitorImpl.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/org/dancres/blitz/SearchVisitorImpl.java Sat Mar 21 11:00:06 2009 +0000 @@ -0,0 +1,433 @@ +package org.dancres.blitz; + +import java.io.IOException; +import java.util.logging.Level; +import java.util.logging.Logger; + +import net.jini.core.transaction.TransactionException; +import net.jini.space.JavaSpace; + +import org.dancres.blitz.entry.*; +import org.dancres.blitz.mangler.MangledEntry; +import org.dancres.blitz.notify.EventGenerator; +import org.dancres.blitz.notify.EventGeneratorState; +import org.dancres.blitz.notify.EventQueue; +import org.dancres.blitz.notify.QueueEvent; +import org.dancres.blitz.oid.OID; +import org.dancres.blitz.txn.TxnState; +import org.dancres.blitz.txnlock.LockMgr; +import org.dancres.blitz.txnlock.TxnLock; +import org.dancres.blitz.txnlock.TxnLocks; +import org.dancres.blitz.txnlock.BaulkedParty; + +/** + All search results obtained by the lower layers are offered to a + SearchVisitor instance which can then determine whether the offered Entry + is suitable. This includes "deep matching" which requires that we fully + compare the fields of template to entry. The lower-layers do not perform + this task - they return the entry's that are a probable match. In + addition, instances of search visitor check transaction locks etc. which + the lower layers know nothing about. + */ +class SearchVisitorImpl implements SingleMatchTask, + SearchVisitor { + + private static final Logger theLogger = + Logging.newLogger("org.dancres.blitz.entry.SearchVisitorImpl"); + + private MangledEntry theTemplate; + private TxnState theTxnState; + private boolean isTaking; + private int theLockOp; + + private EventGeneratorImpl theSearchTask; + private SearchVisitor theAdapter = new SearchVisitorAdapter(); + private BaulkedParty theParty; + + private CompletionEvent theCompletion; + + private boolean needsWakeup = false; + + /** + @param isTake indicates the kind of txn lock we would need to assert + */ + SearchVisitorImpl(MangledEntry aTemplate, boolean isTake, + TxnState aState, VisitorBaulkedPartyFactory aFactory) + throws IOException { + + theTemplate = aTemplate; + isTaking = isTake; + theTxnState = aState; + theLockOp = (isTaking == true) ? TxnLock.DELETE : TxnLock.READ; + theSearchTask = new EventGeneratorImpl(aTemplate); + theParty = aFactory.newParty(this); + SearchTasks.get().add(this); + EventQueue.get().insert(theSearchTask); + } + + public EventGenerator getSearchTask() { + return theSearchTask; + } + + public SearchVisitor getVisitor() { + return theAdapter; + } + + /*********************************************************************** + * SearchVisitor + ***********************************************************************/ + + /** + * This is an unfiltered offer method which only checks to see if it can + * perform an acquire. Actual matching is done in the Adapter class + * or in the satellite EventGenerator implementation. i.e. By the time + * we get to this method we need to ensure that matching has been done. + * + * @todo Consider re-arranging this method as described in the comment + * below. + */ + public int offer(SearchOffer anOffer) { + OpInfo myInfo = anOffer.getInfo(); + + MangledEntry myEntry = anOffer.getEntry(); + + LockMgr myMgr = TxnLocks.getLockMgr(myInfo.getType()); + TxnLock myLock = myMgr.getLock(myInfo.getOID()); + + synchronized (this) { + + int myResult; + + // Do we need to try and secure this? + if (haveFinished()) + return STOP; + + VisitorBaulkedPartyFactory.Handback myHandback = + new VisitorBaulkedPartyFactory.Handback(myInfo.getType(), + myInfo.getOID(), myEntry); + + synchronized (myLock) { + myResult = myLock.acquire(theTxnState, theLockOp, + theParty, myHandback, false); + } + + if (myResult == TxnLock.SUCCESS) { + + if (theLogger.isLoggable(Level.FINE)) + theLogger.log(Level.FINE, theTxnState.getId() + + " Acq: " + + myInfo + ", " + myLock); + + try { + theTxnState.add(new EntryTxnOp(theLockOp, myInfo, + myLock)); + } catch (TransactionException aTE) { + + /* + If we catch a transactionexception here we're dead, + we will exit with invalid state. If we have the + lock above we know we have the Entry and no-one + can remove it from under us. + + Once we've locked the entry we're done + matching because either we will return it + successfully after tracking our action in the + transaction or we will blow up on the transaction + and release the entry lock and return an exception. + Either way we can stop all searching as soon as + we have the Entry locked. + + Thus we can set theEntry above once we see success + and then exit the sync block rather than holding + it whilst we update the transaction. Before + exiting the sync block we need to check for + conflict as we do below and set the didconflict + flag. + + Currently we stop searches and wakeup blocked + threads in getEntry via setStatus thus setting + theEntry is currently insufficient. However + it would be safe to modify haveCompleted to also + include a test for theEntry being set. Note we + must NEVER use haveCompleted in getEntry() + as a consequence! + */ + myLock.release(theTxnState, theLockOp); + return sendEvent(new CompletionEvent(aTE)); + } + + if (theLogger.isLoggable(Level.FINE)) + theLogger.log(Level.FINE, "Succeeded"); + + return sendEvent(new CompletionEvent(myEntry)); + } + } + + return TRY_AGAIN; + } + + public int sendEvent(CompletionEvent anEvent) { + synchronized (this) { + if (haveFinished()) + return STOP; + + theCompletion = anEvent; + + theSearchTask.taint(); + SearchTasks.get().remove(this, wasNotSatisfied()); + + if (needsWakeup) + notify(); + + return STOP; + } + } + + /** + * NEVER, EVER, EVER invoke <code>haveCompleted</code> in this method. + * <code>haveCompleted</code> is an internal function designed to allow + * methods to figure out when parallel searching and other operations can + * safely abort early. + * + * This method must _never_ abort early, it must wait until status is set + * via <code>setStatus</code> - it cannot peek at potentially incomplete + * status which <code>haveCompleted</code> does. + */ + public synchronized MangledEntry getEntry(long aTimeout) + throws TransactionException, + InterruptedException { + + // We only wait the once because we'll only ever wake from this + // if there's a result or we timeout + if (wouldBlock() && (aTimeout != 0)) { + needsWakeup = true; + wait(aTimeout); + needsWakeup = false; + } + + // We're returning - ensure we don't allow any more operations to + // avoid doing a take we'll never return.... + sendEvent(CompletionEvent.COMPLETED); + + if (theCompletion.getException() != null) + throw theCompletion.getException(); + + return theCompletion.getEntry(); + } + + public synchronized boolean wouldBlock() { + return (theCompletion == null); + } + + /** + * NEVER, EVER, EVER invoke <code>haveCompleted</code> unless you are + * attempting to determine that internal parallel functions + * such as parallel searching and other operations can + * safely abort early. + * + * When this method returns <code>true</code> it is stating that some + * thread has determined a final state for this visitor and is in the + * process of resolving it. + */ + private synchronized boolean haveFinished() { + return (theCompletion != null); + } + + private boolean wasNotSatisfied() { + return (theCompletion.getEntry() == null); + } + + public boolean isDeleter() { + return isTaking; + } + + private void resolved() { + sendEvent(new CompletionEvent(new TransactionException( + "Transaction completed with operations still outstanding: " + + (isTaking ? "take" : "read")))); + } + + private class EventGeneratorImpl implements EventGenerator { + private boolean isTainted = false; + private MangledEntry theTemplate; + private OID theOID; + + EventGeneratorImpl(MangledEntry aTemplate) { + theTemplate = aTemplate; + } + + public void assign(OID anOID) { + theOID = anOID; + } + + public long getStartSeqNum() { + return 0; + } + + public OID getId() { + return theOID; + } + + public boolean isPersistent() { + return false; + } + + public long getSourceId() { + return 0; + } + + public void taint() { + synchronized (this) { + // Tainting can only be done once + // + if (isTainted) + return; + + isTainted = true; + } + + try { + EventQueue.get().kill(getId()); + } catch (IOException anIOE) { + theLogger.log(Level.SEVERE, + "Encountered IOException during kill", anIOE); + } + + /* + try { + Tasks.queue(new CleanTask(getId())); + } catch (InterruptedException anIE) { + theLogger.log(Level.WARNING, + "Failed to lodge cleanup for: " + getId(), anIE); + } + */ + } + + private boolean isTainted() { + synchronized(this) { + return (isTainted); + } + } + + public boolean canSee(QueueEvent anEvent, long aTime) { + if (isTainted()) + return false; + + // Check if it's txn_ended and my txn and call resolved if it is + if ((anEvent.getType() == QueueEvent.TRANSACTION_ENDED) && + (theTxnState.getId().equals(anEvent.getTxn().getId()))) { + resolved(); + return false; + } + + // We want to see new writes from a transaction + // + return (anEvent.getType() == QueueEvent.ENTRY_WRITE); + } + + public boolean matches(MangledEntry anEntry) { + if (isTainted()) + return false; + + return Types.isSubtype(theTemplate.getType(), anEntry.getType()) && + theTemplate.match(anEntry); + } + + public boolean renew(long aTime) { + // Nothing to do as we expire by being tainted by the enclosing + // class only + // + return true; + } + + public void recover(long aSeqNum) { + // Nothing to do + } + + public long jumpSequenceNumber() { + return 0; + } + + public long jumpSequenceNumber(long aMin) { + return 0; + } + + public void ping(QueueEvent anEvent, JavaSpace aSource) { + if (isTainted()) + return; + + LongtermOffer myOffer = null; + + QueueEvent.Context myContext = anEvent.getContext(); + MangledEntry myEntry = myContext.getEntry(); + OID myOID = myContext.getOID(); + + try { + EntryRepository myRepos = + EntryRepositoryFactory.get().find(myEntry.getType()); + + myOffer = myRepos.getOffer(myOID); + + if (myOffer == null) + return; + + myOffer.offer(SearchVisitorImpl.this); + + } catch (IOException anIOE) { + // Nothing can be done + theLogger.log(Level.SEVERE, + "Encountered IOException during write offer", anIOE); + } finally { + if (myOffer != null) { + try { + myOffer.release(); + } catch (IOException anIOE) { + theLogger.log(Level.SEVERE, + "Encountered IOException during write offer(release)", + anIOE); + } + } + } + } + + public EventGeneratorState getMemento() { + throw new RuntimeException( + "Shouldn't be happening - we're transient"); + } + } + + private class SearchVisitorAdapter implements SearchVisitor { + + public boolean isDeleter() { + return SearchVisitorImpl.this.isDeleter(); + } + + public int offer(SearchOffer anOffer) { + if (theLogger.isLoggable(Level.FINE)) + theLogger.log(Level.FINE, "Offer"); + + if (haveFinished()) { + if (theLogger.isLoggable(Level.FINE)) + theLogger.log(Level.FINE, theTxnState.getId() + + " Have completed"); + return STOP; + } + + OpInfo myInfo = anOffer.getInfo(); + + if (!Types.isSubtype(theTemplate.getType(), myInfo.getType())) { + if (theLogger.isLoggable(Level.FINE)) + theLogger.log(Level.FINE, "Not subtype"); + + return TRY_AGAIN; + } + + MangledEntry myEntry = anOffer.getEntry(); + + if (theTemplate.match(myEntry)) { + return SearchVisitorImpl.this.offer(anOffer); + } else + return TRY_AGAIN; + } + } +}