comparison src/org/dancres/blitz/txnlock/TxnLock.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:3dc0c5604566
1 package org.dancres.blitz.txnlock;
2
3 import java.util.LinkedList;
4 import java.util.ArrayList;
5 import java.util.List;
6 import java.util.logging.*;
7
8 import org.dancres.blitz.entry.OpInfo;
9
10 import org.dancres.blitz.task.Task;
11 import org.dancres.blitz.task.Tasks;
12
13 import org.dancres.blitz.txn.TxnId;
14 import org.dancres.blitz.txn.TxnState;
15
16 import org.dancres.blitz.Logging;
17
18 /**
19 <p> Every transaction (null or otherwise) must secure a lock for an
20 EntrySleeve before it can deemed to have succeeded. When the transaction is
21 commited these locks are then released. </p>
22
23 <p> Things are slightly more complicated than this because we must handle
24 lock conflict and integrate that with the dispatching of blocked takes or
25 reads such that we can wake up those blocked matches when the blocking
26 transaction is commited. </p>
27 */
28 public class TxnLock {
29 private static final Logger theLogger =
30 Logging.newLogger("org.dancres.blitz.txnlock.TxnLock");
31
32 public static final int WRITE = -1;
33 public static final int READ = -2;
34 public static final int DELETE = -3;
35
36 public static final int SUCCESS = 1;
37 public static final int CONFLICT = 2;
38 public static final int FAIL = 3;
39
40 private List theConflicts;
41
42 private ArrayList theLockStates = new ArrayList();
43
44 /*
45 DEBUG BITS
46 */
47 private static final boolean DEBUG = false;
48 private static Object theIdLock = new Object();
49 private static long theNextId = 0;
50 private long theId = -1;
51
52 private static class LockState {
53 private int theType;
54 private TxnId theOwner;
55
56 LockState(int aType, TxnId anOwner) {
57 theType = aType;
58 theOwner = anOwner;
59 }
60
61 int getType() {
62 return theType;
63 }
64
65 TxnId getOwner() {
66 return theOwner;
67 }
68
69 public String toString() {
70 return theOwner + ":" + theType;
71 }
72 }
73
74 TxnLock() {
75 if (DEBUG) {
76 synchronized(theIdLock) {
77 theId = ++theNextId;
78 }
79 }
80 }
81
82 public boolean isActive() {
83 return (theLockStates.size() != 0);
84 }
85
86 public int test(TxnState anAcquirer, int aDesiredOp) {
87 TxnId myTxnId = anAcquirer.getId();
88
89 if (aDesiredOp == WRITE) {
90 // Always succeeds
91 return SUCCESS;
92 }
93
94 switch(aDesiredOp) {
95 case READ : {
96 boolean deleterIsMe = hasDeleterThatsMe(myTxnId);
97 TxnId aDeleter = hasDeleter();
98 TxnId aWriter = hasWriterOtherThan(myTxnId);
99
100 if (deleterIsMe) {
101 return FAIL;
102 } else if ((aDeleter != null) || (aWriter != null)) {
103 return CONFLICT;
104 } else {
105 return SUCCESS;
106 }
107 }
108 case DELETE : {
109 boolean deleterIsMe = hasDeleterThatsMe(myTxnId);
110 TxnId aDeleter = hasDeleter();
111 TxnId aWriter = hasWriterOtherThan(myTxnId);
112 TxnId aReader = hasReaderOtherThan(myTxnId);
113
114 if (deleterIsMe)
115 return FAIL;
116 else if ((aDeleter != null) ||
117 (aReader != null) ||
118 (aWriter != null)) {
119 return CONFLICT;
120 } else {
121 return SUCCESS;
122 }
123 }
124 default : {
125 throw new RuntimeException("Unrecognised acquire op");
126 }
127 }
128 }
129
130 /**
131 One calls this method to assert an operation lock on a particular
132 EntryUID. On success <code>true</code> is returned and the acquire
133 is complete. In the case of failure, the caller is not blocked.
134 Rather, we register a callback against the transaction which will
135 be invoked after it relinquishes it's lock. Note that there might
136 well be more than one blocking transaction but we only bother
137 registering against one of them. This might result in a few false
138 wakeups but they are unlikely to be performance inhibiting. This
139 method should be called in a synchronized block. <P>
140
141 Note that writes always succeed so there's no need to pass a callback
142 for such acquire requests.
143
144 @param anAcquirer TxnId of the transaction which wishes to assert the
145 lock.
146 @param aDesiredOp should be one of DELETE, READ,
147 WRITE
148 @param aParty called should a blocking transaction be encountered and
149 a second time when the blocking transaction has relinquished the lock
150 @param aHandback passed to aParty to allow multiplexing of conflicts
151 @param isRecovery when <code>true</code> causes a lock to be asserted
152 without performing checks.
153
154 @return One of, SUCCESS, CONFLICT or FAIL.
155 */
156 public int acquire(TxnState anAcquirer, int aDesiredOp, BaulkedParty aParty,
157 Object aHandback, boolean isRecovery) {
158
159 TxnId myTxnId = anAcquirer.getId();
160
161 if (aDesiredOp == WRITE) {
162 // Always succeeds
163 theLockStates.add(new LockState(WRITE, myTxnId));
164 return SUCCESS;
165 }
166
167 if (isRecovery) {
168 switch (aDesiredOp) {
169 case READ : {
170 theLockStates.add(new LockState(READ, myTxnId));
171 break;
172 }
173 case DELETE: {
174 theLockStates.add(new LockState(DELETE, myTxnId));
175 break;
176 }
177 }
178
179 return SUCCESS;
180 }
181
182 switch(aDesiredOp) {
183 case READ : {
184 boolean deleterIsMe = hasDeleterThatsMe(myTxnId);
185 TxnId aDeleter = hasDeleter();
186 TxnId aWriter = hasWriterOtherThan(myTxnId);
187
188 if (deleterIsMe) {
189 return FAIL;
190 } else if ((aDeleter != null) || (aWriter != null)) {
191 addConflict(aParty, aHandback,
192 (aDeleter == null) ? aWriter : aDeleter);
193 return CONFLICT;
194 } else {
195 theLockStates.add(new LockState(READ, myTxnId));
196 return SUCCESS;
197 }
198 }
199 case DELETE : {
200 boolean deleterIsMe = hasDeleterThatsMe(myTxnId);
201 TxnId aDeleter = hasDeleter();
202 TxnId aWriter = hasWriterOtherThan(myTxnId);
203 TxnId aReader = hasReaderOtherThan(myTxnId);
204
205 if (deleterIsMe)
206 return FAIL;
207 else if ((aDeleter != null) ||
208 (aReader != null) ||
209 (aWriter != null)) {
210 if (aDeleter != null)
211 addConflict(aParty, aHandback, aDeleter);
212 else
213 addConflict(aParty, aHandback,
214 (aReader == null) ? aWriter : aReader);
215 return CONFLICT;
216 } else {
217 theLockStates.add(new LockState(DELETE,
218 myTxnId));
219 return SUCCESS;
220 }
221 }
222 default : {
223 throw new RuntimeException("Unrecognised acquire op");
224 }
225 }
226 }
227
228 private void addConflict(BaulkedParty aParty, Object aHandback,
229 TxnId aConflicter) {
230 if (aParty == null)
231 return;
232
233 if (theConflicts == null)
234 theConflicts = new ArrayList();
235 // theConflicts = new LinkedList();
236
237 theConflicts.add(new Callback(aConflicter, aParty, aHandback));
238 }
239
240 private static class Callback implements Task {
241 private TxnId theBlocker;
242 private BaulkedParty theParty;
243 private Object theHandback;
244
245 Callback(TxnId aBlocker, BaulkedParty aParty, Object aHandback) {
246 theBlocker = aBlocker;
247 theParty = aParty;
248 theHandback = aHandback;
249 theParty.blocked(aHandback);
250 }
251
252 TxnId getBlocker() {
253 return theBlocker;
254 }
255
256 BaulkedParty getBaulked() {
257 return theParty;
258 }
259
260 public void run() {
261 theParty.unblocked(theHandback);
262 }
263 }
264
265 public void release(TxnState anAcquirer, int anOp) {
266 TxnId myId = anAcquirer.getId();
267
268 int myIndex = 0;
269 boolean haveReleasedLock = false;
270 int myOutstanding = 0;
271
272 /*
273 Examine all lock states and:
274
275 (1) Release the relevant lock state
276 (2) Take note of any other lock states associated with this
277 transaction.
278 */
279 ArrayList myDispatches = new ArrayList();
280
281 synchronized(this) {
282 while (myIndex < theLockStates.size()) {
283 LockState myState = (LockState) theLockStates.get(myIndex);
284
285 // If the lock state is associated with this transaction...
286 if (myState.getOwner().equals(myId)) {
287 /*
288 If we've already released a lock of this type for this
289 transaction just note that we've found another lock held
290 by the transaction. Otherwise, release the lock and note
291 we did that.
292 */
293 if ((myState.getType() == anOp) && (! haveReleasedLock)) {
294 theLockStates.remove(myIndex);
295 haveReleasedLock = true;
296 continue;
297 } else {
298 ++myOutstanding;
299 }
300 }
301
302 ++myIndex;
303 }
304
305 if (myOutstanding != 0) {
306 // System.err.println("Still have outstanding: " + myOutstanding);
307 // We haven't released all locks yet - no point waking up blockers
308 return;
309 } else {
310 // System.err.println("Waking up conflicters");
311 }
312
313 // Now process any outstanding blockers
314 myIndex = 0;
315 while ((theConflicts != null) && (myIndex < theConflicts.size())) {
316 Callback myCallback = (Callback) theConflicts.get(myIndex);
317
318 if (myCallback.getBlocker().equals(myId)) {
319 theConflicts.remove(myCallback);
320
321 // System.err.println("Dispatching callback");
322 myDispatches.add(myCallback);
323 } else {
324 ++myIndex;
325 }
326 }
327 }
328
329 if (myDispatches.size() > 0) {
330 try {
331 Tasks.queue(new DispatchTask(myDispatches));
332 } catch (InterruptedException anIE) {
333 theLogger.log(Level.SEVERE,
334 "Failed to queue Txn callback", anIE);
335 }
336 }
337 }
338
339 static class DispatchTask implements Task {
340 private ArrayList theDispatches;
341
342 DispatchTask(ArrayList aDispatches) {
343 theDispatches = aDispatches;
344 }
345
346 public void run() {
347 for (int i = 0; i < theDispatches.size(); i++) {
348 ((Task) theDispatches.get(i)).run();
349 }
350 }
351 }
352
353 public TxnId getWriter() {
354 for (int i = 0; i < theLockStates.size(); i++) {
355 LockState myState = (LockState) theLockStates.get(i);
356
357 if (myState.getType() == WRITE) {
358 return myState.getOwner();
359 }
360 }
361
362 return null;
363 }
364
365 public synchronized boolean hasWriter(TxnId anId) {
366 for (int i = 0; i < theLockStates.size(); i++) {
367 LockState myState = (LockState) theLockStates.get(i);
368
369 if ((myState.getType() == WRITE) &&
370 (myState.getOwner().equals(anId)))
371 return true;
372 }
373
374 return false;
375 }
376
377 public synchronized boolean hasOnly(TxnId anId, int anOp) {
378 if (theLockStates.size() != 1)
379 return false;
380
381 LockState myState = (LockState) theLockStates.get(0);
382
383 return ((myState.getOwner().equals(anId)) &&
384 (myState.getType() == anOp));
385 }
386
387 private TxnId hasDeleter() {
388 for (int i = 0; i < theLockStates.size(); i++) {
389 LockState myState = (LockState) theLockStates.get(i);
390
391 if (myState.getType() == DELETE)
392 return myState.getOwner();
393 }
394
395 return null;
396 }
397
398 private TxnId hasReaderOtherThan(TxnId anId) {
399 for (int i = 0; i < theLockStates.size(); i++) {
400 LockState myState = (LockState) theLockStates.get(i);
401
402 if ((myState.getType() == READ) &&
403 (! myState.getOwner().equals(anId)))
404 return myState.getOwner();
405 }
406
407 return null;
408 }
409
410 private TxnId hasWriterOtherThan(TxnId anId) {
411 for (int i = 0; i < theLockStates.size(); i++) {
412 LockState myState = (LockState) theLockStates.get(i);
413
414 if ((myState.getType() == WRITE) &&
415 (! myState.getOwner().equals(anId)))
416 return myState.getOwner();
417 }
418
419 return null;
420 }
421
422 private boolean hasDeleterThatsMe(TxnId anId) {
423 for (int i = 0; i < theLockStates.size(); i++) {
424 LockState myState = (LockState) theLockStates.get(i);
425
426 if ((myState.getType() == DELETE) &&
427 (myState.getOwner().equals(anId)))
428 return true;
429 }
430
431 return false;
432 }
433
434 public String toString() {
435 return "TLock: " + theId;
436 }
437 }