comparison src/org/dancres/blitz/BulkTakeVisitor.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:3dc0c5604566
1 package org.dancres.blitz;
2
3 import java.io.IOException;
4
5 import java.util.ArrayList;
6 import java.util.List;
7
8 import java.util.logging.*;
9
10 import net.jini.core.transaction.TransactionException;
11 import net.jini.space.JavaSpace;
12
13 import org.dancres.blitz.mangler.MangledEntry;
14
15 import org.dancres.blitz.entry.*;
16
17 import org.dancres.blitz.txn.TxnState;
18
19 import org.dancres.blitz.txnlock.*;
20
21 import org.dancres.blitz.oid.OID;
22
23 import org.dancres.blitz.notify.EventGenerator;
24 import org.dancres.blitz.notify.EventQueue;
25 import org.dancres.blitz.notify.QueueEvent;
26 import org.dancres.blitz.notify.EventGeneratorState;
27
28 class BulkTakeVisitor implements BulkMatchTask, SearchVisitor {
29 private static final Logger theLogger =
30 Logging.newLogger("org.dancres.blitz.BulkTakeVisitor");
31
32 private MangledEntry[] theTemplates;
33 private TxnState theTxnState;
34 private long theLimit;
35 private EventGeneratorImpl theSearchTask;
36 private SearchVisitorAdapter theAdapter = new SearchVisitorAdapter();
37 private BaulkedParty theParty;
38
39 private ArrayList theEntries = new ArrayList();
40
41 private int theStatus = SearchVisitor.TRY_AGAIN;
42
43 private TransactionException theException;
44
45 private boolean needsWakeup = false;
46
47 BulkTakeVisitor(MangledEntry[] aTemplates, TxnState aTxnState,
48 long aLimit, VisitorBaulkedPartyFactory aFactory)
49 throws IOException {
50
51 theTemplates = aTemplates;
52 theTxnState = aTxnState;
53 theLimit = aLimit;
54 theSearchTask = new EventGeneratorImpl(theTemplates);
55 theParty = aFactory.newParty(this);
56 SearchTasks.get().add(this);
57 EventQueue.get().insert(theSearchTask);
58 }
59
60 private void resolved() {
61 setStatus(STOP, new TransactionException());
62 }
63
64 public SearchVisitor getVisitor() {
65 return theAdapter;
66 }
67
68 public int offer(SearchOffer anOffer) {
69 theLogger.log(Level.FINE, "offer");
70
71 synchronized(this) {
72 if (haveCompleted()) {
73 theLogger.log(Level.FINE, "Have completed");
74 return STOP;
75 }
76 }
77
78 OpInfo myInfo = anOffer.getInfo();
79 MangledEntry myEntry = anOffer.getEntry();
80
81 LockMgr myMgr = TxnLocks.getLockMgr(myInfo.getType());
82 TxnLock myLock = myMgr.getLock(myInfo.getOID());
83
84 synchronized (this) {
85 int myResult;
86
87 // Picked up enough matches in the meantime? Quit...
88 if (haveCompleted())
89 return STOP;
90
91 VisitorBaulkedPartyFactory.Handback myHandback =
92 new VisitorBaulkedPartyFactory.Handback(myInfo.getType(),
93 myInfo.getOID(), myEntry);
94
95 synchronized (myLock) {
96 myResult = myLock.acquire(theTxnState,
97 TxnLock.DELETE,
98 theParty, myHandback, false);
99 }
100
101 if (myResult == TxnLock.SUCCESS) {
102 // Got the lock
103 try {
104 theTxnState.add(new EntryTxnOp(TxnLock.DELETE,
105 myInfo,
106 myLock));
107 } catch (TransactionException aTE) {
108 synchronized (myLock) {
109 myLock.release(theTxnState, TxnLock.DELETE);
110 }
111 return setStatus(STOP, aTE);
112 }
113
114 // Add the Entry to our list of matches
115 theEntries.add(myEntry);
116
117 /*
118 Picked up enough matches? Quit else carry on
119 */
120 if (haveCompleted())
121 return setStatus(STOP, null);
122 else
123 return TRY_AGAIN;
124 } else {
125 /*
126 One of our templates matched but we didn't
127 get a lock. No point in trying other templates
128 because even if they match we might get another
129 conflict. We'll leave it to settle instead and
130 look for other matches.
131 */
132 return TRY_AGAIN;
133 }
134 }
135 }
136
137 public boolean isDeleter() {
138 return true;
139 }
140
141 private boolean haveCompleted() {
142 /*
143 If we're blocking, then we'll unblock as soon as we get one
144 match, otherwise, we want to be greedy and grab as many entries
145 as our limit allows
146 */
147 if (needsWakeup) {
148 return (theEntries.size() != 0) || (theStatus == STOP);
149 } else {
150 return (theEntries.size() == theLimit) || (theStatus == STOP);
151 }
152 }
153
154 public synchronized List getEntries(long aTimeout)
155 throws TransactionException, InterruptedException {
156
157 // We only wait the once because we'll only ever wake from this
158 // if there's a result or we timeout
159 if ((theEntries.size() == 0) && (aTimeout != 0)) {
160 needsWakeup = true;
161 wait(aTimeout);
162 needsWakeup = false;
163 }
164
165 // We're returning - ensure we don't allow any more operations to
166 // avoid doing a take we'll never return....
167 setStatus(STOP, null);
168
169 if (theException != null)
170 throw theException;
171
172 return theEntries;
173 }
174
175 /**
176 We're greedy and always want more if some might be available.
177 i.e. If we haven't got to the point where we've scanned the entire
178 space (in which case user should be calling wouldBlock()) we want more!
179 */
180 public synchronized boolean wantsMore() {
181 return (theEntries.size() < theLimit);
182 }
183
184 /**
185 We only request block if we have no entries.
186 */
187 public synchronized boolean wouldBlock() {
188 return (theEntries.size() == 0);
189 }
190
191 private boolean wasNotSatisfied() {
192 return (theEntries.size() == 0);
193 }
194
195 private synchronized int setStatus(int aState, TransactionException aTE) {
196 /*
197 * Test only for STOP, do not use haveCompleted in this case
198 * which can declare us completed and cause us to axit before
199 * we actually set STOP.
200 */
201 if (theStatus == STOP)
202 return STOP;
203
204 theStatus = aState;
205 theException = aTE;
206
207 theSearchTask.taint(false);
208 SearchTasks.get().remove(this, wasNotSatisfied());
209
210 if (needsWakeup)
211 notify();
212
213 return theStatus;
214 }
215
216 private class EventGeneratorImpl implements EventGenerator {
217 private boolean isTainted = false;
218 private MangledEntry[] theTemplates;
219 private OID theOID;
220
221 EventGeneratorImpl(MangledEntry[] aTemplates) {
222 theTemplates = aTemplates;
223 }
224
225 public void assign(OID anOID) {
226 theOID = anOID;
227 }
228
229 public long getStartSeqNum() {
230 return 0;
231 }
232
233 public OID getId() {
234 return theOID;
235 }
236
237 public boolean isPersistent() {
238 return false;
239 }
240
241 public long getSourceId() {
242 return 0;
243 }
244
245 void taint(boolean signal) {
246 synchronized (this) {
247 // Tainting can only be done once
248 //
249 if (isTainted)
250 return;
251
252 isTainted = true;
253 }
254
255 if (signal)
256 setStatus(STOP, new TransactionException("Destroyed"));
257
258 try {
259 EventQueue.get().kill(getId());
260 } catch (IOException anIOE) {
261 theLogger.log(Level.SEVERE,
262 "Encountered IOException during kill", anIOE);
263 }
264
265 /*
266 try {
267 Tasks.queue(new CleanTask(getId()));
268 } catch (InterruptedException anIE) {
269 theLogger.log(Level.WARNING,
270 "Failed to lodge cleanup for: " + getId(), anIE);
271 }
272 */
273 }
274
275 public void taint() {
276 taint(true);
277 }
278
279 public boolean canSee(QueueEvent anEvent, long aTime) {
280 synchronized (this) {
281 if (isTainted) {
282 return false;
283 }
284 }
285
286 // Check if it's txn_ended and my txn and call resolved if it is
287 if ((anEvent.getType() == QueueEvent.TRANSACTION_ENDED) &&
288 (theTxnState.getId().equals(anEvent.getTxn().getId()))) {
289 resolved();
290 return false;
291 }
292
293 // We want to see new writes from a transaction
294 //
295 return (anEvent.getType() == QueueEvent.ENTRY_WRITE);
296 }
297
298 public boolean matches(MangledEntry anEntry) {
299 synchronized (this) {
300 if (isTainted) {
301 return false;
302 }
303 }
304
305 for (int i = 0; i < theTemplates.length; i++) {
306 MangledEntry myTemplate = theTemplates[i];
307
308 if (Types.isSubtype(myTemplate.getType(), anEntry.getType())) {
309 if (myTemplate.match(anEntry))
310 return true;
311 }
312 }
313
314 return false;
315 }
316
317 public boolean renew(long aTime) {
318 // Nothing to do as we expire by being tainted by the enclosing
319 // class only
320 //
321 return true;
322 }
323
324 public void recover(long aSeqNum) {
325 // Nothing to do
326 }
327
328 public long jumpSequenceNumber() {
329 return 0;
330 }
331
332 public long jumpSequenceNumber(long aMin) {
333 return 0;
334 }
335
336 public void ping(QueueEvent anEvent, JavaSpace aSource) {
337 synchronized (this) {
338 if (isTainted) {
339 return;
340 }
341 }
342
343 LongtermOffer myOffer = null;
344
345 try {
346 QueueEvent.Context myContext = anEvent.getContext();
347 MangledEntry myEntry = myContext.getEntry();
348 OID myOID = myContext.getOID();
349
350 EntryRepository myRepos =
351 EntryRepositoryFactory.get().find(myEntry.getType());
352
353 myOffer = myRepos.getOffer(myOID);
354
355 if (myOffer == null)
356 return;
357
358 myOffer.offer(BulkTakeVisitor.this);
359
360 } catch (IOException anIOE) {
361 // Nothing can be done
362 theLogger.log(Level.SEVERE,
363 "Encountered IOException during write offer", anIOE);
364 } finally {
365 if (myOffer != null) {
366 try {
367 myOffer.release();
368 } catch (IOException anIOE) {
369 theLogger.log(Level.SEVERE,
370 "Encountered IOException during write offer(release)",
371 anIOE);
372 }
373 }
374 }
375 }
376
377 public EventGeneratorState getMemento() {
378 throw new RuntimeException(
379 "Shouldn't be happening - we're transient");
380 }
381 }
382
383 private class SearchVisitorAdapter implements SearchVisitor {
384
385 public boolean isDeleter() {
386 return BulkTakeVisitor.this.isDeleter();
387 }
388
389 public int offer(SearchOffer anOffer) {
390 if (theLogger.isLoggable(Level.FINE))
391 theLogger.log(Level.FINE, "Offer");
392
393 synchronized (this) {
394 if (haveCompleted()) {
395 theLogger.log(Level.FINE, "Have completed");
396 return STOP;
397 }
398 }
399
400 OpInfo myInfo = anOffer.getInfo();
401 MangledEntry myEntry = anOffer.getEntry();
402
403 for (int i = 0; i < theTemplates.length; i++) {
404 MangledEntry myTemplate = theTemplates[i];
405
406 if (Types.isSubtype(myTemplate.getType(), myInfo.getType())) {
407
408 if ((myTemplate.isWildcard()) ||
409 (myTemplate.match(myEntry))) {
410
411 // If we get a match, we only need to try offer
412 // once to see if we can lock the entry so we can
413 // give up after the first match & offer.
414 //
415 return BulkTakeVisitor.this.offer(anOffer);
416 }
417 }
418 }
419
420 return TRY_AGAIN;
421 }
422 }
423 }