comparison src/org/dancres/blitz/txn/TxnManagerState.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:3dc0c5604566
1 package org.dancres.blitz.txn;
2
3 import java.io.IOException;
4 import java.io.ObjectInputStream;
5 import java.io.ObjectOutputStream;
6 import java.io.Serializable;
7 import java.rmi.RemoteException;
8 import java.util.ArrayList;
9 import java.util.HashMap;
10 import java.util.Iterator;
11 import java.util.List;
12 import java.util.logging.Level;
13
14 import net.jini.core.transaction.TransactionException;
15 import net.jini.core.transaction.UnknownTransactionException;
16 import net.jini.core.transaction.server.TransactionConstants;
17
18 import org.prevayler.AlarmClock;
19 import org.prevayler.PrevalentSystem;
20 import org.prevayler.SnapshotContributor;
21
22 /**
23 Responsible for tracking/managing transactions. This responsiblity is split
24 across two classes. TxnManager handles control aspects whilst
25 TxnManagerState tracks the transactional information. <P>
26
27 @see org.dancres.blitz.txn.TxnManager
28 */
29 class TxnManagerState implements PrevalentSystem {
30 static final long serialVersionUID = -5650181362477845180L;
31
32 private static boolean UPGRADE = false;
33
34 /*
35 These member variables are serialized as part of writeObject and
36 de-serialized as part of readObject (which also initializes appropriate
37 state). i.e. We don't just serialize this object directly.
38 */
39 private AlarmClock theClock;
40
41 private HashMap theTxns = new HashMap();
42
43 private Serializable[] theSnapshotContributions = new Serializable[0];
44
45 private ArrayList theSnapshotContributors = new ArrayList();
46
47 static void enableUpgrade() {
48 UPGRADE = true;
49 }
50
51 public void clock(AlarmClock aClock) {
52 theClock = aClock;
53 }
54
55 public AlarmClock clock() {
56 return theClock;
57 }
58
59 public List getActiveTxnIds() {
60 ArrayList myTxnIds = new ArrayList();
61
62 synchronized(this) {
63 Iterator myTxns = theTxns.keySet().iterator();
64
65 while (myTxns.hasNext()) {
66 TxnId myId = (TxnId) myTxns.next();
67 myTxnIds.add(myId);
68 }
69 }
70
71 return myTxnIds;
72 }
73
74 public void add(SnapshotContributor aContributor) {
75 synchronized(theSnapshotContributors) {
76 if (!theSnapshotContributors.contains(aContributor))
77 theSnapshotContributors.add(aContributor);
78 }
79 }
80
81 public void remove(SnapshotContributor aContributor) {
82 synchronized(theSnapshotContributors) {
83 theSnapshotContributors.remove(aContributor);
84 }
85 }
86
87 public Serializable[] getSnapshotContributions() {
88 return theSnapshotContributions;
89 }
90
91 private void writeObject(ObjectOutputStream anOut) throws IOException {
92 anOut.writeObject(LogVersion.VERSION);
93
94 /*
95 We only save PREPARED transactions, ignoring ACTIVES because
96 they are transient and their state changes won't be applied
97 until we've issued prepare and then commit or abort. The ACTIVES
98 will either die due to failure or, post the sync, add operations
99 to the log. Note that, whilst a transaction is active, it generates
100 no log records at all hence the reason we don't need to save them.
101 Commited or aborted updates in cache which need flushing to disk
102 should have already been sync'd before we get this far.
103 */
104 ArrayList myPrepared = new ArrayList();
105
106 synchronized(this) {
107 // Write out clock
108 //
109 anOut.writeObject(theClock);
110
111 Iterator myTxns = theTxns.keySet().iterator();
112
113 while (myTxns.hasNext()) {
114 TxnId myId = (TxnId) myTxns.next();
115
116 TxnState myState = getState(myId);
117
118 try {
119 int myStatus = myState.getStatus();
120
121 if (myStatus == TransactionConstants.PREPARED) {
122 myPrepared.add(myState);
123 }
124
125 } catch (TransactionException aTE) {
126 // Whoops, got nailed checking status, logged in the call
127 // nothing to do.
128 }
129 }
130
131 anOut.writeInt(myPrepared.size());
132
133 for (int i = 0; i < myPrepared.size(); i++) {
134 anOut.writeObject(myPrepared.get(i));
135 }
136 }
137
138 /*
139 Write out any user-code snapshot contributions
140 */
141 ArrayList myContributions = new ArrayList();
142
143 synchronized(theSnapshotContributors) {
144 for (int i = 0; i < theSnapshotContributors.size(); i++) {
145 myContributions.add(((SnapshotContributor) theSnapshotContributors.get(i)).getContribution());
146 }
147 }
148
149 Serializable[] myUserData = new Serializable[myContributions.size()];
150 myUserData = (Serializable[]) myContributions.toArray(myUserData);
151
152 anOut.writeObject(myUserData);
153 }
154
155 private void readObject(ObjectInputStream anIn)
156 throws IOException, ClassNotFoundException {
157
158 boolean isUpgrade = false;
159
160 theTxns = new HashMap();
161 theSnapshotContributors = new ArrayList();
162
163 Object myFirst = anIn.readObject();
164
165 /*
166 If there's no LogVersion, chances are we're looking at a pre 1.13
167 log format - upgrade is simple as there's no LogVersion and there
168 will be no user checkpoint data so we just ignore those fields.
169 */
170 if (! (myFirst instanceof LogVersion)) {
171 TxnManager.theLogger.log(Level.SEVERE, "Upgrading old transaction log");
172 isUpgrade = true;
173 theClock = (AlarmClock) myFirst;
174 } else {
175 LogVersion myVersion = (LogVersion) myFirst;
176
177 if (!myVersion.equals(LogVersion.VERSION))
178 throw new IOException("Yikes - log versions don't match - upgrade?" + myVersion);
179
180 theClock = (AlarmClock) anIn.readObject();
181 }
182
183 int myNumRecords = anIn.readInt();
184
185 synchronized(this) {
186
187 for (int i = 0; i < myNumRecords; i++) {
188 TxnState myState = (TxnState) anIn.readObject();
189
190 try {
191 myState.prepare(true);
192 } catch (UnknownTransactionException aUTE) {
193 IOException anIOE = new IOException("Failed to recover prepare");
194 anIOE.initCause(aUTE);
195 throw anIOE;
196 }
197
198 theTxns.put(myState.getId(), myState);
199 }
200 }
201
202 if (isUpgrade)
203 theSnapshotContributions = new Serializable[0];
204 else
205 theSnapshotContributions = (Serializable[]) anIn.readObject();
206 }
207
208 private TxnState getState(TxnId anId) {
209 synchronized(this) {
210 return (TxnState) theTxns.get(anId);
211 }
212 }
213
214 /**
215 Resolve a transaction using this method before calling any of
216 <code>prepare</code>, <code>commit</code>, <code>abort</code> or
217 <code>prepareAndCommit</code>.
218
219 @todo Add Janitor/Checker thread to clear out dead transaction
220 state - see comments in method
221 */
222 TxnState getTxnFor(TxnId anId, TxnGateway aGateway, boolean mustExist)
223 throws UnknownTransactionException {
224
225 TxnState myState = null;
226
227 if (mustExist) {
228 myState = getState(anId);
229 } else {
230 synchronized(this) {
231 myState = (TxnState) theTxns.get(anId);
232 }
233
234 /*
235 If state doesn't exist, we need to join and update the state
236 */
237 if (myState == null) {
238 try {
239 aGateway.join(anId);
240 } catch (Exception anException) {
241 TxnManager.theLogger.log(Level.SEVERE,
242 "Failed to join txn" +
243 anId, anException);
244
245 throw new UnknownTransactionException();
246 }
247
248 /*
249 There's a race condition here where the transaction
250 could be prepared before we get our state updated.
251 If that happens, the prepare method will bounce
252 the prepare call blowing the transaction out.
253
254 This will leave us with a bit of dead txn state which
255 we ought to cleanup by invoking getState etc. This
256 could be done by a janitor thread in the future.
257
258 In reality:
259
260 (a) This is unlikely
261 (b) Should this happen, we're going to be the least of
262 the problems because someone somewhere thinks
263 the transaction is active (this thread is acting on
264 their behalf) and someone else is closing it out.
265 When the client associated with this thread invokes
266 commit there'll be a big nasty mess.
267 */
268 synchronized(this) {
269
270 /*
271 Up till now, we race to create/join the transaction
272 (see above). Now we must put it right....
273 */
274 myState = (TxnState) theTxns.get(anId);
275 if (myState == null) {
276 myState = new TxnState(anId);
277 theTxns.put(anId, myState);
278 }
279 }
280 }
281 }
282
283 if (myState == null)
284 throw new UnknownTransactionException();
285 else
286 return myState;
287 }
288
289 /**
290 In cases where no explicit transaction has been passed in by a caller,
291 create a null transaction which is an internal, fully transactional
292 replacement which can be used for the duration of the operation
293 in question.
294 */
295 TxnState newNullTxn() throws RemoteException {
296 TxnId myId = TxnId.newNullTxn();
297 TxnState myState = new TxnState(myId);
298
299 synchronized(this) {
300 theTxns.put(myId, myState);
301 }
302
303 return myState;
304 }
305
306 /**
307 In cases where no state will be changed (no Entry's taken or written),
308 create an instance of this transaction which, when commited or aborted
309 will be undone but not logged.
310 */
311 TxnState newIdentityTxn() throws RemoteException {
312 TxnId myId = TxnId.newNullTxn();
313 TxnState myState = new TxnState(myId, true);
314
315 synchronized(this) {
316 theTxns.put(myId, myState);
317 }
318
319 return myState;
320 }
321
322 /**
323 Do not call this method directly - it should only be invoked from
324 a Prevayler command.
325 */
326 int prepare(TxnState aState)
327 throws UnknownTransactionException, IOException {
328
329 /*
330 Do we know about this transaction?
331
332 If we don't we've failed and are now doing recovery so we must
333 re-insert the state.
334 */
335 boolean needsRestore = (getState(aState.getId()) == null);
336
337 if (needsRestore) {
338 synchronized(this) {
339 theTxns.put(aState.getId(), aState);
340 }
341 }
342
343 return aState.prepare(needsRestore);
344 }
345
346 /**
347 Do not call this method directly - it should only be invoked from
348 a Prevayler command.
349 */
350 void commit(TxnId anId)
351 throws UnknownTransactionException, IOException {
352
353 TxnState myState = getTxnFor(anId, null, true);
354
355 myState.commit();
356
357 removeTxn(anId);
358 }
359
360 /**
361 Do not call this method directly - it should only be invoked from
362 a Prevayler command.
363 */
364 void abort(TxnId anId)
365 throws UnknownTransactionException, IOException {
366
367 TxnState myState = getTxnFor(anId, null, true);
368
369 myState.abort();
370 removeTxn(anId);
371 }
372
373 void abortAll() throws IOException {
374 synchronized(this) {
375 Iterator myTxns = theTxns.keySet().iterator();
376
377 while (myTxns.hasNext()) {
378 TxnId myId = (TxnId) myTxns.next();
379
380 TxnState myState = getState(myId);
381
382 try {
383 int myStatus = myState.getStatus();
384
385 if ((myStatus == TransactionConstants.PREPARED) ||
386 (myStatus == TransactionConstants.ACTIVE)) {
387
388 /*
389 * AbortAll is a naive operation in that it has no
390 * awareness of a specific transaction thus it cannot
391 * explicitly vote one of them off so we must do it
392 * ourselves
393 */
394 myState.vote();
395 myState.abort();
396 myTxns.remove();
397 }
398
399 } catch (TransactionException aTE) {
400 // Whoops, got nailed checking status, logged in the call
401 // nothing to do.
402 }
403 }
404 }
405 }
406
407 private void removeTxn(TxnId anId) {
408 TxnState myState;
409
410 synchronized(this) {
411 myState = (TxnState) theTxns.remove(anId);
412 }
413 }
414
415 int getNumActiveTxns() {
416 synchronized(this) {
417 return theTxns.size();
418 }
419 }
420 }
421
422