Mercurial > hg > blitz_condensed
comparison src/org/dancres/blitz/BulkTakeVisitor.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:3dc0c5604566 |
---|---|
1 package org.dancres.blitz; | |
2 | |
3 import java.io.IOException; | |
4 | |
5 import java.util.ArrayList; | |
6 import java.util.List; | |
7 | |
8 import java.util.logging.*; | |
9 | |
10 import net.jini.core.transaction.TransactionException; | |
11 import net.jini.space.JavaSpace; | |
12 | |
13 import org.dancres.blitz.mangler.MangledEntry; | |
14 | |
15 import org.dancres.blitz.entry.*; | |
16 | |
17 import org.dancres.blitz.txn.TxnState; | |
18 | |
19 import org.dancres.blitz.txnlock.*; | |
20 | |
21 import org.dancres.blitz.oid.OID; | |
22 | |
23 import org.dancres.blitz.notify.EventGenerator; | |
24 import org.dancres.blitz.notify.EventQueue; | |
25 import org.dancres.blitz.notify.QueueEvent; | |
26 import org.dancres.blitz.notify.EventGeneratorState; | |
27 | |
28 class BulkTakeVisitor implements BulkMatchTask, SearchVisitor { | |
29 private static final Logger theLogger = | |
30 Logging.newLogger("org.dancres.blitz.BulkTakeVisitor"); | |
31 | |
32 private MangledEntry[] theTemplates; | |
33 private TxnState theTxnState; | |
34 private long theLimit; | |
35 private EventGeneratorImpl theSearchTask; | |
36 private SearchVisitorAdapter theAdapter = new SearchVisitorAdapter(); | |
37 private BaulkedParty theParty; | |
38 | |
39 private ArrayList theEntries = new ArrayList(); | |
40 | |
41 private int theStatus = SearchVisitor.TRY_AGAIN; | |
42 | |
43 private TransactionException theException; | |
44 | |
45 private boolean needsWakeup = false; | |
46 | |
47 BulkTakeVisitor(MangledEntry[] aTemplates, TxnState aTxnState, | |
48 long aLimit, VisitorBaulkedPartyFactory aFactory) | |
49 throws IOException { | |
50 | |
51 theTemplates = aTemplates; | |
52 theTxnState = aTxnState; | |
53 theLimit = aLimit; | |
54 theSearchTask = new EventGeneratorImpl(theTemplates); | |
55 theParty = aFactory.newParty(this); | |
56 SearchTasks.get().add(this); | |
57 EventQueue.get().insert(theSearchTask); | |
58 } | |
59 | |
60 private void resolved() { | |
61 setStatus(STOP, new TransactionException()); | |
62 } | |
63 | |
64 public SearchVisitor getVisitor() { | |
65 return theAdapter; | |
66 } | |
67 | |
68 public int offer(SearchOffer anOffer) { | |
69 theLogger.log(Level.FINE, "offer"); | |
70 | |
71 synchronized(this) { | |
72 if (haveCompleted()) { | |
73 theLogger.log(Level.FINE, "Have completed"); | |
74 return STOP; | |
75 } | |
76 } | |
77 | |
78 OpInfo myInfo = anOffer.getInfo(); | |
79 MangledEntry myEntry = anOffer.getEntry(); | |
80 | |
81 LockMgr myMgr = TxnLocks.getLockMgr(myInfo.getType()); | |
82 TxnLock myLock = myMgr.getLock(myInfo.getOID()); | |
83 | |
84 synchronized (this) { | |
85 int myResult; | |
86 | |
87 // Picked up enough matches in the meantime? Quit... | |
88 if (haveCompleted()) | |
89 return STOP; | |
90 | |
91 VisitorBaulkedPartyFactory.Handback myHandback = | |
92 new VisitorBaulkedPartyFactory.Handback(myInfo.getType(), | |
93 myInfo.getOID(), myEntry); | |
94 | |
95 synchronized (myLock) { | |
96 myResult = myLock.acquire(theTxnState, | |
97 TxnLock.DELETE, | |
98 theParty, myHandback, false); | |
99 } | |
100 | |
101 if (myResult == TxnLock.SUCCESS) { | |
102 // Got the lock | |
103 try { | |
104 theTxnState.add(new EntryTxnOp(TxnLock.DELETE, | |
105 myInfo, | |
106 myLock)); | |
107 } catch (TransactionException aTE) { | |
108 synchronized (myLock) { | |
109 myLock.release(theTxnState, TxnLock.DELETE); | |
110 } | |
111 return setStatus(STOP, aTE); | |
112 } | |
113 | |
114 // Add the Entry to our list of matches | |
115 theEntries.add(myEntry); | |
116 | |
117 /* | |
118 Picked up enough matches? Quit else carry on | |
119 */ | |
120 if (haveCompleted()) | |
121 return setStatus(STOP, null); | |
122 else | |
123 return TRY_AGAIN; | |
124 } else { | |
125 /* | |
126 One of our templates matched but we didn't | |
127 get a lock. No point in trying other templates | |
128 because even if they match we might get another | |
129 conflict. We'll leave it to settle instead and | |
130 look for other matches. | |
131 */ | |
132 return TRY_AGAIN; | |
133 } | |
134 } | |
135 } | |
136 | |
137 public boolean isDeleter() { | |
138 return true; | |
139 } | |
140 | |
141 private boolean haveCompleted() { | |
142 /* | |
143 If we're blocking, then we'll unblock as soon as we get one | |
144 match, otherwise, we want to be greedy and grab as many entries | |
145 as our limit allows | |
146 */ | |
147 if (needsWakeup) { | |
148 return (theEntries.size() != 0) || (theStatus == STOP); | |
149 } else { | |
150 return (theEntries.size() == theLimit) || (theStatus == STOP); | |
151 } | |
152 } | |
153 | |
154 public synchronized List getEntries(long aTimeout) | |
155 throws TransactionException, InterruptedException { | |
156 | |
157 // We only wait the once because we'll only ever wake from this | |
158 // if there's a result or we timeout | |
159 if ((theEntries.size() == 0) && (aTimeout != 0)) { | |
160 needsWakeup = true; | |
161 wait(aTimeout); | |
162 needsWakeup = false; | |
163 } | |
164 | |
165 // We're returning - ensure we don't allow any more operations to | |
166 // avoid doing a take we'll never return.... | |
167 setStatus(STOP, null); | |
168 | |
169 if (theException != null) | |
170 throw theException; | |
171 | |
172 return theEntries; | |
173 } | |
174 | |
175 /** | |
176 We're greedy and always want more if some might be available. | |
177 i.e. If we haven't got to the point where we've scanned the entire | |
178 space (in which case user should be calling wouldBlock()) we want more! | |
179 */ | |
180 public synchronized boolean wantsMore() { | |
181 return (theEntries.size() < theLimit); | |
182 } | |
183 | |
184 /** | |
185 We only request block if we have no entries. | |
186 */ | |
187 public synchronized boolean wouldBlock() { | |
188 return (theEntries.size() == 0); | |
189 } | |
190 | |
191 private boolean wasNotSatisfied() { | |
192 return (theEntries.size() == 0); | |
193 } | |
194 | |
195 private synchronized int setStatus(int aState, TransactionException aTE) { | |
196 /* | |
197 * Test only for STOP, do not use haveCompleted in this case | |
198 * which can declare us completed and cause us to axit before | |
199 * we actually set STOP. | |
200 */ | |
201 if (theStatus == STOP) | |
202 return STOP; | |
203 | |
204 theStatus = aState; | |
205 theException = aTE; | |
206 | |
207 theSearchTask.taint(false); | |
208 SearchTasks.get().remove(this, wasNotSatisfied()); | |
209 | |
210 if (needsWakeup) | |
211 notify(); | |
212 | |
213 return theStatus; | |
214 } | |
215 | |
216 private class EventGeneratorImpl implements EventGenerator { | |
217 private boolean isTainted = false; | |
218 private MangledEntry[] theTemplates; | |
219 private OID theOID; | |
220 | |
221 EventGeneratorImpl(MangledEntry[] aTemplates) { | |
222 theTemplates = aTemplates; | |
223 } | |
224 | |
225 public void assign(OID anOID) { | |
226 theOID = anOID; | |
227 } | |
228 | |
229 public long getStartSeqNum() { | |
230 return 0; | |
231 } | |
232 | |
233 public OID getId() { | |
234 return theOID; | |
235 } | |
236 | |
237 public boolean isPersistent() { | |
238 return false; | |
239 } | |
240 | |
241 public long getSourceId() { | |
242 return 0; | |
243 } | |
244 | |
245 void taint(boolean signal) { | |
246 synchronized (this) { | |
247 // Tainting can only be done once | |
248 // | |
249 if (isTainted) | |
250 return; | |
251 | |
252 isTainted = true; | |
253 } | |
254 | |
255 if (signal) | |
256 setStatus(STOP, new TransactionException("Destroyed")); | |
257 | |
258 try { | |
259 EventQueue.get().kill(getId()); | |
260 } catch (IOException anIOE) { | |
261 theLogger.log(Level.SEVERE, | |
262 "Encountered IOException during kill", anIOE); | |
263 } | |
264 | |
265 /* | |
266 try { | |
267 Tasks.queue(new CleanTask(getId())); | |
268 } catch (InterruptedException anIE) { | |
269 theLogger.log(Level.WARNING, | |
270 "Failed to lodge cleanup for: " + getId(), anIE); | |
271 } | |
272 */ | |
273 } | |
274 | |
275 public void taint() { | |
276 taint(true); | |
277 } | |
278 | |
279 public boolean canSee(QueueEvent anEvent, long aTime) { | |
280 synchronized (this) { | |
281 if (isTainted) { | |
282 return false; | |
283 } | |
284 } | |
285 | |
286 // Check if it's txn_ended and my txn and call resolved if it is | |
287 if ((anEvent.getType() == QueueEvent.TRANSACTION_ENDED) && | |
288 (theTxnState.getId().equals(anEvent.getTxn().getId()))) { | |
289 resolved(); | |
290 return false; | |
291 } | |
292 | |
293 // We want to see new writes from a transaction | |
294 // | |
295 return (anEvent.getType() == QueueEvent.ENTRY_WRITE); | |
296 } | |
297 | |
298 public boolean matches(MangledEntry anEntry) { | |
299 synchronized (this) { | |
300 if (isTainted) { | |
301 return false; | |
302 } | |
303 } | |
304 | |
305 for (int i = 0; i < theTemplates.length; i++) { | |
306 MangledEntry myTemplate = theTemplates[i]; | |
307 | |
308 if (Types.isSubtype(myTemplate.getType(), anEntry.getType())) { | |
309 if (myTemplate.match(anEntry)) | |
310 return true; | |
311 } | |
312 } | |
313 | |
314 return false; | |
315 } | |
316 | |
317 public boolean renew(long aTime) { | |
318 // Nothing to do as we expire by being tainted by the enclosing | |
319 // class only | |
320 // | |
321 return true; | |
322 } | |
323 | |
324 public void recover(long aSeqNum) { | |
325 // Nothing to do | |
326 } | |
327 | |
328 public long jumpSequenceNumber() { | |
329 return 0; | |
330 } | |
331 | |
332 public long jumpSequenceNumber(long aMin) { | |
333 return 0; | |
334 } | |
335 | |
336 public void ping(QueueEvent anEvent, JavaSpace aSource) { | |
337 synchronized (this) { | |
338 if (isTainted) { | |
339 return; | |
340 } | |
341 } | |
342 | |
343 LongtermOffer myOffer = null; | |
344 | |
345 try { | |
346 QueueEvent.Context myContext = anEvent.getContext(); | |
347 MangledEntry myEntry = myContext.getEntry(); | |
348 OID myOID = myContext.getOID(); | |
349 | |
350 EntryRepository myRepos = | |
351 EntryRepositoryFactory.get().find(myEntry.getType()); | |
352 | |
353 myOffer = myRepos.getOffer(myOID); | |
354 | |
355 if (myOffer == null) | |
356 return; | |
357 | |
358 myOffer.offer(BulkTakeVisitor.this); | |
359 | |
360 } catch (IOException anIOE) { | |
361 // Nothing can be done | |
362 theLogger.log(Level.SEVERE, | |
363 "Encountered IOException during write offer", anIOE); | |
364 } finally { | |
365 if (myOffer != null) { | |
366 try { | |
367 myOffer.release(); | |
368 } catch (IOException anIOE) { | |
369 theLogger.log(Level.SEVERE, | |
370 "Encountered IOException during write offer(release)", | |
371 anIOE); | |
372 } | |
373 } | |
374 } | |
375 } | |
376 | |
377 public EventGeneratorState getMemento() { | |
378 throw new RuntimeException( | |
379 "Shouldn't be happening - we're transient"); | |
380 } | |
381 } | |
382 | |
383 private class SearchVisitorAdapter implements SearchVisitor { | |
384 | |
385 public boolean isDeleter() { | |
386 return BulkTakeVisitor.this.isDeleter(); | |
387 } | |
388 | |
389 public int offer(SearchOffer anOffer) { | |
390 if (theLogger.isLoggable(Level.FINE)) | |
391 theLogger.log(Level.FINE, "Offer"); | |
392 | |
393 synchronized (this) { | |
394 if (haveCompleted()) { | |
395 theLogger.log(Level.FINE, "Have completed"); | |
396 return STOP; | |
397 } | |
398 } | |
399 | |
400 OpInfo myInfo = anOffer.getInfo(); | |
401 MangledEntry myEntry = anOffer.getEntry(); | |
402 | |
403 for (int i = 0; i < theTemplates.length; i++) { | |
404 MangledEntry myTemplate = theTemplates[i]; | |
405 | |
406 if (Types.isSubtype(myTemplate.getType(), myInfo.getType())) { | |
407 | |
408 if ((myTemplate.isWildcard()) || | |
409 (myTemplate.match(myEntry))) { | |
410 | |
411 // If we get a match, we only need to try offer | |
412 // once to see if we can lock the entry so we can | |
413 // give up after the first match & offer. | |
414 // | |
415 return BulkTakeVisitor.this.offer(anOffer); | |
416 } | |
417 } | |
418 } | |
419 | |
420 return TRY_AGAIN; | |
421 } | |
422 } | |
423 } |