Mercurial > hg > blitz_condensed
diff src/org/dancres/blitz/BulkTakeVisitor.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/org/dancres/blitz/BulkTakeVisitor.java Sat Mar 21 11:00:06 2009 +0000 @@ -0,0 +1,423 @@ +package org.dancres.blitz; + +import java.io.IOException; + +import java.util.ArrayList; +import java.util.List; + +import java.util.logging.*; + +import net.jini.core.transaction.TransactionException; +import net.jini.space.JavaSpace; + +import org.dancres.blitz.mangler.MangledEntry; + +import org.dancres.blitz.entry.*; + +import org.dancres.blitz.txn.TxnState; + +import org.dancres.blitz.txnlock.*; + +import org.dancres.blitz.oid.OID; + +import org.dancres.blitz.notify.EventGenerator; +import org.dancres.blitz.notify.EventQueue; +import org.dancres.blitz.notify.QueueEvent; +import org.dancres.blitz.notify.EventGeneratorState; + +class BulkTakeVisitor implements BulkMatchTask, SearchVisitor { + private static final Logger theLogger = + Logging.newLogger("org.dancres.blitz.BulkTakeVisitor"); + + private MangledEntry[] theTemplates; + private TxnState theTxnState; + private long theLimit; + private EventGeneratorImpl theSearchTask; + private SearchVisitorAdapter theAdapter = new SearchVisitorAdapter(); + private BaulkedParty theParty; + + private ArrayList theEntries = new ArrayList(); + + private int theStatus = SearchVisitor.TRY_AGAIN; + + private TransactionException theException; + + private boolean needsWakeup = false; + + BulkTakeVisitor(MangledEntry[] aTemplates, TxnState aTxnState, + long aLimit, VisitorBaulkedPartyFactory aFactory) + throws IOException { + + theTemplates = aTemplates; + theTxnState = aTxnState; + theLimit = aLimit; + theSearchTask = new EventGeneratorImpl(theTemplates); + theParty = aFactory.newParty(this); + SearchTasks.get().add(this); + EventQueue.get().insert(theSearchTask); + } + + private void resolved() { + setStatus(STOP, new TransactionException()); + } + + public SearchVisitor getVisitor() { + return theAdapter; + } + + public int offer(SearchOffer anOffer) { + theLogger.log(Level.FINE, "offer"); + + synchronized(this) { + if (haveCompleted()) { + theLogger.log(Level.FINE, "Have completed"); + return STOP; + } + } + + OpInfo myInfo = anOffer.getInfo(); + MangledEntry myEntry = anOffer.getEntry(); + + LockMgr myMgr = TxnLocks.getLockMgr(myInfo.getType()); + TxnLock myLock = myMgr.getLock(myInfo.getOID()); + + synchronized (this) { + int myResult; + + // Picked up enough matches in the meantime? Quit... + if (haveCompleted()) + return STOP; + + VisitorBaulkedPartyFactory.Handback myHandback = + new VisitorBaulkedPartyFactory.Handback(myInfo.getType(), + myInfo.getOID(), myEntry); + + synchronized (myLock) { + myResult = myLock.acquire(theTxnState, + TxnLock.DELETE, + theParty, myHandback, false); + } + + if (myResult == TxnLock.SUCCESS) { + // Got the lock + try { + theTxnState.add(new EntryTxnOp(TxnLock.DELETE, + myInfo, + myLock)); + } catch (TransactionException aTE) { + synchronized (myLock) { + myLock.release(theTxnState, TxnLock.DELETE); + } + return setStatus(STOP, aTE); + } + + // Add the Entry to our list of matches + theEntries.add(myEntry); + + /* + Picked up enough matches? Quit else carry on + */ + if (haveCompleted()) + return setStatus(STOP, null); + else + return TRY_AGAIN; + } else { + /* + One of our templates matched but we didn't + get a lock. No point in trying other templates + because even if they match we might get another + conflict. We'll leave it to settle instead and + look for other matches. + */ + return TRY_AGAIN; + } + } + } + + public boolean isDeleter() { + return true; + } + + private boolean haveCompleted() { + /* + If we're blocking, then we'll unblock as soon as we get one + match, otherwise, we want to be greedy and grab as many entries + as our limit allows + */ + if (needsWakeup) { + return (theEntries.size() != 0) || (theStatus == STOP); + } else { + return (theEntries.size() == theLimit) || (theStatus == STOP); + } + } + + public synchronized List getEntries(long aTimeout) + throws TransactionException, InterruptedException { + + // We only wait the once because we'll only ever wake from this + // if there's a result or we timeout + if ((theEntries.size() == 0) && (aTimeout != 0)) { + needsWakeup = true; + wait(aTimeout); + needsWakeup = false; + } + + // We're returning - ensure we don't allow any more operations to + // avoid doing a take we'll never return.... + setStatus(STOP, null); + + if (theException != null) + throw theException; + + return theEntries; + } + + /** + We're greedy and always want more if some might be available. + i.e. If we haven't got to the point where we've scanned the entire + space (in which case user should be calling wouldBlock()) we want more! + */ + public synchronized boolean wantsMore() { + return (theEntries.size() < theLimit); + } + + /** + We only request block if we have no entries. + */ + public synchronized boolean wouldBlock() { + return (theEntries.size() == 0); + } + + private boolean wasNotSatisfied() { + return (theEntries.size() == 0); + } + + private synchronized int setStatus(int aState, TransactionException aTE) { + /* + * Test only for STOP, do not use haveCompleted in this case + * which can declare us completed and cause us to axit before + * we actually set STOP. + */ + if (theStatus == STOP) + return STOP; + + theStatus = aState; + theException = aTE; + + theSearchTask.taint(false); + SearchTasks.get().remove(this, wasNotSatisfied()); + + if (needsWakeup) + notify(); + + return theStatus; + } + + private class EventGeneratorImpl implements EventGenerator { + private boolean isTainted = false; + private MangledEntry[] theTemplates; + private OID theOID; + + EventGeneratorImpl(MangledEntry[] aTemplates) { + theTemplates = aTemplates; + } + + public void assign(OID anOID) { + theOID = anOID; + } + + public long getStartSeqNum() { + return 0; + } + + public OID getId() { + return theOID; + } + + public boolean isPersistent() { + return false; + } + + public long getSourceId() { + return 0; + } + + void taint(boolean signal) { + synchronized (this) { + // Tainting can only be done once + // + if (isTainted) + return; + + isTainted = true; + } + + if (signal) + setStatus(STOP, new TransactionException("Destroyed")); + + try { + EventQueue.get().kill(getId()); + } catch (IOException anIOE) { + theLogger.log(Level.SEVERE, + "Encountered IOException during kill", anIOE); + } + + /* + try { + Tasks.queue(new CleanTask(getId())); + } catch (InterruptedException anIE) { + theLogger.log(Level.WARNING, + "Failed to lodge cleanup for: " + getId(), anIE); + } + */ + } + + public void taint() { + taint(true); + } + + public boolean canSee(QueueEvent anEvent, long aTime) { + synchronized (this) { + if (isTainted) { + return false; + } + } + + // Check if it's txn_ended and my txn and call resolved if it is + if ((anEvent.getType() == QueueEvent.TRANSACTION_ENDED) && + (theTxnState.getId().equals(anEvent.getTxn().getId()))) { + resolved(); + return false; + } + + // We want to see new writes from a transaction + // + return (anEvent.getType() == QueueEvent.ENTRY_WRITE); + } + + public boolean matches(MangledEntry anEntry) { + synchronized (this) { + if (isTainted) { + return false; + } + } + + for (int i = 0; i < theTemplates.length; i++) { + MangledEntry myTemplate = theTemplates[i]; + + if (Types.isSubtype(myTemplate.getType(), anEntry.getType())) { + if (myTemplate.match(anEntry)) + return true; + } + } + + return false; + } + + public boolean renew(long aTime) { + // Nothing to do as we expire by being tainted by the enclosing + // class only + // + return true; + } + + public void recover(long aSeqNum) { + // Nothing to do + } + + public long jumpSequenceNumber() { + return 0; + } + + public long jumpSequenceNumber(long aMin) { + return 0; + } + + public void ping(QueueEvent anEvent, JavaSpace aSource) { + synchronized (this) { + if (isTainted) { + return; + } + } + + LongtermOffer myOffer = null; + + try { + QueueEvent.Context myContext = anEvent.getContext(); + MangledEntry myEntry = myContext.getEntry(); + OID myOID = myContext.getOID(); + + EntryRepository myRepos = + EntryRepositoryFactory.get().find(myEntry.getType()); + + myOffer = myRepos.getOffer(myOID); + + if (myOffer == null) + return; + + myOffer.offer(BulkTakeVisitor.this); + + } catch (IOException anIOE) { + // Nothing can be done + theLogger.log(Level.SEVERE, + "Encountered IOException during write offer", anIOE); + } finally { + if (myOffer != null) { + try { + myOffer.release(); + } catch (IOException anIOE) { + theLogger.log(Level.SEVERE, + "Encountered IOException during write offer(release)", + anIOE); + } + } + } + } + + public EventGeneratorState getMemento() { + throw new RuntimeException( + "Shouldn't be happening - we're transient"); + } + } + + private class SearchVisitorAdapter implements SearchVisitor { + + public boolean isDeleter() { + return BulkTakeVisitor.this.isDeleter(); + } + + public int offer(SearchOffer anOffer) { + if (theLogger.isLoggable(Level.FINE)) + theLogger.log(Level.FINE, "Offer"); + + synchronized (this) { + if (haveCompleted()) { + theLogger.log(Level.FINE, "Have completed"); + return STOP; + } + } + + OpInfo myInfo = anOffer.getInfo(); + MangledEntry myEntry = anOffer.getEntry(); + + for (int i = 0; i < theTemplates.length; i++) { + MangledEntry myTemplate = theTemplates[i]; + + if (Types.isSubtype(myTemplate.getType(), myInfo.getType())) { + + if ((myTemplate.isWildcard()) || + (myTemplate.match(myEntry))) { + + // If we get a match, we only need to try offer + // once to see if we can lock the entry so we can + // give up after the first match & offer. + // + return BulkTakeVisitor.this.offer(anOffer); + } + } + } + + return TRY_AGAIN; + } + } +} \ No newline at end of file