comparison src/org/dancres/blitz/FifoSearchVisitorImpl.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:3dc0c5604566
1 package org.dancres.blitz;
2
3 import java.io.IOException;
4
5 import java.util.LinkedList;
6
7 import java.util.logging.*;
8
9 import net.jini.core.transaction.TransactionException;
10 import net.jini.space.JavaSpace;
11
12 import org.dancres.blitz.mangler.MangledEntry;
13
14 import org.dancres.blitz.entry.*;
15
16 import org.dancres.blitz.txn.TxnState;
17
18 import org.dancres.blitz.txnlock.*;
19
20 import org.dancres.blitz.oid.OID;
21 import org.dancres.blitz.notify.*;
22
23
24 /**
25 <p>All search results obtained by the lower layers are offered to a
26 SearchVisitor instance which can then determine whether the offered Entry
27 is suitable. This includes "deep matching" which requires that we fully
28 compare the fields of template to entry. The lower-layers do not perform
29 this task - they return the entry's that are a probable match. In
30 addition, instances of search visitor check transaction locks etc. which
31 the lower layers know nothing about.</p>
32
33 <p>FifoSearchVisitorImpl enforces some ordering requirements that aren't
34 present in SearchVisitorImpl relating to fairness which is sometimes more
35 important than speed.</p>
36 */
37 class FifoSearchVisitorImpl implements SingleMatchTask,
38 SearchVisitor {
39
40 private static final Logger theLogger =
41 Logging.newLogger("org.dancres.blitz.entry.FifoSearchVisitorImpl");
42
43 private MangledEntry theTemplate;
44 private TxnState theTxnState;
45 private boolean isTaking;
46
47 private EventGeneratorImpl theSearchTask;
48 private SearchVisitor theAdapter = new SearchVisitorAdapter();
49 private BaulkedParty theParty;
50
51 private int theLockOp;
52
53 private boolean needsWakeup = false;
54
55 private CompletionEvent theCompletion;
56
57 private long theStartTime = System.currentTimeMillis();
58
59 private LinkedList theNewWrites = new LinkedList();
60
61 /**
62 @param isTake indicates the kind of txn lock we would need to assert
63 */
64 FifoSearchVisitorImpl(MangledEntry aTemplate, boolean isTake,
65 TxnState aState, VisitorBaulkedPartyFactory aFactory)
66 throws IOException {
67
68 theTemplate = aTemplate;
69 isTaking = isTake;
70 theTxnState = aState;
71 theLockOp = (isTaking == true) ? TxnLock.DELETE : TxnLock.READ;
72 theSearchTask = new EventGeneratorImpl(aTemplate);
73 theParty = aFactory.newParty(this);
74 SearchTasks.get().add(this);
75 EventQueue.get().insert(theSearchTask);
76 }
77
78 public EventGenerator getSearchTask() {
79 return theSearchTask;
80 }
81
82 public SearchVisitor getVisitor() {
83 return theAdapter;
84 }
85
86 /***********************************************************************
87 * SearchVisitor
88 ***********************************************************************/
89
90 public int offer(SearchOffer anOffer) {
91 OpInfo myInfo = anOffer.getInfo();
92
93 MangledEntry myEntry = anOffer.getEntry();
94
95 LockMgr myMgr = TxnLocks.getLockMgr(myInfo.getType());
96 TxnLock myLock = myMgr.getLock(myInfo.getOID());
97
98 synchronized (this) {
99
100 int myResult;
101
102 // Do we need to try and secure this?
103 if (haveCompleted())
104 return STOP;
105
106 VisitorBaulkedPartyFactory.Handback myHandback =
107 new VisitorBaulkedPartyFactory.Handback(myInfo.getType(),
108 myInfo.getOID(), myEntry);
109
110 synchronized (myLock) {
111 myResult = myLock.acquire(theTxnState, theLockOp,
112 theParty, myHandback, false);
113 }
114
115 if (myResult == TxnLock.SUCCESS) {
116
117 try {
118 theTxnState.add(new EntryTxnOp(theLockOp, myInfo,
119 myLock));
120 } catch (TransactionException aTE) {
121 myLock.release(theTxnState, theLockOp);
122 return sendEvent(new CompletionEvent(aTE));
123 }
124
125 if (theLogger.isLoggable(Level.FINE))
126 theLogger.log(Level.FINE, "Succeeded");
127
128 return sendEvent(new CompletionEvent(myEntry));
129 }
130 }
131
132 return TRY_AGAIN;
133 }
134
135 public int sendEvent(CompletionEvent anEvent) {
136 synchronized(this) {
137 if (haveCompleted())
138 return STOP;
139
140 theCompletion = anEvent;
141
142 theSearchTask.taint();
143 SearchTasks.get().remove(this, wasNotSatisfied());
144
145 if (needsWakeup)
146 notify();
147
148 return STOP;
149 }
150 }
151
152 public MangledEntry getEntry(long aTimeout)
153 throws TransactionException,
154 InterruptedException {
155
156 synchronized(this) {
157 needsWakeup = true;
158 }
159
160 while (true) {
161 synchronized(this) {
162 /*
163 If we've completed, throw exception or return Entry
164 accordingly, cleaning up state appropriately
165 */
166 if (haveCompleted()) {
167
168 // We're returning - ensure we don't allow any more
169 // operations to avoid doing a take we'll never return.
170 //
171 needsWakeup = false;
172
173 if (theCompletion.getException() != null)
174 throw theCompletion.getException();
175
176 return theCompletion.getEntry();
177 }
178
179 // We haven't completed, yet, can we process queue elements?
180 if (theNewWrites.size() == 0) {
181
182 long myRemaining = aTimeout - (System.currentTimeMillis() -
183 theStartTime);
184
185 // Is there more time to wait?
186 if (myRemaining > 0)
187 wait(myRemaining);
188 else {
189 // No, force exit
190 needsWakeup = false;
191 sendEvent(CompletionEvent.COMPLETED);
192 }
193 }
194 }
195
196 // We must flush the queue outside of lock
197 try {
198 flushQueue();
199 } catch (IOException anIOE) {
200 TransactionException myTE =
201 new TransactionException("I/O Error whilst processing queue");
202 myTE.initCause(anIOE);
203 sendEvent(new CompletionEvent(myTE));
204 }
205 }
206 }
207
208 private void flushQueue() throws IOException {
209 while (true) {
210 // Something has caused us to stop, give up now
211 //
212 if (haveCompleted())
213 break;
214
215 SpaceEntryUID myUID = null;
216
217 synchronized(this) {
218 if (theNewWrites.size() != 0) {
219 myUID = (SpaceEntryUID) theNewWrites.removeFirst();
220 }
221 }
222
223 // Nothing else in queue?
224 //
225 if (myUID == null)
226 break;
227
228 EntryRepository myRepos =
229 EntryRepositoryFactory.get().find(myUID.getType());
230
231 myRepos.find(this, myUID.getOID(), null);
232 }
233 }
234
235 public synchronized boolean wouldBlock() {
236 return (theCompletion == null);
237 }
238
239 private synchronized boolean haveCompleted() {
240 return (theCompletion != null);
241 }
242
243 private boolean wasNotSatisfied() {
244 return (theCompletion.getEntry() == null);
245 }
246
247 public boolean isDeleter() {
248 return isTaking;
249 }
250
251 private void resolved() {
252 sendEvent(new CompletionEvent(new TransactionException(
253 "Transaction completed with operations still outstanding: " +
254 (isTaking ? "take" : "read"))));
255 }
256
257 private class SearchVisitorAdapter implements SearchVisitor {
258
259 public boolean isDeleter() {
260 return FifoSearchVisitorImpl.this.isDeleter();
261 }
262
263 public int offer(SearchOffer anOffer) {
264 if (theLogger.isLoggable(Level.FINE))
265 theLogger.log(Level.FINE, "Offer");
266
267 synchronized (FifoSearchVisitorImpl.this) {
268 if (haveCompleted()) {
269 if (theLogger.isLoggable(Level.FINE))
270 theLogger.log(Level.FINE, theTxnState.getId() +
271 " Have completed");
272 return STOP;
273 }
274 }
275
276 OpInfo myInfo = anOffer.getInfo();
277
278 if (!Types.isSubtype(theTemplate.getType(), myInfo.getType())) {
279 if (theLogger.isLoggable(Level.FINE))
280 theLogger.log(Level.FINE, "Not subtype");
281
282 return TRY_AGAIN;
283 }
284
285 MangledEntry myEntry = anOffer.getEntry();
286
287 if (theTemplate.match(myEntry)) {
288 return FifoSearchVisitorImpl.this.offer(anOffer);
289 } else
290 return TRY_AGAIN;
291 }
292 }
293
294 private class EventGeneratorImpl implements EventGenerator {
295 private boolean isTainted = false;
296 private MangledEntry theTemplate;
297 private OID theOID;
298
299 EventGeneratorImpl(MangledEntry aTemplate) {
300 theTemplate = aTemplate;
301 }
302
303 public void assign(OID anOID) {
304 theOID = anOID;
305 }
306
307 public long getStartSeqNum() {
308 return 0;
309 }
310
311 public OID getId() {
312 return theOID;
313 }
314
315 public boolean isPersistent() {
316 return false;
317 }
318
319 public long getSourceId() {
320 return 0;
321 }
322
323 public void taint() {
324 synchronized (this) {
325 // Tainting can only be done once
326 //
327 if (isTainted)
328 return;
329
330 isTainted = true;
331 }
332
333 try {
334 EventQueue.get().kill(getId());
335 } catch (IOException anIOE) {
336 theLogger.log(Level.SEVERE,
337 "Encountered IOException during kill", anIOE);
338 }
339
340 /*
341 try {
342 Tasks.queue(new CleanTask(getId()));
343 } catch (InterruptedException anIE) {
344 theLogger.log(Level.WARNING,
345 "Failed to lodge cleanup for: " + getId(), anIE);
346 }
347 */
348 }
349
350 private boolean isTainted() {
351 synchronized (this) {
352 return (isTainted);
353 }
354 }
355
356 public boolean canSee(QueueEvent anEvent, long aTime) {
357 if (isTainted())
358 return false;
359
360 // Check if it's txn_ended and my txn and call resolved if it is
361 if ((anEvent.getType() == QueueEvent.TRANSACTION_ENDED) &&
362 (theTxnState.getId().equals(anEvent.getTxn().getId()))) {
363 resolved();
364 return false;
365 }
366
367 // We want to see new writes from a transaction
368 //
369 return (anEvent.getType() == QueueEvent.ENTRY_WRITE);
370 }
371
372 public boolean matches(MangledEntry anEntry) {
373 if (isTainted())
374 return false;
375
376 return Types.isSubtype(theTemplate.getType(), anEntry.getType()) &&
377 theTemplate.match(anEntry);
378 }
379
380 public boolean renew(long aTime) {
381 // Nothing to do as we expire by being tainted by the enclosing
382 // class only
383 //
384 return true;
385 }
386
387 public void recover(long aSeqNum) {
388 // Nothing to do
389 }
390
391 public long jumpSequenceNumber() {
392 return 0;
393 }
394
395 public long jumpSequenceNumber(long aMin) {
396 return 0;
397 }
398
399 public void ping(QueueEvent anEvent, JavaSpace aSource) {
400 /*
401 Queue the write for later consideration unless we're done.
402 Later consideration will only be after we've invoked getEntry
403 which will only occur after we've performed searching of storage
404 */
405 synchronized (FifoSearchVisitorImpl.this) {
406 if (haveCompleted())
407 return;
408
409 QueueEvent.Context myContext = anEvent.getContext();
410 MangledEntry myEntry = myContext.getEntry();
411 OID myOID = myContext.getOID();
412
413 theNewWrites.add(new SpaceEntryUID(myEntry.getType(), myOID));
414
415 if (needsWakeup)
416 FifoSearchVisitorImpl.this.notify();
417 }
418 }
419
420 public EventGeneratorState getMemento() {
421 throw new RuntimeException(
422 "Shouldn't be happening - we're transient");
423 }
424 }
425 }