Mercurial > hg > blitz_condensed
comparison src/org/dancres/blitz/txn/TxnManager.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children | 4580bb12db30 |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:3dc0c5604566 |
---|---|
1 package org.dancres.blitz.txn; | |
2 | |
3 import java.io.IOException; | |
4 import java.io.Serializable; | |
5 import java.rmi.RemoteException; | |
6 import java.util.logging.Level; | |
7 import java.util.logging.Logger; | |
8 | |
9 import net.jini.config.ConfigurationException; | |
10 import net.jini.core.transaction.Transaction; | |
11 import net.jini.core.transaction.TransactionException; | |
12 import net.jini.core.transaction.UnknownTransactionException; | |
13 import net.jini.core.transaction.server.ServerTransaction; | |
14 import net.jini.core.transaction.server.TransactionConstants; | |
15 import net.jini.core.transaction.server.TransactionManager; | |
16 | |
17 import org.dancres.blitz.Logging; | |
18 import org.dancres.blitz.config.ConfigurationFactory; | |
19 import org.dancres.blitz.disk.Disk; | |
20 import org.dancres.blitz.stats.StatsBoard; | |
21 import org.dancres.blitz.task.Tasks; | |
22 import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock; | |
23 import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock; | |
24 import org.prevayler.Command; | |
25 import org.prevayler.SnapshotContributor; | |
26 import org.prevayler.implementation.SnapshotPrevayler; | |
27 import org.prevayler.implementation.Snapshotter; | |
28 | |
29 /** | |
30 Responsible for tracking/managing transactions. This responsiblity is split | |
31 across two classes. TxnManager handles control aspects whilst | |
32 TxnManagerState tracks the transactional information. <P> | |
33 | |
34 We make our lives a little easier by making null transactions look like a | |
35 normal transaction which keeps things consistent and clean bar an implied | |
36 prepare/commit cycle in the operation code of SpaceImpl. It also assists | |
37 us in that the transaction code can be largely tested without an external | |
38 transaction manager because a lot of it is exercised during the use of null | |
39 operations. <P> | |
40 | |
41 The transaction subsystems logging and snapshot/checkpointing requirements | |
42 are handled by holding all state in TxnManagerState and making it a | |
43 <code>PrevalentSystem</code>. Snapshots are treated as the equivalent of | |
44 checkpoints and are triggered in a separate thread. <P> | |
45 | |
46 Note that snapshots cannot be performed alongside the processing of commands | |
47 thus, we take a write on a readerswriter lock during snapshot and release it | |
48 after. All other operations are performed under a readlock. Note that the | |
49 write lock must be asserted BEFORE invoking snapshot to avoid deadlock. | |
50 The snapshot is only written to disk after Disk.sync completes.<P> | |
51 | |
52 The actual workings are a little different from the norm with TxnManager | |
53 generating commands which act upon TxnManagerState. Certain methods don't | |
54 generate commands at all because they don't represent a true state change. | |
55 Typically these methods are related to introducing new initial state into | |
56 TxnManagerState such as a transaction which doesn't need to be made | |
57 durable (saved to log etc.) until it reaches the prepared state. <P> | |
58 | |
59 Prepare records a durable record of the transaction's operations whilst | |
60 commit and abort result in those operations being applied. <P> | |
61 | |
62 @see org.prevayler.PrevalentSystem | |
63 @see org.prevayler.Command | |
64 @see org.dancres.blitz.txn.TxnManagerState | |
65 | |
66 @todo Micro-optimization - we don't need to write an abort record if | |
67 the txn isn't prepared - just clear it out! | |
68 */ | |
69 public class TxnManager { | |
70 | |
71 static Logger theLogger = | |
72 Logging.newLogger("org.dancres.blitz.TxnManager", Level.INFO); | |
73 | |
74 private static boolean LOG_CKPTS; | |
75 | |
76 static { | |
77 try { | |
78 LOG_CKPTS = | |
79 ((Boolean) | |
80 ConfigurationFactory.getEntry("logCkpts", | |
81 Boolean.class, | |
82 new Boolean(false))).booleanValue(); | |
83 } catch (ConfigurationException aCE) { | |
84 theLogger.log(Level.SEVERE, "Couldn't load config", aCE); | |
85 } | |
86 } | |
87 | |
88 private static TxnManager theManager; | |
89 | |
90 private TxnManagerState theManagerState; | |
91 | |
92 private ReadWriteLock theLock = new WriterPreferenceReadWriteLock(); | |
93 | |
94 private SnapshotPrevayler thePrevayler; | |
95 | |
96 private CheckpointTrigger theCheckpointTrigger; | |
97 | |
98 private long theCheckpointCount = 0; | |
99 | |
100 private TxnGateway theGateway; | |
101 | |
102 private boolean isRecovery = true; | |
103 | |
104 public static synchronized void init(TxnGateway aGateway) | |
105 throws Exception { | |
106 | |
107 if (theManager != null) { | |
108 return; | |
109 } else { | |
110 theManager = new TxnManager(aGateway); | |
111 theManager.recover(); | |
112 } | |
113 } | |
114 | |
115 public static synchronized TxnManager get() { | |
116 return theManager; | |
117 } | |
118 | |
119 private TxnManager(TxnGateway aGateway) { | |
120 theGateway = aGateway; | |
121 } | |
122 | |
123 private void recover() throws Exception { | |
124 StoragePersonality myPersonality = | |
125 StoragePersonalityFactory.getPersonality(); | |
126 | |
127 theLogger.log(Level.INFO, "Doing log recovery..."); | |
128 | |
129 thePrevayler = myPersonality.getPrevayler(new TxnManagerState()); | |
130 | |
131 theLogger.log(Level.INFO, "Log Recovery complete..."); | |
132 | |
133 theManagerState = (TxnManagerState) thePrevayler.system(); | |
134 | |
135 theCheckpointTrigger = | |
136 myPersonality.getCheckpointTrigger(new CheckpointerImpl(this)); | |
137 | |
138 StatsBoard.get().add(new TxnStatGenerator()); | |
139 | |
140 issueCheckpoint(true); | |
141 | |
142 isRecovery = false; | |
143 | |
144 // Startup the transaction pinger | |
145 new TxnPinger(theManagerState); | |
146 } | |
147 | |
148 /** | |
149 @return <code>true</code> if the TransactionManager is performing | |
150 recovery. When we are performing recovery the log files cannot be | |
151 written to by user code (that is code outside of the transaction | |
152 package). | |
153 */ | |
154 public boolean isRecovery() { | |
155 return isRecovery; | |
156 } | |
157 | |
158 public void add(SnapshotContributor aContributor) { | |
159 theManagerState.add(aContributor); | |
160 } | |
161 | |
162 public void remove(SnapshotContributor aContributor) { | |
163 theManagerState.remove(aContributor); | |
164 } | |
165 | |
166 public TxnGateway getGateway() { | |
167 return theGateway; | |
168 } | |
169 | |
170 /** | |
171 @return the number of transactions currently being processed. Includes | |
172 ACTIVE, PREPARED, COMMITTED and ABORTED (where the last two states may be | |
173 present depending on state with respect to log records etc). | |
174 */ | |
175 int getActiveTxnCount() { | |
176 try { | |
177 int myActiveTxnCount; | |
178 | |
179 theLock.readLock().acquire(); | |
180 | |
181 myActiveTxnCount = theManagerState.getNumActiveTxns(); | |
182 | |
183 theLock.readLock().release(); | |
184 | |
185 return myActiveTxnCount; | |
186 | |
187 } catch (InterruptedException anIE) { | |
188 theLogger.log(Level.SEVERE, "Couldn't get active txn count", | |
189 anIE); | |
190 | |
191 return -1; | |
192 } | |
193 } | |
194 | |
195 public TxnState getTxnFor(TxnId anId) | |
196 throws UnknownTransactionException, RemoteException { | |
197 return theManagerState.getTxnFor(anId, theGateway, true); | |
198 } | |
199 | |
200 /** | |
201 Resolve a JINI transaction using this method before calling any of | |
202 <code>prepare</code>, <code>commit</code>, <code>abort</code> or | |
203 <code>prepareAndCommit</code>. | |
204 */ | |
205 public TxnState getTxnFor(Transaction aTransaction, boolean mustExist) | |
206 throws UnknownTransactionException, RemoteException { | |
207 | |
208 TxnId myId = convertToId(aTransaction); | |
209 return theManagerState.getTxnFor(myId, theGateway, mustExist); | |
210 } | |
211 | |
212 /** | |
213 Invoked as part of the path for incoming transaction control via | |
214 the remote TransactionManager | |
215 */ | |
216 public TxnState getTxnFor(TransactionManager aMgr, long anId) | |
217 throws UnknownTransactionException, RemoteException { | |
218 | |
219 TxnId myId = convertToId(aMgr, anId); | |
220 return theManagerState.getTxnFor(myId, theGateway, true); | |
221 } | |
222 | |
223 /** | |
224 This method is slightly more flexible then getTxnFor in that it will | |
225 accept a <code>null</code> and return a new null transaction as well | |
226 as the usual mapping of JINI txn to internal txn state. | |
227 */ | |
228 public TxnState resolve(Transaction aTransaction) | |
229 throws UnknownTransactionException, RemoteException { | |
230 | |
231 if (aTransaction == null) | |
232 return newNullTxn(); | |
233 else | |
234 return getTxnFor(aTransaction, false); | |
235 } | |
236 | |
237 /** | |
238 In cases where no explicit transaction has been passed in by a caller, | |
239 create a null transaction which is an internal, fully transactional | |
240 replacement which can be used for the duration of the operation | |
241 in question. | |
242 */ | |
243 public TxnState newNullTxn() throws RemoteException { | |
244 return theManagerState.newNullTxn(); | |
245 } | |
246 | |
247 /** | |
248 In cases where no state will be changed (no Entry's taken or written), | |
249 create an instance of this transaction which, when commited or aborted | |
250 will be undone but not logged. | |
251 */ | |
252 public TxnState newIdentityTxn() throws RemoteException { | |
253 return theManagerState.newIdentityTxn(); | |
254 } | |
255 | |
256 /** | |
257 Doesn't throw a DbException because it just writes to disk. Restore | |
258 is coped with inside TxnManagerState when we're doing recovery and | |
259 an Exception could be thrown then. | |
260 */ | |
261 public int prepare(TxnState aState) throws UnknownTransactionException { | |
262 | |
263 try { | |
264 theLock.readLock().acquire(); | |
265 | |
266 aState.vote(); | |
267 | |
268 boolean dontLog = ((aState.isIdentity()) || (aState.hasNoOps())); | |
269 | |
270 Integer myResult = (Integer) | |
271 execute(new PrepCommand(aState), dontLog); | |
272 | |
273 theLock.readLock().release(); | |
274 | |
275 theCheckpointTrigger.loggedCommand(); | |
276 | |
277 return myResult.intValue(); | |
278 | |
279 } catch (InterruptedException anIE) { | |
280 theLogger.log(Level.SEVERE, "Failed to log prepare", anIE); | |
281 throw new UnknownTransactionException(); | |
282 } catch (Exception anE) { | |
283 theLock.readLock().release(); | |
284 theLogger.log(Level.SEVERE, "Failed to log prepare", anE); | |
285 throw new UnknownTransactionException(); | |
286 } | |
287 } | |
288 | |
289 public void commit(TxnState aState) throws UnknownTransactionException { | |
290 | |
291 try { | |
292 theLock.readLock().acquire(); | |
293 | |
294 boolean dontLog = ((aState.isIdentity()) || (aState.hasNoOps())); | |
295 | |
296 execute(new CommitCommand(aState.getId()), dontLog); | |
297 | |
298 theLock.readLock().release(); | |
299 | |
300 aState.doFinalize(); | |
301 | |
302 theCheckpointTrigger.loggedCommand(); | |
303 | |
304 } catch (InterruptedException anIE) { | |
305 theLogger.log(Level.SEVERE, "Failed to log commit", anIE); | |
306 throw new UnknownTransactionException(); | |
307 } catch (Exception anE) { | |
308 theLock.readLock().release(); | |
309 theLogger.log(Level.SEVERE, "Failed to log commit", anE); | |
310 throw new UnknownTransactionException(); | |
311 } | |
312 } | |
313 | |
314 public void abort(TxnState aState) throws UnknownTransactionException { | |
315 | |
316 try { | |
317 theLock.readLock().acquire(); | |
318 | |
319 int myResultingState = aState.vote(); | |
320 | |
321 /* | |
322 We're in at least voting state and possibly prepared state. | |
323 If at this point the txn is an identity transaction or | |
324 it has no operations we needn't log it. We also needn't log | |
325 it in the case where we've not yet written a prepare record | |
326 to disk which would be indicated by vote() returning | |
327 VOTING as opposed to PREPARED. | |
328 */ | |
329 boolean dontLog = | |
330 ((aState.isIdentity()) || (aState.hasNoOps()) || | |
331 (myResultingState == TransactionConstants.VOTING)); | |
332 | |
333 execute(new AbortCommand(aState.getId()), dontLog); | |
334 | |
335 theLock.readLock().release(); | |
336 | |
337 aState.doFinalize(); | |
338 | |
339 theCheckpointTrigger.loggedCommand(); | |
340 | |
341 } catch (InterruptedException anIE) { | |
342 theLogger.log(Level.SEVERE, "Failed to log abort", anIE); | |
343 throw new UnknownTransactionException(); | |
344 } catch (Exception anE) { | |
345 theLock.readLock().release(); | |
346 theLogger.log(Level.SEVERE, "Failed to log abort", anE); | |
347 throw new UnknownTransactionException(); | |
348 } | |
349 } | |
350 | |
351 public int prepareAndCommit(TxnState aState) | |
352 throws UnknownTransactionException { | |
353 | |
354 try { | |
355 theLock.readLock().acquire(); | |
356 | |
357 aState.vote(); | |
358 | |
359 boolean dontLog = ((aState.isIdentity()) || (aState.hasNoOps())); | |
360 | |
361 Integer myResult = (Integer) | |
362 execute(new PrepCommitCommand(aState), dontLog); | |
363 | |
364 theLock.readLock().release(); | |
365 | |
366 aState.doFinalize(); | |
367 | |
368 theCheckpointTrigger.loggedCommand(); | |
369 | |
370 return myResult.intValue(); | |
371 | |
372 } catch (InterruptedException anIE) { | |
373 theLogger.log(Level.SEVERE, "Failed to log prepCommit", anIE); | |
374 throw new UnknownTransactionException(); | |
375 } catch (Exception anE) { | |
376 theLock.readLock().release(); | |
377 theLogger.log(Level.SEVERE, "Failed to log prepCommit", anE); | |
378 throw new UnknownTransactionException(); | |
379 } | |
380 } | |
381 | |
382 /** | |
383 Log a specific single action in a transaction of its own. This is | |
384 typically used by elements of Blitz that need to record some state | |
385 transition that would need to be re-applied at recovery. | |
386 */ | |
387 public void log(TxnOp anOp) throws TransactionException { | |
388 tryLog(anOp, Long.MAX_VALUE); | |
389 } | |
390 | |
391 /** | |
392 Attempt to log an action. The attempt is bounded by the specified | |
393 timeout such that if the action cannot be logged within the time, it | |
394 will be abandoned. | |
395 | |
396 @return <code>true</code> if the action was written, <code>false</code> | |
397 otherwise. | |
398 */ | |
399 public boolean tryLog(TxnOp anOp, long aTimeout) | |
400 throws TransactionException { | |
401 | |
402 try { | |
403 TxnState myEnclosing = newNullTxn(); | |
404 | |
405 myEnclosing.add(anOp); | |
406 | |
407 if (theLock.readLock().attempt(aTimeout)) { | |
408 myEnclosing.vote(); | |
409 | |
410 // Given the contract of timeout, we can make this loss'y' | |
411 thePrevayler.executeCommand(new PrepCommitCommand(myEnclosing), | |
412 false); | |
413 theLock.readLock().release(); | |
414 theCheckpointTrigger.loggedCommand(); | |
415 return true; | |
416 } else | |
417 return false; | |
418 } catch (InterruptedException anIE) { | |
419 theLogger.log(Level.SEVERE, "Failed to log Action", anIE); | |
420 throw new TransactionException(); | |
421 } catch (Exception anE) { | |
422 theLock.readLock().release(); | |
423 theLogger.log(Level.SEVERE, "Failed to log Action", anE); | |
424 throw new TransactionException(); | |
425 } | |
426 } | |
427 | |
428 public void abortAll() throws IOException { | |
429 | |
430 try { | |
431 theLock.readLock().acquire(); | |
432 | |
433 // Has to be logged | |
434 // | |
435 thePrevayler.executeCommand(new AbortAllCommand()); | |
436 | |
437 theLock.readLock().release(); | |
438 | |
439 theCheckpointTrigger.loggedCommand(); | |
440 | |
441 } catch (InterruptedException anIE) { | |
442 theLogger.log(Level.SEVERE, "Failed to log abortAll", anIE); | |
443 throw new IOException(); | |
444 } catch (Exception anE) { | |
445 theLock.readLock().release(); | |
446 theLogger.log(Level.SEVERE, "Failed to log abortAll", anE); | |
447 throw new IOException(); | |
448 } | |
449 } | |
450 | |
451 private TxnId convertToId(TransactionManager aMgr, long anId) | |
452 throws RemoteException { | |
453 | |
454 return new TxnId(aMgr, anId); | |
455 } | |
456 | |
457 private TxnId convertToId(Transaction aTxn) throws RemoteException { | |
458 ServerTransaction myTxn = (ServerTransaction) aTxn; | |
459 | |
460 return new TxnId(myTxn.mgr, myTxn.id); | |
461 } | |
462 | |
463 /** | |
464 * @todo There's a race condition here where state might change after we | |
465 * check for in-memory execute or not leading to incorrect behaviour in | |
466 * face of a crash. This needs fixing whilst at the same time we should | |
467 * address the issue of an abort on an ACTIVE transaction which therefore | |
468 * needn't be logged. What we need to do is modify each command to report | |
469 * if it needs to be logged. We then modify the prevayler to execute | |
470 * the command and then determine if the outcome needs logging and log | |
471 * if necessary. Only the SnapshotPrevaylerImpl really cares about this | |
472 * so only it needs tweaking. This state should probably be deduced | |
473 * by the TxnState itself or TxnManagerState - it certainly needs to be | |
474 * done under the lock of the TxnState. The decision points for whether | |
475 * a transaction is idempotent are at initial prepare (where we discover | |
476 * if there are no listeners or the txn is the identity txn) and at an | |
477 * abort when the transaction it still active (and thus we haven't written | |
478 * a log record). | |
479 */ | |
480 private Serializable execute(Command aCommand, boolean dontLog) | |
481 throws Exception { | |
482 if (dontLog) { | |
483 // System.err.println("Bypass"); | |
484 return aCommand.execute(theManagerState); | |
485 } else { | |
486 // System.err.println("Full log"); | |
487 return thePrevayler.executeCommand(aCommand); | |
488 } | |
489 } | |
490 | |
491 /** | |
492 Run a hot backup. Basic contract is that we will catch all but the | |
493 currently active transactions (whose effects will be confined to the | |
494 cache anyway. | |
495 | |
496 @param aDestDir the mount point/directory to copy the files to. | |
497 */ | |
498 public void hotBackup(String aDestDir) throws IOException { | |
499 | |
500 /* | |
501 First do an unlocked sync to get most updates from the cache | |
502 to disk. | |
503 | |
504 Then take the txn lock and perform the second sync part of | |
505 which then does the file copy for backup purposes | |
506 */ | |
507 try { | |
508 Disk.sync(); | |
509 } catch (Exception anE) { | |
510 IOException anIOE = new IOException("Failed to start backup (initial sync)"); | |
511 anIOE.initCause(anE); | |
512 | |
513 throw anIOE; | |
514 } | |
515 | |
516 try { | |
517 theLock.writeLock().acquire(); | |
518 | |
519 try { | |
520 Disk.backup(aDestDir); | |
521 } finally { | |
522 theLock.writeLock().release(); | |
523 } | |
524 | |
525 } catch (InterruptedException anIE) { | |
526 IOException anIOE = new IOException("Failed to start backup"); | |
527 anIOE.initCause(anIE); | |
528 | |
529 throw anIOE; | |
530 } | |
531 } | |
532 | |
533 /** | |
534 Used by external entities to request a snapshot which could be used | |
535 by another Blitz instance - i.e. think of this as a copy type operation. | |
536 | |
537 @todo The checking of active transactions is not bullet proof because | |
538 we could get a new transaction before we start the checkpoint. Consider | |
539 a barrier or some other fix (requires a decision on how to handle | |
540 waiting for settling of transactions etc). We're keeping it simple for | |
541 now. | |
542 */ | |
543 public void requestSnapshot() throws TransactionException, IOException { | |
544 try { | |
545 int myActiveTxnCount; | |
546 | |
547 theLock.readLock().acquire(); | |
548 | |
549 myActiveTxnCount = theManagerState.getNumActiveTxns(); | |
550 | |
551 theLock.readLock().release(); | |
552 | |
553 if (myActiveTxnCount != 0) | |
554 throw new TransactionException( | |
555 "Cannot snapshot with active transactions it's bad for your data"); | |
556 } catch (InterruptedException anIE) { | |
557 throw new TransactionException("Couldn't check for outstanding transactions"); | |
558 } | |
559 | |
560 CheckpointTask myTask = new CheckpointTask(); | |
561 | |
562 try { | |
563 Tasks.queue("checkpoints", myTask); | |
564 } catch (InterruptedException anIE) { | |
565 theLogger.log(Level.SEVERE, "Failed to queue checkpoint task", | |
566 anIE); | |
567 throw new IOException("Failed to queue checkpoint task"); | |
568 } | |
569 | |
570 myTask.waitForCompletion(); | |
571 } | |
572 | |
573 /** | |
574 Requests a checkpoint and blocks 'til completion. This method is | |
575 used internally by the Blitz core and unlike requestSnapshot makes | |
576 no efforts to render the filesystem into a state which could be copied | |
577 to another Blitz instance. | |
578 */ | |
579 public void requestSyncCheckpoint() throws IOException { | |
580 issueCheckpoint(true); | |
581 } | |
582 | |
583 /** | |
584 This method is used internally by the Blitz core and unlike | |
585 requestSnapshot makes no efforts to render the filesystem into a state | |
586 which could be copied to another Blitz instance. | |
587 | |
588 <ol> | |
589 <li> Assert a write lock to prevent further commands from being | |
590 logged. </li> | |
591 <li> Snapshot the state of the PrevalentSystem (TxnManagerState) </li> | |
592 <li> Snapshot results in changeover to a new log file. | |
593 <li> Release the write lock to allow further commands to enter the new | |
594 log file </li> | |
595 <li> Invoke Disk.sync to flush dirty data to disk which will callback | |
596 on completion </li> | |
597 <li> Callback triggers writing of the snapshot which invalidates all | |
598 log files previous to the one started above </li> | |
599 </ol> | |
600 | |
601 <p>In the event of failure before sync'ing is complete - i.e. the | |
602 snapshot has not been saved, log files from the previous snapshot | |
603 onwards will be used to re-construct state. If there's no previous | |
604 snapshot, all log files will be replayed to reconstruct state. </p> | |
605 | |
606 @see org.dancres.blitz.disk.Disk | |
607 */ | |
608 void requestAsyncCheckpoint() throws IOException { | |
609 issueCheckpoint(false); | |
610 } | |
611 | |
612 /** | |
613 Internal implementation of the checkpoint operation used by snapshot | |
614 and checkpoint methods above. | |
615 | |
616 @param isBlocking specifies whether to block the caller until the | |
617 checkpoint completes | |
618 */ | |
619 private void issueCheckpoint(boolean isBlocking) throws IOException { | |
620 | |
621 // Only checkpoint if the trigger says it's okay | |
622 if (theCheckpointTrigger.checkpointsDisabled()) | |
623 return; | |
624 | |
625 try { | |
626 long myCkptId; | |
627 | |
628 synchronized(this) { | |
629 myCkptId = theCheckpointCount++; | |
630 } | |
631 | |
632 if (LOG_CKPTS) | |
633 theLogger.log(Level.INFO, "Checkpoint::start: " + myCkptId); | |
634 | |
635 theLock.writeLock().acquire(); | |
636 | |
637 // Issue tentative checkpoint - change over logs | |
638 // and carry over prepared state from old log in snapshotter | |
639 // which will be commited/aborted in the new log | |
640 Snapshotter mySnapper = thePrevayler.takeSnapshot(); | |
641 | |
642 theLock.writeLock().release(); | |
643 | |
644 // Now sync disks and save snapshot at completion | |
645 if (isBlocking) { | |
646 BlockingSnapshotTask myTask = | |
647 new BlockingSnapshotTask(myCkptId, mySnapper); | |
648 | |
649 Disk.sync(myTask); | |
650 myTask.waitForCompletion(); | |
651 } else { | |
652 Disk.sync(new SnapshotTask(myCkptId, mySnapper)); | |
653 } | |
654 | |
655 if (LOG_CKPTS) | |
656 theLogger.log(Level.INFO, "Checkpoint::end: " + myCkptId); | |
657 | |
658 } catch (InterruptedException anIE) { | |
659 theLogger.log(Level.SEVERE, "Failed to get lock for ckpt", anIE); | |
660 throw new IOException("Failed to lock for ckpt"); | |
661 } catch (Exception anE) { | |
662 theLogger.log(Level.SEVERE, "Failed to sync", anE); | |
663 throw new IOException("Failed to sync"); | |
664 } | |
665 } | |
666 | |
667 /** | |
668 An instance of this object is passed to Disk.sync. It's run method | |
669 will be called once Disk has completed the sync task. When called, | |
670 it saves the snapshot to disk which obsoletes logs previous to the | |
671 snapshot (where previous is defined as a log with a sequence number | |
672 less than that of the snapshot). | |
673 */ | |
674 class SnapshotTask implements Runnable { | |
675 private Snapshotter theSnapper; | |
676 private long theCkptId; | |
677 | |
678 SnapshotTask(long aCkptId, Snapshotter aSnapper) { | |
679 theCkptId = aCkptId; | |
680 theSnapper = aSnapper; | |
681 } | |
682 | |
683 public void run() { | |
684 try { | |
685 if (LOG_CKPTS) | |
686 theLogger.log(Level.INFO, | |
687 "Disks sync'd - save snapshot: " + | |
688 theCkptId); | |
689 theSnapper.save(); | |
690 | |
691 if (LOG_CKPTS) | |
692 theLogger.log(Level.INFO, "Snapshot saved: " + theCkptId); | |
693 } catch (IOException anIOE) { | |
694 theLogger.log(Level.SEVERE, "Failed to save snapshot on sync", | |
695 anIOE); | |
696 } | |
697 } | |
698 } | |
699 | |
700 class BlockingSnapshotTask implements Runnable { | |
701 private Snapshotter theSnapper; | |
702 private long theCkptId; | |
703 private boolean isComplete; | |
704 | |
705 BlockingSnapshotTask(long aCkptId, Snapshotter aSnapper) { | |
706 theCkptId = aCkptId; | |
707 theSnapper = aSnapper; | |
708 } | |
709 | |
710 public void run() { | |
711 try { | |
712 if (LOG_CKPTS) | |
713 theLogger.log(Level.INFO, | |
714 "Disks sync'd - save snapshot: " + | |
715 theCkptId); | |
716 theSnapper.save(); | |
717 | |
718 synchronized(this) { | |
719 isComplete = true; | |
720 notify(); | |
721 } | |
722 | |
723 if (LOG_CKPTS) | |
724 theLogger.log(Level.INFO, "Snapshot saved: " + theCkptId); | |
725 } catch (IOException anIOE) { | |
726 theLogger.log(Level.SEVERE, "Failed to save snapshot on sync", | |
727 anIOE); | |
728 } | |
729 } | |
730 | |
731 void waitForCompletion() { | |
732 synchronized(this) { | |
733 while(! isComplete) { | |
734 try { | |
735 wait(); | |
736 } catch (InterruptedException anIE) { | |
737 } | |
738 } | |
739 } | |
740 } | |
741 } | |
742 } |