Mercurial > hg > blitz_condensed
comparison src/org/dancres/blitz/txn/TxnManagerState.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:3dc0c5604566 |
---|---|
1 package org.dancres.blitz.txn; | |
2 | |
3 import java.io.IOException; | |
4 import java.io.ObjectInputStream; | |
5 import java.io.ObjectOutputStream; | |
6 import java.io.Serializable; | |
7 import java.rmi.RemoteException; | |
8 import java.util.ArrayList; | |
9 import java.util.HashMap; | |
10 import java.util.Iterator; | |
11 import java.util.List; | |
12 import java.util.logging.Level; | |
13 | |
14 import net.jini.core.transaction.TransactionException; | |
15 import net.jini.core.transaction.UnknownTransactionException; | |
16 import net.jini.core.transaction.server.TransactionConstants; | |
17 | |
18 import org.prevayler.AlarmClock; | |
19 import org.prevayler.PrevalentSystem; | |
20 import org.prevayler.SnapshotContributor; | |
21 | |
22 /** | |
23 Responsible for tracking/managing transactions. This responsiblity is split | |
24 across two classes. TxnManager handles control aspects whilst | |
25 TxnManagerState tracks the transactional information. <P> | |
26 | |
27 @see org.dancres.blitz.txn.TxnManager | |
28 */ | |
29 class TxnManagerState implements PrevalentSystem { | |
30 static final long serialVersionUID = -5650181362477845180L; | |
31 | |
32 private static boolean UPGRADE = false; | |
33 | |
34 /* | |
35 These member variables are serialized as part of writeObject and | |
36 de-serialized as part of readObject (which also initializes appropriate | |
37 state). i.e. We don't just serialize this object directly. | |
38 */ | |
39 private AlarmClock theClock; | |
40 | |
41 private HashMap theTxns = new HashMap(); | |
42 | |
43 private Serializable[] theSnapshotContributions = new Serializable[0]; | |
44 | |
45 private ArrayList theSnapshotContributors = new ArrayList(); | |
46 | |
47 static void enableUpgrade() { | |
48 UPGRADE = true; | |
49 } | |
50 | |
51 public void clock(AlarmClock aClock) { | |
52 theClock = aClock; | |
53 } | |
54 | |
55 public AlarmClock clock() { | |
56 return theClock; | |
57 } | |
58 | |
59 public List getActiveTxnIds() { | |
60 ArrayList myTxnIds = new ArrayList(); | |
61 | |
62 synchronized(this) { | |
63 Iterator myTxns = theTxns.keySet().iterator(); | |
64 | |
65 while (myTxns.hasNext()) { | |
66 TxnId myId = (TxnId) myTxns.next(); | |
67 myTxnIds.add(myId); | |
68 } | |
69 } | |
70 | |
71 return myTxnIds; | |
72 } | |
73 | |
74 public void add(SnapshotContributor aContributor) { | |
75 synchronized(theSnapshotContributors) { | |
76 if (!theSnapshotContributors.contains(aContributor)) | |
77 theSnapshotContributors.add(aContributor); | |
78 } | |
79 } | |
80 | |
81 public void remove(SnapshotContributor aContributor) { | |
82 synchronized(theSnapshotContributors) { | |
83 theSnapshotContributors.remove(aContributor); | |
84 } | |
85 } | |
86 | |
87 public Serializable[] getSnapshotContributions() { | |
88 return theSnapshotContributions; | |
89 } | |
90 | |
91 private void writeObject(ObjectOutputStream anOut) throws IOException { | |
92 anOut.writeObject(LogVersion.VERSION); | |
93 | |
94 /* | |
95 We only save PREPARED transactions, ignoring ACTIVES because | |
96 they are transient and their state changes won't be applied | |
97 until we've issued prepare and then commit or abort. The ACTIVES | |
98 will either die due to failure or, post the sync, add operations | |
99 to the log. Note that, whilst a transaction is active, it generates | |
100 no log records at all hence the reason we don't need to save them. | |
101 Commited or aborted updates in cache which need flushing to disk | |
102 should have already been sync'd before we get this far. | |
103 */ | |
104 ArrayList myPrepared = new ArrayList(); | |
105 | |
106 synchronized(this) { | |
107 // Write out clock | |
108 // | |
109 anOut.writeObject(theClock); | |
110 | |
111 Iterator myTxns = theTxns.keySet().iterator(); | |
112 | |
113 while (myTxns.hasNext()) { | |
114 TxnId myId = (TxnId) myTxns.next(); | |
115 | |
116 TxnState myState = getState(myId); | |
117 | |
118 try { | |
119 int myStatus = myState.getStatus(); | |
120 | |
121 if (myStatus == TransactionConstants.PREPARED) { | |
122 myPrepared.add(myState); | |
123 } | |
124 | |
125 } catch (TransactionException aTE) { | |
126 // Whoops, got nailed checking status, logged in the call | |
127 // nothing to do. | |
128 } | |
129 } | |
130 | |
131 anOut.writeInt(myPrepared.size()); | |
132 | |
133 for (int i = 0; i < myPrepared.size(); i++) { | |
134 anOut.writeObject(myPrepared.get(i)); | |
135 } | |
136 } | |
137 | |
138 /* | |
139 Write out any user-code snapshot contributions | |
140 */ | |
141 ArrayList myContributions = new ArrayList(); | |
142 | |
143 synchronized(theSnapshotContributors) { | |
144 for (int i = 0; i < theSnapshotContributors.size(); i++) { | |
145 myContributions.add(((SnapshotContributor) theSnapshotContributors.get(i)).getContribution()); | |
146 } | |
147 } | |
148 | |
149 Serializable[] myUserData = new Serializable[myContributions.size()]; | |
150 myUserData = (Serializable[]) myContributions.toArray(myUserData); | |
151 | |
152 anOut.writeObject(myUserData); | |
153 } | |
154 | |
155 private void readObject(ObjectInputStream anIn) | |
156 throws IOException, ClassNotFoundException { | |
157 | |
158 boolean isUpgrade = false; | |
159 | |
160 theTxns = new HashMap(); | |
161 theSnapshotContributors = new ArrayList(); | |
162 | |
163 Object myFirst = anIn.readObject(); | |
164 | |
165 /* | |
166 If there's no LogVersion, chances are we're looking at a pre 1.13 | |
167 log format - upgrade is simple as there's no LogVersion and there | |
168 will be no user checkpoint data so we just ignore those fields. | |
169 */ | |
170 if (! (myFirst instanceof LogVersion)) { | |
171 TxnManager.theLogger.log(Level.SEVERE, "Upgrading old transaction log"); | |
172 isUpgrade = true; | |
173 theClock = (AlarmClock) myFirst; | |
174 } else { | |
175 LogVersion myVersion = (LogVersion) myFirst; | |
176 | |
177 if (!myVersion.equals(LogVersion.VERSION)) | |
178 throw new IOException("Yikes - log versions don't match - upgrade?" + myVersion); | |
179 | |
180 theClock = (AlarmClock) anIn.readObject(); | |
181 } | |
182 | |
183 int myNumRecords = anIn.readInt(); | |
184 | |
185 synchronized(this) { | |
186 | |
187 for (int i = 0; i < myNumRecords; i++) { | |
188 TxnState myState = (TxnState) anIn.readObject(); | |
189 | |
190 try { | |
191 myState.prepare(true); | |
192 } catch (UnknownTransactionException aUTE) { | |
193 IOException anIOE = new IOException("Failed to recover prepare"); | |
194 anIOE.initCause(aUTE); | |
195 throw anIOE; | |
196 } | |
197 | |
198 theTxns.put(myState.getId(), myState); | |
199 } | |
200 } | |
201 | |
202 if (isUpgrade) | |
203 theSnapshotContributions = new Serializable[0]; | |
204 else | |
205 theSnapshotContributions = (Serializable[]) anIn.readObject(); | |
206 } | |
207 | |
208 private TxnState getState(TxnId anId) { | |
209 synchronized(this) { | |
210 return (TxnState) theTxns.get(anId); | |
211 } | |
212 } | |
213 | |
214 /** | |
215 Resolve a transaction using this method before calling any of | |
216 <code>prepare</code>, <code>commit</code>, <code>abort</code> or | |
217 <code>prepareAndCommit</code>. | |
218 | |
219 @todo Add Janitor/Checker thread to clear out dead transaction | |
220 state - see comments in method | |
221 */ | |
222 TxnState getTxnFor(TxnId anId, TxnGateway aGateway, boolean mustExist) | |
223 throws UnknownTransactionException { | |
224 | |
225 TxnState myState = null; | |
226 | |
227 if (mustExist) { | |
228 myState = getState(anId); | |
229 } else { | |
230 synchronized(this) { | |
231 myState = (TxnState) theTxns.get(anId); | |
232 } | |
233 | |
234 /* | |
235 If state doesn't exist, we need to join and update the state | |
236 */ | |
237 if (myState == null) { | |
238 try { | |
239 aGateway.join(anId); | |
240 } catch (Exception anException) { | |
241 TxnManager.theLogger.log(Level.SEVERE, | |
242 "Failed to join txn" + | |
243 anId, anException); | |
244 | |
245 throw new UnknownTransactionException(); | |
246 } | |
247 | |
248 /* | |
249 There's a race condition here where the transaction | |
250 could be prepared before we get our state updated. | |
251 If that happens, the prepare method will bounce | |
252 the prepare call blowing the transaction out. | |
253 | |
254 This will leave us with a bit of dead txn state which | |
255 we ought to cleanup by invoking getState etc. This | |
256 could be done by a janitor thread in the future. | |
257 | |
258 In reality: | |
259 | |
260 (a) This is unlikely | |
261 (b) Should this happen, we're going to be the least of | |
262 the problems because someone somewhere thinks | |
263 the transaction is active (this thread is acting on | |
264 their behalf) and someone else is closing it out. | |
265 When the client associated with this thread invokes | |
266 commit there'll be a big nasty mess. | |
267 */ | |
268 synchronized(this) { | |
269 | |
270 /* | |
271 Up till now, we race to create/join the transaction | |
272 (see above). Now we must put it right.... | |
273 */ | |
274 myState = (TxnState) theTxns.get(anId); | |
275 if (myState == null) { | |
276 myState = new TxnState(anId); | |
277 theTxns.put(anId, myState); | |
278 } | |
279 } | |
280 } | |
281 } | |
282 | |
283 if (myState == null) | |
284 throw new UnknownTransactionException(); | |
285 else | |
286 return myState; | |
287 } | |
288 | |
289 /** | |
290 In cases where no explicit transaction has been passed in by a caller, | |
291 create a null transaction which is an internal, fully transactional | |
292 replacement which can be used for the duration of the operation | |
293 in question. | |
294 */ | |
295 TxnState newNullTxn() throws RemoteException { | |
296 TxnId myId = TxnId.newNullTxn(); | |
297 TxnState myState = new TxnState(myId); | |
298 | |
299 synchronized(this) { | |
300 theTxns.put(myId, myState); | |
301 } | |
302 | |
303 return myState; | |
304 } | |
305 | |
306 /** | |
307 In cases where no state will be changed (no Entry's taken or written), | |
308 create an instance of this transaction which, when commited or aborted | |
309 will be undone but not logged. | |
310 */ | |
311 TxnState newIdentityTxn() throws RemoteException { | |
312 TxnId myId = TxnId.newNullTxn(); | |
313 TxnState myState = new TxnState(myId, true); | |
314 | |
315 synchronized(this) { | |
316 theTxns.put(myId, myState); | |
317 } | |
318 | |
319 return myState; | |
320 } | |
321 | |
322 /** | |
323 Do not call this method directly - it should only be invoked from | |
324 a Prevayler command. | |
325 */ | |
326 int prepare(TxnState aState) | |
327 throws UnknownTransactionException, IOException { | |
328 | |
329 /* | |
330 Do we know about this transaction? | |
331 | |
332 If we don't we've failed and are now doing recovery so we must | |
333 re-insert the state. | |
334 */ | |
335 boolean needsRestore = (getState(aState.getId()) == null); | |
336 | |
337 if (needsRestore) { | |
338 synchronized(this) { | |
339 theTxns.put(aState.getId(), aState); | |
340 } | |
341 } | |
342 | |
343 return aState.prepare(needsRestore); | |
344 } | |
345 | |
346 /** | |
347 Do not call this method directly - it should only be invoked from | |
348 a Prevayler command. | |
349 */ | |
350 void commit(TxnId anId) | |
351 throws UnknownTransactionException, IOException { | |
352 | |
353 TxnState myState = getTxnFor(anId, null, true); | |
354 | |
355 myState.commit(); | |
356 | |
357 removeTxn(anId); | |
358 } | |
359 | |
360 /** | |
361 Do not call this method directly - it should only be invoked from | |
362 a Prevayler command. | |
363 */ | |
364 void abort(TxnId anId) | |
365 throws UnknownTransactionException, IOException { | |
366 | |
367 TxnState myState = getTxnFor(anId, null, true); | |
368 | |
369 myState.abort(); | |
370 removeTxn(anId); | |
371 } | |
372 | |
373 void abortAll() throws IOException { | |
374 synchronized(this) { | |
375 Iterator myTxns = theTxns.keySet().iterator(); | |
376 | |
377 while (myTxns.hasNext()) { | |
378 TxnId myId = (TxnId) myTxns.next(); | |
379 | |
380 TxnState myState = getState(myId); | |
381 | |
382 try { | |
383 int myStatus = myState.getStatus(); | |
384 | |
385 if ((myStatus == TransactionConstants.PREPARED) || | |
386 (myStatus == TransactionConstants.ACTIVE)) { | |
387 | |
388 /* | |
389 * AbortAll is a naive operation in that it has no | |
390 * awareness of a specific transaction thus it cannot | |
391 * explicitly vote one of them off so we must do it | |
392 * ourselves | |
393 */ | |
394 myState.vote(); | |
395 myState.abort(); | |
396 myTxns.remove(); | |
397 } | |
398 | |
399 } catch (TransactionException aTE) { | |
400 // Whoops, got nailed checking status, logged in the call | |
401 // nothing to do. | |
402 } | |
403 } | |
404 } | |
405 } | |
406 | |
407 private void removeTxn(TxnId anId) { | |
408 TxnState myState; | |
409 | |
410 synchronized(this) { | |
411 myState = (TxnState) theTxns.remove(anId); | |
412 } | |
413 } | |
414 | |
415 int getNumActiveTxns() { | |
416 synchronized(this) { | |
417 return theTxns.size(); | |
418 } | |
419 } | |
420 } | |
421 | |
422 |