Mercurial > hg > blitz_condensed
comparison src/org/dancres/blitz/notify/EventQueue.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:3dc0c5604566 |
---|---|
1 package org.dancres.blitz.notify; | |
2 | |
3 import java.io.IOException; | |
4 import java.rmi.MarshalledObject; | |
5 import java.rmi.RemoteException; | |
6 import java.util.logging.Level; | |
7 import java.util.logging.Logger; | |
8 import java.util.Iterator; | |
9 | |
10 import net.jini.config.ConfigurationException; | |
11 import net.jini.core.event.RemoteEventListener; | |
12 | |
13 import org.dancres.blitz.ActiveObject; | |
14 import org.dancres.blitz.ActiveObjectRegistry; | |
15 import org.dancres.blitz.Logging; | |
16 import org.dancres.blitz.config.ConfigurationFactory; | |
17 import org.dancres.blitz.mangler.MangledEntry; | |
18 import org.dancres.blitz.oid.OID; | |
19 import org.dancres.blitz.stats.StatsBoard; | |
20 import org.dancres.blitz.txn.TxnState; | |
21 import org.dancres.blitz.txn.TxnManager; | |
22 import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue; | |
23 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor; | |
24 import org.dancres.blitz.util.QueueStatGenerator; | |
25 | |
26 /** | |
27 The heart of the notify implementation. Events are passed to here and | |
28 processed against templates. Resulting events are then passed into the | |
29 notification pool for dispatch. <P> | |
30 | |
31 <p>For a transaction end event, we remove any associated registrations held | |
32 in EventGeneratorFactory.</p> | |
33 */ | |
34 public class EventQueue implements ActiveObject { | |
35 static Logger theLogger = | |
36 Logging.newLogger("org.dancres.blitz.notify.EventQueue"); | |
37 | |
38 private static final EventQueue theEventQueue = new EventQueue(); | |
39 | |
40 public static EventQueue get() { | |
41 return theEventQueue; | |
42 } | |
43 | |
44 private PooledExecutor theProcessors; | |
45 | |
46 private long theEventCount = 0; | |
47 | |
48 private RemoteEventDispatcher theDispatcher = new RemoteEventDispatcher(); | |
49 | |
50 private int NUM_PROCESSORS = 10; | |
51 | |
52 private int QUEUE_BOUND = 100; | |
53 | |
54 private EventQueue() { | |
55 ActiveObjectRegistry.add(this); | |
56 | |
57 try { | |
58 NUM_PROCESSORS = ((Integer) | |
59 ConfigurationFactory.getEntry("maxEventProcessors", | |
60 int.class, | |
61 new Integer(1))).intValue(); | |
62 QUEUE_BOUND = ((Integer) | |
63 ConfigurationFactory.getEntry("eventQueueBound", | |
64 int.class, | |
65 new Integer(0))).intValue(); | |
66 | |
67 theLogger.log(Level.INFO, "Max Event Processors: " + | |
68 NUM_PROCESSORS); | |
69 theLogger.log(Level.INFO, "Event Queue Bound: " + QUEUE_BOUND); | |
70 | |
71 } catch (ConfigurationException aCE) { | |
72 theLogger.log(Level.SEVERE, "Failed to load config", aCE); | |
73 } | |
74 } | |
75 | |
76 public void add(QueueEvent anEvent) { | |
77 add(anEvent, false); | |
78 } | |
79 | |
80 public void add(QueueEvent anEvent, boolean aWaitIndicator) { | |
81 if (TxnManager.get().isRecovery()) | |
82 return; | |
83 | |
84 // No-one listening, no point in doing work | |
85 // | |
86 if (EventGeneratorFactory.get().getCount() == 0) | |
87 return; | |
88 | |
89 if (theLogger.isLoggable(Level.FINEST)) | |
90 theLogger.log(Level.FINEST, "Event: " + anEvent.getType() + ", " + | |
91 anEvent.getTxn() + ", " + anEvent.getContext()); | |
92 try { | |
93 /* | |
94 * WTF? For some reason, javac can't resolve the two types below to | |
95 * DispatchTask without explicit casting - suspecting cursed generics | |
96 * and type reasoning grrrrr | |
97 */ | |
98 DispatchTask aTask = (aWaitIndicator == true) ? | |
99 (DispatchTask) new BlockingDispatchImpl(this, anEvent) : | |
100 (DispatchTask) new NonblockingDispatchImpl(this, anEvent); | |
101 | |
102 theProcessors.execute(aTask); | |
103 | |
104 aTask.block(); | |
105 } catch (InterruptedException anIE) { | |
106 theLogger.log(Level.FINEST, "Failed to queue event"); | |
107 } | |
108 } | |
109 | |
110 public void insert(EventGenerator anEventGenerator) throws IOException { | |
111 EventGeneratorFactory.get().addTemporary(anEventGenerator); | |
112 } | |
113 | |
114 /** | |
115 @todo Lock txn during adding of reg for non-null txns. | |
116 */ | |
117 public void register(MangledEntry aTemplate, TxnState aTxn, | |
118 RemoteEventListener aListener, long aLeaseTime, | |
119 MarshalledObject aHandback, | |
120 Registrar aRegistrar) | |
121 throws IOException, RemoteException { | |
122 | |
123 | |
124 if (aTxn == null) { | |
125 EventGenerator myGenerator = | |
126 EventGeneratorFactory.get().newPersistentGenerator(aTemplate, | |
127 aListener, | |
128 aLeaseTime, | |
129 aHandback); | |
130 | |
131 long myFirstSeqNum = myGenerator.getStartSeqNum(); | |
132 | |
133 aRegistrar.newRegistration(myGenerator.getSourceId(), | |
134 myFirstSeqNum, | |
135 new SpaceNotifyUID(myGenerator.getId())); | |
136 } else { | |
137 EventGenerator myGenerator = | |
138 EventGeneratorFactory.get().newTransientGenerator(aTemplate, | |
139 aListener, | |
140 aLeaseTime, | |
141 aHandback, | |
142 aTxn); | |
143 | |
144 long myFirstSeqNum = myGenerator.getStartSeqNum(); | |
145 | |
146 aRegistrar.newRegistration(myGenerator.getSourceId(), | |
147 myFirstSeqNum, | |
148 new SpaceNotifyUID(myGenerator.getId())); | |
149 } | |
150 } | |
151 | |
152 public void registerVisibility(MangledEntry[] aTemplates, TxnState aTxn, | |
153 RemoteEventListener aListener, | |
154 long aLeaseTime, | |
155 MarshalledObject aHandback, | |
156 Registrar aRegistrar, boolean visibleOnly) | |
157 throws IOException, RemoteException { | |
158 long myFirstSeqNum; | |
159 EventGenerator myGenerator; | |
160 | |
161 if (aTxn == null) { | |
162 myGenerator = | |
163 EventGeneratorFactory.get().newPersistentVisibility(aTemplates, | |
164 aListener, | |
165 aLeaseTime, | |
166 aHandback, | |
167 visibleOnly); | |
168 } else { | |
169 myGenerator = | |
170 EventGeneratorFactory.get().newTransientVisibility(aTemplates, | |
171 aListener, | |
172 aLeaseTime, | |
173 aHandback, | |
174 aTxn, | |
175 visibleOnly); | |
176 | |
177 } | |
178 | |
179 myFirstSeqNum = myGenerator.getStartSeqNum(); | |
180 aRegistrar.newRegistration(myGenerator.getSourceId(), | |
181 myFirstSeqNum, | |
182 new SpaceNotifyUID(myGenerator.getId())); | |
183 } | |
184 | |
185 public void begin() { | |
186 try { | |
187 /* | |
188 * Apply the jump before processing any events. This will be | |
189 * performed on the in-memory copy and placed on disk at the | |
190 * next checkpoint. In the meantime, we make sure correct state | |
191 * is preserved using log records | |
192 */ | |
193 EventGeneratorFactory.get().jumpSequenceNumbers(); | |
194 } catch (IOException anIOE) { | |
195 theLogger.log(Level.SEVERE, "Failed to apply restart jump", anIOE); | |
196 } | |
197 | |
198 BoundedLinkedQueue myQueue; | |
199 | |
200 if (QUEUE_BOUND == 0) { | |
201 theLogger.log(Level.INFO, "Event queue bounding disabled"); | |
202 myQueue = new BoundedLinkedQueue(Integer.MAX_VALUE); | |
203 theProcessors = | |
204 new PooledExecutor(myQueue, NUM_PROCESSORS); | |
205 } else { | |
206 theLogger.log(Level.INFO, "Event queue bounding enabled"); | |
207 myQueue = new BoundedLinkedQueue(QUEUE_BOUND); | |
208 theProcessors = | |
209 new PooledExecutor(myQueue, NUM_PROCESSORS); | |
210 } | |
211 | |
212 theProcessors.setMinimumPoolSize(NUM_PROCESSORS); | |
213 // theProcessors.waitWhenBlocked(); | |
214 | |
215 StatsBoard.get().add(new QueueStatGenerator("Events", myQueue)); | |
216 } | |
217 | |
218 public void halt() { | |
219 theProcessors.shutdownNow(); | |
220 | |
221 synchronized(this) { | |
222 theLogger.log(Level.INFO, "Processed: " + theEventCount); | |
223 } | |
224 } | |
225 | |
226 void dispatchImpl(DispatchTask aTask) { | |
227 synchronized(this) { | |
228 ++theEventCount; | |
229 } | |
230 | |
231 switch (aTask.getEvent().getType()) { | |
232 case QueueEvent.TRANSACTION_ENDED : | |
233 case QueueEvent.ENTRY_WRITE : | |
234 case QueueEvent.ENTRY_WRITTEN : | |
235 case QueueEvent.ENTRY_VISIBLE : | |
236 case QueueEvent.ENTRY_NOT_CONFLICTED : { | |
237 iterateMatches(aTask); | |
238 break; | |
239 } | |
240 } | |
241 } | |
242 | |
243 private void iterateMatches(DispatchTask aTask) { | |
244 QueueEvent myEvent = aTask.getEvent(); | |
245 long myCurrentTime = System.currentTimeMillis(); | |
246 QueueEvent.Context myContext = myEvent.getContext(); | |
247 MangledEntry myEntry = null; | |
248 | |
249 if (myContext != null) | |
250 myEntry = myEvent.getContext().getEntry(); | |
251 | |
252 try { | |
253 Iterator myGenerators = | |
254 EventGeneratorFactory.get().getGenerators(); | |
255 | |
256 while (myGenerators.hasNext()) { | |
257 EventGenerator myGenerator = (EventGenerator) myGenerators.next(); | |
258 | |
259 if ((myGenerator.canSee(myEvent, myCurrentTime)) && | |
260 (myGenerator.matches(myEntry))) { | |
261 theDispatcher.sendEvent(aTask, myGenerator); | |
262 } | |
263 } | |
264 } catch (IOException anIOE) { | |
265 theLogger.log(Level.SEVERE, "Couldn't recover generators from factory"); | |
266 } finally { | |
267 aTask.enableResolve(); | |
268 } | |
269 } | |
270 | |
271 | |
272 public boolean renew(OID aOID, long anExpiry) | |
273 throws IOException { | |
274 | |
275 return EventGeneratorFactory.get().renew(aOID, anExpiry); | |
276 } | |
277 | |
278 public boolean cancel(OID aOID) | |
279 throws IOException { | |
280 | |
281 return EventGeneratorFactory.get().cancel(aOID); | |
282 } | |
283 | |
284 public void kill(OID anOID) throws IOException { | |
285 EventGeneratorFactory.get().killTemplate(anOID); | |
286 } | |
287 } |