comparison src/org/dancres/blitz/txn/TxnManager.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children 4580bb12db30
comparison
equal deleted inserted replaced
-1:000000000000 0:3dc0c5604566
1 package org.dancres.blitz.txn;
2
3 import java.io.IOException;
4 import java.io.Serializable;
5 import java.rmi.RemoteException;
6 import java.util.logging.Level;
7 import java.util.logging.Logger;
8
9 import net.jini.config.ConfigurationException;
10 import net.jini.core.transaction.Transaction;
11 import net.jini.core.transaction.TransactionException;
12 import net.jini.core.transaction.UnknownTransactionException;
13 import net.jini.core.transaction.server.ServerTransaction;
14 import net.jini.core.transaction.server.TransactionConstants;
15 import net.jini.core.transaction.server.TransactionManager;
16
17 import org.dancres.blitz.Logging;
18 import org.dancres.blitz.config.ConfigurationFactory;
19 import org.dancres.blitz.disk.Disk;
20 import org.dancres.blitz.stats.StatsBoard;
21 import org.dancres.blitz.task.Tasks;
22 import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
23 import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock;
24 import org.prevayler.Command;
25 import org.prevayler.SnapshotContributor;
26 import org.prevayler.implementation.SnapshotPrevayler;
27 import org.prevayler.implementation.Snapshotter;
28
29 /**
30 Responsible for tracking/managing transactions. This responsiblity is split
31 across two classes. TxnManager handles control aspects whilst
32 TxnManagerState tracks the transactional information. <P>
33
34 We make our lives a little easier by making null transactions look like a
35 normal transaction which keeps things consistent and clean bar an implied
36 prepare/commit cycle in the operation code of SpaceImpl. It also assists
37 us in that the transaction code can be largely tested without an external
38 transaction manager because a lot of it is exercised during the use of null
39 operations. <P>
40
41 The transaction subsystems logging and snapshot/checkpointing requirements
42 are handled by holding all state in TxnManagerState and making it a
43 <code>PrevalentSystem</code>. Snapshots are treated as the equivalent of
44 checkpoints and are triggered in a separate thread. <P>
45
46 Note that snapshots cannot be performed alongside the processing of commands
47 thus, we take a write on a readerswriter lock during snapshot and release it
48 after. All other operations are performed under a readlock. Note that the
49 write lock must be asserted BEFORE invoking snapshot to avoid deadlock.
50 The snapshot is only written to disk after Disk.sync completes.<P>
51
52 The actual workings are a little different from the norm with TxnManager
53 generating commands which act upon TxnManagerState. Certain methods don't
54 generate commands at all because they don't represent a true state change.
55 Typically these methods are related to introducing new initial state into
56 TxnManagerState such as a transaction which doesn't need to be made
57 durable (saved to log etc.) until it reaches the prepared state. <P>
58
59 Prepare records a durable record of the transaction's operations whilst
60 commit and abort result in those operations being applied. <P>
61
62 @see org.prevayler.PrevalentSystem
63 @see org.prevayler.Command
64 @see org.dancres.blitz.txn.TxnManagerState
65
66 @todo Micro-optimization - we don't need to write an abort record if
67 the txn isn't prepared - just clear it out!
68 */
69 public class TxnManager {
70
71 static Logger theLogger =
72 Logging.newLogger("org.dancres.blitz.TxnManager", Level.INFO);
73
74 private static boolean LOG_CKPTS;
75
76 static {
77 try {
78 LOG_CKPTS =
79 ((Boolean)
80 ConfigurationFactory.getEntry("logCkpts",
81 Boolean.class,
82 new Boolean(false))).booleanValue();
83 } catch (ConfigurationException aCE) {
84 theLogger.log(Level.SEVERE, "Couldn't load config", aCE);
85 }
86 }
87
88 private static TxnManager theManager;
89
90 private TxnManagerState theManagerState;
91
92 private ReadWriteLock theLock = new WriterPreferenceReadWriteLock();
93
94 private SnapshotPrevayler thePrevayler;
95
96 private CheckpointTrigger theCheckpointTrigger;
97
98 private long theCheckpointCount = 0;
99
100 private TxnGateway theGateway;
101
102 private boolean isRecovery = true;
103
104 public static synchronized void init(TxnGateway aGateway)
105 throws Exception {
106
107 if (theManager != null) {
108 return;
109 } else {
110 theManager = new TxnManager(aGateway);
111 theManager.recover();
112 }
113 }
114
115 public static synchronized TxnManager get() {
116 return theManager;
117 }
118
119 private TxnManager(TxnGateway aGateway) {
120 theGateway = aGateway;
121 }
122
123 private void recover() throws Exception {
124 StoragePersonality myPersonality =
125 StoragePersonalityFactory.getPersonality();
126
127 theLogger.log(Level.INFO, "Doing log recovery...");
128
129 thePrevayler = myPersonality.getPrevayler(new TxnManagerState());
130
131 theLogger.log(Level.INFO, "Log Recovery complete...");
132
133 theManagerState = (TxnManagerState) thePrevayler.system();
134
135 theCheckpointTrigger =
136 myPersonality.getCheckpointTrigger(new CheckpointerImpl(this));
137
138 StatsBoard.get().add(new TxnStatGenerator());
139
140 issueCheckpoint(true);
141
142 isRecovery = false;
143
144 // Startup the transaction pinger
145 new TxnPinger(theManagerState);
146 }
147
148 /**
149 @return <code>true</code> if the TransactionManager is performing
150 recovery. When we are performing recovery the log files cannot be
151 written to by user code (that is code outside of the transaction
152 package).
153 */
154 public boolean isRecovery() {
155 return isRecovery;
156 }
157
158 public void add(SnapshotContributor aContributor) {
159 theManagerState.add(aContributor);
160 }
161
162 public void remove(SnapshotContributor aContributor) {
163 theManagerState.remove(aContributor);
164 }
165
166 public TxnGateway getGateway() {
167 return theGateway;
168 }
169
170 /**
171 @return the number of transactions currently being processed. Includes
172 ACTIVE, PREPARED, COMMITTED and ABORTED (where the last two states may be
173 present depending on state with respect to log records etc).
174 */
175 int getActiveTxnCount() {
176 try {
177 int myActiveTxnCount;
178
179 theLock.readLock().acquire();
180
181 myActiveTxnCount = theManagerState.getNumActiveTxns();
182
183 theLock.readLock().release();
184
185 return myActiveTxnCount;
186
187 } catch (InterruptedException anIE) {
188 theLogger.log(Level.SEVERE, "Couldn't get active txn count",
189 anIE);
190
191 return -1;
192 }
193 }
194
195 public TxnState getTxnFor(TxnId anId)
196 throws UnknownTransactionException, RemoteException {
197 return theManagerState.getTxnFor(anId, theGateway, true);
198 }
199
200 /**
201 Resolve a JINI transaction using this method before calling any of
202 <code>prepare</code>, <code>commit</code>, <code>abort</code> or
203 <code>prepareAndCommit</code>.
204 */
205 public TxnState getTxnFor(Transaction aTransaction, boolean mustExist)
206 throws UnknownTransactionException, RemoteException {
207
208 TxnId myId = convertToId(aTransaction);
209 return theManagerState.getTxnFor(myId, theGateway, mustExist);
210 }
211
212 /**
213 Invoked as part of the path for incoming transaction control via
214 the remote TransactionManager
215 */
216 public TxnState getTxnFor(TransactionManager aMgr, long anId)
217 throws UnknownTransactionException, RemoteException {
218
219 TxnId myId = convertToId(aMgr, anId);
220 return theManagerState.getTxnFor(myId, theGateway, true);
221 }
222
223 /**
224 This method is slightly more flexible then getTxnFor in that it will
225 accept a <code>null</code> and return a new null transaction as well
226 as the usual mapping of JINI txn to internal txn state.
227 */
228 public TxnState resolve(Transaction aTransaction)
229 throws UnknownTransactionException, RemoteException {
230
231 if (aTransaction == null)
232 return newNullTxn();
233 else
234 return getTxnFor(aTransaction, false);
235 }
236
237 /**
238 In cases where no explicit transaction has been passed in by a caller,
239 create a null transaction which is an internal, fully transactional
240 replacement which can be used for the duration of the operation
241 in question.
242 */
243 public TxnState newNullTxn() throws RemoteException {
244 return theManagerState.newNullTxn();
245 }
246
247 /**
248 In cases where no state will be changed (no Entry's taken or written),
249 create an instance of this transaction which, when commited or aborted
250 will be undone but not logged.
251 */
252 public TxnState newIdentityTxn() throws RemoteException {
253 return theManagerState.newIdentityTxn();
254 }
255
256 /**
257 Doesn't throw a DbException because it just writes to disk. Restore
258 is coped with inside TxnManagerState when we're doing recovery and
259 an Exception could be thrown then.
260 */
261 public int prepare(TxnState aState) throws UnknownTransactionException {
262
263 try {
264 theLock.readLock().acquire();
265
266 aState.vote();
267
268 boolean dontLog = ((aState.isIdentity()) || (aState.hasNoOps()));
269
270 Integer myResult = (Integer)
271 execute(new PrepCommand(aState), dontLog);
272
273 theLock.readLock().release();
274
275 theCheckpointTrigger.loggedCommand();
276
277 return myResult.intValue();
278
279 } catch (InterruptedException anIE) {
280 theLogger.log(Level.SEVERE, "Failed to log prepare", anIE);
281 throw new UnknownTransactionException();
282 } catch (Exception anE) {
283 theLock.readLock().release();
284 theLogger.log(Level.SEVERE, "Failed to log prepare", anE);
285 throw new UnknownTransactionException();
286 }
287 }
288
289 public void commit(TxnState aState) throws UnknownTransactionException {
290
291 try {
292 theLock.readLock().acquire();
293
294 boolean dontLog = ((aState.isIdentity()) || (aState.hasNoOps()));
295
296 execute(new CommitCommand(aState.getId()), dontLog);
297
298 theLock.readLock().release();
299
300 aState.doFinalize();
301
302 theCheckpointTrigger.loggedCommand();
303
304 } catch (InterruptedException anIE) {
305 theLogger.log(Level.SEVERE, "Failed to log commit", anIE);
306 throw new UnknownTransactionException();
307 } catch (Exception anE) {
308 theLock.readLock().release();
309 theLogger.log(Level.SEVERE, "Failed to log commit", anE);
310 throw new UnknownTransactionException();
311 }
312 }
313
314 public void abort(TxnState aState) throws UnknownTransactionException {
315
316 try {
317 theLock.readLock().acquire();
318
319 int myResultingState = aState.vote();
320
321 /*
322 We're in at least voting state and possibly prepared state.
323 If at this point the txn is an identity transaction or
324 it has no operations we needn't log it. We also needn't log
325 it in the case where we've not yet written a prepare record
326 to disk which would be indicated by vote() returning
327 VOTING as opposed to PREPARED.
328 */
329 boolean dontLog =
330 ((aState.isIdentity()) || (aState.hasNoOps()) ||
331 (myResultingState == TransactionConstants.VOTING));
332
333 execute(new AbortCommand(aState.getId()), dontLog);
334
335 theLock.readLock().release();
336
337 aState.doFinalize();
338
339 theCheckpointTrigger.loggedCommand();
340
341 } catch (InterruptedException anIE) {
342 theLogger.log(Level.SEVERE, "Failed to log abort", anIE);
343 throw new UnknownTransactionException();
344 } catch (Exception anE) {
345 theLock.readLock().release();
346 theLogger.log(Level.SEVERE, "Failed to log abort", anE);
347 throw new UnknownTransactionException();
348 }
349 }
350
351 public int prepareAndCommit(TxnState aState)
352 throws UnknownTransactionException {
353
354 try {
355 theLock.readLock().acquire();
356
357 aState.vote();
358
359 boolean dontLog = ((aState.isIdentity()) || (aState.hasNoOps()));
360
361 Integer myResult = (Integer)
362 execute(new PrepCommitCommand(aState), dontLog);
363
364 theLock.readLock().release();
365
366 aState.doFinalize();
367
368 theCheckpointTrigger.loggedCommand();
369
370 return myResult.intValue();
371
372 } catch (InterruptedException anIE) {
373 theLogger.log(Level.SEVERE, "Failed to log prepCommit", anIE);
374 throw new UnknownTransactionException();
375 } catch (Exception anE) {
376 theLock.readLock().release();
377 theLogger.log(Level.SEVERE, "Failed to log prepCommit", anE);
378 throw new UnknownTransactionException();
379 }
380 }
381
382 /**
383 Log a specific single action in a transaction of its own. This is
384 typically used by elements of Blitz that need to record some state
385 transition that would need to be re-applied at recovery.
386 */
387 public void log(TxnOp anOp) throws TransactionException {
388 tryLog(anOp, Long.MAX_VALUE);
389 }
390
391 /**
392 Attempt to log an action. The attempt is bounded by the specified
393 timeout such that if the action cannot be logged within the time, it
394 will be abandoned.
395
396 @return <code>true</code> if the action was written, <code>false</code>
397 otherwise.
398 */
399 public boolean tryLog(TxnOp anOp, long aTimeout)
400 throws TransactionException {
401
402 try {
403 TxnState myEnclosing = newNullTxn();
404
405 myEnclosing.add(anOp);
406
407 if (theLock.readLock().attempt(aTimeout)) {
408 myEnclosing.vote();
409
410 // Given the contract of timeout, we can make this loss'y'
411 thePrevayler.executeCommand(new PrepCommitCommand(myEnclosing),
412 false);
413 theLock.readLock().release();
414 theCheckpointTrigger.loggedCommand();
415 return true;
416 } else
417 return false;
418 } catch (InterruptedException anIE) {
419 theLogger.log(Level.SEVERE, "Failed to log Action", anIE);
420 throw new TransactionException();
421 } catch (Exception anE) {
422 theLock.readLock().release();
423 theLogger.log(Level.SEVERE, "Failed to log Action", anE);
424 throw new TransactionException();
425 }
426 }
427
428 public void abortAll() throws IOException {
429
430 try {
431 theLock.readLock().acquire();
432
433 // Has to be logged
434 //
435 thePrevayler.executeCommand(new AbortAllCommand());
436
437 theLock.readLock().release();
438
439 theCheckpointTrigger.loggedCommand();
440
441 } catch (InterruptedException anIE) {
442 theLogger.log(Level.SEVERE, "Failed to log abortAll", anIE);
443 throw new IOException();
444 } catch (Exception anE) {
445 theLock.readLock().release();
446 theLogger.log(Level.SEVERE, "Failed to log abortAll", anE);
447 throw new IOException();
448 }
449 }
450
451 private TxnId convertToId(TransactionManager aMgr, long anId)
452 throws RemoteException {
453
454 return new TxnId(aMgr, anId);
455 }
456
457 private TxnId convertToId(Transaction aTxn) throws RemoteException {
458 ServerTransaction myTxn = (ServerTransaction) aTxn;
459
460 return new TxnId(myTxn.mgr, myTxn.id);
461 }
462
463 /**
464 * @todo There's a race condition here where state might change after we
465 * check for in-memory execute or not leading to incorrect behaviour in
466 * face of a crash. This needs fixing whilst at the same time we should
467 * address the issue of an abort on an ACTIVE transaction which therefore
468 * needn't be logged. What we need to do is modify each command to report
469 * if it needs to be logged. We then modify the prevayler to execute
470 * the command and then determine if the outcome needs logging and log
471 * if necessary. Only the SnapshotPrevaylerImpl really cares about this
472 * so only it needs tweaking. This state should probably be deduced
473 * by the TxnState itself or TxnManagerState - it certainly needs to be
474 * done under the lock of the TxnState. The decision points for whether
475 * a transaction is idempotent are at initial prepare (where we discover
476 * if there are no listeners or the txn is the identity txn) and at an
477 * abort when the transaction it still active (and thus we haven't written
478 * a log record).
479 */
480 private Serializable execute(Command aCommand, boolean dontLog)
481 throws Exception {
482 if (dontLog) {
483 // System.err.println("Bypass");
484 return aCommand.execute(theManagerState);
485 } else {
486 // System.err.println("Full log");
487 return thePrevayler.executeCommand(aCommand);
488 }
489 }
490
491 /**
492 Run a hot backup. Basic contract is that we will catch all but the
493 currently active transactions (whose effects will be confined to the
494 cache anyway.
495
496 @param aDestDir the mount point/directory to copy the files to.
497 */
498 public void hotBackup(String aDestDir) throws IOException {
499
500 /*
501 First do an unlocked sync to get most updates from the cache
502 to disk.
503
504 Then take the txn lock and perform the second sync part of
505 which then does the file copy for backup purposes
506 */
507 try {
508 Disk.sync();
509 } catch (Exception anE) {
510 IOException anIOE = new IOException("Failed to start backup (initial sync)");
511 anIOE.initCause(anE);
512
513 throw anIOE;
514 }
515
516 try {
517 theLock.writeLock().acquire();
518
519 try {
520 Disk.backup(aDestDir);
521 } finally {
522 theLock.writeLock().release();
523 }
524
525 } catch (InterruptedException anIE) {
526 IOException anIOE = new IOException("Failed to start backup");
527 anIOE.initCause(anIE);
528
529 throw anIOE;
530 }
531 }
532
533 /**
534 Used by external entities to request a snapshot which could be used
535 by another Blitz instance - i.e. think of this as a copy type operation.
536
537 @todo The checking of active transactions is not bullet proof because
538 we could get a new transaction before we start the checkpoint. Consider
539 a barrier or some other fix (requires a decision on how to handle
540 waiting for settling of transactions etc). We're keeping it simple for
541 now.
542 */
543 public void requestSnapshot() throws TransactionException, IOException {
544 try {
545 int myActiveTxnCount;
546
547 theLock.readLock().acquire();
548
549 myActiveTxnCount = theManagerState.getNumActiveTxns();
550
551 theLock.readLock().release();
552
553 if (myActiveTxnCount != 0)
554 throw new TransactionException(
555 "Cannot snapshot with active transactions it's bad for your data");
556 } catch (InterruptedException anIE) {
557 throw new TransactionException("Couldn't check for outstanding transactions");
558 }
559
560 CheckpointTask myTask = new CheckpointTask();
561
562 try {
563 Tasks.queue("checkpoints", myTask);
564 } catch (InterruptedException anIE) {
565 theLogger.log(Level.SEVERE, "Failed to queue checkpoint task",
566 anIE);
567 throw new IOException("Failed to queue checkpoint task");
568 }
569
570 myTask.waitForCompletion();
571 }
572
573 /**
574 Requests a checkpoint and blocks 'til completion. This method is
575 used internally by the Blitz core and unlike requestSnapshot makes
576 no efforts to render the filesystem into a state which could be copied
577 to another Blitz instance.
578 */
579 public void requestSyncCheckpoint() throws IOException {
580 issueCheckpoint(true);
581 }
582
583 /**
584 This method is used internally by the Blitz core and unlike
585 requestSnapshot makes no efforts to render the filesystem into a state
586 which could be copied to another Blitz instance.
587
588 <ol>
589 <li> Assert a write lock to prevent further commands from being
590 logged. </li>
591 <li> Snapshot the state of the PrevalentSystem (TxnManagerState) </li>
592 <li> Snapshot results in changeover to a new log file.
593 <li> Release the write lock to allow further commands to enter the new
594 log file </li>
595 <li> Invoke Disk.sync to flush dirty data to disk which will callback
596 on completion </li>
597 <li> Callback triggers writing of the snapshot which invalidates all
598 log files previous to the one started above </li>
599 </ol>
600
601 <p>In the event of failure before sync'ing is complete - i.e. the
602 snapshot has not been saved, log files from the previous snapshot
603 onwards will be used to re-construct state. If there's no previous
604 snapshot, all log files will be replayed to reconstruct state. </p>
605
606 @see org.dancres.blitz.disk.Disk
607 */
608 void requestAsyncCheckpoint() throws IOException {
609 issueCheckpoint(false);
610 }
611
612 /**
613 Internal implementation of the checkpoint operation used by snapshot
614 and checkpoint methods above.
615
616 @param isBlocking specifies whether to block the caller until the
617 checkpoint completes
618 */
619 private void issueCheckpoint(boolean isBlocking) throws IOException {
620
621 // Only checkpoint if the trigger says it's okay
622 if (theCheckpointTrigger.checkpointsDisabled())
623 return;
624
625 try {
626 long myCkptId;
627
628 synchronized(this) {
629 myCkptId = theCheckpointCount++;
630 }
631
632 if (LOG_CKPTS)
633 theLogger.log(Level.INFO, "Checkpoint::start: " + myCkptId);
634
635 theLock.writeLock().acquire();
636
637 // Issue tentative checkpoint - change over logs
638 // and carry over prepared state from old log in snapshotter
639 // which will be commited/aborted in the new log
640 Snapshotter mySnapper = thePrevayler.takeSnapshot();
641
642 theLock.writeLock().release();
643
644 // Now sync disks and save snapshot at completion
645 if (isBlocking) {
646 BlockingSnapshotTask myTask =
647 new BlockingSnapshotTask(myCkptId, mySnapper);
648
649 Disk.sync(myTask);
650 myTask.waitForCompletion();
651 } else {
652 Disk.sync(new SnapshotTask(myCkptId, mySnapper));
653 }
654
655 if (LOG_CKPTS)
656 theLogger.log(Level.INFO, "Checkpoint::end: " + myCkptId);
657
658 } catch (InterruptedException anIE) {
659 theLogger.log(Level.SEVERE, "Failed to get lock for ckpt", anIE);
660 throw new IOException("Failed to lock for ckpt");
661 } catch (Exception anE) {
662 theLogger.log(Level.SEVERE, "Failed to sync", anE);
663 throw new IOException("Failed to sync");
664 }
665 }
666
667 /**
668 An instance of this object is passed to Disk.sync. It's run method
669 will be called once Disk has completed the sync task. When called,
670 it saves the snapshot to disk which obsoletes logs previous to the
671 snapshot (where previous is defined as a log with a sequence number
672 less than that of the snapshot).
673 */
674 class SnapshotTask implements Runnable {
675 private Snapshotter theSnapper;
676 private long theCkptId;
677
678 SnapshotTask(long aCkptId, Snapshotter aSnapper) {
679 theCkptId = aCkptId;
680 theSnapper = aSnapper;
681 }
682
683 public void run() {
684 try {
685 if (LOG_CKPTS)
686 theLogger.log(Level.INFO,
687 "Disks sync'd - save snapshot: " +
688 theCkptId);
689 theSnapper.save();
690
691 if (LOG_CKPTS)
692 theLogger.log(Level.INFO, "Snapshot saved: " + theCkptId);
693 } catch (IOException anIOE) {
694 theLogger.log(Level.SEVERE, "Failed to save snapshot on sync",
695 anIOE);
696 }
697 }
698 }
699
700 class BlockingSnapshotTask implements Runnable {
701 private Snapshotter theSnapper;
702 private long theCkptId;
703 private boolean isComplete;
704
705 BlockingSnapshotTask(long aCkptId, Snapshotter aSnapper) {
706 theCkptId = aCkptId;
707 theSnapper = aSnapper;
708 }
709
710 public void run() {
711 try {
712 if (LOG_CKPTS)
713 theLogger.log(Level.INFO,
714 "Disks sync'd - save snapshot: " +
715 theCkptId);
716 theSnapper.save();
717
718 synchronized(this) {
719 isComplete = true;
720 notify();
721 }
722
723 if (LOG_CKPTS)
724 theLogger.log(Level.INFO, "Snapshot saved: " + theCkptId);
725 } catch (IOException anIOE) {
726 theLogger.log(Level.SEVERE, "Failed to save snapshot on sync",
727 anIOE);
728 }
729 }
730
731 void waitForCompletion() {
732 synchronized(this) {
733 while(! isComplete) {
734 try {
735 wait();
736 } catch (InterruptedException anIE) {
737 }
738 }
739 }
740 }
741 }
742 }