Mercurial > hg > blitz_condensed
diff src/org/dancres/blitz/notify/EventQueue.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/org/dancres/blitz/notify/EventQueue.java Sat Mar 21 11:00:06 2009 +0000 @@ -0,0 +1,287 @@ +package org.dancres.blitz.notify; + +import java.io.IOException; +import java.rmi.MarshalledObject; +import java.rmi.RemoteException; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.Iterator; + +import net.jini.config.ConfigurationException; +import net.jini.core.event.RemoteEventListener; + +import org.dancres.blitz.ActiveObject; +import org.dancres.blitz.ActiveObjectRegistry; +import org.dancres.blitz.Logging; +import org.dancres.blitz.config.ConfigurationFactory; +import org.dancres.blitz.mangler.MangledEntry; +import org.dancres.blitz.oid.OID; +import org.dancres.blitz.stats.StatsBoard; +import org.dancres.blitz.txn.TxnState; +import org.dancres.blitz.txn.TxnManager; +import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue; +import EDU.oswego.cs.dl.util.concurrent.PooledExecutor; +import org.dancres.blitz.util.QueueStatGenerator; + +/** + The heart of the notify implementation. Events are passed to here and + processed against templates. Resulting events are then passed into the + notification pool for dispatch. <P> + + <p>For a transaction end event, we remove any associated registrations held + in EventGeneratorFactory.</p> + */ +public class EventQueue implements ActiveObject { + static Logger theLogger = + Logging.newLogger("org.dancres.blitz.notify.EventQueue"); + + private static final EventQueue theEventQueue = new EventQueue(); + + public static EventQueue get() { + return theEventQueue; + } + + private PooledExecutor theProcessors; + + private long theEventCount = 0; + + private RemoteEventDispatcher theDispatcher = new RemoteEventDispatcher(); + + private int NUM_PROCESSORS = 10; + + private int QUEUE_BOUND = 100; + + private EventQueue() { + ActiveObjectRegistry.add(this); + + try { + NUM_PROCESSORS = ((Integer) + ConfigurationFactory.getEntry("maxEventProcessors", + int.class, + new Integer(1))).intValue(); + QUEUE_BOUND = ((Integer) + ConfigurationFactory.getEntry("eventQueueBound", + int.class, + new Integer(0))).intValue(); + + theLogger.log(Level.INFO, "Max Event Processors: " + + NUM_PROCESSORS); + theLogger.log(Level.INFO, "Event Queue Bound: " + QUEUE_BOUND); + + } catch (ConfigurationException aCE) { + theLogger.log(Level.SEVERE, "Failed to load config", aCE); + } + } + + public void add(QueueEvent anEvent) { + add(anEvent, false); + } + + public void add(QueueEvent anEvent, boolean aWaitIndicator) { + if (TxnManager.get().isRecovery()) + return; + + // No-one listening, no point in doing work + // + if (EventGeneratorFactory.get().getCount() == 0) + return; + + if (theLogger.isLoggable(Level.FINEST)) + theLogger.log(Level.FINEST, "Event: " + anEvent.getType() + ", " + + anEvent.getTxn() + ", " + anEvent.getContext()); + try { + /* + * WTF? For some reason, javac can't resolve the two types below to + * DispatchTask without explicit casting - suspecting cursed generics + * and type reasoning grrrrr + */ + DispatchTask aTask = (aWaitIndicator == true) ? + (DispatchTask) new BlockingDispatchImpl(this, anEvent) : + (DispatchTask) new NonblockingDispatchImpl(this, anEvent); + + theProcessors.execute(aTask); + + aTask.block(); + } catch (InterruptedException anIE) { + theLogger.log(Level.FINEST, "Failed to queue event"); + } + } + + public void insert(EventGenerator anEventGenerator) throws IOException { + EventGeneratorFactory.get().addTemporary(anEventGenerator); + } + + /** + @todo Lock txn during adding of reg for non-null txns. + */ + public void register(MangledEntry aTemplate, TxnState aTxn, + RemoteEventListener aListener, long aLeaseTime, + MarshalledObject aHandback, + Registrar aRegistrar) + throws IOException, RemoteException { + + + if (aTxn == null) { + EventGenerator myGenerator = + EventGeneratorFactory.get().newPersistentGenerator(aTemplate, + aListener, + aLeaseTime, + aHandback); + + long myFirstSeqNum = myGenerator.getStartSeqNum(); + + aRegistrar.newRegistration(myGenerator.getSourceId(), + myFirstSeqNum, + new SpaceNotifyUID(myGenerator.getId())); + } else { + EventGenerator myGenerator = + EventGeneratorFactory.get().newTransientGenerator(aTemplate, + aListener, + aLeaseTime, + aHandback, + aTxn); + + long myFirstSeqNum = myGenerator.getStartSeqNum(); + + aRegistrar.newRegistration(myGenerator.getSourceId(), + myFirstSeqNum, + new SpaceNotifyUID(myGenerator.getId())); + } + } + + public void registerVisibility(MangledEntry[] aTemplates, TxnState aTxn, + RemoteEventListener aListener, + long aLeaseTime, + MarshalledObject aHandback, + Registrar aRegistrar, boolean visibleOnly) + throws IOException, RemoteException { + long myFirstSeqNum; + EventGenerator myGenerator; + + if (aTxn == null) { + myGenerator = + EventGeneratorFactory.get().newPersistentVisibility(aTemplates, + aListener, + aLeaseTime, + aHandback, + visibleOnly); + } else { + myGenerator = + EventGeneratorFactory.get().newTransientVisibility(aTemplates, + aListener, + aLeaseTime, + aHandback, + aTxn, + visibleOnly); + + } + + myFirstSeqNum = myGenerator.getStartSeqNum(); + aRegistrar.newRegistration(myGenerator.getSourceId(), + myFirstSeqNum, + new SpaceNotifyUID(myGenerator.getId())); + } + + public void begin() { + try { + /* + * Apply the jump before processing any events. This will be + * performed on the in-memory copy and placed on disk at the + * next checkpoint. In the meantime, we make sure correct state + * is preserved using log records + */ + EventGeneratorFactory.get().jumpSequenceNumbers(); + } catch (IOException anIOE) { + theLogger.log(Level.SEVERE, "Failed to apply restart jump", anIOE); + } + + BoundedLinkedQueue myQueue; + + if (QUEUE_BOUND == 0) { + theLogger.log(Level.INFO, "Event queue bounding disabled"); + myQueue = new BoundedLinkedQueue(Integer.MAX_VALUE); + theProcessors = + new PooledExecutor(myQueue, NUM_PROCESSORS); + } else { + theLogger.log(Level.INFO, "Event queue bounding enabled"); + myQueue = new BoundedLinkedQueue(QUEUE_BOUND); + theProcessors = + new PooledExecutor(myQueue, NUM_PROCESSORS); + } + + theProcessors.setMinimumPoolSize(NUM_PROCESSORS); + // theProcessors.waitWhenBlocked(); + + StatsBoard.get().add(new QueueStatGenerator("Events", myQueue)); + } + + public void halt() { + theProcessors.shutdownNow(); + + synchronized(this) { + theLogger.log(Level.INFO, "Processed: " + theEventCount); + } + } + + void dispatchImpl(DispatchTask aTask) { + synchronized(this) { + ++theEventCount; + } + + switch (aTask.getEvent().getType()) { + case QueueEvent.TRANSACTION_ENDED : + case QueueEvent.ENTRY_WRITE : + case QueueEvent.ENTRY_WRITTEN : + case QueueEvent.ENTRY_VISIBLE : + case QueueEvent.ENTRY_NOT_CONFLICTED : { + iterateMatches(aTask); + break; + } + } + } + + private void iterateMatches(DispatchTask aTask) { + QueueEvent myEvent = aTask.getEvent(); + long myCurrentTime = System.currentTimeMillis(); + QueueEvent.Context myContext = myEvent.getContext(); + MangledEntry myEntry = null; + + if (myContext != null) + myEntry = myEvent.getContext().getEntry(); + + try { + Iterator myGenerators = + EventGeneratorFactory.get().getGenerators(); + + while (myGenerators.hasNext()) { + EventGenerator myGenerator = (EventGenerator) myGenerators.next(); + + if ((myGenerator.canSee(myEvent, myCurrentTime)) && + (myGenerator.matches(myEntry))) { + theDispatcher.sendEvent(aTask, myGenerator); + } + } + } catch (IOException anIOE) { + theLogger.log(Level.SEVERE, "Couldn't recover generators from factory"); + } finally { + aTask.enableResolve(); + } + } + + + public boolean renew(OID aOID, long anExpiry) + throws IOException { + + return EventGeneratorFactory.get().renew(aOID, anExpiry); + } + + public boolean cancel(OID aOID) + throws IOException { + + return EventGeneratorFactory.get().cancel(aOID); + } + + public void kill(OID anOID) throws IOException { + EventGeneratorFactory.get().killTemplate(anOID); + } +}