Mercurial > hg > blitz_stable
comparison src/org/dancres/blitz/FifoSearchVisitorImpl.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:3dc0c5604566 |
---|---|
1 package org.dancres.blitz; | |
2 | |
3 import java.io.IOException; | |
4 | |
5 import java.util.LinkedList; | |
6 | |
7 import java.util.logging.*; | |
8 | |
9 import net.jini.core.transaction.TransactionException; | |
10 import net.jini.space.JavaSpace; | |
11 | |
12 import org.dancres.blitz.mangler.MangledEntry; | |
13 | |
14 import org.dancres.blitz.entry.*; | |
15 | |
16 import org.dancres.blitz.txn.TxnState; | |
17 | |
18 import org.dancres.blitz.txnlock.*; | |
19 | |
20 import org.dancres.blitz.oid.OID; | |
21 import org.dancres.blitz.notify.*; | |
22 | |
23 | |
24 /** | |
25 <p>All search results obtained by the lower layers are offered to a | |
26 SearchVisitor instance which can then determine whether the offered Entry | |
27 is suitable. This includes "deep matching" which requires that we fully | |
28 compare the fields of template to entry. The lower-layers do not perform | |
29 this task - they return the entry's that are a probable match. In | |
30 addition, instances of search visitor check transaction locks etc. which | |
31 the lower layers know nothing about.</p> | |
32 | |
33 <p>FifoSearchVisitorImpl enforces some ordering requirements that aren't | |
34 present in SearchVisitorImpl relating to fairness which is sometimes more | |
35 important than speed.</p> | |
36 */ | |
37 class FifoSearchVisitorImpl implements SingleMatchTask, | |
38 SearchVisitor { | |
39 | |
40 private static final Logger theLogger = | |
41 Logging.newLogger("org.dancres.blitz.entry.FifoSearchVisitorImpl"); | |
42 | |
43 private MangledEntry theTemplate; | |
44 private TxnState theTxnState; | |
45 private boolean isTaking; | |
46 | |
47 private EventGeneratorImpl theSearchTask; | |
48 private SearchVisitor theAdapter = new SearchVisitorAdapter(); | |
49 private BaulkedParty theParty; | |
50 | |
51 private int theLockOp; | |
52 | |
53 private boolean needsWakeup = false; | |
54 | |
55 private CompletionEvent theCompletion; | |
56 | |
57 private long theStartTime = System.currentTimeMillis(); | |
58 | |
59 private LinkedList theNewWrites = new LinkedList(); | |
60 | |
61 /** | |
62 @param isTake indicates the kind of txn lock we would need to assert | |
63 */ | |
64 FifoSearchVisitorImpl(MangledEntry aTemplate, boolean isTake, | |
65 TxnState aState, VisitorBaulkedPartyFactory aFactory) | |
66 throws IOException { | |
67 | |
68 theTemplate = aTemplate; | |
69 isTaking = isTake; | |
70 theTxnState = aState; | |
71 theLockOp = (isTaking == true) ? TxnLock.DELETE : TxnLock.READ; | |
72 theSearchTask = new EventGeneratorImpl(aTemplate); | |
73 theParty = aFactory.newParty(this); | |
74 SearchTasks.get().add(this); | |
75 EventQueue.get().insert(theSearchTask); | |
76 } | |
77 | |
78 public EventGenerator getSearchTask() { | |
79 return theSearchTask; | |
80 } | |
81 | |
82 public SearchVisitor getVisitor() { | |
83 return theAdapter; | |
84 } | |
85 | |
86 /*********************************************************************** | |
87 * SearchVisitor | |
88 ***********************************************************************/ | |
89 | |
90 public int offer(SearchOffer anOffer) { | |
91 OpInfo myInfo = anOffer.getInfo(); | |
92 | |
93 MangledEntry myEntry = anOffer.getEntry(); | |
94 | |
95 LockMgr myMgr = TxnLocks.getLockMgr(myInfo.getType()); | |
96 TxnLock myLock = myMgr.getLock(myInfo.getOID()); | |
97 | |
98 synchronized (this) { | |
99 | |
100 int myResult; | |
101 | |
102 // Do we need to try and secure this? | |
103 if (haveCompleted()) | |
104 return STOP; | |
105 | |
106 VisitorBaulkedPartyFactory.Handback myHandback = | |
107 new VisitorBaulkedPartyFactory.Handback(myInfo.getType(), | |
108 myInfo.getOID(), myEntry); | |
109 | |
110 synchronized (myLock) { | |
111 myResult = myLock.acquire(theTxnState, theLockOp, | |
112 theParty, myHandback, false); | |
113 } | |
114 | |
115 if (myResult == TxnLock.SUCCESS) { | |
116 | |
117 try { | |
118 theTxnState.add(new EntryTxnOp(theLockOp, myInfo, | |
119 myLock)); | |
120 } catch (TransactionException aTE) { | |
121 myLock.release(theTxnState, theLockOp); | |
122 return sendEvent(new CompletionEvent(aTE)); | |
123 } | |
124 | |
125 if (theLogger.isLoggable(Level.FINE)) | |
126 theLogger.log(Level.FINE, "Succeeded"); | |
127 | |
128 return sendEvent(new CompletionEvent(myEntry)); | |
129 } | |
130 } | |
131 | |
132 return TRY_AGAIN; | |
133 } | |
134 | |
135 public int sendEvent(CompletionEvent anEvent) { | |
136 synchronized(this) { | |
137 if (haveCompleted()) | |
138 return STOP; | |
139 | |
140 theCompletion = anEvent; | |
141 | |
142 theSearchTask.taint(); | |
143 SearchTasks.get().remove(this, wasNotSatisfied()); | |
144 | |
145 if (needsWakeup) | |
146 notify(); | |
147 | |
148 return STOP; | |
149 } | |
150 } | |
151 | |
152 public MangledEntry getEntry(long aTimeout) | |
153 throws TransactionException, | |
154 InterruptedException { | |
155 | |
156 synchronized(this) { | |
157 needsWakeup = true; | |
158 } | |
159 | |
160 while (true) { | |
161 synchronized(this) { | |
162 /* | |
163 If we've completed, throw exception or return Entry | |
164 accordingly, cleaning up state appropriately | |
165 */ | |
166 if (haveCompleted()) { | |
167 | |
168 // We're returning - ensure we don't allow any more | |
169 // operations to avoid doing a take we'll never return. | |
170 // | |
171 needsWakeup = false; | |
172 | |
173 if (theCompletion.getException() != null) | |
174 throw theCompletion.getException(); | |
175 | |
176 return theCompletion.getEntry(); | |
177 } | |
178 | |
179 // We haven't completed, yet, can we process queue elements? | |
180 if (theNewWrites.size() == 0) { | |
181 | |
182 long myRemaining = aTimeout - (System.currentTimeMillis() - | |
183 theStartTime); | |
184 | |
185 // Is there more time to wait? | |
186 if (myRemaining > 0) | |
187 wait(myRemaining); | |
188 else { | |
189 // No, force exit | |
190 needsWakeup = false; | |
191 sendEvent(CompletionEvent.COMPLETED); | |
192 } | |
193 } | |
194 } | |
195 | |
196 // We must flush the queue outside of lock | |
197 try { | |
198 flushQueue(); | |
199 } catch (IOException anIOE) { | |
200 TransactionException myTE = | |
201 new TransactionException("I/O Error whilst processing queue"); | |
202 myTE.initCause(anIOE); | |
203 sendEvent(new CompletionEvent(myTE)); | |
204 } | |
205 } | |
206 } | |
207 | |
208 private void flushQueue() throws IOException { | |
209 while (true) { | |
210 // Something has caused us to stop, give up now | |
211 // | |
212 if (haveCompleted()) | |
213 break; | |
214 | |
215 SpaceEntryUID myUID = null; | |
216 | |
217 synchronized(this) { | |
218 if (theNewWrites.size() != 0) { | |
219 myUID = (SpaceEntryUID) theNewWrites.removeFirst(); | |
220 } | |
221 } | |
222 | |
223 // Nothing else in queue? | |
224 // | |
225 if (myUID == null) | |
226 break; | |
227 | |
228 EntryRepository myRepos = | |
229 EntryRepositoryFactory.get().find(myUID.getType()); | |
230 | |
231 myRepos.find(this, myUID.getOID(), null); | |
232 } | |
233 } | |
234 | |
235 public synchronized boolean wouldBlock() { | |
236 return (theCompletion == null); | |
237 } | |
238 | |
239 private synchronized boolean haveCompleted() { | |
240 return (theCompletion != null); | |
241 } | |
242 | |
243 private boolean wasNotSatisfied() { | |
244 return (theCompletion.getEntry() == null); | |
245 } | |
246 | |
247 public boolean isDeleter() { | |
248 return isTaking; | |
249 } | |
250 | |
251 private void resolved() { | |
252 sendEvent(new CompletionEvent(new TransactionException( | |
253 "Transaction completed with operations still outstanding: " + | |
254 (isTaking ? "take" : "read")))); | |
255 } | |
256 | |
257 private class SearchVisitorAdapter implements SearchVisitor { | |
258 | |
259 public boolean isDeleter() { | |
260 return FifoSearchVisitorImpl.this.isDeleter(); | |
261 } | |
262 | |
263 public int offer(SearchOffer anOffer) { | |
264 if (theLogger.isLoggable(Level.FINE)) | |
265 theLogger.log(Level.FINE, "Offer"); | |
266 | |
267 synchronized (FifoSearchVisitorImpl.this) { | |
268 if (haveCompleted()) { | |
269 if (theLogger.isLoggable(Level.FINE)) | |
270 theLogger.log(Level.FINE, theTxnState.getId() + | |
271 " Have completed"); | |
272 return STOP; | |
273 } | |
274 } | |
275 | |
276 OpInfo myInfo = anOffer.getInfo(); | |
277 | |
278 if (!Types.isSubtype(theTemplate.getType(), myInfo.getType())) { | |
279 if (theLogger.isLoggable(Level.FINE)) | |
280 theLogger.log(Level.FINE, "Not subtype"); | |
281 | |
282 return TRY_AGAIN; | |
283 } | |
284 | |
285 MangledEntry myEntry = anOffer.getEntry(); | |
286 | |
287 if (theTemplate.match(myEntry)) { | |
288 return FifoSearchVisitorImpl.this.offer(anOffer); | |
289 } else | |
290 return TRY_AGAIN; | |
291 } | |
292 } | |
293 | |
294 private class EventGeneratorImpl implements EventGenerator { | |
295 private boolean isTainted = false; | |
296 private MangledEntry theTemplate; | |
297 private OID theOID; | |
298 | |
299 EventGeneratorImpl(MangledEntry aTemplate) { | |
300 theTemplate = aTemplate; | |
301 } | |
302 | |
303 public void assign(OID anOID) { | |
304 theOID = anOID; | |
305 } | |
306 | |
307 public long getStartSeqNum() { | |
308 return 0; | |
309 } | |
310 | |
311 public OID getId() { | |
312 return theOID; | |
313 } | |
314 | |
315 public boolean isPersistent() { | |
316 return false; | |
317 } | |
318 | |
319 public long getSourceId() { | |
320 return 0; | |
321 } | |
322 | |
323 public void taint() { | |
324 synchronized (this) { | |
325 // Tainting can only be done once | |
326 // | |
327 if (isTainted) | |
328 return; | |
329 | |
330 isTainted = true; | |
331 } | |
332 | |
333 try { | |
334 EventQueue.get().kill(getId()); | |
335 } catch (IOException anIOE) { | |
336 theLogger.log(Level.SEVERE, | |
337 "Encountered IOException during kill", anIOE); | |
338 } | |
339 | |
340 /* | |
341 try { | |
342 Tasks.queue(new CleanTask(getId())); | |
343 } catch (InterruptedException anIE) { | |
344 theLogger.log(Level.WARNING, | |
345 "Failed to lodge cleanup for: " + getId(), anIE); | |
346 } | |
347 */ | |
348 } | |
349 | |
350 private boolean isTainted() { | |
351 synchronized (this) { | |
352 return (isTainted); | |
353 } | |
354 } | |
355 | |
356 public boolean canSee(QueueEvent anEvent, long aTime) { | |
357 if (isTainted()) | |
358 return false; | |
359 | |
360 // Check if it's txn_ended and my txn and call resolved if it is | |
361 if ((anEvent.getType() == QueueEvent.TRANSACTION_ENDED) && | |
362 (theTxnState.getId().equals(anEvent.getTxn().getId()))) { | |
363 resolved(); | |
364 return false; | |
365 } | |
366 | |
367 // We want to see new writes from a transaction | |
368 // | |
369 return (anEvent.getType() == QueueEvent.ENTRY_WRITE); | |
370 } | |
371 | |
372 public boolean matches(MangledEntry anEntry) { | |
373 if (isTainted()) | |
374 return false; | |
375 | |
376 return Types.isSubtype(theTemplate.getType(), anEntry.getType()) && | |
377 theTemplate.match(anEntry); | |
378 } | |
379 | |
380 public boolean renew(long aTime) { | |
381 // Nothing to do as we expire by being tainted by the enclosing | |
382 // class only | |
383 // | |
384 return true; | |
385 } | |
386 | |
387 public void recover(long aSeqNum) { | |
388 // Nothing to do | |
389 } | |
390 | |
391 public long jumpSequenceNumber() { | |
392 return 0; | |
393 } | |
394 | |
395 public long jumpSequenceNumber(long aMin) { | |
396 return 0; | |
397 } | |
398 | |
399 public void ping(QueueEvent anEvent, JavaSpace aSource) { | |
400 /* | |
401 Queue the write for later consideration unless we're done. | |
402 Later consideration will only be after we've invoked getEntry | |
403 which will only occur after we've performed searching of storage | |
404 */ | |
405 synchronized (FifoSearchVisitorImpl.this) { | |
406 if (haveCompleted()) | |
407 return; | |
408 | |
409 QueueEvent.Context myContext = anEvent.getContext(); | |
410 MangledEntry myEntry = myContext.getEntry(); | |
411 OID myOID = myContext.getOID(); | |
412 | |
413 theNewWrites.add(new SpaceEntryUID(myEntry.getType(), myOID)); | |
414 | |
415 if (needsWakeup) | |
416 FifoSearchVisitorImpl.this.notify(); | |
417 } | |
418 } | |
419 | |
420 public EventGeneratorState getMemento() { | |
421 throw new RuntimeException( | |
422 "Shouldn't be happening - we're transient"); | |
423 } | |
424 } | |
425 } |