comparison src/org/dancres/blitz/notify/EventQueue.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:3dc0c5604566
1 package org.dancres.blitz.notify;
2
3 import java.io.IOException;
4 import java.rmi.MarshalledObject;
5 import java.rmi.RemoteException;
6 import java.util.logging.Level;
7 import java.util.logging.Logger;
8 import java.util.Iterator;
9
10 import net.jini.config.ConfigurationException;
11 import net.jini.core.event.RemoteEventListener;
12
13 import org.dancres.blitz.ActiveObject;
14 import org.dancres.blitz.ActiveObjectRegistry;
15 import org.dancres.blitz.Logging;
16 import org.dancres.blitz.config.ConfigurationFactory;
17 import org.dancres.blitz.mangler.MangledEntry;
18 import org.dancres.blitz.oid.OID;
19 import org.dancres.blitz.stats.StatsBoard;
20 import org.dancres.blitz.txn.TxnState;
21 import org.dancres.blitz.txn.TxnManager;
22 import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue;
23 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
24 import org.dancres.blitz.util.QueueStatGenerator;
25
26 /**
27 The heart of the notify implementation. Events are passed to here and
28 processed against templates. Resulting events are then passed into the
29 notification pool for dispatch. <P>
30
31 <p>For a transaction end event, we remove any associated registrations held
32 in EventGeneratorFactory.</p>
33 */
34 public class EventQueue implements ActiveObject {
35 static Logger theLogger =
36 Logging.newLogger("org.dancres.blitz.notify.EventQueue");
37
38 private static final EventQueue theEventQueue = new EventQueue();
39
40 public static EventQueue get() {
41 return theEventQueue;
42 }
43
44 private PooledExecutor theProcessors;
45
46 private long theEventCount = 0;
47
48 private RemoteEventDispatcher theDispatcher = new RemoteEventDispatcher();
49
50 private int NUM_PROCESSORS = 10;
51
52 private int QUEUE_BOUND = 100;
53
54 private EventQueue() {
55 ActiveObjectRegistry.add(this);
56
57 try {
58 NUM_PROCESSORS = ((Integer)
59 ConfigurationFactory.getEntry("maxEventProcessors",
60 int.class,
61 new Integer(1))).intValue();
62 QUEUE_BOUND = ((Integer)
63 ConfigurationFactory.getEntry("eventQueueBound",
64 int.class,
65 new Integer(0))).intValue();
66
67 theLogger.log(Level.INFO, "Max Event Processors: " +
68 NUM_PROCESSORS);
69 theLogger.log(Level.INFO, "Event Queue Bound: " + QUEUE_BOUND);
70
71 } catch (ConfigurationException aCE) {
72 theLogger.log(Level.SEVERE, "Failed to load config", aCE);
73 }
74 }
75
76 public void add(QueueEvent anEvent) {
77 add(anEvent, false);
78 }
79
80 public void add(QueueEvent anEvent, boolean aWaitIndicator) {
81 if (TxnManager.get().isRecovery())
82 return;
83
84 // No-one listening, no point in doing work
85 //
86 if (EventGeneratorFactory.get().getCount() == 0)
87 return;
88
89 if (theLogger.isLoggable(Level.FINEST))
90 theLogger.log(Level.FINEST, "Event: " + anEvent.getType() + ", " +
91 anEvent.getTxn() + ", " + anEvent.getContext());
92 try {
93 /*
94 * WTF? For some reason, javac can't resolve the two types below to
95 * DispatchTask without explicit casting - suspecting cursed generics
96 * and type reasoning grrrrr
97 */
98 DispatchTask aTask = (aWaitIndicator == true) ?
99 (DispatchTask) new BlockingDispatchImpl(this, anEvent) :
100 (DispatchTask) new NonblockingDispatchImpl(this, anEvent);
101
102 theProcessors.execute(aTask);
103
104 aTask.block();
105 } catch (InterruptedException anIE) {
106 theLogger.log(Level.FINEST, "Failed to queue event");
107 }
108 }
109
110 public void insert(EventGenerator anEventGenerator) throws IOException {
111 EventGeneratorFactory.get().addTemporary(anEventGenerator);
112 }
113
114 /**
115 @todo Lock txn during adding of reg for non-null txns.
116 */
117 public void register(MangledEntry aTemplate, TxnState aTxn,
118 RemoteEventListener aListener, long aLeaseTime,
119 MarshalledObject aHandback,
120 Registrar aRegistrar)
121 throws IOException, RemoteException {
122
123
124 if (aTxn == null) {
125 EventGenerator myGenerator =
126 EventGeneratorFactory.get().newPersistentGenerator(aTemplate,
127 aListener,
128 aLeaseTime,
129 aHandback);
130
131 long myFirstSeqNum = myGenerator.getStartSeqNum();
132
133 aRegistrar.newRegistration(myGenerator.getSourceId(),
134 myFirstSeqNum,
135 new SpaceNotifyUID(myGenerator.getId()));
136 } else {
137 EventGenerator myGenerator =
138 EventGeneratorFactory.get().newTransientGenerator(aTemplate,
139 aListener,
140 aLeaseTime,
141 aHandback,
142 aTxn);
143
144 long myFirstSeqNum = myGenerator.getStartSeqNum();
145
146 aRegistrar.newRegistration(myGenerator.getSourceId(),
147 myFirstSeqNum,
148 new SpaceNotifyUID(myGenerator.getId()));
149 }
150 }
151
152 public void registerVisibility(MangledEntry[] aTemplates, TxnState aTxn,
153 RemoteEventListener aListener,
154 long aLeaseTime,
155 MarshalledObject aHandback,
156 Registrar aRegistrar, boolean visibleOnly)
157 throws IOException, RemoteException {
158 long myFirstSeqNum;
159 EventGenerator myGenerator;
160
161 if (aTxn == null) {
162 myGenerator =
163 EventGeneratorFactory.get().newPersistentVisibility(aTemplates,
164 aListener,
165 aLeaseTime,
166 aHandback,
167 visibleOnly);
168 } else {
169 myGenerator =
170 EventGeneratorFactory.get().newTransientVisibility(aTemplates,
171 aListener,
172 aLeaseTime,
173 aHandback,
174 aTxn,
175 visibleOnly);
176
177 }
178
179 myFirstSeqNum = myGenerator.getStartSeqNum();
180 aRegistrar.newRegistration(myGenerator.getSourceId(),
181 myFirstSeqNum,
182 new SpaceNotifyUID(myGenerator.getId()));
183 }
184
185 public void begin() {
186 try {
187 /*
188 * Apply the jump before processing any events. This will be
189 * performed on the in-memory copy and placed on disk at the
190 * next checkpoint. In the meantime, we make sure correct state
191 * is preserved using log records
192 */
193 EventGeneratorFactory.get().jumpSequenceNumbers();
194 } catch (IOException anIOE) {
195 theLogger.log(Level.SEVERE, "Failed to apply restart jump", anIOE);
196 }
197
198 BoundedLinkedQueue myQueue;
199
200 if (QUEUE_BOUND == 0) {
201 theLogger.log(Level.INFO, "Event queue bounding disabled");
202 myQueue = new BoundedLinkedQueue(Integer.MAX_VALUE);
203 theProcessors =
204 new PooledExecutor(myQueue, NUM_PROCESSORS);
205 } else {
206 theLogger.log(Level.INFO, "Event queue bounding enabled");
207 myQueue = new BoundedLinkedQueue(QUEUE_BOUND);
208 theProcessors =
209 new PooledExecutor(myQueue, NUM_PROCESSORS);
210 }
211
212 theProcessors.setMinimumPoolSize(NUM_PROCESSORS);
213 // theProcessors.waitWhenBlocked();
214
215 StatsBoard.get().add(new QueueStatGenerator("Events", myQueue));
216 }
217
218 public void halt() {
219 theProcessors.shutdownNow();
220
221 synchronized(this) {
222 theLogger.log(Level.INFO, "Processed: " + theEventCount);
223 }
224 }
225
226 void dispatchImpl(DispatchTask aTask) {
227 synchronized(this) {
228 ++theEventCount;
229 }
230
231 switch (aTask.getEvent().getType()) {
232 case QueueEvent.TRANSACTION_ENDED :
233 case QueueEvent.ENTRY_WRITE :
234 case QueueEvent.ENTRY_WRITTEN :
235 case QueueEvent.ENTRY_VISIBLE :
236 case QueueEvent.ENTRY_NOT_CONFLICTED : {
237 iterateMatches(aTask);
238 break;
239 }
240 }
241 }
242
243 private void iterateMatches(DispatchTask aTask) {
244 QueueEvent myEvent = aTask.getEvent();
245 long myCurrentTime = System.currentTimeMillis();
246 QueueEvent.Context myContext = myEvent.getContext();
247 MangledEntry myEntry = null;
248
249 if (myContext != null)
250 myEntry = myEvent.getContext().getEntry();
251
252 try {
253 Iterator myGenerators =
254 EventGeneratorFactory.get().getGenerators();
255
256 while (myGenerators.hasNext()) {
257 EventGenerator myGenerator = (EventGenerator) myGenerators.next();
258
259 if ((myGenerator.canSee(myEvent, myCurrentTime)) &&
260 (myGenerator.matches(myEntry))) {
261 theDispatcher.sendEvent(aTask, myGenerator);
262 }
263 }
264 } catch (IOException anIOE) {
265 theLogger.log(Level.SEVERE, "Couldn't recover generators from factory");
266 } finally {
267 aTask.enableResolve();
268 }
269 }
270
271
272 public boolean renew(OID aOID, long anExpiry)
273 throws IOException {
274
275 return EventGeneratorFactory.get().renew(aOID, anExpiry);
276 }
277
278 public boolean cancel(OID aOID)
279 throws IOException {
280
281 return EventGeneratorFactory.get().cancel(aOID);
282 }
283
284 public void kill(OID anOID) throws IOException {
285 EventGeneratorFactory.get().killTemplate(anOID);
286 }
287 }