Mercurial > hg > blitz_stable
comparison src/org/dancres/blitz/SearchVisitorImpl.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:3dc0c5604566 |
---|---|
1 package org.dancres.blitz; | |
2 | |
3 import java.io.IOException; | |
4 import java.util.logging.Level; | |
5 import java.util.logging.Logger; | |
6 | |
7 import net.jini.core.transaction.TransactionException; | |
8 import net.jini.space.JavaSpace; | |
9 | |
10 import org.dancres.blitz.entry.*; | |
11 import org.dancres.blitz.mangler.MangledEntry; | |
12 import org.dancres.blitz.notify.EventGenerator; | |
13 import org.dancres.blitz.notify.EventGeneratorState; | |
14 import org.dancres.blitz.notify.EventQueue; | |
15 import org.dancres.blitz.notify.QueueEvent; | |
16 import org.dancres.blitz.oid.OID; | |
17 import org.dancres.blitz.txn.TxnState; | |
18 import org.dancres.blitz.txnlock.LockMgr; | |
19 import org.dancres.blitz.txnlock.TxnLock; | |
20 import org.dancres.blitz.txnlock.TxnLocks; | |
21 import org.dancres.blitz.txnlock.BaulkedParty; | |
22 | |
23 /** | |
24 All search results obtained by the lower layers are offered to a | |
25 SearchVisitor instance which can then determine whether the offered Entry | |
26 is suitable. This includes "deep matching" which requires that we fully | |
27 compare the fields of template to entry. The lower-layers do not perform | |
28 this task - they return the entry's that are a probable match. In | |
29 addition, instances of search visitor check transaction locks etc. which | |
30 the lower layers know nothing about. | |
31 */ | |
32 class SearchVisitorImpl implements SingleMatchTask, | |
33 SearchVisitor { | |
34 | |
35 private static final Logger theLogger = | |
36 Logging.newLogger("org.dancres.blitz.entry.SearchVisitorImpl"); | |
37 | |
38 private MangledEntry theTemplate; | |
39 private TxnState theTxnState; | |
40 private boolean isTaking; | |
41 private int theLockOp; | |
42 | |
43 private EventGeneratorImpl theSearchTask; | |
44 private SearchVisitor theAdapter = new SearchVisitorAdapter(); | |
45 private BaulkedParty theParty; | |
46 | |
47 private CompletionEvent theCompletion; | |
48 | |
49 private boolean needsWakeup = false; | |
50 | |
51 /** | |
52 @param isTake indicates the kind of txn lock we would need to assert | |
53 */ | |
54 SearchVisitorImpl(MangledEntry aTemplate, boolean isTake, | |
55 TxnState aState, VisitorBaulkedPartyFactory aFactory) | |
56 throws IOException { | |
57 | |
58 theTemplate = aTemplate; | |
59 isTaking = isTake; | |
60 theTxnState = aState; | |
61 theLockOp = (isTaking == true) ? TxnLock.DELETE : TxnLock.READ; | |
62 theSearchTask = new EventGeneratorImpl(aTemplate); | |
63 theParty = aFactory.newParty(this); | |
64 SearchTasks.get().add(this); | |
65 EventQueue.get().insert(theSearchTask); | |
66 } | |
67 | |
68 public EventGenerator getSearchTask() { | |
69 return theSearchTask; | |
70 } | |
71 | |
72 public SearchVisitor getVisitor() { | |
73 return theAdapter; | |
74 } | |
75 | |
76 /*********************************************************************** | |
77 * SearchVisitor | |
78 ***********************************************************************/ | |
79 | |
80 /** | |
81 * This is an unfiltered offer method which only checks to see if it can | |
82 * perform an acquire. Actual matching is done in the Adapter class | |
83 * or in the satellite EventGenerator implementation. i.e. By the time | |
84 * we get to this method we need to ensure that matching has been done. | |
85 * | |
86 * @todo Consider re-arranging this method as described in the comment | |
87 * below. | |
88 */ | |
89 public int offer(SearchOffer anOffer) { | |
90 OpInfo myInfo = anOffer.getInfo(); | |
91 | |
92 MangledEntry myEntry = anOffer.getEntry(); | |
93 | |
94 LockMgr myMgr = TxnLocks.getLockMgr(myInfo.getType()); | |
95 TxnLock myLock = myMgr.getLock(myInfo.getOID()); | |
96 | |
97 synchronized (this) { | |
98 | |
99 int myResult; | |
100 | |
101 // Do we need to try and secure this? | |
102 if (haveFinished()) | |
103 return STOP; | |
104 | |
105 VisitorBaulkedPartyFactory.Handback myHandback = | |
106 new VisitorBaulkedPartyFactory.Handback(myInfo.getType(), | |
107 myInfo.getOID(), myEntry); | |
108 | |
109 synchronized (myLock) { | |
110 myResult = myLock.acquire(theTxnState, theLockOp, | |
111 theParty, myHandback, false); | |
112 } | |
113 | |
114 if (myResult == TxnLock.SUCCESS) { | |
115 | |
116 if (theLogger.isLoggable(Level.FINE)) | |
117 theLogger.log(Level.FINE, theTxnState.getId() + | |
118 " Acq: " + | |
119 myInfo + ", " + myLock); | |
120 | |
121 try { | |
122 theTxnState.add(new EntryTxnOp(theLockOp, myInfo, | |
123 myLock)); | |
124 } catch (TransactionException aTE) { | |
125 | |
126 /* | |
127 If we catch a transactionexception here we're dead, | |
128 we will exit with invalid state. If we have the | |
129 lock above we know we have the Entry and no-one | |
130 can remove it from under us. | |
131 | |
132 Once we've locked the entry we're done | |
133 matching because either we will return it | |
134 successfully after tracking our action in the | |
135 transaction or we will blow up on the transaction | |
136 and release the entry lock and return an exception. | |
137 Either way we can stop all searching as soon as | |
138 we have the Entry locked. | |
139 | |
140 Thus we can set theEntry above once we see success | |
141 and then exit the sync block rather than holding | |
142 it whilst we update the transaction. Before | |
143 exiting the sync block we need to check for | |
144 conflict as we do below and set the didconflict | |
145 flag. | |
146 | |
147 Currently we stop searches and wakeup blocked | |
148 threads in getEntry via setStatus thus setting | |
149 theEntry is currently insufficient. However | |
150 it would be safe to modify haveCompleted to also | |
151 include a test for theEntry being set. Note we | |
152 must NEVER use haveCompleted in getEntry() | |
153 as a consequence! | |
154 */ | |
155 myLock.release(theTxnState, theLockOp); | |
156 return sendEvent(new CompletionEvent(aTE)); | |
157 } | |
158 | |
159 if (theLogger.isLoggable(Level.FINE)) | |
160 theLogger.log(Level.FINE, "Succeeded"); | |
161 | |
162 return sendEvent(new CompletionEvent(myEntry)); | |
163 } | |
164 } | |
165 | |
166 return TRY_AGAIN; | |
167 } | |
168 | |
169 public int sendEvent(CompletionEvent anEvent) { | |
170 synchronized (this) { | |
171 if (haveFinished()) | |
172 return STOP; | |
173 | |
174 theCompletion = anEvent; | |
175 | |
176 theSearchTask.taint(); | |
177 SearchTasks.get().remove(this, wasNotSatisfied()); | |
178 | |
179 if (needsWakeup) | |
180 notify(); | |
181 | |
182 return STOP; | |
183 } | |
184 } | |
185 | |
186 /** | |
187 * NEVER, EVER, EVER invoke <code>haveCompleted</code> in this method. | |
188 * <code>haveCompleted</code> is an internal function designed to allow | |
189 * methods to figure out when parallel searching and other operations can | |
190 * safely abort early. | |
191 * | |
192 * This method must _never_ abort early, it must wait until status is set | |
193 * via <code>setStatus</code> - it cannot peek at potentially incomplete | |
194 * status which <code>haveCompleted</code> does. | |
195 */ | |
196 public synchronized MangledEntry getEntry(long aTimeout) | |
197 throws TransactionException, | |
198 InterruptedException { | |
199 | |
200 // We only wait the once because we'll only ever wake from this | |
201 // if there's a result or we timeout | |
202 if (wouldBlock() && (aTimeout != 0)) { | |
203 needsWakeup = true; | |
204 wait(aTimeout); | |
205 needsWakeup = false; | |
206 } | |
207 | |
208 // We're returning - ensure we don't allow any more operations to | |
209 // avoid doing a take we'll never return.... | |
210 sendEvent(CompletionEvent.COMPLETED); | |
211 | |
212 if (theCompletion.getException() != null) | |
213 throw theCompletion.getException(); | |
214 | |
215 return theCompletion.getEntry(); | |
216 } | |
217 | |
218 public synchronized boolean wouldBlock() { | |
219 return (theCompletion == null); | |
220 } | |
221 | |
222 /** | |
223 * NEVER, EVER, EVER invoke <code>haveCompleted</code> unless you are | |
224 * attempting to determine that internal parallel functions | |
225 * such as parallel searching and other operations can | |
226 * safely abort early. | |
227 * | |
228 * When this method returns <code>true</code> it is stating that some | |
229 * thread has determined a final state for this visitor and is in the | |
230 * process of resolving it. | |
231 */ | |
232 private synchronized boolean haveFinished() { | |
233 return (theCompletion != null); | |
234 } | |
235 | |
236 private boolean wasNotSatisfied() { | |
237 return (theCompletion.getEntry() == null); | |
238 } | |
239 | |
240 public boolean isDeleter() { | |
241 return isTaking; | |
242 } | |
243 | |
244 private void resolved() { | |
245 sendEvent(new CompletionEvent(new TransactionException( | |
246 "Transaction completed with operations still outstanding: " + | |
247 (isTaking ? "take" : "read")))); | |
248 } | |
249 | |
250 private class EventGeneratorImpl implements EventGenerator { | |
251 private boolean isTainted = false; | |
252 private MangledEntry theTemplate; | |
253 private OID theOID; | |
254 | |
255 EventGeneratorImpl(MangledEntry aTemplate) { | |
256 theTemplate = aTemplate; | |
257 } | |
258 | |
259 public void assign(OID anOID) { | |
260 theOID = anOID; | |
261 } | |
262 | |
263 public long getStartSeqNum() { | |
264 return 0; | |
265 } | |
266 | |
267 public OID getId() { | |
268 return theOID; | |
269 } | |
270 | |
271 public boolean isPersistent() { | |
272 return false; | |
273 } | |
274 | |
275 public long getSourceId() { | |
276 return 0; | |
277 } | |
278 | |
279 public void taint() { | |
280 synchronized (this) { | |
281 // Tainting can only be done once | |
282 // | |
283 if (isTainted) | |
284 return; | |
285 | |
286 isTainted = true; | |
287 } | |
288 | |
289 try { | |
290 EventQueue.get().kill(getId()); | |
291 } catch (IOException anIOE) { | |
292 theLogger.log(Level.SEVERE, | |
293 "Encountered IOException during kill", anIOE); | |
294 } | |
295 | |
296 /* | |
297 try { | |
298 Tasks.queue(new CleanTask(getId())); | |
299 } catch (InterruptedException anIE) { | |
300 theLogger.log(Level.WARNING, | |
301 "Failed to lodge cleanup for: " + getId(), anIE); | |
302 } | |
303 */ | |
304 } | |
305 | |
306 private boolean isTainted() { | |
307 synchronized(this) { | |
308 return (isTainted); | |
309 } | |
310 } | |
311 | |
312 public boolean canSee(QueueEvent anEvent, long aTime) { | |
313 if (isTainted()) | |
314 return false; | |
315 | |
316 // Check if it's txn_ended and my txn and call resolved if it is | |
317 if ((anEvent.getType() == QueueEvent.TRANSACTION_ENDED) && | |
318 (theTxnState.getId().equals(anEvent.getTxn().getId()))) { | |
319 resolved(); | |
320 return false; | |
321 } | |
322 | |
323 // We want to see new writes from a transaction | |
324 // | |
325 return (anEvent.getType() == QueueEvent.ENTRY_WRITE); | |
326 } | |
327 | |
328 public boolean matches(MangledEntry anEntry) { | |
329 if (isTainted()) | |
330 return false; | |
331 | |
332 return Types.isSubtype(theTemplate.getType(), anEntry.getType()) && | |
333 theTemplate.match(anEntry); | |
334 } | |
335 | |
336 public boolean renew(long aTime) { | |
337 // Nothing to do as we expire by being tainted by the enclosing | |
338 // class only | |
339 // | |
340 return true; | |
341 } | |
342 | |
343 public void recover(long aSeqNum) { | |
344 // Nothing to do | |
345 } | |
346 | |
347 public long jumpSequenceNumber() { | |
348 return 0; | |
349 } | |
350 | |
351 public long jumpSequenceNumber(long aMin) { | |
352 return 0; | |
353 } | |
354 | |
355 public void ping(QueueEvent anEvent, JavaSpace aSource) { | |
356 if (isTainted()) | |
357 return; | |
358 | |
359 LongtermOffer myOffer = null; | |
360 | |
361 QueueEvent.Context myContext = anEvent.getContext(); | |
362 MangledEntry myEntry = myContext.getEntry(); | |
363 OID myOID = myContext.getOID(); | |
364 | |
365 try { | |
366 EntryRepository myRepos = | |
367 EntryRepositoryFactory.get().find(myEntry.getType()); | |
368 | |
369 myOffer = myRepos.getOffer(myOID); | |
370 | |
371 if (myOffer == null) | |
372 return; | |
373 | |
374 myOffer.offer(SearchVisitorImpl.this); | |
375 | |
376 } catch (IOException anIOE) { | |
377 // Nothing can be done | |
378 theLogger.log(Level.SEVERE, | |
379 "Encountered IOException during write offer", anIOE); | |
380 } finally { | |
381 if (myOffer != null) { | |
382 try { | |
383 myOffer.release(); | |
384 } catch (IOException anIOE) { | |
385 theLogger.log(Level.SEVERE, | |
386 "Encountered IOException during write offer(release)", | |
387 anIOE); | |
388 } | |
389 } | |
390 } | |
391 } | |
392 | |
393 public EventGeneratorState getMemento() { | |
394 throw new RuntimeException( | |
395 "Shouldn't be happening - we're transient"); | |
396 } | |
397 } | |
398 | |
399 private class SearchVisitorAdapter implements SearchVisitor { | |
400 | |
401 public boolean isDeleter() { | |
402 return SearchVisitorImpl.this.isDeleter(); | |
403 } | |
404 | |
405 public int offer(SearchOffer anOffer) { | |
406 if (theLogger.isLoggable(Level.FINE)) | |
407 theLogger.log(Level.FINE, "Offer"); | |
408 | |
409 if (haveFinished()) { | |
410 if (theLogger.isLoggable(Level.FINE)) | |
411 theLogger.log(Level.FINE, theTxnState.getId() + | |
412 " Have completed"); | |
413 return STOP; | |
414 } | |
415 | |
416 OpInfo myInfo = anOffer.getInfo(); | |
417 | |
418 if (!Types.isSubtype(theTemplate.getType(), myInfo.getType())) { | |
419 if (theLogger.isLoggable(Level.FINE)) | |
420 theLogger.log(Level.FINE, "Not subtype"); | |
421 | |
422 return TRY_AGAIN; | |
423 } | |
424 | |
425 MangledEntry myEntry = anOffer.getEntry(); | |
426 | |
427 if (theTemplate.match(myEntry)) { | |
428 return SearchVisitorImpl.this.offer(anOffer); | |
429 } else | |
430 return TRY_AGAIN; | |
431 } | |
432 } | |
433 } |