comparison src/org/dancres/blitz/notify/VisibilityImpl.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:3dc0c5604566
1 package org.dancres.blitz.notify;
2
3 import java.io.IOException;
4 import java.io.Serializable;
5
6 import java.rmi.MarshalledObject;
7 import java.rmi.RemoteException;
8 import java.rmi.NoSuchObjectException;
9
10 import java.util.logging.Level;
11
12 import net.jini.core.event.RemoteEventListener;
13 import net.jini.core.event.RemoteEvent;
14 import net.jini.core.event.UnknownEventException;
15
16 import net.jini.core.transaction.TransactionException;
17
18 import net.jini.space.AvailabilityEvent;
19 import net.jini.space.JavaSpace;
20
21 import org.dancres.blitz.txn.TxnId;
22 import org.dancres.blitz.txn.TxnManager;
23
24 import org.dancres.blitz.mangler.MangledEntry;
25
26 import org.dancres.blitz.entry.Types;
27
28 import org.dancres.blitz.oid.OID;
29
30 import org.dancres.blitz.task.Tasks;
31
32 /**
33 <p> Represents a single registration via registerForVisbility. If it's
34 transactional, it will never be saved as it's considered temporary. If
35 it's transactional it will be saved to disk. In addition, it's state will
36 be saved to the log periodically (on a number of allocations basis) to
37 preserve data in case of a crash. <p>
38
39 <p> This class uses notifyPreparer to do initial preparation of the
40 passed RemoteEventListener when constructed from scratch (i.e. it's a new
41 registration). It uses recoveredNotifyPreparer in those cases where it
42 has been de-serialized from storage and is about to be used for the first
43 time post a restart </p>
44
45 <p> Instances of this object can be "tainted" for various reasons including
46 certain kinds of exception from the remote notify implementation, lease
47 expiry or transaction commit. Tainting prevents any further operations
48 against this instance such as event dispatch, logging changes to disk and
49 lease renewal. Tainting also results in cleanup for the registration being
50 scheduled.</p>
51
52 @todo Refactor the commonet stuff out of here and EventGeneratorImpl
53 */
54 public class VisibilityImpl implements EventGenerator {
55 private OID theOID;
56 private MangledEntry[] theTemplates;
57 private MarshalledObject theHandback;
58 private MarshalledObject theMarshalledListener;
59 private long theSourceId;
60 private long theSeqNum;
61 private long theLeaseTime;
62 private long theStartSeqNum;
63 private boolean doVisible;
64
65 private int thePingCount;
66
67 /**
68 Indicates if we've done proxy preparation on the RemoteEventListener
69 */
70 private boolean isPrepared;
71
72 /**
73 Indicates when we're no longer generating events either because we
74 couldn't get an appropriate response from the client or because
75 we're being deleted.
76 */
77 private boolean isTainted;
78
79 private RemoteEventListener theListener;
80
81 /*
82 VisibilityImpls against a specific transaction are never saved so
83 we make this transient to help in picking up code which hasn't
84 taken account of this fact.....
85 */
86 private TxnId theTxnId;
87
88 static VisibilityImpl restoreGenerator(VisibilityImplState aState) {
89 return new VisibilityImpl(aState);
90 }
91
92 private VisibilityImpl(VisibilityImplState aState) {
93 theOID = aState.getOID();
94 theTemplates = aState.getTemplates();
95 theHandback = aState.getHandback();
96 theMarshalledListener = aState.getListener();
97 theSourceId = aState.getSourceId();
98 theSeqNum = aState.getSeqNum();
99 theLeaseTime = aState.getLeaseTime();
100 doVisible = aState.getJustVisible();
101 }
102
103 VisibilityImpl(MangledEntry[] aTemplates,
104 MarshalledObject anObject,
105 RemoteEventListener aDest, long aSeqNum,
106 long aLeaseTime, boolean justVisible)
107 throws RemoteException {
108 this(aTemplates, anObject, aDest, aSeqNum,
109 aLeaseTime, null, justVisible);
110 }
111
112 VisibilityImpl(MangledEntry[] aTemplates,
113 MarshalledObject anObject,
114 RemoteEventListener aDest, long aSeqNum,
115 long aLeaseTime, TxnId aTxnId, boolean justVisible)
116 throws RemoteException {
117
118 theTemplates = aTemplates;
119 theHandback = anObject;
120 theSeqNum = aSeqNum;
121 theStartSeqNum = aSeqNum;
122 theTxnId = aTxnId;
123 theLeaseTime = aLeaseTime;
124 theListener = (RemoteEventListener)
125 GeneratorConfig.getPreparer().prepareProxy(aDest);
126 doVisible = justVisible;
127
128 try {
129 theMarshalledListener = new MarshalledObject(aDest);
130 } catch (IOException anIOE) {
131 throw new RemoteException("Failed to marshall listener", anIOE);
132 }
133
134 isPrepared = true;
135 }
136
137 public void assign(OID anOID) {
138 theOID = anOID;
139
140 if (theTxnId == null)
141 theSourceId = theOID.getId();
142 else
143 theSourceId = theTxnId.getId();
144 }
145
146 /**
147 Return the first sequence number that this event generator ever
148 produced.
149 */
150 public long getStartSeqNum() {
151 return theStartSeqNum;
152 }
153
154 public OID getId() {
155 return theOID;
156 }
157
158 public boolean isPersistent() {
159 return (theTxnId == null);
160 }
161
162 public long getSourceId() {
163 return theSourceId;
164 }
165
166 /**
167 Once an Visibility becomes tainted it will generate no more events
168 and schedule cleanup.
169 */
170 public void taint() {
171 synchronized(this) {
172 // Tainting can only be done once
173 //
174 if (isTainted)
175 return;
176
177 isTainted = true;
178 }
179
180 try {
181 Tasks.queue(new CleanTask(getId()));
182 } catch (InterruptedException anIE) {
183 EventQueue.theLogger.log(Level.SEVERE,
184 "Failed to lodge cleanup for: " + getId(), anIE);
185 }
186 }
187
188 /* ********************************************************************
189 Event filtering starts here
190 **********************************************************************/
191
192 /**
193 Determines whether the passed QueueEvent is "visible" to this
194 Visibility. This is determind by checking transaction id's, expiry
195 and that the generator isn't "tainted"
196 */
197 public boolean canSee(QueueEvent anEvent, long aTime) {
198 synchronized(this) {
199 if (isTainted)
200 return false;
201
202 if (aTime > theLeaseTime) {
203 taint();
204 return false;
205 }
206
207 // If it's a visibility event, it's the result of something
208 // that was previously written and committed (and generated events
209 // previously) which has now been marked taken and now is not
210 // taken
211 if ((anEvent.getType() == QueueEvent.ENTRY_VISIBLE) ||
212 // Only post this one if user has said they're interested
213 // in visible operations
214 ((anEvent.getType() == QueueEvent.ENTRY_NOT_CONFLICTED) &&
215 (!doVisible)))
216 return true;
217
218 // If we're associated with a transaction.....
219 if (theTxnId != null) {
220
221 if ((anEvent.getType() == QueueEvent.TRANSACTION_ENDED) &&
222 (theTxnId.equals(anEvent.getTxn().getId()))) {
223 taint();
224 return false;
225 }
226
227 // We see all Written's including those generated by us
228 if (anEvent.getType() == QueueEvent.ENTRY_WRITTEN)
229 return true;
230
231 // If it's not a written we can only see it if we originated it
232 return (anEvent.getTxn().getId().equals(theTxnId));
233 } else {
234 // Only see ENTRY_WRITTENS
235 return (anEvent.getType() == QueueEvent.ENTRY_WRITTEN);
236 }
237 }
238 }
239
240 public boolean matches(MangledEntry anEntry) {
241 for (int i = 0; i < theTemplates.length; i++) {
242 MangledEntry myTemplate = theTemplates[i];
243
244 if (Types.isSubtype(myTemplate.getType(), anEntry.getType())) {
245 if (myTemplate.match(anEntry))
246 return true;
247 }
248 }
249
250 return false;
251 }
252
253 /* ********************************************************************
254 Lease management starts here
255 **********************************************************************/
256
257 public boolean renew(long aTime) {
258 synchronized(this) {
259 if (isTainted)
260 return false;
261
262 if (System.currentTimeMillis() > theLeaseTime)
263 return false;
264
265 theLeaseTime = aTime;
266 return true;
267 }
268 }
269
270 /* ********************************************************************
271 Recovery starts here
272 **********************************************************************/
273
274 public void recover(long aSeqNum) {
275 synchronized(this) {
276 if (aSeqNum > theSeqNum) {
277 theSeqNum = aSeqNum;
278 }
279 }
280 }
281
282 /**
283 Jumps the sequence number by the RESTART_JUMP
284 */
285 public long jumpSequenceNumber() {
286 synchronized(this) {
287 theSeqNum += (GeneratorConfig.getRestartJump() +
288 GeneratorConfig.getSaveInterval());
289
290 return theSeqNum;
291 }
292 }
293
294 /**
295 Jumps the sequence number if it's not already at this minimum.
296 */
297 public long jumpSequenceNumber(long aMin) {
298 synchronized(this) {
299 if (theSeqNum < aMin) {
300 theSeqNum = aMin + GeneratorConfig.getSaveInterval();
301 }
302
303 return theSeqNum;
304 }
305 }
306
307 /* ********************************************************************
308 Event Generation starts here
309 **********************************************************************/
310
311 /**
312 Dispatches a remote event to a client.
313 */
314 public void ping(QueueEvent anEvent, JavaSpace aSource) {
315 SeqNumInterval mySnapshot = null;
316
317 synchronized(this) {
318 if (isTainted)
319 return;
320
321 RemoteEventListener myTarget = null;
322
323 try {
324 RemoteEvent myEvent =
325 newEvent(aSource, anEvent.getContext().getEntry(),
326 (anEvent.getType() != QueueEvent.ENTRY_NOT_CONFLICTED));
327
328 myTarget = getDest();
329
330 myTarget.notify(myEvent);
331
332 mySnapshot = shouldLog();
333
334 } catch (UnknownEventException aUEE) {
335 RemoteEventDispatcher.theLogger.log(Level.SEVERE,
336 "Couldn't send event [Trash]" +
337 myTarget, aUEE);
338 taint();
339 } catch (NoSuchObjectException anNSOE) {
340 RemoteEventDispatcher.theLogger.log(Level.SEVERE,
341 "Couldn't send event [Trash]" +
342 myTarget, anNSOE);
343 taint();
344 } catch (RemoteException anRE) {
345 RemoteEventDispatcher.theLogger.log(Level.SEVERE,
346 "Couldn't send event " +
347 myTarget, anRE);
348 }
349 }
350
351 try {
352 /*
353 See recover() for details on why we can log this outside of the
354 lock
355 */
356 if (mySnapshot != null)
357 TxnManager.get().log(mySnapshot);
358 } catch (TransactionException aTE) {
359 RemoteEventDispatcher.theLogger.log(Level.SEVERE,
360 "Couldn't update EventGenerator", aTE);
361 }
362 }
363
364 private RemoteEvent newEvent(JavaSpace aSource, MangledEntry anEntry,
365 boolean isVisible) {
366 ++thePingCount;
367
368 return new AvailabilityEventImpl(aSource, theSourceId, theSeqNum++,
369 theHandback, anEntry, isVisible);
370 }
371
372 private RemoteEventListener getDest()
373 throws RemoteException {
374
375 // This flag is only set in the constructor or by us.
376 if (!isPrepared) {
377 try {
378 Object myListener = theMarshalledListener.get();
379
380 theListener = (RemoteEventListener)
381 GeneratorConfig.getRecoveryPreparer().prepareProxy(myListener);
382 isPrepared = true;
383 } catch (IOException anIOE) {
384 throw new RemoteException("Failed to unmarshall listener",
385 anIOE);
386 } catch (ClassNotFoundException aCNFE) {
387 throw new RemoteException("Failed to load class for listener",
388 aCNFE);
389 }
390 }
391
392 return theListener;
393 }
394
395 /* ********************************************************************
396 State save/recovery starts here
397 **********************************************************************/
398
399 public EventGeneratorState getMemento() {
400 synchronized(this) {
401 return new VisibilityImplState(theOID, theTemplates,
402 theHandback,
403 theMarshalledListener,
404 theSourceId,
405 theSeqNum, theLeaseTime,
406 theTxnId, doVisible);
407 }
408 }
409
410 /**
411 * Call this with synchronized asserted on the Generator
412 */
413 private SeqNumInterval shouldLog() {
414 // Never log transactional notify registrations
415 //
416 if (theTxnId != null) {
417 thePingCount = 0;
418 return null;
419 }
420
421 if (thePingCount == GeneratorConfig.getSaveInterval()) {
422 thePingCount = 0;
423 return new SeqNumInterval(theOID, theSeqNum);
424 } else
425 return null;
426 }
427 }