Mercurial > hg > blitz_condensed
comparison src/org/dancres/blitz/SpaceImpl.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:3dc0c5604566 |
---|---|
1 package org.dancres.blitz; | |
2 | |
3 import java.io.IOException; | |
4 | |
5 import java.rmi.MarshalledObject; | |
6 | |
7 import java.util.List; | |
8 import java.util.ArrayList; | |
9 | |
10 import java.util.logging.*; | |
11 | |
12 import net.jini.core.event.RemoteEventListener; | |
13 | |
14 import net.jini.core.transaction.Transaction; | |
15 import net.jini.core.transaction.UnknownTransactionException; | |
16 import net.jini.core.transaction.TransactionException; | |
17 | |
18 import org.dancres.blitz.mangler.MangledEntry; | |
19 | |
20 import org.dancres.blitz.entry.EntryRepositoryFactory; | |
21 import org.dancres.blitz.entry.EntryRepository; | |
22 import org.dancres.blitz.entry.OpInfo; | |
23 | |
24 import org.dancres.blitz.disk.Disk; | |
25 import org.dancres.blitz.disk.DiskTxn; | |
26 | |
27 import org.dancres.blitz.oid.OID; | |
28 | |
29 import org.dancres.blitz.util.Time; | |
30 | |
31 import org.dancres.blitz.task.Tasks; | |
32 | |
33 import org.dancres.blitz.notify.EventQueue; | |
34 import org.dancres.blitz.notify.QueueEvent; | |
35 | |
36 import org.dancres.blitz.lease.SpaceUID; | |
37 import org.dancres.blitz.lease.LeaseBounds; | |
38 | |
39 import org.dancres.blitz.txn.TxnManager; | |
40 import org.dancres.blitz.txn.TxnState; | |
41 import org.dancres.blitz.txn.TxnGateway; | |
42 | |
43 import org.dancres.blitz.config.ConfigurationFactory; | |
44 import org.dancres.blitz.config.Fifo; | |
45 | |
46 import org.dancres.blitz.stats.StatsBoard; | |
47 import org.dancres.blitz.stats.Stat; | |
48 | |
49 /** | |
50 <p>The core back-end implementation of a space. </p> | |
51 | |
52 <p>One of the more subtle responsibilities of this class is that it handles | |
53 the negotiation of lease durations. This has implications for remote layers | |
54 etc. which should expect to pass down an unadulterated lease duration | |
55 (including <code>Lease.FOREVER</code> and <code>Lease.ANY</code>) and have | |
56 the space return the actual resultant lease time. This is true for | |
57 both initial writes of entry's or notify registrations and future | |
58 renews.</p> | |
59 | |
60 <p>Another responsibility of this class is to ensure that it only | |
61 returns <code>SpaceUID</code>s which are space-global unique identifiers. | |
62 There are a number of different resources managed by the space core which | |
63 each have their own locally unique identifiers. Thus the core wraps these | |
64 in appropriate <code>SpaceUID</code> implementations before returning | |
65 them.</p> | |
66 | |
67 <p>The various <code>SpaceUID</code> implementations can be used as the | |
68 target of lease renewal and cancel operations. Each implementation, | |
69 typically has it's own <code>LeaseHandler</code> implementation which is | |
70 registered with <code>LeaseHandlers</code> and has the renewal/cancel | |
71 operations delegated to it.</p> | |
72 | |
73 <p><code>Entry</code>s are managed by EntryRepository instances.</p> | |
74 | |
75 <p><code>notify</code>s are delegated to EventQueue.</p> | |
76 | |
77 @see org.dancres.blitz.lease.SpaceUID | |
78 @see org.dancres.blitz.lease.LeaseHandler | |
79 @see org.dancres.blitz.lease.LeaseHandlers | |
80 @see org.dancres.blitz.entry.EntryRepository | |
81 @see org.dancres.blitz.notify.EventQueue | |
82 */ | |
83 public class SpaceImpl { | |
84 | |
85 static final Logger theLogger = | |
86 Logging.newLogger("org.dancres.blitz.SpaceImpl", Level.INFO); | |
87 | |
88 private TxnControl theTxnController = new TxnControlImpl(); | |
89 private LeaseControl theLeaseController = new LeaseControlImpl(); | |
90 | |
91 private long theStartTime; | |
92 | |
93 private boolean isSynchronousNotify = false; | |
94 | |
95 public SpaceImpl(TxnGateway aGateway) throws Exception { | |
96 VersionInfo.dump(); | |
97 | |
98 theStartTime = System.currentTimeMillis(); | |
99 | |
100 long myDebugCycle = | |
101 ((Long) | |
102 ConfigurationFactory.getEntry("statsDump", | |
103 long.class, | |
104 new Long(0))).longValue(); | |
105 | |
106 isSynchronousNotify = ((Boolean) | |
107 ConfigurationFactory.getEntry("syncNotifyOnWrite", | |
108 boolean.class, | |
109 new Boolean(false))).booleanValue(); | |
110 | |
111 theLogger.info("Synchrounous Notifies: " + isSynchronousNotify); | |
112 | |
113 StatsDumper.start(myDebugCycle); | |
114 | |
115 TxnManager.init(aGateway); | |
116 | |
117 // Make sure EventQueue is hooked up | |
118 EventQueue.get(); | |
119 | |
120 // Activate threads etc. only after TxnManager is initialised so | |
121 // state has been recovered/is stable. | |
122 // | |
123 ActiveObjectRegistry.startAll(); | |
124 | |
125 // HACK: Ensure root repository is loaded because that publishes | |
126 // the known types stats (we shouldn't know this!) | |
127 EntryRepositoryFactory.get().get(EntryRepository.ROOT_TYPE); | |
128 } | |
129 | |
130 public WriteTicket write(MangledEntry anEntry, Transaction aTxn, | |
131 long aLeaseTime) | |
132 throws IOException, TransactionException { | |
133 | |
134 TxnState myJiniTxn = TxnManager.get().resolve(aTxn); | |
135 | |
136 long myLeaseTime = | |
137 Time.getAbsoluteTime(LeaseBounds.boundWrite(aLeaseTime)); | |
138 | |
139 EntryRepository myRepos = | |
140 EntryRepositoryFactory.get().get(anEntry.getType()); | |
141 | |
142 if (myRepos.noSchemaDefined()) { | |
143 DiskTxn myTxn = DiskTxn.newTxn(); | |
144 | |
145 myRepos.setFields(anEntry.getFields()); | |
146 | |
147 /* | |
148 Update parent repositories - each parent needs to know about | |
149 this subtype | |
150 */ | |
151 String[] myParents = anEntry.tearOffParents(); | |
152 | |
153 for (int i = 0; i < myParents.length; i++) { | |
154 EntryRepository myParentRepos = | |
155 EntryRepositoryFactory.get().get(myParents[i]); | |
156 myParentRepos.addSubtype(anEntry.getType()); | |
157 } | |
158 | |
159 myTxn.commit(); | |
160 } | |
161 | |
162 /* | |
163 Invoke write on the repository and then bundle up the OID into | |
164 an appropriate Lease form? | |
165 */ | |
166 OID myOID; | |
167 | |
168 WriteEscortImpl myEscort = new WriteEscortImpl(myJiniTxn); | |
169 | |
170 myRepos.write(anEntry, myLeaseTime, myEscort); | |
171 | |
172 OpInfo myResult = myEscort.getInfo(); | |
173 | |
174 myOID = myResult.getOID(); | |
175 | |
176 /* | |
177 Post an event for notifies and blockers | |
178 */ | |
179 QueueEvent myEvent = | |
180 new QueueEvent(QueueEvent.ENTRY_WRITE, | |
181 myJiniTxn, | |
182 new QueueEvent.Context(anEntry, myOID)); | |
183 | |
184 if (isSynchronousNotify) | |
185 EventQueue.get().add(myEvent, true); | |
186 else | |
187 EventQueue.get().add(myEvent); | |
188 | |
189 // For null transactions we must issue the commit | |
190 if (myJiniTxn.isNull()) | |
191 TxnManager.get().prepareAndCommit(myJiniTxn); | |
192 | |
193 SpaceUID mySUID = new SpaceEntryUID(anEntry.getType(), myOID); | |
194 return new WriteTicketImpl(mySUID, myLeaseTime); | |
195 } | |
196 | |
197 private MangledEntry find(MangledEntry anEntry, Transaction aTxn, | |
198 long aWaitTime, boolean doTake, boolean ifExists) | |
199 throws IOException, TransactionException { | |
200 | |
201 MangledEntry myEntry = null; | |
202 | |
203 TxnState myJiniTxn = TxnManager.get().resolve(aTxn); | |
204 | |
205 EntryRepository myRepos = | |
206 EntryRepositoryFactory.get().find(anEntry.getType()); | |
207 | |
208 SingleMatchTask myTask; | |
209 VisitorBaulkedPartyFactory myFactory; | |
210 | |
211 if (ifExists) | |
212 myFactory = new ExistsFactory(); | |
213 else | |
214 myFactory = new SearchFactory(); | |
215 | |
216 if ((myRepos == null) || | |
217 (myRepos.getConstraints().get(Fifo.class) == null)) { | |
218 | |
219 /* | |
220 If repos doesn't yet exist, we're running on a first come | |
221 first served basis - it only makes sense to perform enforce | |
222 fifo when we've accrued some live Entry's in storage where | |
223 we wish to select those ahead of incoming matches. | |
224 | |
225 In terms of priority for incoming matches, SearchTasks natural | |
226 order will ensure that the oldest blocking task gets the incoming | |
227 matches before anyone else | |
228 */ | |
229 myTask = new SearchVisitorImpl(anEntry, doTake, | |
230 myJiniTxn, myFactory); | |
231 } else { | |
232 myTask = new FifoSearchVisitorImpl(anEntry, doTake, | |
233 myJiniTxn, myFactory); | |
234 } | |
235 | |
236 if (myRepos != null) { | |
237 myRepos.find(anEntry, myTask.getVisitor()); | |
238 | |
239 if (myTask.wouldBlock()) { | |
240 | |
241 // Try subtypes | |
242 String[] mySubtypes = myRepos.getSubtypes(); | |
243 | |
244 for (int i = 0; i < mySubtypes.length; i++) { | |
245 myRepos = EntryRepositoryFactory.get().find(mySubtypes[i]); | |
246 | |
247 if (myRepos != null) { | |
248 myRepos.find(anEntry, myTask.getVisitor()); | |
249 | |
250 // If we got a match from the search (or it could | |
251 // have come from a recent write) | |
252 if (!myTask.wouldBlock()) | |
253 break; | |
254 } | |
255 } | |
256 } | |
257 } else { | |
258 // Don't bother as the only sensible thing to do is wait | |
259 // which will happen below | |
260 } | |
261 | |
262 try { | |
263 // Result waiting? | |
264 if (!myTask.wouldBlock()) | |
265 myEntry = myTask.getEntry(0); | |
266 else { | |
267 // Will optionally force early exit if there were no conflicts | |
268 myFactory.enableResolutionSignal(); | |
269 | |
270 myEntry = | |
271 myTask.getEntry(Time.getWaitTime(aWaitTime)); | |
272 } | |
273 } catch (InterruptedException anIE) { | |
274 throw new TransactionException("Search interrupted"); | |
275 } catch (TransactionException aTE) { | |
276 if (myJiniTxn.isNull()) | |
277 TxnManager.get().abort(myJiniTxn); | |
278 | |
279 throw aTE; | |
280 } | |
281 | |
282 // If we started the txn we MUST finish it, entry or not | |
283 if (myJiniTxn.isNull()) { | |
284 TxnManager.get().prepareAndCommit(myJiniTxn); | |
285 } | |
286 | |
287 return myEntry; | |
288 } | |
289 | |
290 public MangledEntry take(MangledEntry anEntry, Transaction aTxn, | |
291 long aWaitTime) | |
292 throws IOException, TransactionException { | |
293 | |
294 return find(anEntry, aTxn, aWaitTime, true, false); | |
295 } | |
296 | |
297 public MangledEntry read(MangledEntry anEntry, Transaction aTxn, | |
298 long aWaitTime) | |
299 throws IOException, TransactionException { | |
300 | |
301 return find(anEntry, aTxn, aWaitTime, false, false); | |
302 } | |
303 | |
304 public MangledEntry takeIfExists(MangledEntry anEntry, Transaction aTxn, | |
305 long aWaitTime) | |
306 throws IOException, TransactionException { | |
307 | |
308 return find(anEntry, aTxn, aWaitTime, true, true); | |
309 } | |
310 | |
311 public MangledEntry readIfExists(MangledEntry anEntry, Transaction aTxn, | |
312 long aWaitTime) | |
313 throws IOException, TransactionException { | |
314 | |
315 return find(anEntry, aTxn, aWaitTime, false, true); | |
316 } | |
317 | |
318 public RegTicket notify(MangledEntry aTemplate, Transaction aTxn, | |
319 RemoteEventListener aListener, long aLeaseTime, | |
320 MarshalledObject aHandback) | |
321 throws IOException, TransactionException { | |
322 | |
323 TxnState myState = null; | |
324 | |
325 if (aTxn != null) { | |
326 try { | |
327 myState = TxnManager.get().getTxnFor(aTxn, false); | |
328 } catch (UnknownTransactionException aUTE) { | |
329 throw new TransactionException(); | |
330 } | |
331 } | |
332 | |
333 long myLeaseTime = | |
334 Time.getAbsoluteTime(LeaseBounds.boundNotify(aLeaseTime)); | |
335 | |
336 RegTicketImpl myTicket = | |
337 new RegTicketImpl(myLeaseTime); | |
338 | |
339 EventQueue.get().register(aTemplate, myState, aListener, | |
340 myLeaseTime, aHandback, myTicket); | |
341 return myTicket; | |
342 } | |
343 | |
344 public RegTicket visibility(MangledEntry[] aTemplates, Transaction aTxn, | |
345 RemoteEventListener aListener, long aLeaseTime, | |
346 MarshalledObject aHandback, boolean visibleOnly) | |
347 throws IOException, TransactionException { | |
348 | |
349 TxnState myState = null; | |
350 | |
351 if (aTxn != null) { | |
352 try { | |
353 myState = TxnManager.get().getTxnFor(aTxn, false); | |
354 } catch (UnknownTransactionException aUTE) { | |
355 throw new TransactionException(); | |
356 } | |
357 } | |
358 | |
359 long myLeaseTime = | |
360 Time.getAbsoluteTime(LeaseBounds.boundNotify(aLeaseTime)); | |
361 | |
362 RegTicketImpl myTicket = | |
363 new RegTicketImpl(myLeaseTime); | |
364 | |
365 EventQueue.get().registerVisibility(aTemplates, myState, aListener, | |
366 myLeaseTime, aHandback, myTicket, | |
367 visibleOnly); | |
368 return myTicket; | |
369 } | |
370 | |
371 /** | |
372 * Call this method to obtain a collection of matches available within the | |
373 * currently active instance against the specified templates. | |
374 * | |
375 * @param holdLocks if <code>true</code> indicates that read locks should | |
376 * be held against the specified transaction rather than just tested | |
377 * @param shouldUpdate if <code>true</code> will cause this view to be | |
378 * dynamically updated with new writes after initial scan of contents | |
379 * @param aMax is the maximum number of Entry's to return | |
380 */ | |
381 public EntryView getView(MangledEntry[] aTemplates, Transaction aTxn, | |
382 boolean holdLocks, boolean shouldUpdate, long aMax) | |
383 throws IOException, TransactionException { | |
384 | |
385 return new EntryViewImpl(aTxn, aTemplates, holdLocks, | |
386 shouldUpdate, aMax); | |
387 } | |
388 | |
389 /** | |
390 * Call this method to obtain a collection of matches available within the | |
391 * currently active instance against the specified templates. The view | |
392 * returned will be dynamically updated. | |
393 * | |
394 * @param holdLocks if <code>true</code> indicates that read locks should | |
395 * be held against the specified transaction rather than just tested | |
396 * @param aMax is the maximum number of Entry's to return | |
397 * | |
398 */ | |
399 public EntryView getView(MangledEntry[] aTemplates, Transaction aTxn, | |
400 boolean holdLocks, long aMax) | |
401 throws IOException, TransactionException { | |
402 | |
403 return new EntryViewImpl(aTxn, aTemplates, holdLocks, true, aMax); | |
404 } | |
405 | |
406 public List write(List aMangledEntries, | |
407 Transaction aTxn, | |
408 List aLeaseTimes) | |
409 throws IOException, TransactionException { | |
410 | |
411 ArrayList myTickets = new ArrayList(); | |
412 | |
413 TxnState myJiniTxn = TxnManager.get().resolve(aTxn); | |
414 | |
415 WriteEscortImpl myEscort = new WriteEscortImpl(myJiniTxn); | |
416 | |
417 int myLast = aMangledEntries.size() - 1; | |
418 | |
419 for (int i = 0; i < aMangledEntries.size(); i++) { | |
420 MangledEntry myEntry = (MangledEntry) aMangledEntries.get(i); | |
421 long myLongLease = ((Long) aLeaseTimes.get(i)).longValue(); | |
422 | |
423 EntryRepository myRepos = | |
424 EntryRepositoryFactory.get().get(myEntry.getType()); | |
425 | |
426 if (myRepos.noSchemaDefined()) { | |
427 DiskTxn myTxn = DiskTxn.newTxn(); | |
428 | |
429 myRepos.setFields(myEntry.getFields()); | |
430 | |
431 /* | |
432 Update parent repositories - each parent needs to know about | |
433 this subtype | |
434 */ | |
435 String[] myParents = myEntry.tearOffParents(); | |
436 | |
437 for (int j = 0; j < myParents.length; j++) { | |
438 EntryRepository myParentRepos = | |
439 EntryRepositoryFactory.get().get(myParents[j]); | |
440 myParentRepos.addSubtype(myEntry.getType()); | |
441 } | |
442 | |
443 myTxn.commit(); | |
444 } | |
445 | |
446 /* | |
447 Invoke write on the repository and then bundle up the OID into | |
448 an appropriate Lease form? | |
449 */ | |
450 OID myOID; | |
451 | |
452 long myLeaseTime = | |
453 Time.getAbsoluteTime(LeaseBounds.boundWrite(myLongLease)); | |
454 | |
455 myRepos.write(myEntry, myLeaseTime, myEscort); | |
456 | |
457 OpInfo myResult = myEscort.getInfo(); | |
458 | |
459 myOID = myResult.getOID(); | |
460 | |
461 /* | |
462 Post an event for notifies and blockers | |
463 */ | |
464 QueueEvent myEvent = | |
465 new QueueEvent(QueueEvent.ENTRY_WRITE, | |
466 myJiniTxn, | |
467 new QueueEvent.Context(myEntry, myOID)); | |
468 | |
469 /* | |
470 Only need to post synchronously on the last write | |
471 */ | |
472 if ((isSynchronousNotify) && (i == myLast)) | |
473 EventQueue.get().add(myEvent, true); | |
474 else | |
475 EventQueue.get().add(myEvent); | |
476 | |
477 SpaceUID mySUID = new SpaceEntryUID(myEntry.getType(), myOID); | |
478 myTickets.add(new WriteTicketImpl(mySUID, myLeaseTime)); | |
479 } | |
480 | |
481 if (myJiniTxn.isNull()) { | |
482 TxnManager.get().prepareAndCommit(myJiniTxn); | |
483 } | |
484 | |
485 return myTickets; | |
486 } | |
487 | |
488 public List take(MangledEntry[] aTemplates, | |
489 Transaction aTxn, | |
490 long aWaitTime, | |
491 long aLimit) | |
492 throws TransactionException, IOException { | |
493 | |
494 TxnState myJiniTxn = TxnManager.get().resolve(aTxn); | |
495 | |
496 BulkMatchTask myVisitor = | |
497 new BulkTakeVisitor(aTemplates, myJiniTxn, aLimit, | |
498 new SearchFactory()); | |
499 | |
500 EntryRepository myRepos; | |
501 | |
502 // For each template | |
503 for (int i = 0; i < aTemplates.length; i++) { | |
504 myRepos = | |
505 EntryRepositoryFactory.get().find(aTemplates[i].getType()); | |
506 | |
507 // If we have a repository rooted at the type | |
508 if (myRepos != null) { | |
509 myRepos.find(aTemplates[i], myVisitor.getVisitor()); | |
510 | |
511 // Did we satisfy the visitor already? | |
512 if (myVisitor.wantsMore()) { | |
513 | |
514 // If not, try subtypes of the current type | |
515 String[] mySubtypes = myRepos.getSubtypes(); | |
516 | |
517 for (int j = 0; j < mySubtypes.length; j++) { | |
518 myRepos = | |
519 EntryRepositoryFactory.get().find(mySubtypes[j]); | |
520 | |
521 if (myRepos != null) { | |
522 myRepos.find(aTemplates[i], myVisitor.getVisitor()); | |
523 | |
524 if (!myVisitor.wantsMore()) | |
525 break; | |
526 } | |
527 } | |
528 } | |
529 } | |
530 | |
531 if (!myVisitor.wantsMore()) | |
532 break; | |
533 } | |
534 | |
535 try { | |
536 if (!myVisitor.wouldBlock()) | |
537 return myVisitor.getEntries(0); | |
538 else { | |
539 return myVisitor.getEntries(Time.getWaitTime(aWaitTime)); | |
540 } | |
541 } catch (InterruptedException anIE) { | |
542 throw new TransactionException("Search interrupted"); | |
543 } finally { | |
544 if (myJiniTxn.isNull()) | |
545 TxnManager.get().prepareAndCommit(myJiniTxn); | |
546 } | |
547 } | |
548 | |
549 public LeaseControl getLeaseControl() { | |
550 return theLeaseController; | |
551 } | |
552 | |
553 public TxnControl getTxnControl() { | |
554 return theTxnController; | |
555 } | |
556 | |
557 public void stop() throws Exception { | |
558 ActiveObjectRegistry.stopAll(); | |
559 | |
560 Disk.sync(); | |
561 | |
562 Disk.stop(); | |
563 | |
564 theLogger.log(Level.INFO, "Dumping stats"); | |
565 Stat[] myStats = StatsBoard.get().getStats(); | |
566 for (int i = 0; i < myStats.length; i++) { | |
567 theLogger.log(Level.INFO, myStats[i].getId() + ", " + myStats[i]); | |
568 } | |
569 | |
570 theLogger.log(Level.INFO, "Blitz core halted after: " + | |
571 (System.currentTimeMillis() - theStartTime) + | |
572 " ms"); | |
573 } | |
574 | |
575 /** | |
576 Clear out all Entry's including schema information. | |
577 This is necessarily very destructive as it aborts operations and | |
578 open transactions and deletes a lot of underlying database state. | |
579 | |
580 @todo Need to replace the use of SearchTasks.destroy() - should be | |
581 possible via the transaction abort all route - need to check | |
582 */ | |
583 public void empty() throws IOException { | |
584 // Kill outstanding search tasks because we plan to drop schema | |
585 // and thus their templates are rendered invalid | |
586 // SearchTasks.get().destroy(); | |
587 | |
588 // Abort all transactions to release locks | |
589 TxnManager.get().abortAll(); | |
590 | |
591 EntryRepositoryFactory.get().deleteAllEntrys(); | |
592 | |
593 // Checkpoint | |
594 TxnManager.get().requestSyncCheckpoint(); | |
595 | |
596 EntryRepositoryFactory.get().deleteAllRepos(); | |
597 } | |
598 | |
599 /** | |
600 Triggers a manual reap in a new thread | |
601 */ | |
602 public void reap() { | |
603 Thread myReaper = | |
604 new Thread(new Runnable() { | |
605 public void run() { | |
606 EntryRepositoryFactory.reap(); | |
607 } | |
608 }); | |
609 | |
610 myReaper.start(); | |
611 } | |
612 } |