comparison src/org/dancres/blitz/SpaceImpl.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:3dc0c5604566
1 package org.dancres.blitz;
2
3 import java.io.IOException;
4
5 import java.rmi.MarshalledObject;
6
7 import java.util.List;
8 import java.util.ArrayList;
9
10 import java.util.logging.*;
11
12 import net.jini.core.event.RemoteEventListener;
13
14 import net.jini.core.transaction.Transaction;
15 import net.jini.core.transaction.UnknownTransactionException;
16 import net.jini.core.transaction.TransactionException;
17
18 import org.dancres.blitz.mangler.MangledEntry;
19
20 import org.dancres.blitz.entry.EntryRepositoryFactory;
21 import org.dancres.blitz.entry.EntryRepository;
22 import org.dancres.blitz.entry.OpInfo;
23
24 import org.dancres.blitz.disk.Disk;
25 import org.dancres.blitz.disk.DiskTxn;
26
27 import org.dancres.blitz.oid.OID;
28
29 import org.dancres.blitz.util.Time;
30
31 import org.dancres.blitz.task.Tasks;
32
33 import org.dancres.blitz.notify.EventQueue;
34 import org.dancres.blitz.notify.QueueEvent;
35
36 import org.dancres.blitz.lease.SpaceUID;
37 import org.dancres.blitz.lease.LeaseBounds;
38
39 import org.dancres.blitz.txn.TxnManager;
40 import org.dancres.blitz.txn.TxnState;
41 import org.dancres.blitz.txn.TxnGateway;
42
43 import org.dancres.blitz.config.ConfigurationFactory;
44 import org.dancres.blitz.config.Fifo;
45
46 import org.dancres.blitz.stats.StatsBoard;
47 import org.dancres.blitz.stats.Stat;
48
49 /**
50 <p>The core back-end implementation of a space. </p>
51
52 <p>One of the more subtle responsibilities of this class is that it handles
53 the negotiation of lease durations. This has implications for remote layers
54 etc. which should expect to pass down an unadulterated lease duration
55 (including <code>Lease.FOREVER</code> and <code>Lease.ANY</code>) and have
56 the space return the actual resultant lease time. This is true for
57 both initial writes of entry's or notify registrations and future
58 renews.</p>
59
60 <p>Another responsibility of this class is to ensure that it only
61 returns <code>SpaceUID</code>s which are space-global unique identifiers.
62 There are a number of different resources managed by the space core which
63 each have their own locally unique identifiers. Thus the core wraps these
64 in appropriate <code>SpaceUID</code> implementations before returning
65 them.</p>
66
67 <p>The various <code>SpaceUID</code> implementations can be used as the
68 target of lease renewal and cancel operations. Each implementation,
69 typically has it's own <code>LeaseHandler</code> implementation which is
70 registered with <code>LeaseHandlers</code> and has the renewal/cancel
71 operations delegated to it.</p>
72
73 <p><code>Entry</code>s are managed by EntryRepository instances.</p>
74
75 <p><code>notify</code>s are delegated to EventQueue.</p>
76
77 @see org.dancres.blitz.lease.SpaceUID
78 @see org.dancres.blitz.lease.LeaseHandler
79 @see org.dancres.blitz.lease.LeaseHandlers
80 @see org.dancres.blitz.entry.EntryRepository
81 @see org.dancres.blitz.notify.EventQueue
82 */
83 public class SpaceImpl {
84
85 static final Logger theLogger =
86 Logging.newLogger("org.dancres.blitz.SpaceImpl", Level.INFO);
87
88 private TxnControl theTxnController = new TxnControlImpl();
89 private LeaseControl theLeaseController = new LeaseControlImpl();
90
91 private long theStartTime;
92
93 private boolean isSynchronousNotify = false;
94
95 public SpaceImpl(TxnGateway aGateway) throws Exception {
96 VersionInfo.dump();
97
98 theStartTime = System.currentTimeMillis();
99
100 long myDebugCycle =
101 ((Long)
102 ConfigurationFactory.getEntry("statsDump",
103 long.class,
104 new Long(0))).longValue();
105
106 isSynchronousNotify = ((Boolean)
107 ConfigurationFactory.getEntry("syncNotifyOnWrite",
108 boolean.class,
109 new Boolean(false))).booleanValue();
110
111 theLogger.info("Synchrounous Notifies: " + isSynchronousNotify);
112
113 StatsDumper.start(myDebugCycle);
114
115 TxnManager.init(aGateway);
116
117 // Make sure EventQueue is hooked up
118 EventQueue.get();
119
120 // Activate threads etc. only after TxnManager is initialised so
121 // state has been recovered/is stable.
122 //
123 ActiveObjectRegistry.startAll();
124
125 // HACK: Ensure root repository is loaded because that publishes
126 // the known types stats (we shouldn't know this!)
127 EntryRepositoryFactory.get().get(EntryRepository.ROOT_TYPE);
128 }
129
130 public WriteTicket write(MangledEntry anEntry, Transaction aTxn,
131 long aLeaseTime)
132 throws IOException, TransactionException {
133
134 TxnState myJiniTxn = TxnManager.get().resolve(aTxn);
135
136 long myLeaseTime =
137 Time.getAbsoluteTime(LeaseBounds.boundWrite(aLeaseTime));
138
139 EntryRepository myRepos =
140 EntryRepositoryFactory.get().get(anEntry.getType());
141
142 if (myRepos.noSchemaDefined()) {
143 DiskTxn myTxn = DiskTxn.newTxn();
144
145 myRepos.setFields(anEntry.getFields());
146
147 /*
148 Update parent repositories - each parent needs to know about
149 this subtype
150 */
151 String[] myParents = anEntry.tearOffParents();
152
153 for (int i = 0; i < myParents.length; i++) {
154 EntryRepository myParentRepos =
155 EntryRepositoryFactory.get().get(myParents[i]);
156 myParentRepos.addSubtype(anEntry.getType());
157 }
158
159 myTxn.commit();
160 }
161
162 /*
163 Invoke write on the repository and then bundle up the OID into
164 an appropriate Lease form?
165 */
166 OID myOID;
167
168 WriteEscortImpl myEscort = new WriteEscortImpl(myJiniTxn);
169
170 myRepos.write(anEntry, myLeaseTime, myEscort);
171
172 OpInfo myResult = myEscort.getInfo();
173
174 myOID = myResult.getOID();
175
176 /*
177 Post an event for notifies and blockers
178 */
179 QueueEvent myEvent =
180 new QueueEvent(QueueEvent.ENTRY_WRITE,
181 myJiniTxn,
182 new QueueEvent.Context(anEntry, myOID));
183
184 if (isSynchronousNotify)
185 EventQueue.get().add(myEvent, true);
186 else
187 EventQueue.get().add(myEvent);
188
189 // For null transactions we must issue the commit
190 if (myJiniTxn.isNull())
191 TxnManager.get().prepareAndCommit(myJiniTxn);
192
193 SpaceUID mySUID = new SpaceEntryUID(anEntry.getType(), myOID);
194 return new WriteTicketImpl(mySUID, myLeaseTime);
195 }
196
197 private MangledEntry find(MangledEntry anEntry, Transaction aTxn,
198 long aWaitTime, boolean doTake, boolean ifExists)
199 throws IOException, TransactionException {
200
201 MangledEntry myEntry = null;
202
203 TxnState myJiniTxn = TxnManager.get().resolve(aTxn);
204
205 EntryRepository myRepos =
206 EntryRepositoryFactory.get().find(anEntry.getType());
207
208 SingleMatchTask myTask;
209 VisitorBaulkedPartyFactory myFactory;
210
211 if (ifExists)
212 myFactory = new ExistsFactory();
213 else
214 myFactory = new SearchFactory();
215
216 if ((myRepos == null) ||
217 (myRepos.getConstraints().get(Fifo.class) == null)) {
218
219 /*
220 If repos doesn't yet exist, we're running on a first come
221 first served basis - it only makes sense to perform enforce
222 fifo when we've accrued some live Entry's in storage where
223 we wish to select those ahead of incoming matches.
224
225 In terms of priority for incoming matches, SearchTasks natural
226 order will ensure that the oldest blocking task gets the incoming
227 matches before anyone else
228 */
229 myTask = new SearchVisitorImpl(anEntry, doTake,
230 myJiniTxn, myFactory);
231 } else {
232 myTask = new FifoSearchVisitorImpl(anEntry, doTake,
233 myJiniTxn, myFactory);
234 }
235
236 if (myRepos != null) {
237 myRepos.find(anEntry, myTask.getVisitor());
238
239 if (myTask.wouldBlock()) {
240
241 // Try subtypes
242 String[] mySubtypes = myRepos.getSubtypes();
243
244 for (int i = 0; i < mySubtypes.length; i++) {
245 myRepos = EntryRepositoryFactory.get().find(mySubtypes[i]);
246
247 if (myRepos != null) {
248 myRepos.find(anEntry, myTask.getVisitor());
249
250 // If we got a match from the search (or it could
251 // have come from a recent write)
252 if (!myTask.wouldBlock())
253 break;
254 }
255 }
256 }
257 } else {
258 // Don't bother as the only sensible thing to do is wait
259 // which will happen below
260 }
261
262 try {
263 // Result waiting?
264 if (!myTask.wouldBlock())
265 myEntry = myTask.getEntry(0);
266 else {
267 // Will optionally force early exit if there were no conflicts
268 myFactory.enableResolutionSignal();
269
270 myEntry =
271 myTask.getEntry(Time.getWaitTime(aWaitTime));
272 }
273 } catch (InterruptedException anIE) {
274 throw new TransactionException("Search interrupted");
275 } catch (TransactionException aTE) {
276 if (myJiniTxn.isNull())
277 TxnManager.get().abort(myJiniTxn);
278
279 throw aTE;
280 }
281
282 // If we started the txn we MUST finish it, entry or not
283 if (myJiniTxn.isNull()) {
284 TxnManager.get().prepareAndCommit(myJiniTxn);
285 }
286
287 return myEntry;
288 }
289
290 public MangledEntry take(MangledEntry anEntry, Transaction aTxn,
291 long aWaitTime)
292 throws IOException, TransactionException {
293
294 return find(anEntry, aTxn, aWaitTime, true, false);
295 }
296
297 public MangledEntry read(MangledEntry anEntry, Transaction aTxn,
298 long aWaitTime)
299 throws IOException, TransactionException {
300
301 return find(anEntry, aTxn, aWaitTime, false, false);
302 }
303
304 public MangledEntry takeIfExists(MangledEntry anEntry, Transaction aTxn,
305 long aWaitTime)
306 throws IOException, TransactionException {
307
308 return find(anEntry, aTxn, aWaitTime, true, true);
309 }
310
311 public MangledEntry readIfExists(MangledEntry anEntry, Transaction aTxn,
312 long aWaitTime)
313 throws IOException, TransactionException {
314
315 return find(anEntry, aTxn, aWaitTime, false, true);
316 }
317
318 public RegTicket notify(MangledEntry aTemplate, Transaction aTxn,
319 RemoteEventListener aListener, long aLeaseTime,
320 MarshalledObject aHandback)
321 throws IOException, TransactionException {
322
323 TxnState myState = null;
324
325 if (aTxn != null) {
326 try {
327 myState = TxnManager.get().getTxnFor(aTxn, false);
328 } catch (UnknownTransactionException aUTE) {
329 throw new TransactionException();
330 }
331 }
332
333 long myLeaseTime =
334 Time.getAbsoluteTime(LeaseBounds.boundNotify(aLeaseTime));
335
336 RegTicketImpl myTicket =
337 new RegTicketImpl(myLeaseTime);
338
339 EventQueue.get().register(aTemplate, myState, aListener,
340 myLeaseTime, aHandback, myTicket);
341 return myTicket;
342 }
343
344 public RegTicket visibility(MangledEntry[] aTemplates, Transaction aTxn,
345 RemoteEventListener aListener, long aLeaseTime,
346 MarshalledObject aHandback, boolean visibleOnly)
347 throws IOException, TransactionException {
348
349 TxnState myState = null;
350
351 if (aTxn != null) {
352 try {
353 myState = TxnManager.get().getTxnFor(aTxn, false);
354 } catch (UnknownTransactionException aUTE) {
355 throw new TransactionException();
356 }
357 }
358
359 long myLeaseTime =
360 Time.getAbsoluteTime(LeaseBounds.boundNotify(aLeaseTime));
361
362 RegTicketImpl myTicket =
363 new RegTicketImpl(myLeaseTime);
364
365 EventQueue.get().registerVisibility(aTemplates, myState, aListener,
366 myLeaseTime, aHandback, myTicket,
367 visibleOnly);
368 return myTicket;
369 }
370
371 /**
372 * Call this method to obtain a collection of matches available within the
373 * currently active instance against the specified templates.
374 *
375 * @param holdLocks if <code>true</code> indicates that read locks should
376 * be held against the specified transaction rather than just tested
377 * @param shouldUpdate if <code>true</code> will cause this view to be
378 * dynamically updated with new writes after initial scan of contents
379 * @param aMax is the maximum number of Entry's to return
380 */
381 public EntryView getView(MangledEntry[] aTemplates, Transaction aTxn,
382 boolean holdLocks, boolean shouldUpdate, long aMax)
383 throws IOException, TransactionException {
384
385 return new EntryViewImpl(aTxn, aTemplates, holdLocks,
386 shouldUpdate, aMax);
387 }
388
389 /**
390 * Call this method to obtain a collection of matches available within the
391 * currently active instance against the specified templates. The view
392 * returned will be dynamically updated.
393 *
394 * @param holdLocks if <code>true</code> indicates that read locks should
395 * be held against the specified transaction rather than just tested
396 * @param aMax is the maximum number of Entry's to return
397 *
398 */
399 public EntryView getView(MangledEntry[] aTemplates, Transaction aTxn,
400 boolean holdLocks, long aMax)
401 throws IOException, TransactionException {
402
403 return new EntryViewImpl(aTxn, aTemplates, holdLocks, true, aMax);
404 }
405
406 public List write(List aMangledEntries,
407 Transaction aTxn,
408 List aLeaseTimes)
409 throws IOException, TransactionException {
410
411 ArrayList myTickets = new ArrayList();
412
413 TxnState myJiniTxn = TxnManager.get().resolve(aTxn);
414
415 WriteEscortImpl myEscort = new WriteEscortImpl(myJiniTxn);
416
417 int myLast = aMangledEntries.size() - 1;
418
419 for (int i = 0; i < aMangledEntries.size(); i++) {
420 MangledEntry myEntry = (MangledEntry) aMangledEntries.get(i);
421 long myLongLease = ((Long) aLeaseTimes.get(i)).longValue();
422
423 EntryRepository myRepos =
424 EntryRepositoryFactory.get().get(myEntry.getType());
425
426 if (myRepos.noSchemaDefined()) {
427 DiskTxn myTxn = DiskTxn.newTxn();
428
429 myRepos.setFields(myEntry.getFields());
430
431 /*
432 Update parent repositories - each parent needs to know about
433 this subtype
434 */
435 String[] myParents = myEntry.tearOffParents();
436
437 for (int j = 0; j < myParents.length; j++) {
438 EntryRepository myParentRepos =
439 EntryRepositoryFactory.get().get(myParents[j]);
440 myParentRepos.addSubtype(myEntry.getType());
441 }
442
443 myTxn.commit();
444 }
445
446 /*
447 Invoke write on the repository and then bundle up the OID into
448 an appropriate Lease form?
449 */
450 OID myOID;
451
452 long myLeaseTime =
453 Time.getAbsoluteTime(LeaseBounds.boundWrite(myLongLease));
454
455 myRepos.write(myEntry, myLeaseTime, myEscort);
456
457 OpInfo myResult = myEscort.getInfo();
458
459 myOID = myResult.getOID();
460
461 /*
462 Post an event for notifies and blockers
463 */
464 QueueEvent myEvent =
465 new QueueEvent(QueueEvent.ENTRY_WRITE,
466 myJiniTxn,
467 new QueueEvent.Context(myEntry, myOID));
468
469 /*
470 Only need to post synchronously on the last write
471 */
472 if ((isSynchronousNotify) && (i == myLast))
473 EventQueue.get().add(myEvent, true);
474 else
475 EventQueue.get().add(myEvent);
476
477 SpaceUID mySUID = new SpaceEntryUID(myEntry.getType(), myOID);
478 myTickets.add(new WriteTicketImpl(mySUID, myLeaseTime));
479 }
480
481 if (myJiniTxn.isNull()) {
482 TxnManager.get().prepareAndCommit(myJiniTxn);
483 }
484
485 return myTickets;
486 }
487
488 public List take(MangledEntry[] aTemplates,
489 Transaction aTxn,
490 long aWaitTime,
491 long aLimit)
492 throws TransactionException, IOException {
493
494 TxnState myJiniTxn = TxnManager.get().resolve(aTxn);
495
496 BulkMatchTask myVisitor =
497 new BulkTakeVisitor(aTemplates, myJiniTxn, aLimit,
498 new SearchFactory());
499
500 EntryRepository myRepos;
501
502 // For each template
503 for (int i = 0; i < aTemplates.length; i++) {
504 myRepos =
505 EntryRepositoryFactory.get().find(aTemplates[i].getType());
506
507 // If we have a repository rooted at the type
508 if (myRepos != null) {
509 myRepos.find(aTemplates[i], myVisitor.getVisitor());
510
511 // Did we satisfy the visitor already?
512 if (myVisitor.wantsMore()) {
513
514 // If not, try subtypes of the current type
515 String[] mySubtypes = myRepos.getSubtypes();
516
517 for (int j = 0; j < mySubtypes.length; j++) {
518 myRepos =
519 EntryRepositoryFactory.get().find(mySubtypes[j]);
520
521 if (myRepos != null) {
522 myRepos.find(aTemplates[i], myVisitor.getVisitor());
523
524 if (!myVisitor.wantsMore())
525 break;
526 }
527 }
528 }
529 }
530
531 if (!myVisitor.wantsMore())
532 break;
533 }
534
535 try {
536 if (!myVisitor.wouldBlock())
537 return myVisitor.getEntries(0);
538 else {
539 return myVisitor.getEntries(Time.getWaitTime(aWaitTime));
540 }
541 } catch (InterruptedException anIE) {
542 throw new TransactionException("Search interrupted");
543 } finally {
544 if (myJiniTxn.isNull())
545 TxnManager.get().prepareAndCommit(myJiniTxn);
546 }
547 }
548
549 public LeaseControl getLeaseControl() {
550 return theLeaseController;
551 }
552
553 public TxnControl getTxnControl() {
554 return theTxnController;
555 }
556
557 public void stop() throws Exception {
558 ActiveObjectRegistry.stopAll();
559
560 Disk.sync();
561
562 Disk.stop();
563
564 theLogger.log(Level.INFO, "Dumping stats");
565 Stat[] myStats = StatsBoard.get().getStats();
566 for (int i = 0; i < myStats.length; i++) {
567 theLogger.log(Level.INFO, myStats[i].getId() + ", " + myStats[i]);
568 }
569
570 theLogger.log(Level.INFO, "Blitz core halted after: " +
571 (System.currentTimeMillis() - theStartTime) +
572 " ms");
573 }
574
575 /**
576 Clear out all Entry's including schema information.
577 This is necessarily very destructive as it aborts operations and
578 open transactions and deletes a lot of underlying database state.
579
580 @todo Need to replace the use of SearchTasks.destroy() - should be
581 possible via the transaction abort all route - need to check
582 */
583 public void empty() throws IOException {
584 // Kill outstanding search tasks because we plan to drop schema
585 // and thus their templates are rendered invalid
586 // SearchTasks.get().destroy();
587
588 // Abort all transactions to release locks
589 TxnManager.get().abortAll();
590
591 EntryRepositoryFactory.get().deleteAllEntrys();
592
593 // Checkpoint
594 TxnManager.get().requestSyncCheckpoint();
595
596 EntryRepositoryFactory.get().deleteAllRepos();
597 }
598
599 /**
600 Triggers a manual reap in a new thread
601 */
602 public void reap() {
603 Thread myReaper =
604 new Thread(new Runnable() {
605 public void run() {
606 EntryRepositoryFactory.reap();
607 }
608 });
609
610 myReaper.start();
611 }
612 }