comparison src/org/prevayler/implementation/BufferingPrevaylerImpl.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:3dc0c5604566
1 package org.prevayler.implementation;
2
3 import java.util.LinkedList;
4 import java.util.Date;
5 import java.io.*;
6
7 import org.prevayler.*;
8
9 import org.dancres.blitz.ActiveObject;
10 import org.dancres.blitz.ActiveObjectRegistry;
11
12 /**
13 <p>Prevayler which guarentees that all logged changes will be made
14 persistent within a certain fixed amount of time - e.g. after 5 seconds.
15 When a snapshot is requested, any logged changes will be flushed to disk and
16 then a snapshot will be prepared. This provides correct recovery should
17 we crash before we save the snapshot (which could be a while) but after
18 we've flushed the log.</p>
19
20 <p>This Prevayler is designed to work in co-operation with Blitz's
21 OpCountingCheckpointTrigger and the ActiveObjectRegistry.</p>
22
23 @see org.dancres.blitz.txn.OpCountingCheckpointTrigger
24 @see org.dancres.blitz.ActiveObjectRegistry
25 */
26 public class BufferingPrevaylerImpl implements SnapshotPrevayler {
27
28 private final PrevalentSystem system;
29 private final SystemClock clock;
30 private final CommandOutputStream output;
31
32 private LinkedList theBuffer = new LinkedList();
33
34 /** Returns a new Prevayler for the given PrevalentSystem.
35 * "PrevalenceBase" shall be the directory where the snapshot and log files shall be created and read.
36 * @param newSystem The newly started, "empty" PrevalentSystem that will be used as a starting point for every system startup, until the first snapshot is taken.
37 * @param shouldReset Whether to issue resets in the underlying OOS
38 */
39 public BufferingPrevaylerImpl(PrevalentSystem newSystem,
40 boolean shouldReset, boolean shouldClean,
41 int aBufferSize)
42 throws IOException, ClassNotFoundException {
43
44 this(newSystem, "PrevalenceBase", shouldReset, shouldClean,
45 aBufferSize);
46 }
47
48
49 /** Returns a new Prevayler for the given PrevalentSystem.
50 * @param newSystem The newly started, "empty" PrevalentSystem that will be used as a starting point for every system startup, until the first snapshot is taken.
51 * @param directory The full path of the directory where the snapshot and log files shall be created and read.
52 * @param shouldReset Whether to issue resets in the underlying OOS
53 */
54 public BufferingPrevaylerImpl(PrevalentSystem newSystem, String directory,
55 boolean shouldReset, boolean shouldClean,
56 int aBufferSize)
57 throws IOException, ClassNotFoundException {
58
59 newSystem.clock(new SystemClock());
60 CommandInputStream input = new CommandInputStream(directory);
61
62 PrevalentSystem savedSystem = input.readLastSnapshot();
63 system = (savedSystem == null)
64 ? newSystem
65 : savedSystem;
66
67 recoverCommands(input);
68
69 output = input.commandOutputStream(shouldReset, shouldClean,
70 aBufferSize);
71 clock = (SystemClock)system.clock();
72 clock.resume();
73 }
74
75
76 /** Returns the underlying PrevalentSystem.
77 */
78 public PrevalentSystem system() {
79 return system;
80 }
81
82 /** Logs the received command for crash or shutdown recovery and executes it on the underlying PrevalentSystem.
83 * @see system()
84 * @return The serializable object that was returned by the execution of command.
85 * @throws IOException if there is trouble writing the command to the log.
86 * @throws Exception if command.execute() throws an exception.
87 */
88 public synchronized Serializable executeCommand(Command command) throws Exception {
89 clock.pause(); //To be deterministic, the system must know exactly at what time the command is being executed.
90 try {
91 theBuffer.add(new ClockRecoveryCommand(command, clock.time()));
92
93 return command.execute(system);
94
95 } finally {
96 clock.resume();
97 }
98 }
99
100 public synchronized Serializable executeCommand(Command command,
101 boolean doSync)
102 throws Exception {
103
104 clock.pause();
105 try {
106 theBuffer.add(new ClockRecoveryCommand(command, clock.time()));
107
108 return command.execute(system);
109
110 } finally {
111 clock.resume();
112 }
113 }
114
115 /**
116 * This method prepares a snapshot of the system and returns it in a
117 * Snapshotter instance which can be used to save the snapshot to disk
118 * once dirty state has been sync'd to disk. If your application has no
119 * additional state, you can simply invoke on the Snapshotter immediately.
120 * @return Snapshotter to be used to save an appropriate snapshot post
121 * sync'ing of dirty state to disk.
122 * @see system()
123 * @throws IOException if there is trouble preparing the snapshot file.
124 */
125 public synchronized Snapshotter takeSnapshot() throws IOException {
126 clock.pause();
127 try {
128 flush();
129
130 return output.writeSnapshot(system);
131 } finally {
132 clock.resume();
133 }
134 }
135
136
137 private synchronized void flush() throws IOException {
138 while (! theBuffer.isEmpty()) {
139 Command myCommand = (Command) theBuffer.remove(0);
140
141 output.writeCommand(myCommand, theBuffer.isEmpty());
142 }
143 }
144
145 private void recoverCommands(CommandInputStream input) throws IOException, ClassNotFoundException {
146 Command command;
147 while(true) {
148 try {
149 command = input.readCommand();
150 } catch (EOFException eof) {
151 break;
152 }
153
154 try {
155 command.execute(system);
156 } catch (Exception e) {
157 //Don't do anything at all. Commands may throw exceptions normally.
158 System.err.println("Command threw exception");
159 e.printStackTrace(System.err);
160 }
161 }
162 }
163 }
164