Mercurial > hg > blitz_stable
diff src/org/dancres/blitz/FifoSearchVisitorImpl.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/org/dancres/blitz/FifoSearchVisitorImpl.java Sat Mar 21 11:00:06 2009 +0000 @@ -0,0 +1,425 @@ +package org.dancres.blitz; + +import java.io.IOException; + +import java.util.LinkedList; + +import java.util.logging.*; + +import net.jini.core.transaction.TransactionException; +import net.jini.space.JavaSpace; + +import org.dancres.blitz.mangler.MangledEntry; + +import org.dancres.blitz.entry.*; + +import org.dancres.blitz.txn.TxnState; + +import org.dancres.blitz.txnlock.*; + +import org.dancres.blitz.oid.OID; +import org.dancres.blitz.notify.*; + + +/** + <p>All search results obtained by the lower layers are offered to a + SearchVisitor instance which can then determine whether the offered Entry + is suitable. This includes "deep matching" which requires that we fully + compare the fields of template to entry. The lower-layers do not perform + this task - they return the entry's that are a probable match. In + addition, instances of search visitor check transaction locks etc. which + the lower layers know nothing about.</p> + + <p>FifoSearchVisitorImpl enforces some ordering requirements that aren't + present in SearchVisitorImpl relating to fairness which is sometimes more + important than speed.</p> + */ +class FifoSearchVisitorImpl implements SingleMatchTask, + SearchVisitor { + + private static final Logger theLogger = + Logging.newLogger("org.dancres.blitz.entry.FifoSearchVisitorImpl"); + + private MangledEntry theTemplate; + private TxnState theTxnState; + private boolean isTaking; + + private EventGeneratorImpl theSearchTask; + private SearchVisitor theAdapter = new SearchVisitorAdapter(); + private BaulkedParty theParty; + + private int theLockOp; + + private boolean needsWakeup = false; + + private CompletionEvent theCompletion; + + private long theStartTime = System.currentTimeMillis(); + + private LinkedList theNewWrites = new LinkedList(); + + /** + @param isTake indicates the kind of txn lock we would need to assert + */ + FifoSearchVisitorImpl(MangledEntry aTemplate, boolean isTake, + TxnState aState, VisitorBaulkedPartyFactory aFactory) + throws IOException { + + theTemplate = aTemplate; + isTaking = isTake; + theTxnState = aState; + theLockOp = (isTaking == true) ? TxnLock.DELETE : TxnLock.READ; + theSearchTask = new EventGeneratorImpl(aTemplate); + theParty = aFactory.newParty(this); + SearchTasks.get().add(this); + EventQueue.get().insert(theSearchTask); + } + + public EventGenerator getSearchTask() { + return theSearchTask; + } + + public SearchVisitor getVisitor() { + return theAdapter; + } + + /*********************************************************************** + * SearchVisitor + ***********************************************************************/ + + public int offer(SearchOffer anOffer) { + OpInfo myInfo = anOffer.getInfo(); + + MangledEntry myEntry = anOffer.getEntry(); + + LockMgr myMgr = TxnLocks.getLockMgr(myInfo.getType()); + TxnLock myLock = myMgr.getLock(myInfo.getOID()); + + synchronized (this) { + + int myResult; + + // Do we need to try and secure this? + if (haveCompleted()) + return STOP; + + VisitorBaulkedPartyFactory.Handback myHandback = + new VisitorBaulkedPartyFactory.Handback(myInfo.getType(), + myInfo.getOID(), myEntry); + + synchronized (myLock) { + myResult = myLock.acquire(theTxnState, theLockOp, + theParty, myHandback, false); + } + + if (myResult == TxnLock.SUCCESS) { + + try { + theTxnState.add(new EntryTxnOp(theLockOp, myInfo, + myLock)); + } catch (TransactionException aTE) { + myLock.release(theTxnState, theLockOp); + return sendEvent(new CompletionEvent(aTE)); + } + + if (theLogger.isLoggable(Level.FINE)) + theLogger.log(Level.FINE, "Succeeded"); + + return sendEvent(new CompletionEvent(myEntry)); + } + } + + return TRY_AGAIN; + } + + public int sendEvent(CompletionEvent anEvent) { + synchronized(this) { + if (haveCompleted()) + return STOP; + + theCompletion = anEvent; + + theSearchTask.taint(); + SearchTasks.get().remove(this, wasNotSatisfied()); + + if (needsWakeup) + notify(); + + return STOP; + } + } + + public MangledEntry getEntry(long aTimeout) + throws TransactionException, + InterruptedException { + + synchronized(this) { + needsWakeup = true; + } + + while (true) { + synchronized(this) { + /* + If we've completed, throw exception or return Entry + accordingly, cleaning up state appropriately + */ + if (haveCompleted()) { + + // We're returning - ensure we don't allow any more + // operations to avoid doing a take we'll never return. + // + needsWakeup = false; + + if (theCompletion.getException() != null) + throw theCompletion.getException(); + + return theCompletion.getEntry(); + } + + // We haven't completed, yet, can we process queue elements? + if (theNewWrites.size() == 0) { + + long myRemaining = aTimeout - (System.currentTimeMillis() - + theStartTime); + + // Is there more time to wait? + if (myRemaining > 0) + wait(myRemaining); + else { + // No, force exit + needsWakeup = false; + sendEvent(CompletionEvent.COMPLETED); + } + } + } + + // We must flush the queue outside of lock + try { + flushQueue(); + } catch (IOException anIOE) { + TransactionException myTE = + new TransactionException("I/O Error whilst processing queue"); + myTE.initCause(anIOE); + sendEvent(new CompletionEvent(myTE)); + } + } + } + + private void flushQueue() throws IOException { + while (true) { + // Something has caused us to stop, give up now + // + if (haveCompleted()) + break; + + SpaceEntryUID myUID = null; + + synchronized(this) { + if (theNewWrites.size() != 0) { + myUID = (SpaceEntryUID) theNewWrites.removeFirst(); + } + } + + // Nothing else in queue? + // + if (myUID == null) + break; + + EntryRepository myRepos = + EntryRepositoryFactory.get().find(myUID.getType()); + + myRepos.find(this, myUID.getOID(), null); + } + } + + public synchronized boolean wouldBlock() { + return (theCompletion == null); + } + + private synchronized boolean haveCompleted() { + return (theCompletion != null); + } + + private boolean wasNotSatisfied() { + return (theCompletion.getEntry() == null); + } + + public boolean isDeleter() { + return isTaking; + } + + private void resolved() { + sendEvent(new CompletionEvent(new TransactionException( + "Transaction completed with operations still outstanding: " + + (isTaking ? "take" : "read")))); + } + + private class SearchVisitorAdapter implements SearchVisitor { + + public boolean isDeleter() { + return FifoSearchVisitorImpl.this.isDeleter(); + } + + public int offer(SearchOffer anOffer) { + if (theLogger.isLoggable(Level.FINE)) + theLogger.log(Level.FINE, "Offer"); + + synchronized (FifoSearchVisitorImpl.this) { + if (haveCompleted()) { + if (theLogger.isLoggable(Level.FINE)) + theLogger.log(Level.FINE, theTxnState.getId() + + " Have completed"); + return STOP; + } + } + + OpInfo myInfo = anOffer.getInfo(); + + if (!Types.isSubtype(theTemplate.getType(), myInfo.getType())) { + if (theLogger.isLoggable(Level.FINE)) + theLogger.log(Level.FINE, "Not subtype"); + + return TRY_AGAIN; + } + + MangledEntry myEntry = anOffer.getEntry(); + + if (theTemplate.match(myEntry)) { + return FifoSearchVisitorImpl.this.offer(anOffer); + } else + return TRY_AGAIN; + } + } + + private class EventGeneratorImpl implements EventGenerator { + private boolean isTainted = false; + private MangledEntry theTemplate; + private OID theOID; + + EventGeneratorImpl(MangledEntry aTemplate) { + theTemplate = aTemplate; + } + + public void assign(OID anOID) { + theOID = anOID; + } + + public long getStartSeqNum() { + return 0; + } + + public OID getId() { + return theOID; + } + + public boolean isPersistent() { + return false; + } + + public long getSourceId() { + return 0; + } + + public void taint() { + synchronized (this) { + // Tainting can only be done once + // + if (isTainted) + return; + + isTainted = true; + } + + try { + EventQueue.get().kill(getId()); + } catch (IOException anIOE) { + theLogger.log(Level.SEVERE, + "Encountered IOException during kill", anIOE); + } + + /* + try { + Tasks.queue(new CleanTask(getId())); + } catch (InterruptedException anIE) { + theLogger.log(Level.WARNING, + "Failed to lodge cleanup for: " + getId(), anIE); + } + */ + } + + private boolean isTainted() { + synchronized (this) { + return (isTainted); + } + } + + public boolean canSee(QueueEvent anEvent, long aTime) { + if (isTainted()) + return false; + + // Check if it's txn_ended and my txn and call resolved if it is + if ((anEvent.getType() == QueueEvent.TRANSACTION_ENDED) && + (theTxnState.getId().equals(anEvent.getTxn().getId()))) { + resolved(); + return false; + } + + // We want to see new writes from a transaction + // + return (anEvent.getType() == QueueEvent.ENTRY_WRITE); + } + + public boolean matches(MangledEntry anEntry) { + if (isTainted()) + return false; + + return Types.isSubtype(theTemplate.getType(), anEntry.getType()) && + theTemplate.match(anEntry); + } + + public boolean renew(long aTime) { + // Nothing to do as we expire by being tainted by the enclosing + // class only + // + return true; + } + + public void recover(long aSeqNum) { + // Nothing to do + } + + public long jumpSequenceNumber() { + return 0; + } + + public long jumpSequenceNumber(long aMin) { + return 0; + } + + public void ping(QueueEvent anEvent, JavaSpace aSource) { + /* + Queue the write for later consideration unless we're done. + Later consideration will only be after we've invoked getEntry + which will only occur after we've performed searching of storage + */ + synchronized (FifoSearchVisitorImpl.this) { + if (haveCompleted()) + return; + + QueueEvent.Context myContext = anEvent.getContext(); + MangledEntry myEntry = myContext.getEntry(); + OID myOID = myContext.getOID(); + + theNewWrites.add(new SpaceEntryUID(myEntry.getType(), myOID)); + + if (needsWakeup) + FifoSearchVisitorImpl.this.notify(); + } + } + + public EventGeneratorState getMemento() { + throw new RuntimeException( + "Shouldn't be happening - we're transient"); + } + } +}