comparison src/org/dancres/blitz/notify/EventGeneratorImpl.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:3dc0c5604566
1 package org.dancres.blitz.notify;
2
3 import java.io.IOException;
4
5 import java.rmi.MarshalledObject;
6 import java.rmi.RemoteException;
7 import java.rmi.NoSuchObjectException;
8
9 import java.util.logging.Level;
10
11 import net.jini.core.event.RemoteEventListener;
12 import net.jini.core.event.RemoteEvent;
13 import net.jini.core.event.UnknownEventException;
14
15 import net.jini.core.transaction.TransactionException;
16
17 import net.jini.space.JavaSpace;
18
19 import org.dancres.blitz.txn.TxnId;
20 import org.dancres.blitz.txn.TxnManager;
21
22 import org.dancres.blitz.mangler.MangledEntry;
23
24 import org.dancres.blitz.entry.Types;
25
26 import org.dancres.blitz.oid.OID;
27
28 import org.dancres.blitz.task.Tasks;
29
30 /**
31 <p> Represents a single registration via notify. If it's transactional,
32 it will never be saved as it's considered temporary. If it's transactional
33 it will be saved to disk. In addition, it's state will be saved to the
34 log periodically (on a number of allocations basis) to preserve data
35 in case of a crash. <p>
36
37 <p> This class uses notifyPreparer to do initial preparation of the
38 passed RemoteEventListener when constructed from scratch (i.e. it's a new
39 registration). It uses recoveredNotifyPreparer in those cases where it
40 has been de-serialized from storage and is about to be used for the first
41 time post a restart </p>
42
43 <p> Instances of this object can be "tainted" for various reasons including
44 certain kinds of exception from the remote notify implementation, lease
45 expiry or transaction commit. Tainting prevents any further operations
46 against this instance such as event dispatch, logging changes to disk and
47 lease renewal. Tainting also results in cleanup for the registration being
48 scheduled.</p>
49 */
50 public class EventGeneratorImpl implements EventGenerator {
51 private OID theOID;
52 private MangledEntry theTemplate;
53 private MarshalledObject theHandback;
54 private MarshalledObject theMarshalledListener;
55 private long theSourceId;
56 private long theSeqNum;
57 private long theLeaseTime;
58 private long theStartSeqNum;
59
60 private int thePingCount;
61
62 /**
63 Indicates if we've done proxy preparation on the RemoteEventListener
64 */
65 private boolean isPrepared;
66
67 /**
68 Indicates when we're no longer generating events either because we
69 couldn't get an appropriate response from the client or because
70 we're being deleted.
71 */
72 private boolean isTainted;
73
74 private RemoteEventListener theListener;
75
76 private TxnId theTxnId;
77
78 static EventGeneratorImpl restoreGenerator(EventGeneratorImplState aState) {
79 return new EventGeneratorImpl(aState);
80 }
81
82 private EventGeneratorImpl(EventGeneratorImplState aState) {
83 theOID = aState.getOID();
84 theTemplate = aState.getTemplate();
85 theHandback = aState.getHandback();
86 theMarshalledListener = aState.getListener();
87 theSourceId = aState.getSourceId();
88 theSeqNum = aState.getSeqNum();
89 theLeaseTime = aState.getLeaseTime();
90 }
91
92 EventGeneratorImpl(MangledEntry aTemplate,
93 MarshalledObject anObject,
94 RemoteEventListener aDest, long aSeqNum,
95 long aLeaseTime) throws RemoteException {
96 this(aTemplate, anObject, aDest, aSeqNum, aLeaseTime, null);
97 }
98
99 EventGeneratorImpl(MangledEntry aTemplate,
100 MarshalledObject anObject,
101 RemoteEventListener aDest, long aSeqNum,
102 long aLeaseTime, TxnId aTxnId) throws RemoteException {
103 theTemplate = aTemplate;
104 theHandback = anObject;
105 theSeqNum = aSeqNum;
106 theStartSeqNum = aSeqNum;
107 theTxnId = aTxnId;
108 theLeaseTime = aLeaseTime;
109 theListener = (RemoteEventListener)
110 GeneratorConfig.getPreparer().prepareProxy(aDest);
111
112 try {
113 theMarshalledListener = new MarshalledObject(aDest);
114 } catch (IOException anIOE) {
115 throw new RemoteException("Failed to marshall listener", anIOE);
116 }
117
118 isPrepared = true;
119 }
120
121 public void assign(OID anOID) {
122 theOID = anOID;
123
124 if (theTxnId == null)
125 theSourceId = theOID.getId();
126 else
127 theSourceId = theTxnId.getId();
128 }
129
130 /**
131 Return the first sequence number that this event generator ever
132 produced.
133 */
134 public long getStartSeqNum() {
135 return theStartSeqNum;
136 }
137
138 public OID getId() {
139 return theOID;
140 }
141
142 public boolean isPersistent() {
143 return (theTxnId == null);
144 }
145
146 public long getSourceId() {
147 return theSourceId;
148 }
149
150 /**
151 Once an EventGenerator becomes tainted it will generate no more events
152 and schedule cleanup.
153
154 @todo This should maybe straight call
155 EventGeneratorFactory.get().killTemplate(theId); outside the sync
156 block - no need to use another thread
157 */
158 public void taint() {
159 synchronized(this) {
160 // Tainting can only be done once
161 //
162 if (isTainted)
163 return;
164
165 isTainted = true;
166 }
167
168 try {
169 Tasks.queue(new CleanTask(getId()));
170 } catch (InterruptedException anIE) {
171 EventQueue.theLogger.log(Level.WARNING,
172 "Failed to lodge cleanup for: " + getId(), anIE);
173 }
174 }
175
176 /* ********************************************************************
177 Event filtering starts here
178 **********************************************************************/
179
180 /**
181 Determines whether the passed QueueEvent is "visible" to this
182 EventGenerator. This is determind by checking transaction id's, expiry
183 and that the generator isn't "tainted"
184 */
185 public boolean canSee(QueueEvent anEvent, long aTime) {
186 if (anEvent.getType() == QueueEvent.ENTRY_VISIBLE)
187 return false;
188
189 synchronized(this) {
190 if (isTainted)
191 return false;
192
193 if (aTime > theLeaseTime) {
194 taint();
195 return false;
196 }
197
198 // If we're associated with a transaction.....
199 if (theTxnId != null) {
200
201 if ((anEvent.getType() == QueueEvent.TRANSACTION_ENDED) &&
202 (theTxnId.equals(anEvent.getTxn().getId()))) {
203 taint();
204 return false;
205 }
206
207 // We see all Written's including those generated by us
208 if (anEvent.getType() == QueueEvent.ENTRY_WRITTEN)
209 return true;
210
211 // If it's not a written we can only see it if we originated it
212 return (anEvent.getTxn().getId().equals(theTxnId));
213 } else {
214 // Only see ENTRY_WRITTENS
215 return (anEvent.getType() == QueueEvent.ENTRY_WRITTEN);
216 }
217 }
218 }
219
220 public boolean matches(MangledEntry anEntry) {
221 return Types.isSubtype(theTemplate.getType(), anEntry.getType()) &&
222 theTemplate.match(anEntry);
223 }
224
225 /* ********************************************************************
226 Lease management starts here
227 **********************************************************************/
228
229 public boolean renew(long aTime) {
230 synchronized(this) {
231 if (isTainted)
232 return false;
233
234 if (System.currentTimeMillis() > theLeaseTime)
235 return false;
236
237 theLeaseTime = aTime;
238 return true;
239 }
240 }
241
242 /* ********************************************************************
243 Recovery starts here
244 **********************************************************************/
245
246 /**
247 * Sequence number intervals can come out of order such that a greater
248 * interval may be applied before a lesser one. With that being the case
249 * we test the size of the sequence number and assign if it's greater
250 * rather than just a straight assignment
251 */
252 public void recover(long aSeqNum) {
253 synchronized(this) {
254 if (aSeqNum > theSeqNum) {
255 theSeqNum = aSeqNum;
256 }
257 }
258 }
259
260 /**
261 Jumps the sequence number by the RESTART_JUMP
262 */
263 public long jumpSequenceNumber() {
264 synchronized(this) {
265 theSeqNum += (GeneratorConfig.getRestartJump() +
266 GeneratorConfig.getSaveInterval());
267
268 return theSeqNum;
269 }
270 }
271
272 /**
273 Jumps the sequence number if it's not already at this minimum.
274 */
275 public long jumpSequenceNumber(long aMin) {
276 synchronized(this) {
277 if (theSeqNum < aMin) {
278 theSeqNum = aMin + GeneratorConfig.getSaveInterval();
279 }
280
281 return theSeqNum;
282 }
283 }
284
285 /* ********************************************************************
286 Event Generation starts here
287 **********************************************************************/
288
289 /**
290 Dispatches a remote event to a client.
291 */
292 public void ping(QueueEvent anEvent, JavaSpace aSource) {
293 SeqNumInterval mySnapshot = null;
294
295 synchronized(this) {
296 if (isTainted)
297 return;
298
299 RemoteEventListener myTarget = null;
300
301 try {
302
303 RemoteEvent myEvent = newEvent(aSource);
304 myTarget = getDest();
305
306 myTarget.notify(myEvent);
307
308 mySnapshot = shouldLog();
309
310 } catch (UnknownEventException aUEE) {
311 RemoteEventDispatcher.theLogger.log(Level.SEVERE,
312 "Couldn't send event [Trash]" +
313 myTarget, aUEE);
314 taint();
315 } catch (NoSuchObjectException anNSOE) {
316 RemoteEventDispatcher.theLogger.log(Level.SEVERE,
317 "Couldn't send event [Trash]" +
318 myTarget, anNSOE);
319 taint();
320 } catch (RemoteException anRE) {
321 RemoteEventDispatcher.theLogger.log(Level.SEVERE,
322 "Couldn't send event " +
323 myTarget, anRE);
324 }
325 }
326
327 try {
328 /*
329 See recover() for details on why we can log this outside of the
330 lock
331 */
332 if (mySnapshot != null)
333 TxnManager.get().log(mySnapshot);
334 } catch (TransactionException aTE) {
335 RemoteEventDispatcher.theLogger.log(Level.SEVERE,
336 "Couldn't update EventGenerator", aTE);
337 }
338 }
339
340 private RemoteEvent newEvent(Object aSource) {
341 ++thePingCount;
342
343 return new RemoteEvent(aSource, theSourceId, theSeqNum++,
344 theHandback);
345 }
346
347 private RemoteEventListener getDest()
348 throws RemoteException {
349
350 // This flag is only set in the constructor or by us.
351 if (!isPrepared) {
352 try {
353 Object myListener = theMarshalledListener.get();
354
355 theListener = (RemoteEventListener)
356 GeneratorConfig.getRecoveryPreparer().prepareProxy(myListener);
357 isPrepared = true;
358 } catch (IOException anIOE) {
359 throw new RemoteException("Failed to unmarshall listener",
360 anIOE);
361 } catch (ClassNotFoundException aCNFE) {
362 throw new RemoteException("Failed to load class for listener",
363 aCNFE);
364 }
365 }
366
367 return theListener;
368 }
369
370 /* ********************************************************************
371 State save/recovery starts here
372 **********************************************************************/
373
374 public EventGeneratorState getMemento() {
375 synchronized(this) {
376 return new EventGeneratorImplState(theOID, theTemplate,
377 theHandback,
378 theMarshalledListener,
379 theSourceId,
380 theSeqNum, theLeaseTime,
381 theTxnId);
382 }
383 }
384
385 /**
386 Call this with synchronized asserted on the Generator
387 */
388 private SeqNumInterval shouldLog() {
389 // Never log transactional notify registrations
390 //
391 if (theTxnId != null) {
392 thePingCount = 0;
393 return null;
394 }
395
396 if (thePingCount == GeneratorConfig.getSaveInterval()) {
397 thePingCount = 0;
398 return new SeqNumInterval(theOID, theSeqNum);
399 } else
400 return null;
401 }
402 }