Mercurial > hg > blitz_condensed
comparison src/org/dancres/blitz/notify/VisibilityImpl.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:3dc0c5604566 |
---|---|
1 package org.dancres.blitz.notify; | |
2 | |
3 import java.io.IOException; | |
4 import java.io.Serializable; | |
5 | |
6 import java.rmi.MarshalledObject; | |
7 import java.rmi.RemoteException; | |
8 import java.rmi.NoSuchObjectException; | |
9 | |
10 import java.util.logging.Level; | |
11 | |
12 import net.jini.core.event.RemoteEventListener; | |
13 import net.jini.core.event.RemoteEvent; | |
14 import net.jini.core.event.UnknownEventException; | |
15 | |
16 import net.jini.core.transaction.TransactionException; | |
17 | |
18 import net.jini.space.AvailabilityEvent; | |
19 import net.jini.space.JavaSpace; | |
20 | |
21 import org.dancres.blitz.txn.TxnId; | |
22 import org.dancres.blitz.txn.TxnManager; | |
23 | |
24 import org.dancres.blitz.mangler.MangledEntry; | |
25 | |
26 import org.dancres.blitz.entry.Types; | |
27 | |
28 import org.dancres.blitz.oid.OID; | |
29 | |
30 import org.dancres.blitz.task.Tasks; | |
31 | |
32 /** | |
33 <p> Represents a single registration via registerForVisbility. If it's | |
34 transactional, it will never be saved as it's considered temporary. If | |
35 it's transactional it will be saved to disk. In addition, it's state will | |
36 be saved to the log periodically (on a number of allocations basis) to | |
37 preserve data in case of a crash. <p> | |
38 | |
39 <p> This class uses notifyPreparer to do initial preparation of the | |
40 passed RemoteEventListener when constructed from scratch (i.e. it's a new | |
41 registration). It uses recoveredNotifyPreparer in those cases where it | |
42 has been de-serialized from storage and is about to be used for the first | |
43 time post a restart </p> | |
44 | |
45 <p> Instances of this object can be "tainted" for various reasons including | |
46 certain kinds of exception from the remote notify implementation, lease | |
47 expiry or transaction commit. Tainting prevents any further operations | |
48 against this instance such as event dispatch, logging changes to disk and | |
49 lease renewal. Tainting also results in cleanup for the registration being | |
50 scheduled.</p> | |
51 | |
52 @todo Refactor the commonet stuff out of here and EventGeneratorImpl | |
53 */ | |
54 public class VisibilityImpl implements EventGenerator { | |
55 private OID theOID; | |
56 private MangledEntry[] theTemplates; | |
57 private MarshalledObject theHandback; | |
58 private MarshalledObject theMarshalledListener; | |
59 private long theSourceId; | |
60 private long theSeqNum; | |
61 private long theLeaseTime; | |
62 private long theStartSeqNum; | |
63 private boolean doVisible; | |
64 | |
65 private int thePingCount; | |
66 | |
67 /** | |
68 Indicates if we've done proxy preparation on the RemoteEventListener | |
69 */ | |
70 private boolean isPrepared; | |
71 | |
72 /** | |
73 Indicates when we're no longer generating events either because we | |
74 couldn't get an appropriate response from the client or because | |
75 we're being deleted. | |
76 */ | |
77 private boolean isTainted; | |
78 | |
79 private RemoteEventListener theListener; | |
80 | |
81 /* | |
82 VisibilityImpls against a specific transaction are never saved so | |
83 we make this transient to help in picking up code which hasn't | |
84 taken account of this fact..... | |
85 */ | |
86 private TxnId theTxnId; | |
87 | |
88 static VisibilityImpl restoreGenerator(VisibilityImplState aState) { | |
89 return new VisibilityImpl(aState); | |
90 } | |
91 | |
92 private VisibilityImpl(VisibilityImplState aState) { | |
93 theOID = aState.getOID(); | |
94 theTemplates = aState.getTemplates(); | |
95 theHandback = aState.getHandback(); | |
96 theMarshalledListener = aState.getListener(); | |
97 theSourceId = aState.getSourceId(); | |
98 theSeqNum = aState.getSeqNum(); | |
99 theLeaseTime = aState.getLeaseTime(); | |
100 doVisible = aState.getJustVisible(); | |
101 } | |
102 | |
103 VisibilityImpl(MangledEntry[] aTemplates, | |
104 MarshalledObject anObject, | |
105 RemoteEventListener aDest, long aSeqNum, | |
106 long aLeaseTime, boolean justVisible) | |
107 throws RemoteException { | |
108 this(aTemplates, anObject, aDest, aSeqNum, | |
109 aLeaseTime, null, justVisible); | |
110 } | |
111 | |
112 VisibilityImpl(MangledEntry[] aTemplates, | |
113 MarshalledObject anObject, | |
114 RemoteEventListener aDest, long aSeqNum, | |
115 long aLeaseTime, TxnId aTxnId, boolean justVisible) | |
116 throws RemoteException { | |
117 | |
118 theTemplates = aTemplates; | |
119 theHandback = anObject; | |
120 theSeqNum = aSeqNum; | |
121 theStartSeqNum = aSeqNum; | |
122 theTxnId = aTxnId; | |
123 theLeaseTime = aLeaseTime; | |
124 theListener = (RemoteEventListener) | |
125 GeneratorConfig.getPreparer().prepareProxy(aDest); | |
126 doVisible = justVisible; | |
127 | |
128 try { | |
129 theMarshalledListener = new MarshalledObject(aDest); | |
130 } catch (IOException anIOE) { | |
131 throw new RemoteException("Failed to marshall listener", anIOE); | |
132 } | |
133 | |
134 isPrepared = true; | |
135 } | |
136 | |
137 public void assign(OID anOID) { | |
138 theOID = anOID; | |
139 | |
140 if (theTxnId == null) | |
141 theSourceId = theOID.getId(); | |
142 else | |
143 theSourceId = theTxnId.getId(); | |
144 } | |
145 | |
146 /** | |
147 Return the first sequence number that this event generator ever | |
148 produced. | |
149 */ | |
150 public long getStartSeqNum() { | |
151 return theStartSeqNum; | |
152 } | |
153 | |
154 public OID getId() { | |
155 return theOID; | |
156 } | |
157 | |
158 public boolean isPersistent() { | |
159 return (theTxnId == null); | |
160 } | |
161 | |
162 public long getSourceId() { | |
163 return theSourceId; | |
164 } | |
165 | |
166 /** | |
167 Once an Visibility becomes tainted it will generate no more events | |
168 and schedule cleanup. | |
169 */ | |
170 public void taint() { | |
171 synchronized(this) { | |
172 // Tainting can only be done once | |
173 // | |
174 if (isTainted) | |
175 return; | |
176 | |
177 isTainted = true; | |
178 } | |
179 | |
180 try { | |
181 Tasks.queue(new CleanTask(getId())); | |
182 } catch (InterruptedException anIE) { | |
183 EventQueue.theLogger.log(Level.SEVERE, | |
184 "Failed to lodge cleanup for: " + getId(), anIE); | |
185 } | |
186 } | |
187 | |
188 /* ******************************************************************** | |
189 Event filtering starts here | |
190 **********************************************************************/ | |
191 | |
192 /** | |
193 Determines whether the passed QueueEvent is "visible" to this | |
194 Visibility. This is determind by checking transaction id's, expiry | |
195 and that the generator isn't "tainted" | |
196 */ | |
197 public boolean canSee(QueueEvent anEvent, long aTime) { | |
198 synchronized(this) { | |
199 if (isTainted) | |
200 return false; | |
201 | |
202 if (aTime > theLeaseTime) { | |
203 taint(); | |
204 return false; | |
205 } | |
206 | |
207 // If it's a visibility event, it's the result of something | |
208 // that was previously written and committed (and generated events | |
209 // previously) which has now been marked taken and now is not | |
210 // taken | |
211 if ((anEvent.getType() == QueueEvent.ENTRY_VISIBLE) || | |
212 // Only post this one if user has said they're interested | |
213 // in visible operations | |
214 ((anEvent.getType() == QueueEvent.ENTRY_NOT_CONFLICTED) && | |
215 (!doVisible))) | |
216 return true; | |
217 | |
218 // If we're associated with a transaction..... | |
219 if (theTxnId != null) { | |
220 | |
221 if ((anEvent.getType() == QueueEvent.TRANSACTION_ENDED) && | |
222 (theTxnId.equals(anEvent.getTxn().getId()))) { | |
223 taint(); | |
224 return false; | |
225 } | |
226 | |
227 // We see all Written's including those generated by us | |
228 if (anEvent.getType() == QueueEvent.ENTRY_WRITTEN) | |
229 return true; | |
230 | |
231 // If it's not a written we can only see it if we originated it | |
232 return (anEvent.getTxn().getId().equals(theTxnId)); | |
233 } else { | |
234 // Only see ENTRY_WRITTENS | |
235 return (anEvent.getType() == QueueEvent.ENTRY_WRITTEN); | |
236 } | |
237 } | |
238 } | |
239 | |
240 public boolean matches(MangledEntry anEntry) { | |
241 for (int i = 0; i < theTemplates.length; i++) { | |
242 MangledEntry myTemplate = theTemplates[i]; | |
243 | |
244 if (Types.isSubtype(myTemplate.getType(), anEntry.getType())) { | |
245 if (myTemplate.match(anEntry)) | |
246 return true; | |
247 } | |
248 } | |
249 | |
250 return false; | |
251 } | |
252 | |
253 /* ******************************************************************** | |
254 Lease management starts here | |
255 **********************************************************************/ | |
256 | |
257 public boolean renew(long aTime) { | |
258 synchronized(this) { | |
259 if (isTainted) | |
260 return false; | |
261 | |
262 if (System.currentTimeMillis() > theLeaseTime) | |
263 return false; | |
264 | |
265 theLeaseTime = aTime; | |
266 return true; | |
267 } | |
268 } | |
269 | |
270 /* ******************************************************************** | |
271 Recovery starts here | |
272 **********************************************************************/ | |
273 | |
274 public void recover(long aSeqNum) { | |
275 synchronized(this) { | |
276 if (aSeqNum > theSeqNum) { | |
277 theSeqNum = aSeqNum; | |
278 } | |
279 } | |
280 } | |
281 | |
282 /** | |
283 Jumps the sequence number by the RESTART_JUMP | |
284 */ | |
285 public long jumpSequenceNumber() { | |
286 synchronized(this) { | |
287 theSeqNum += (GeneratorConfig.getRestartJump() + | |
288 GeneratorConfig.getSaveInterval()); | |
289 | |
290 return theSeqNum; | |
291 } | |
292 } | |
293 | |
294 /** | |
295 Jumps the sequence number if it's not already at this minimum. | |
296 */ | |
297 public long jumpSequenceNumber(long aMin) { | |
298 synchronized(this) { | |
299 if (theSeqNum < aMin) { | |
300 theSeqNum = aMin + GeneratorConfig.getSaveInterval(); | |
301 } | |
302 | |
303 return theSeqNum; | |
304 } | |
305 } | |
306 | |
307 /* ******************************************************************** | |
308 Event Generation starts here | |
309 **********************************************************************/ | |
310 | |
311 /** | |
312 Dispatches a remote event to a client. | |
313 */ | |
314 public void ping(QueueEvent anEvent, JavaSpace aSource) { | |
315 SeqNumInterval mySnapshot = null; | |
316 | |
317 synchronized(this) { | |
318 if (isTainted) | |
319 return; | |
320 | |
321 RemoteEventListener myTarget = null; | |
322 | |
323 try { | |
324 RemoteEvent myEvent = | |
325 newEvent(aSource, anEvent.getContext().getEntry(), | |
326 (anEvent.getType() != QueueEvent.ENTRY_NOT_CONFLICTED)); | |
327 | |
328 myTarget = getDest(); | |
329 | |
330 myTarget.notify(myEvent); | |
331 | |
332 mySnapshot = shouldLog(); | |
333 | |
334 } catch (UnknownEventException aUEE) { | |
335 RemoteEventDispatcher.theLogger.log(Level.SEVERE, | |
336 "Couldn't send event [Trash]" + | |
337 myTarget, aUEE); | |
338 taint(); | |
339 } catch (NoSuchObjectException anNSOE) { | |
340 RemoteEventDispatcher.theLogger.log(Level.SEVERE, | |
341 "Couldn't send event [Trash]" + | |
342 myTarget, anNSOE); | |
343 taint(); | |
344 } catch (RemoteException anRE) { | |
345 RemoteEventDispatcher.theLogger.log(Level.SEVERE, | |
346 "Couldn't send event " + | |
347 myTarget, anRE); | |
348 } | |
349 } | |
350 | |
351 try { | |
352 /* | |
353 See recover() for details on why we can log this outside of the | |
354 lock | |
355 */ | |
356 if (mySnapshot != null) | |
357 TxnManager.get().log(mySnapshot); | |
358 } catch (TransactionException aTE) { | |
359 RemoteEventDispatcher.theLogger.log(Level.SEVERE, | |
360 "Couldn't update EventGenerator", aTE); | |
361 } | |
362 } | |
363 | |
364 private RemoteEvent newEvent(JavaSpace aSource, MangledEntry anEntry, | |
365 boolean isVisible) { | |
366 ++thePingCount; | |
367 | |
368 return new AvailabilityEventImpl(aSource, theSourceId, theSeqNum++, | |
369 theHandback, anEntry, isVisible); | |
370 } | |
371 | |
372 private RemoteEventListener getDest() | |
373 throws RemoteException { | |
374 | |
375 // This flag is only set in the constructor or by us. | |
376 if (!isPrepared) { | |
377 try { | |
378 Object myListener = theMarshalledListener.get(); | |
379 | |
380 theListener = (RemoteEventListener) | |
381 GeneratorConfig.getRecoveryPreparer().prepareProxy(myListener); | |
382 isPrepared = true; | |
383 } catch (IOException anIOE) { | |
384 throw new RemoteException("Failed to unmarshall listener", | |
385 anIOE); | |
386 } catch (ClassNotFoundException aCNFE) { | |
387 throw new RemoteException("Failed to load class for listener", | |
388 aCNFE); | |
389 } | |
390 } | |
391 | |
392 return theListener; | |
393 } | |
394 | |
395 /* ******************************************************************** | |
396 State save/recovery starts here | |
397 **********************************************************************/ | |
398 | |
399 public EventGeneratorState getMemento() { | |
400 synchronized(this) { | |
401 return new VisibilityImplState(theOID, theTemplates, | |
402 theHandback, | |
403 theMarshalledListener, | |
404 theSourceId, | |
405 theSeqNum, theLeaseTime, | |
406 theTxnId, doVisible); | |
407 } | |
408 } | |
409 | |
410 /** | |
411 * Call this with synchronized asserted on the Generator | |
412 */ | |
413 private SeqNumInterval shouldLog() { | |
414 // Never log transactional notify registrations | |
415 // | |
416 if (theTxnId != null) { | |
417 thePingCount = 0; | |
418 return null; | |
419 } | |
420 | |
421 if (thePingCount == GeneratorConfig.getSaveInterval()) { | |
422 thePingCount = 0; | |
423 return new SeqNumInterval(theOID, theSeqNum); | |
424 } else | |
425 return null; | |
426 } | |
427 } |