Mercurial > hg > blitz_condensed
view src/org/dancres/blitz/notify/EventQueue.java @ 34:6f68e94c1fb8 default tip
Add CondensedStats monitoring utility, equivalent to vmstat
author | Dominic Cleal <dominic-cleal@cdo2.com> |
---|---|
date | Thu, 05 Aug 2010 11:07:25 +0100 |
parents | 3dc0c5604566 |
children |
line wrap: on
line source
package org.dancres.blitz.notify; import java.io.IOException; import java.rmi.MarshalledObject; import java.rmi.RemoteException; import java.util.logging.Level; import java.util.logging.Logger; import java.util.Iterator; import net.jini.config.ConfigurationException; import net.jini.core.event.RemoteEventListener; import org.dancres.blitz.ActiveObject; import org.dancres.blitz.ActiveObjectRegistry; import org.dancres.blitz.Logging; import org.dancres.blitz.config.ConfigurationFactory; import org.dancres.blitz.mangler.MangledEntry; import org.dancres.blitz.oid.OID; import org.dancres.blitz.stats.StatsBoard; import org.dancres.blitz.txn.TxnState; import org.dancres.blitz.txn.TxnManager; import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue; import EDU.oswego.cs.dl.util.concurrent.PooledExecutor; import org.dancres.blitz.util.QueueStatGenerator; /** The heart of the notify implementation. Events are passed to here and processed against templates. Resulting events are then passed into the notification pool for dispatch. <P> <p>For a transaction end event, we remove any associated registrations held in EventGeneratorFactory.</p> */ public class EventQueue implements ActiveObject { static Logger theLogger = Logging.newLogger("org.dancres.blitz.notify.EventQueue"); private static final EventQueue theEventQueue = new EventQueue(); public static EventQueue get() { return theEventQueue; } private PooledExecutor theProcessors; private long theEventCount = 0; private RemoteEventDispatcher theDispatcher = new RemoteEventDispatcher(); private int NUM_PROCESSORS = 10; private int QUEUE_BOUND = 100; private EventQueue() { ActiveObjectRegistry.add(this); try { NUM_PROCESSORS = ((Integer) ConfigurationFactory.getEntry("maxEventProcessors", int.class, new Integer(1))).intValue(); QUEUE_BOUND = ((Integer) ConfigurationFactory.getEntry("eventQueueBound", int.class, new Integer(0))).intValue(); theLogger.log(Level.INFO, "Max Event Processors: " + NUM_PROCESSORS); theLogger.log(Level.INFO, "Event Queue Bound: " + QUEUE_BOUND); } catch (ConfigurationException aCE) { theLogger.log(Level.SEVERE, "Failed to load config", aCE); } } public void add(QueueEvent anEvent) { add(anEvent, false); } public void add(QueueEvent anEvent, boolean aWaitIndicator) { if (TxnManager.get().isRecovery()) return; // No-one listening, no point in doing work // if (EventGeneratorFactory.get().getCount() == 0) return; if (theLogger.isLoggable(Level.FINEST)) theLogger.log(Level.FINEST, "Event: " + anEvent.getType() + ", " + anEvent.getTxn() + ", " + anEvent.getContext()); try { /* * WTF? For some reason, javac can't resolve the two types below to * DispatchTask without explicit casting - suspecting cursed generics * and type reasoning grrrrr */ DispatchTask aTask = (aWaitIndicator == true) ? (DispatchTask) new BlockingDispatchImpl(this, anEvent) : (DispatchTask) new NonblockingDispatchImpl(this, anEvent); theProcessors.execute(aTask); aTask.block(); } catch (InterruptedException anIE) { theLogger.log(Level.FINEST, "Failed to queue event"); } } public void insert(EventGenerator anEventGenerator) throws IOException { EventGeneratorFactory.get().addTemporary(anEventGenerator); } /** @todo Lock txn during adding of reg for non-null txns. */ public void register(MangledEntry aTemplate, TxnState aTxn, RemoteEventListener aListener, long aLeaseTime, MarshalledObject aHandback, Registrar aRegistrar) throws IOException, RemoteException { if (aTxn == null) { EventGenerator myGenerator = EventGeneratorFactory.get().newPersistentGenerator(aTemplate, aListener, aLeaseTime, aHandback); long myFirstSeqNum = myGenerator.getStartSeqNum(); aRegistrar.newRegistration(myGenerator.getSourceId(), myFirstSeqNum, new SpaceNotifyUID(myGenerator.getId())); } else { EventGenerator myGenerator = EventGeneratorFactory.get().newTransientGenerator(aTemplate, aListener, aLeaseTime, aHandback, aTxn); long myFirstSeqNum = myGenerator.getStartSeqNum(); aRegistrar.newRegistration(myGenerator.getSourceId(), myFirstSeqNum, new SpaceNotifyUID(myGenerator.getId())); } } public void registerVisibility(MangledEntry[] aTemplates, TxnState aTxn, RemoteEventListener aListener, long aLeaseTime, MarshalledObject aHandback, Registrar aRegistrar, boolean visibleOnly) throws IOException, RemoteException { long myFirstSeqNum; EventGenerator myGenerator; if (aTxn == null) { myGenerator = EventGeneratorFactory.get().newPersistentVisibility(aTemplates, aListener, aLeaseTime, aHandback, visibleOnly); } else { myGenerator = EventGeneratorFactory.get().newTransientVisibility(aTemplates, aListener, aLeaseTime, aHandback, aTxn, visibleOnly); } myFirstSeqNum = myGenerator.getStartSeqNum(); aRegistrar.newRegistration(myGenerator.getSourceId(), myFirstSeqNum, new SpaceNotifyUID(myGenerator.getId())); } public void begin() { try { /* * Apply the jump before processing any events. This will be * performed on the in-memory copy and placed on disk at the * next checkpoint. In the meantime, we make sure correct state * is preserved using log records */ EventGeneratorFactory.get().jumpSequenceNumbers(); } catch (IOException anIOE) { theLogger.log(Level.SEVERE, "Failed to apply restart jump", anIOE); } BoundedLinkedQueue myQueue; if (QUEUE_BOUND == 0) { theLogger.log(Level.INFO, "Event queue bounding disabled"); myQueue = new BoundedLinkedQueue(Integer.MAX_VALUE); theProcessors = new PooledExecutor(myQueue, NUM_PROCESSORS); } else { theLogger.log(Level.INFO, "Event queue bounding enabled"); myQueue = new BoundedLinkedQueue(QUEUE_BOUND); theProcessors = new PooledExecutor(myQueue, NUM_PROCESSORS); } theProcessors.setMinimumPoolSize(NUM_PROCESSORS); // theProcessors.waitWhenBlocked(); StatsBoard.get().add(new QueueStatGenerator("Events", myQueue)); } public void halt() { theProcessors.shutdownNow(); synchronized(this) { theLogger.log(Level.INFO, "Processed: " + theEventCount); } } void dispatchImpl(DispatchTask aTask) { synchronized(this) { ++theEventCount; } switch (aTask.getEvent().getType()) { case QueueEvent.TRANSACTION_ENDED : case QueueEvent.ENTRY_WRITE : case QueueEvent.ENTRY_WRITTEN : case QueueEvent.ENTRY_VISIBLE : case QueueEvent.ENTRY_NOT_CONFLICTED : { iterateMatches(aTask); break; } } } private void iterateMatches(DispatchTask aTask) { QueueEvent myEvent = aTask.getEvent(); long myCurrentTime = System.currentTimeMillis(); QueueEvent.Context myContext = myEvent.getContext(); MangledEntry myEntry = null; if (myContext != null) myEntry = myEvent.getContext().getEntry(); try { Iterator myGenerators = EventGeneratorFactory.get().getGenerators(); while (myGenerators.hasNext()) { EventGenerator myGenerator = (EventGenerator) myGenerators.next(); if ((myGenerator.canSee(myEvent, myCurrentTime)) && (myGenerator.matches(myEntry))) { theDispatcher.sendEvent(aTask, myGenerator); } } } catch (IOException anIOE) { theLogger.log(Level.SEVERE, "Couldn't recover generators from factory"); } finally { aTask.enableResolve(); } } public boolean renew(OID aOID, long anExpiry) throws IOException { return EventGeneratorFactory.get().renew(aOID, anExpiry); } public boolean cancel(OID aOID) throws IOException { return EventGeneratorFactory.get().cancel(aOID); } public void kill(OID anOID) throws IOException { EventGeneratorFactory.get().killTemplate(anOID); } }