Mercurial > hg > blitz_condensed
comparison src/org/dancres/blitz/notify/EventGeneratorImpl.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:3dc0c5604566 |
---|---|
1 package org.dancres.blitz.notify; | |
2 | |
3 import java.io.IOException; | |
4 | |
5 import java.rmi.MarshalledObject; | |
6 import java.rmi.RemoteException; | |
7 import java.rmi.NoSuchObjectException; | |
8 | |
9 import java.util.logging.Level; | |
10 | |
11 import net.jini.core.event.RemoteEventListener; | |
12 import net.jini.core.event.RemoteEvent; | |
13 import net.jini.core.event.UnknownEventException; | |
14 | |
15 import net.jini.core.transaction.TransactionException; | |
16 | |
17 import net.jini.space.JavaSpace; | |
18 | |
19 import org.dancres.blitz.txn.TxnId; | |
20 import org.dancres.blitz.txn.TxnManager; | |
21 | |
22 import org.dancres.blitz.mangler.MangledEntry; | |
23 | |
24 import org.dancres.blitz.entry.Types; | |
25 | |
26 import org.dancres.blitz.oid.OID; | |
27 | |
28 import org.dancres.blitz.task.Tasks; | |
29 | |
30 /** | |
31 <p> Represents a single registration via notify. If it's transactional, | |
32 it will never be saved as it's considered temporary. If it's transactional | |
33 it will be saved to disk. In addition, it's state will be saved to the | |
34 log periodically (on a number of allocations basis) to preserve data | |
35 in case of a crash. <p> | |
36 | |
37 <p> This class uses notifyPreparer to do initial preparation of the | |
38 passed RemoteEventListener when constructed from scratch (i.e. it's a new | |
39 registration). It uses recoveredNotifyPreparer in those cases where it | |
40 has been de-serialized from storage and is about to be used for the first | |
41 time post a restart </p> | |
42 | |
43 <p> Instances of this object can be "tainted" for various reasons including | |
44 certain kinds of exception from the remote notify implementation, lease | |
45 expiry or transaction commit. Tainting prevents any further operations | |
46 against this instance such as event dispatch, logging changes to disk and | |
47 lease renewal. Tainting also results in cleanup for the registration being | |
48 scheduled.</p> | |
49 */ | |
50 public class EventGeneratorImpl implements EventGenerator { | |
51 private OID theOID; | |
52 private MangledEntry theTemplate; | |
53 private MarshalledObject theHandback; | |
54 private MarshalledObject theMarshalledListener; | |
55 private long theSourceId; | |
56 private long theSeqNum; | |
57 private long theLeaseTime; | |
58 private long theStartSeqNum; | |
59 | |
60 private int thePingCount; | |
61 | |
62 /** | |
63 Indicates if we've done proxy preparation on the RemoteEventListener | |
64 */ | |
65 private boolean isPrepared; | |
66 | |
67 /** | |
68 Indicates when we're no longer generating events either because we | |
69 couldn't get an appropriate response from the client or because | |
70 we're being deleted. | |
71 */ | |
72 private boolean isTainted; | |
73 | |
74 private RemoteEventListener theListener; | |
75 | |
76 private TxnId theTxnId; | |
77 | |
78 static EventGeneratorImpl restoreGenerator(EventGeneratorImplState aState) { | |
79 return new EventGeneratorImpl(aState); | |
80 } | |
81 | |
82 private EventGeneratorImpl(EventGeneratorImplState aState) { | |
83 theOID = aState.getOID(); | |
84 theTemplate = aState.getTemplate(); | |
85 theHandback = aState.getHandback(); | |
86 theMarshalledListener = aState.getListener(); | |
87 theSourceId = aState.getSourceId(); | |
88 theSeqNum = aState.getSeqNum(); | |
89 theLeaseTime = aState.getLeaseTime(); | |
90 } | |
91 | |
92 EventGeneratorImpl(MangledEntry aTemplate, | |
93 MarshalledObject anObject, | |
94 RemoteEventListener aDest, long aSeqNum, | |
95 long aLeaseTime) throws RemoteException { | |
96 this(aTemplate, anObject, aDest, aSeqNum, aLeaseTime, null); | |
97 } | |
98 | |
99 EventGeneratorImpl(MangledEntry aTemplate, | |
100 MarshalledObject anObject, | |
101 RemoteEventListener aDest, long aSeqNum, | |
102 long aLeaseTime, TxnId aTxnId) throws RemoteException { | |
103 theTemplate = aTemplate; | |
104 theHandback = anObject; | |
105 theSeqNum = aSeqNum; | |
106 theStartSeqNum = aSeqNum; | |
107 theTxnId = aTxnId; | |
108 theLeaseTime = aLeaseTime; | |
109 theListener = (RemoteEventListener) | |
110 GeneratorConfig.getPreparer().prepareProxy(aDest); | |
111 | |
112 try { | |
113 theMarshalledListener = new MarshalledObject(aDest); | |
114 } catch (IOException anIOE) { | |
115 throw new RemoteException("Failed to marshall listener", anIOE); | |
116 } | |
117 | |
118 isPrepared = true; | |
119 } | |
120 | |
121 public void assign(OID anOID) { | |
122 theOID = anOID; | |
123 | |
124 if (theTxnId == null) | |
125 theSourceId = theOID.getId(); | |
126 else | |
127 theSourceId = theTxnId.getId(); | |
128 } | |
129 | |
130 /** | |
131 Return the first sequence number that this event generator ever | |
132 produced. | |
133 */ | |
134 public long getStartSeqNum() { | |
135 return theStartSeqNum; | |
136 } | |
137 | |
138 public OID getId() { | |
139 return theOID; | |
140 } | |
141 | |
142 public boolean isPersistent() { | |
143 return (theTxnId == null); | |
144 } | |
145 | |
146 public long getSourceId() { | |
147 return theSourceId; | |
148 } | |
149 | |
150 /** | |
151 Once an EventGenerator becomes tainted it will generate no more events | |
152 and schedule cleanup. | |
153 | |
154 @todo This should maybe straight call | |
155 EventGeneratorFactory.get().killTemplate(theId); outside the sync | |
156 block - no need to use another thread | |
157 */ | |
158 public void taint() { | |
159 synchronized(this) { | |
160 // Tainting can only be done once | |
161 // | |
162 if (isTainted) | |
163 return; | |
164 | |
165 isTainted = true; | |
166 } | |
167 | |
168 try { | |
169 Tasks.queue(new CleanTask(getId())); | |
170 } catch (InterruptedException anIE) { | |
171 EventQueue.theLogger.log(Level.WARNING, | |
172 "Failed to lodge cleanup for: " + getId(), anIE); | |
173 } | |
174 } | |
175 | |
176 /* ******************************************************************** | |
177 Event filtering starts here | |
178 **********************************************************************/ | |
179 | |
180 /** | |
181 Determines whether the passed QueueEvent is "visible" to this | |
182 EventGenerator. This is determind by checking transaction id's, expiry | |
183 and that the generator isn't "tainted" | |
184 */ | |
185 public boolean canSee(QueueEvent anEvent, long aTime) { | |
186 if (anEvent.getType() == QueueEvent.ENTRY_VISIBLE) | |
187 return false; | |
188 | |
189 synchronized(this) { | |
190 if (isTainted) | |
191 return false; | |
192 | |
193 if (aTime > theLeaseTime) { | |
194 taint(); | |
195 return false; | |
196 } | |
197 | |
198 // If we're associated with a transaction..... | |
199 if (theTxnId != null) { | |
200 | |
201 if ((anEvent.getType() == QueueEvent.TRANSACTION_ENDED) && | |
202 (theTxnId.equals(anEvent.getTxn().getId()))) { | |
203 taint(); | |
204 return false; | |
205 } | |
206 | |
207 // We see all Written's including those generated by us | |
208 if (anEvent.getType() == QueueEvent.ENTRY_WRITTEN) | |
209 return true; | |
210 | |
211 // If it's not a written we can only see it if we originated it | |
212 return (anEvent.getTxn().getId().equals(theTxnId)); | |
213 } else { | |
214 // Only see ENTRY_WRITTENS | |
215 return (anEvent.getType() == QueueEvent.ENTRY_WRITTEN); | |
216 } | |
217 } | |
218 } | |
219 | |
220 public boolean matches(MangledEntry anEntry) { | |
221 return Types.isSubtype(theTemplate.getType(), anEntry.getType()) && | |
222 theTemplate.match(anEntry); | |
223 } | |
224 | |
225 /* ******************************************************************** | |
226 Lease management starts here | |
227 **********************************************************************/ | |
228 | |
229 public boolean renew(long aTime) { | |
230 synchronized(this) { | |
231 if (isTainted) | |
232 return false; | |
233 | |
234 if (System.currentTimeMillis() > theLeaseTime) | |
235 return false; | |
236 | |
237 theLeaseTime = aTime; | |
238 return true; | |
239 } | |
240 } | |
241 | |
242 /* ******************************************************************** | |
243 Recovery starts here | |
244 **********************************************************************/ | |
245 | |
246 /** | |
247 * Sequence number intervals can come out of order such that a greater | |
248 * interval may be applied before a lesser one. With that being the case | |
249 * we test the size of the sequence number and assign if it's greater | |
250 * rather than just a straight assignment | |
251 */ | |
252 public void recover(long aSeqNum) { | |
253 synchronized(this) { | |
254 if (aSeqNum > theSeqNum) { | |
255 theSeqNum = aSeqNum; | |
256 } | |
257 } | |
258 } | |
259 | |
260 /** | |
261 Jumps the sequence number by the RESTART_JUMP | |
262 */ | |
263 public long jumpSequenceNumber() { | |
264 synchronized(this) { | |
265 theSeqNum += (GeneratorConfig.getRestartJump() + | |
266 GeneratorConfig.getSaveInterval()); | |
267 | |
268 return theSeqNum; | |
269 } | |
270 } | |
271 | |
272 /** | |
273 Jumps the sequence number if it's not already at this minimum. | |
274 */ | |
275 public long jumpSequenceNumber(long aMin) { | |
276 synchronized(this) { | |
277 if (theSeqNum < aMin) { | |
278 theSeqNum = aMin + GeneratorConfig.getSaveInterval(); | |
279 } | |
280 | |
281 return theSeqNum; | |
282 } | |
283 } | |
284 | |
285 /* ******************************************************************** | |
286 Event Generation starts here | |
287 **********************************************************************/ | |
288 | |
289 /** | |
290 Dispatches a remote event to a client. | |
291 */ | |
292 public void ping(QueueEvent anEvent, JavaSpace aSource) { | |
293 SeqNumInterval mySnapshot = null; | |
294 | |
295 synchronized(this) { | |
296 if (isTainted) | |
297 return; | |
298 | |
299 RemoteEventListener myTarget = null; | |
300 | |
301 try { | |
302 | |
303 RemoteEvent myEvent = newEvent(aSource); | |
304 myTarget = getDest(); | |
305 | |
306 myTarget.notify(myEvent); | |
307 | |
308 mySnapshot = shouldLog(); | |
309 | |
310 } catch (UnknownEventException aUEE) { | |
311 RemoteEventDispatcher.theLogger.log(Level.SEVERE, | |
312 "Couldn't send event [Trash]" + | |
313 myTarget, aUEE); | |
314 taint(); | |
315 } catch (NoSuchObjectException anNSOE) { | |
316 RemoteEventDispatcher.theLogger.log(Level.SEVERE, | |
317 "Couldn't send event [Trash]" + | |
318 myTarget, anNSOE); | |
319 taint(); | |
320 } catch (RemoteException anRE) { | |
321 RemoteEventDispatcher.theLogger.log(Level.SEVERE, | |
322 "Couldn't send event " + | |
323 myTarget, anRE); | |
324 } | |
325 } | |
326 | |
327 try { | |
328 /* | |
329 See recover() for details on why we can log this outside of the | |
330 lock | |
331 */ | |
332 if (mySnapshot != null) | |
333 TxnManager.get().log(mySnapshot); | |
334 } catch (TransactionException aTE) { | |
335 RemoteEventDispatcher.theLogger.log(Level.SEVERE, | |
336 "Couldn't update EventGenerator", aTE); | |
337 } | |
338 } | |
339 | |
340 private RemoteEvent newEvent(Object aSource) { | |
341 ++thePingCount; | |
342 | |
343 return new RemoteEvent(aSource, theSourceId, theSeqNum++, | |
344 theHandback); | |
345 } | |
346 | |
347 private RemoteEventListener getDest() | |
348 throws RemoteException { | |
349 | |
350 // This flag is only set in the constructor or by us. | |
351 if (!isPrepared) { | |
352 try { | |
353 Object myListener = theMarshalledListener.get(); | |
354 | |
355 theListener = (RemoteEventListener) | |
356 GeneratorConfig.getRecoveryPreparer().prepareProxy(myListener); | |
357 isPrepared = true; | |
358 } catch (IOException anIOE) { | |
359 throw new RemoteException("Failed to unmarshall listener", | |
360 anIOE); | |
361 } catch (ClassNotFoundException aCNFE) { | |
362 throw new RemoteException("Failed to load class for listener", | |
363 aCNFE); | |
364 } | |
365 } | |
366 | |
367 return theListener; | |
368 } | |
369 | |
370 /* ******************************************************************** | |
371 State save/recovery starts here | |
372 **********************************************************************/ | |
373 | |
374 public EventGeneratorState getMemento() { | |
375 synchronized(this) { | |
376 return new EventGeneratorImplState(theOID, theTemplate, | |
377 theHandback, | |
378 theMarshalledListener, | |
379 theSourceId, | |
380 theSeqNum, theLeaseTime, | |
381 theTxnId); | |
382 } | |
383 } | |
384 | |
385 /** | |
386 Call this with synchronized asserted on the Generator | |
387 */ | |
388 private SeqNumInterval shouldLog() { | |
389 // Never log transactional notify registrations | |
390 // | |
391 if (theTxnId != null) { | |
392 thePingCount = 0; | |
393 return null; | |
394 } | |
395 | |
396 if (thePingCount == GeneratorConfig.getSaveInterval()) { | |
397 thePingCount = 0; | |
398 return new SeqNumInterval(theOID, theSeqNum); | |
399 } else | |
400 return null; | |
401 } | |
402 } |