Mercurial > hg > blitz_condensed
comparison src/org/dancres/blitz/txnlock/TxnLock.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:3dc0c5604566 |
---|---|
1 package org.dancres.blitz.txnlock; | |
2 | |
3 import java.util.LinkedList; | |
4 import java.util.ArrayList; | |
5 import java.util.List; | |
6 import java.util.logging.*; | |
7 | |
8 import org.dancres.blitz.entry.OpInfo; | |
9 | |
10 import org.dancres.blitz.task.Task; | |
11 import org.dancres.blitz.task.Tasks; | |
12 | |
13 import org.dancres.blitz.txn.TxnId; | |
14 import org.dancres.blitz.txn.TxnState; | |
15 | |
16 import org.dancres.blitz.Logging; | |
17 | |
18 /** | |
19 <p> Every transaction (null or otherwise) must secure a lock for an | |
20 EntrySleeve before it can deemed to have succeeded. When the transaction is | |
21 commited these locks are then released. </p> | |
22 | |
23 <p> Things are slightly more complicated than this because we must handle | |
24 lock conflict and integrate that with the dispatching of blocked takes or | |
25 reads such that we can wake up those blocked matches when the blocking | |
26 transaction is commited. </p> | |
27 */ | |
28 public class TxnLock { | |
29 private static final Logger theLogger = | |
30 Logging.newLogger("org.dancres.blitz.txnlock.TxnLock"); | |
31 | |
32 public static final int WRITE = -1; | |
33 public static final int READ = -2; | |
34 public static final int DELETE = -3; | |
35 | |
36 public static final int SUCCESS = 1; | |
37 public static final int CONFLICT = 2; | |
38 public static final int FAIL = 3; | |
39 | |
40 private List theConflicts; | |
41 | |
42 private ArrayList theLockStates = new ArrayList(); | |
43 | |
44 /* | |
45 DEBUG BITS | |
46 */ | |
47 private static final boolean DEBUG = false; | |
48 private static Object theIdLock = new Object(); | |
49 private static long theNextId = 0; | |
50 private long theId = -1; | |
51 | |
52 private static class LockState { | |
53 private int theType; | |
54 private TxnId theOwner; | |
55 | |
56 LockState(int aType, TxnId anOwner) { | |
57 theType = aType; | |
58 theOwner = anOwner; | |
59 } | |
60 | |
61 int getType() { | |
62 return theType; | |
63 } | |
64 | |
65 TxnId getOwner() { | |
66 return theOwner; | |
67 } | |
68 | |
69 public String toString() { | |
70 return theOwner + ":" + theType; | |
71 } | |
72 } | |
73 | |
74 TxnLock() { | |
75 if (DEBUG) { | |
76 synchronized(theIdLock) { | |
77 theId = ++theNextId; | |
78 } | |
79 } | |
80 } | |
81 | |
82 public boolean isActive() { | |
83 return (theLockStates.size() != 0); | |
84 } | |
85 | |
86 public int test(TxnState anAcquirer, int aDesiredOp) { | |
87 TxnId myTxnId = anAcquirer.getId(); | |
88 | |
89 if (aDesiredOp == WRITE) { | |
90 // Always succeeds | |
91 return SUCCESS; | |
92 } | |
93 | |
94 switch(aDesiredOp) { | |
95 case READ : { | |
96 boolean deleterIsMe = hasDeleterThatsMe(myTxnId); | |
97 TxnId aDeleter = hasDeleter(); | |
98 TxnId aWriter = hasWriterOtherThan(myTxnId); | |
99 | |
100 if (deleterIsMe) { | |
101 return FAIL; | |
102 } else if ((aDeleter != null) || (aWriter != null)) { | |
103 return CONFLICT; | |
104 } else { | |
105 return SUCCESS; | |
106 } | |
107 } | |
108 case DELETE : { | |
109 boolean deleterIsMe = hasDeleterThatsMe(myTxnId); | |
110 TxnId aDeleter = hasDeleter(); | |
111 TxnId aWriter = hasWriterOtherThan(myTxnId); | |
112 TxnId aReader = hasReaderOtherThan(myTxnId); | |
113 | |
114 if (deleterIsMe) | |
115 return FAIL; | |
116 else if ((aDeleter != null) || | |
117 (aReader != null) || | |
118 (aWriter != null)) { | |
119 return CONFLICT; | |
120 } else { | |
121 return SUCCESS; | |
122 } | |
123 } | |
124 default : { | |
125 throw new RuntimeException("Unrecognised acquire op"); | |
126 } | |
127 } | |
128 } | |
129 | |
130 /** | |
131 One calls this method to assert an operation lock on a particular | |
132 EntryUID. On success <code>true</code> is returned and the acquire | |
133 is complete. In the case of failure, the caller is not blocked. | |
134 Rather, we register a callback against the transaction which will | |
135 be invoked after it relinquishes it's lock. Note that there might | |
136 well be more than one blocking transaction but we only bother | |
137 registering against one of them. This might result in a few false | |
138 wakeups but they are unlikely to be performance inhibiting. This | |
139 method should be called in a synchronized block. <P> | |
140 | |
141 Note that writes always succeed so there's no need to pass a callback | |
142 for such acquire requests. | |
143 | |
144 @param anAcquirer TxnId of the transaction which wishes to assert the | |
145 lock. | |
146 @param aDesiredOp should be one of DELETE, READ, | |
147 WRITE | |
148 @param aParty called should a blocking transaction be encountered and | |
149 a second time when the blocking transaction has relinquished the lock | |
150 @param aHandback passed to aParty to allow multiplexing of conflicts | |
151 @param isRecovery when <code>true</code> causes a lock to be asserted | |
152 without performing checks. | |
153 | |
154 @return One of, SUCCESS, CONFLICT or FAIL. | |
155 */ | |
156 public int acquire(TxnState anAcquirer, int aDesiredOp, BaulkedParty aParty, | |
157 Object aHandback, boolean isRecovery) { | |
158 | |
159 TxnId myTxnId = anAcquirer.getId(); | |
160 | |
161 if (aDesiredOp == WRITE) { | |
162 // Always succeeds | |
163 theLockStates.add(new LockState(WRITE, myTxnId)); | |
164 return SUCCESS; | |
165 } | |
166 | |
167 if (isRecovery) { | |
168 switch (aDesiredOp) { | |
169 case READ : { | |
170 theLockStates.add(new LockState(READ, myTxnId)); | |
171 break; | |
172 } | |
173 case DELETE: { | |
174 theLockStates.add(new LockState(DELETE, myTxnId)); | |
175 break; | |
176 } | |
177 } | |
178 | |
179 return SUCCESS; | |
180 } | |
181 | |
182 switch(aDesiredOp) { | |
183 case READ : { | |
184 boolean deleterIsMe = hasDeleterThatsMe(myTxnId); | |
185 TxnId aDeleter = hasDeleter(); | |
186 TxnId aWriter = hasWriterOtherThan(myTxnId); | |
187 | |
188 if (deleterIsMe) { | |
189 return FAIL; | |
190 } else if ((aDeleter != null) || (aWriter != null)) { | |
191 addConflict(aParty, aHandback, | |
192 (aDeleter == null) ? aWriter : aDeleter); | |
193 return CONFLICT; | |
194 } else { | |
195 theLockStates.add(new LockState(READ, myTxnId)); | |
196 return SUCCESS; | |
197 } | |
198 } | |
199 case DELETE : { | |
200 boolean deleterIsMe = hasDeleterThatsMe(myTxnId); | |
201 TxnId aDeleter = hasDeleter(); | |
202 TxnId aWriter = hasWriterOtherThan(myTxnId); | |
203 TxnId aReader = hasReaderOtherThan(myTxnId); | |
204 | |
205 if (deleterIsMe) | |
206 return FAIL; | |
207 else if ((aDeleter != null) || | |
208 (aReader != null) || | |
209 (aWriter != null)) { | |
210 if (aDeleter != null) | |
211 addConflict(aParty, aHandback, aDeleter); | |
212 else | |
213 addConflict(aParty, aHandback, | |
214 (aReader == null) ? aWriter : aReader); | |
215 return CONFLICT; | |
216 } else { | |
217 theLockStates.add(new LockState(DELETE, | |
218 myTxnId)); | |
219 return SUCCESS; | |
220 } | |
221 } | |
222 default : { | |
223 throw new RuntimeException("Unrecognised acquire op"); | |
224 } | |
225 } | |
226 } | |
227 | |
228 private void addConflict(BaulkedParty aParty, Object aHandback, | |
229 TxnId aConflicter) { | |
230 if (aParty == null) | |
231 return; | |
232 | |
233 if (theConflicts == null) | |
234 theConflicts = new ArrayList(); | |
235 // theConflicts = new LinkedList(); | |
236 | |
237 theConflicts.add(new Callback(aConflicter, aParty, aHandback)); | |
238 } | |
239 | |
240 private static class Callback implements Task { | |
241 private TxnId theBlocker; | |
242 private BaulkedParty theParty; | |
243 private Object theHandback; | |
244 | |
245 Callback(TxnId aBlocker, BaulkedParty aParty, Object aHandback) { | |
246 theBlocker = aBlocker; | |
247 theParty = aParty; | |
248 theHandback = aHandback; | |
249 theParty.blocked(aHandback); | |
250 } | |
251 | |
252 TxnId getBlocker() { | |
253 return theBlocker; | |
254 } | |
255 | |
256 BaulkedParty getBaulked() { | |
257 return theParty; | |
258 } | |
259 | |
260 public void run() { | |
261 theParty.unblocked(theHandback); | |
262 } | |
263 } | |
264 | |
265 public void release(TxnState anAcquirer, int anOp) { | |
266 TxnId myId = anAcquirer.getId(); | |
267 | |
268 int myIndex = 0; | |
269 boolean haveReleasedLock = false; | |
270 int myOutstanding = 0; | |
271 | |
272 /* | |
273 Examine all lock states and: | |
274 | |
275 (1) Release the relevant lock state | |
276 (2) Take note of any other lock states associated with this | |
277 transaction. | |
278 */ | |
279 ArrayList myDispatches = new ArrayList(); | |
280 | |
281 synchronized(this) { | |
282 while (myIndex < theLockStates.size()) { | |
283 LockState myState = (LockState) theLockStates.get(myIndex); | |
284 | |
285 // If the lock state is associated with this transaction... | |
286 if (myState.getOwner().equals(myId)) { | |
287 /* | |
288 If we've already released a lock of this type for this | |
289 transaction just note that we've found another lock held | |
290 by the transaction. Otherwise, release the lock and note | |
291 we did that. | |
292 */ | |
293 if ((myState.getType() == anOp) && (! haveReleasedLock)) { | |
294 theLockStates.remove(myIndex); | |
295 haveReleasedLock = true; | |
296 continue; | |
297 } else { | |
298 ++myOutstanding; | |
299 } | |
300 } | |
301 | |
302 ++myIndex; | |
303 } | |
304 | |
305 if (myOutstanding != 0) { | |
306 // System.err.println("Still have outstanding: " + myOutstanding); | |
307 // We haven't released all locks yet - no point waking up blockers | |
308 return; | |
309 } else { | |
310 // System.err.println("Waking up conflicters"); | |
311 } | |
312 | |
313 // Now process any outstanding blockers | |
314 myIndex = 0; | |
315 while ((theConflicts != null) && (myIndex < theConflicts.size())) { | |
316 Callback myCallback = (Callback) theConflicts.get(myIndex); | |
317 | |
318 if (myCallback.getBlocker().equals(myId)) { | |
319 theConflicts.remove(myCallback); | |
320 | |
321 // System.err.println("Dispatching callback"); | |
322 myDispatches.add(myCallback); | |
323 } else { | |
324 ++myIndex; | |
325 } | |
326 } | |
327 } | |
328 | |
329 if (myDispatches.size() > 0) { | |
330 try { | |
331 Tasks.queue(new DispatchTask(myDispatches)); | |
332 } catch (InterruptedException anIE) { | |
333 theLogger.log(Level.SEVERE, | |
334 "Failed to queue Txn callback", anIE); | |
335 } | |
336 } | |
337 } | |
338 | |
339 static class DispatchTask implements Task { | |
340 private ArrayList theDispatches; | |
341 | |
342 DispatchTask(ArrayList aDispatches) { | |
343 theDispatches = aDispatches; | |
344 } | |
345 | |
346 public void run() { | |
347 for (int i = 0; i < theDispatches.size(); i++) { | |
348 ((Task) theDispatches.get(i)).run(); | |
349 } | |
350 } | |
351 } | |
352 | |
353 public TxnId getWriter() { | |
354 for (int i = 0; i < theLockStates.size(); i++) { | |
355 LockState myState = (LockState) theLockStates.get(i); | |
356 | |
357 if (myState.getType() == WRITE) { | |
358 return myState.getOwner(); | |
359 } | |
360 } | |
361 | |
362 return null; | |
363 } | |
364 | |
365 public synchronized boolean hasWriter(TxnId anId) { | |
366 for (int i = 0; i < theLockStates.size(); i++) { | |
367 LockState myState = (LockState) theLockStates.get(i); | |
368 | |
369 if ((myState.getType() == WRITE) && | |
370 (myState.getOwner().equals(anId))) | |
371 return true; | |
372 } | |
373 | |
374 return false; | |
375 } | |
376 | |
377 public synchronized boolean hasOnly(TxnId anId, int anOp) { | |
378 if (theLockStates.size() != 1) | |
379 return false; | |
380 | |
381 LockState myState = (LockState) theLockStates.get(0); | |
382 | |
383 return ((myState.getOwner().equals(anId)) && | |
384 (myState.getType() == anOp)); | |
385 } | |
386 | |
387 private TxnId hasDeleter() { | |
388 for (int i = 0; i < theLockStates.size(); i++) { | |
389 LockState myState = (LockState) theLockStates.get(i); | |
390 | |
391 if (myState.getType() == DELETE) | |
392 return myState.getOwner(); | |
393 } | |
394 | |
395 return null; | |
396 } | |
397 | |
398 private TxnId hasReaderOtherThan(TxnId anId) { | |
399 for (int i = 0; i < theLockStates.size(); i++) { | |
400 LockState myState = (LockState) theLockStates.get(i); | |
401 | |
402 if ((myState.getType() == READ) && | |
403 (! myState.getOwner().equals(anId))) | |
404 return myState.getOwner(); | |
405 } | |
406 | |
407 return null; | |
408 } | |
409 | |
410 private TxnId hasWriterOtherThan(TxnId anId) { | |
411 for (int i = 0; i < theLockStates.size(); i++) { | |
412 LockState myState = (LockState) theLockStates.get(i); | |
413 | |
414 if ((myState.getType() == WRITE) && | |
415 (! myState.getOwner().equals(anId))) | |
416 return myState.getOwner(); | |
417 } | |
418 | |
419 return null; | |
420 } | |
421 | |
422 private boolean hasDeleterThatsMe(TxnId anId) { | |
423 for (int i = 0; i < theLockStates.size(); i++) { | |
424 LockState myState = (LockState) theLockStates.get(i); | |
425 | |
426 if ((myState.getType() == DELETE) && | |
427 (myState.getOwner().equals(anId))) | |
428 return true; | |
429 } | |
430 | |
431 return false; | |
432 } | |
433 | |
434 public String toString() { | |
435 return "TLock: " + theId; | |
436 } | |
437 } |