comparison src/org/dancres/blitz/SearchVisitorImpl.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:3dc0c5604566
1 package org.dancres.blitz;
2
3 import java.io.IOException;
4 import java.util.logging.Level;
5 import java.util.logging.Logger;
6
7 import net.jini.core.transaction.TransactionException;
8 import net.jini.space.JavaSpace;
9
10 import org.dancres.blitz.entry.*;
11 import org.dancres.blitz.mangler.MangledEntry;
12 import org.dancres.blitz.notify.EventGenerator;
13 import org.dancres.blitz.notify.EventGeneratorState;
14 import org.dancres.blitz.notify.EventQueue;
15 import org.dancres.blitz.notify.QueueEvent;
16 import org.dancres.blitz.oid.OID;
17 import org.dancres.blitz.txn.TxnState;
18 import org.dancres.blitz.txnlock.LockMgr;
19 import org.dancres.blitz.txnlock.TxnLock;
20 import org.dancres.blitz.txnlock.TxnLocks;
21 import org.dancres.blitz.txnlock.BaulkedParty;
22
23 /**
24 All search results obtained by the lower layers are offered to a
25 SearchVisitor instance which can then determine whether the offered Entry
26 is suitable. This includes "deep matching" which requires that we fully
27 compare the fields of template to entry. The lower-layers do not perform
28 this task - they return the entry's that are a probable match. In
29 addition, instances of search visitor check transaction locks etc. which
30 the lower layers know nothing about.
31 */
32 class SearchVisitorImpl implements SingleMatchTask,
33 SearchVisitor {
34
35 private static final Logger theLogger =
36 Logging.newLogger("org.dancres.blitz.entry.SearchVisitorImpl");
37
38 private MangledEntry theTemplate;
39 private TxnState theTxnState;
40 private boolean isTaking;
41 private int theLockOp;
42
43 private EventGeneratorImpl theSearchTask;
44 private SearchVisitor theAdapter = new SearchVisitorAdapter();
45 private BaulkedParty theParty;
46
47 private CompletionEvent theCompletion;
48
49 private boolean needsWakeup = false;
50
51 /**
52 @param isTake indicates the kind of txn lock we would need to assert
53 */
54 SearchVisitorImpl(MangledEntry aTemplate, boolean isTake,
55 TxnState aState, VisitorBaulkedPartyFactory aFactory)
56 throws IOException {
57
58 theTemplate = aTemplate;
59 isTaking = isTake;
60 theTxnState = aState;
61 theLockOp = (isTaking == true) ? TxnLock.DELETE : TxnLock.READ;
62 theSearchTask = new EventGeneratorImpl(aTemplate);
63 theParty = aFactory.newParty(this);
64 SearchTasks.get().add(this);
65 EventQueue.get().insert(theSearchTask);
66 }
67
68 public EventGenerator getSearchTask() {
69 return theSearchTask;
70 }
71
72 public SearchVisitor getVisitor() {
73 return theAdapter;
74 }
75
76 /***********************************************************************
77 * SearchVisitor
78 ***********************************************************************/
79
80 /**
81 * This is an unfiltered offer method which only checks to see if it can
82 * perform an acquire. Actual matching is done in the Adapter class
83 * or in the satellite EventGenerator implementation. i.e. By the time
84 * we get to this method we need to ensure that matching has been done.
85 *
86 * @todo Consider re-arranging this method as described in the comment
87 * below.
88 */
89 public int offer(SearchOffer anOffer) {
90 OpInfo myInfo = anOffer.getInfo();
91
92 MangledEntry myEntry = anOffer.getEntry();
93
94 LockMgr myMgr = TxnLocks.getLockMgr(myInfo.getType());
95 TxnLock myLock = myMgr.getLock(myInfo.getOID());
96
97 synchronized (this) {
98
99 int myResult;
100
101 // Do we need to try and secure this?
102 if (haveFinished())
103 return STOP;
104
105 VisitorBaulkedPartyFactory.Handback myHandback =
106 new VisitorBaulkedPartyFactory.Handback(myInfo.getType(),
107 myInfo.getOID(), myEntry);
108
109 synchronized (myLock) {
110 myResult = myLock.acquire(theTxnState, theLockOp,
111 theParty, myHandback, false);
112 }
113
114 if (myResult == TxnLock.SUCCESS) {
115
116 if (theLogger.isLoggable(Level.FINE))
117 theLogger.log(Level.FINE, theTxnState.getId() +
118 " Acq: " +
119 myInfo + ", " + myLock);
120
121 try {
122 theTxnState.add(new EntryTxnOp(theLockOp, myInfo,
123 myLock));
124 } catch (TransactionException aTE) {
125
126 /*
127 If we catch a transactionexception here we're dead,
128 we will exit with invalid state. If we have the
129 lock above we know we have the Entry and no-one
130 can remove it from under us.
131
132 Once we've locked the entry we're done
133 matching because either we will return it
134 successfully after tracking our action in the
135 transaction or we will blow up on the transaction
136 and release the entry lock and return an exception.
137 Either way we can stop all searching as soon as
138 we have the Entry locked.
139
140 Thus we can set theEntry above once we see success
141 and then exit the sync block rather than holding
142 it whilst we update the transaction. Before
143 exiting the sync block we need to check for
144 conflict as we do below and set the didconflict
145 flag.
146
147 Currently we stop searches and wakeup blocked
148 threads in getEntry via setStatus thus setting
149 theEntry is currently insufficient. However
150 it would be safe to modify haveCompleted to also
151 include a test for theEntry being set. Note we
152 must NEVER use haveCompleted in getEntry()
153 as a consequence!
154 */
155 myLock.release(theTxnState, theLockOp);
156 return sendEvent(new CompletionEvent(aTE));
157 }
158
159 if (theLogger.isLoggable(Level.FINE))
160 theLogger.log(Level.FINE, "Succeeded");
161
162 return sendEvent(new CompletionEvent(myEntry));
163 }
164 }
165
166 return TRY_AGAIN;
167 }
168
169 public int sendEvent(CompletionEvent anEvent) {
170 synchronized (this) {
171 if (haveFinished())
172 return STOP;
173
174 theCompletion = anEvent;
175
176 theSearchTask.taint();
177 SearchTasks.get().remove(this, wasNotSatisfied());
178
179 if (needsWakeup)
180 notify();
181
182 return STOP;
183 }
184 }
185
186 /**
187 * NEVER, EVER, EVER invoke <code>haveCompleted</code> in this method.
188 * <code>haveCompleted</code> is an internal function designed to allow
189 * methods to figure out when parallel searching and other operations can
190 * safely abort early.
191 *
192 * This method must _never_ abort early, it must wait until status is set
193 * via <code>setStatus</code> - it cannot peek at potentially incomplete
194 * status which <code>haveCompleted</code> does.
195 */
196 public synchronized MangledEntry getEntry(long aTimeout)
197 throws TransactionException,
198 InterruptedException {
199
200 // We only wait the once because we'll only ever wake from this
201 // if there's a result or we timeout
202 if (wouldBlock() && (aTimeout != 0)) {
203 needsWakeup = true;
204 wait(aTimeout);
205 needsWakeup = false;
206 }
207
208 // We're returning - ensure we don't allow any more operations to
209 // avoid doing a take we'll never return....
210 sendEvent(CompletionEvent.COMPLETED);
211
212 if (theCompletion.getException() != null)
213 throw theCompletion.getException();
214
215 return theCompletion.getEntry();
216 }
217
218 public synchronized boolean wouldBlock() {
219 return (theCompletion == null);
220 }
221
222 /**
223 * NEVER, EVER, EVER invoke <code>haveCompleted</code> unless you are
224 * attempting to determine that internal parallel functions
225 * such as parallel searching and other operations can
226 * safely abort early.
227 *
228 * When this method returns <code>true</code> it is stating that some
229 * thread has determined a final state for this visitor and is in the
230 * process of resolving it.
231 */
232 private synchronized boolean haveFinished() {
233 return (theCompletion != null);
234 }
235
236 private boolean wasNotSatisfied() {
237 return (theCompletion.getEntry() == null);
238 }
239
240 public boolean isDeleter() {
241 return isTaking;
242 }
243
244 private void resolved() {
245 sendEvent(new CompletionEvent(new TransactionException(
246 "Transaction completed with operations still outstanding: " +
247 (isTaking ? "take" : "read"))));
248 }
249
250 private class EventGeneratorImpl implements EventGenerator {
251 private boolean isTainted = false;
252 private MangledEntry theTemplate;
253 private OID theOID;
254
255 EventGeneratorImpl(MangledEntry aTemplate) {
256 theTemplate = aTemplate;
257 }
258
259 public void assign(OID anOID) {
260 theOID = anOID;
261 }
262
263 public long getStartSeqNum() {
264 return 0;
265 }
266
267 public OID getId() {
268 return theOID;
269 }
270
271 public boolean isPersistent() {
272 return false;
273 }
274
275 public long getSourceId() {
276 return 0;
277 }
278
279 public void taint() {
280 synchronized (this) {
281 // Tainting can only be done once
282 //
283 if (isTainted)
284 return;
285
286 isTainted = true;
287 }
288
289 try {
290 EventQueue.get().kill(getId());
291 } catch (IOException anIOE) {
292 theLogger.log(Level.SEVERE,
293 "Encountered IOException during kill", anIOE);
294 }
295
296 /*
297 try {
298 Tasks.queue(new CleanTask(getId()));
299 } catch (InterruptedException anIE) {
300 theLogger.log(Level.WARNING,
301 "Failed to lodge cleanup for: " + getId(), anIE);
302 }
303 */
304 }
305
306 private boolean isTainted() {
307 synchronized(this) {
308 return (isTainted);
309 }
310 }
311
312 public boolean canSee(QueueEvent anEvent, long aTime) {
313 if (isTainted())
314 return false;
315
316 // Check if it's txn_ended and my txn and call resolved if it is
317 if ((anEvent.getType() == QueueEvent.TRANSACTION_ENDED) &&
318 (theTxnState.getId().equals(anEvent.getTxn().getId()))) {
319 resolved();
320 return false;
321 }
322
323 // We want to see new writes from a transaction
324 //
325 return (anEvent.getType() == QueueEvent.ENTRY_WRITE);
326 }
327
328 public boolean matches(MangledEntry anEntry) {
329 if (isTainted())
330 return false;
331
332 return Types.isSubtype(theTemplate.getType(), anEntry.getType()) &&
333 theTemplate.match(anEntry);
334 }
335
336 public boolean renew(long aTime) {
337 // Nothing to do as we expire by being tainted by the enclosing
338 // class only
339 //
340 return true;
341 }
342
343 public void recover(long aSeqNum) {
344 // Nothing to do
345 }
346
347 public long jumpSequenceNumber() {
348 return 0;
349 }
350
351 public long jumpSequenceNumber(long aMin) {
352 return 0;
353 }
354
355 public void ping(QueueEvent anEvent, JavaSpace aSource) {
356 if (isTainted())
357 return;
358
359 LongtermOffer myOffer = null;
360
361 QueueEvent.Context myContext = anEvent.getContext();
362 MangledEntry myEntry = myContext.getEntry();
363 OID myOID = myContext.getOID();
364
365 try {
366 EntryRepository myRepos =
367 EntryRepositoryFactory.get().find(myEntry.getType());
368
369 myOffer = myRepos.getOffer(myOID);
370
371 if (myOffer == null)
372 return;
373
374 myOffer.offer(SearchVisitorImpl.this);
375
376 } catch (IOException anIOE) {
377 // Nothing can be done
378 theLogger.log(Level.SEVERE,
379 "Encountered IOException during write offer", anIOE);
380 } finally {
381 if (myOffer != null) {
382 try {
383 myOffer.release();
384 } catch (IOException anIOE) {
385 theLogger.log(Level.SEVERE,
386 "Encountered IOException during write offer(release)",
387 anIOE);
388 }
389 }
390 }
391 }
392
393 public EventGeneratorState getMemento() {
394 throw new RuntimeException(
395 "Shouldn't be happening - we're transient");
396 }
397 }
398
399 private class SearchVisitorAdapter implements SearchVisitor {
400
401 public boolean isDeleter() {
402 return SearchVisitorImpl.this.isDeleter();
403 }
404
405 public int offer(SearchOffer anOffer) {
406 if (theLogger.isLoggable(Level.FINE))
407 theLogger.log(Level.FINE, "Offer");
408
409 if (haveFinished()) {
410 if (theLogger.isLoggable(Level.FINE))
411 theLogger.log(Level.FINE, theTxnState.getId() +
412 " Have completed");
413 return STOP;
414 }
415
416 OpInfo myInfo = anOffer.getInfo();
417
418 if (!Types.isSubtype(theTemplate.getType(), myInfo.getType())) {
419 if (theLogger.isLoggable(Level.FINE))
420 theLogger.log(Level.FINE, "Not subtype");
421
422 return TRY_AGAIN;
423 }
424
425 MangledEntry myEntry = anOffer.getEntry();
426
427 if (theTemplate.match(myEntry)) {
428 return SearchVisitorImpl.this.offer(anOffer);
429 } else
430 return TRY_AGAIN;
431 }
432 }
433 }