comparison src/com/go/trove/util/tq/TransactionQueue.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:3dc0c5604566
1 /* ====================================================================
2 * Trove - Copyright (c) 1997-2000 Walt Disney Internet Group
3 * ====================================================================
4 * The Tea Software License, Version 1.1
5 *
6 * Copyright (c) 2000 Walt Disney Internet Group. All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
10 * are met:
11 *
12 * 1. Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 *
15 * 2. Redistributions in binary form must reproduce the above copyright
16 * notice, this list of conditions and the following disclaimer in
17 * the documentation and/or other materials provided with the
18 * distribution.
19 *
20 * 3. The end-user documentation included with the redistribution,
21 * if any, must include the following acknowledgment:
22 * "This product includes software developed by the
23 * Walt Disney Internet Group (http://opensource.go.com/)."
24 * Alternately, this acknowledgment may appear in the software itself,
25 * if and wherever such third-party acknowledgments normally appear.
26 *
27 * 4. The names "Tea", "TeaServlet", "Kettle", "Trove" and "BeanDoc" must
28 * not be used to endorse or promote products derived from this
29 * software without prior written permission. For written
30 * permission, please contact opensource@dig.com.
31 *
32 * 5. Products derived from this software may not be called "Tea",
33 * "TeaServlet", "Kettle" or "Trove", nor may "Tea", "TeaServlet",
34 * "Kettle", "Trove" or "BeanDoc" appear in their name, without prior
35 * written permission of the Walt Disney Internet Group.
36 *
37 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
38 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
39 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
40 * DISCLAIMED. IN NO EVENT SHALL THE WALT DISNEY INTERNET GROUP OR ITS
41 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
42 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
43 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
44 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
45 * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
46 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
47 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
48 * ====================================================================
49 *
50 * For more information about Tea, please see http://opensource.go.com/.
51 */
52
53 package com.go.trove.util.tq;
54
55 import java.util.*;
56 import com.go.trove.util.*;
57
58 /******************************************************************************
59 * TransactionQueue processes {@link Transaction Transactions} concurrently
60 * using threads obtained from a {@link ThreadPool}. When a transaction is
61 * enqueued, it goes into a waiting queue, and it is serviced as soon as a
62 * thread is available.
63 *
64 * @author Brian S O'Neill
65 * @version
66 * <!--$$Revision: 1.1 $-->, <!--$$JustDate:--> 01/02/15 <!-- $-->
67 */
68 public class TransactionQueue {
69 private ThreadPool mThreadPool;
70 private String mName;
71 private int mMaxSize;
72 private int mMaxThreads;
73 private long mIdleTimeout;
74 private long mTransactionTimeout;
75
76 private LinkedList mQueue = new LinkedList();
77 private int mThreadCount;
78 private int mServicingCount;
79 private int mThreadId;
80 private boolean mSuspended;
81
82 private Worker mWorker = new Worker();
83
84 private Collection mListeners = new LinkedList();
85 private Collection mExceptionListeners = new LinkedList();
86
87 // Used to gather time lapse statistics.
88 private long mTimeLapseStart;
89 private int mPeakQueueSize;
90 private int mPeakThreadCount;
91 private int mPeakServicingCount;
92 private int mTotalEnqueueAttempts;
93 private int mTotalEnqueued;
94 private int mTotalServiced;
95 private int mTotalExpired;
96 private int mTotalServiceExceptions;
97 private int mTotalUncaughtExceptions;
98 private long mTotalQueueDuration;
99 private long mTotalServiceDuration;
100
101 public TransactionQueue(ThreadPool tp, int maxSize, int maxThreads) {
102 this(tp, "TransactionQueue", maxSize, maxThreads);
103 }
104
105 public TransactionQueue(ThreadPool tp, String name,
106 int maxSize, int maxThreads) {
107 mThreadPool = tp;
108 mName = name;
109
110 setMaximumSize(maxSize);
111 setMaximumThreads(maxThreads);
112
113 setIdleTimeout(tp.getIdleTimeout());
114 setTransactionTimeout(-1);
115
116 resetStatistics();
117 }
118
119 /**
120 * Sets the timeout (in milliseconds) for the TransactionQueue to wait
121 * inactive before going into an idle state. When the TransactionQueue is
122 * idle, there are no internal worker threads. A negative value specifies
123 * that the TransactionQueue should never automatically go into idle mode.
124 *
125 * @see #idle()
126 */
127 public synchronized void setIdleTimeout(long timeout) {
128 mIdleTimeout = timeout;
129 }
130
131 /**
132 * Returns the timeout (in milliseconds) that the TransactionQueue will
133 * wait inactive before going into an idle state. The default value is
134 * the same as the ThreadPool's idle timeout.
135 *
136 * @see #idle()
137 * @see ThreadPool#getIdleTimeout
138 */
139 public synchronized long getIdleTimeout() {
140 return mIdleTimeout;
141 }
142
143 /**
144 * Sets the timeout (in milliseconds) to wait before an enqueued
145 * transaction expires. If a worker receives an expired transaction, it
146 * is cancelled. A negative timeout specifies that enqueued transactions
147 * never expire.
148 */
149 public synchronized void setTransactionTimeout(long timeout) {
150 mTransactionTimeout = timeout;
151 }
152
153 /**
154 * Returns the timeout (in milliseconds) to wait before an enqueued
155 * transaction expires. The default value is -1, indicating that enqueued
156 * transactions never expire.
157 */
158 public synchronized long getTransactionTimeout() {
159 return mTransactionTimeout;
160 }
161
162 /**
163 * Returns the name of this TransactionQueue.
164 */
165 public String getName() {
166 return mName;
167 }
168
169 /**
170 * Returns the maximum allowed number of queued transactions.
171 */
172 public synchronized int getMaximumSize() {
173 return mMaxSize;
174 }
175
176 /**
177 * Setting the max size to zero disables enqueueing.
178 */
179 public synchronized void setMaximumSize(int max) {
180 if (max < 0) {
181 throw new IllegalArgumentException
182 ("TransactionQueue max size must be positive: " + max);
183 }
184
185 mMaxSize = max;
186 }
187
188 /**
189 * Returns the maximum allowed number of worker threads.
190 */
191 public synchronized int getMaximumThreads() {
192 return mMaxThreads;
193 }
194
195 public synchronized void setMaximumThreads(int max) {
196 if (max < 1) {
197 throw new IllegalArgumentException
198 ("TransactionQueue must have at least one thread: " + max);
199 }
200
201 mMaxThreads = max;
202 }
203
204 /**
205 * Enqueues a transaction that will be serviced when a worker is
206 * available. If the queue is full or cannot accept new transactions, the
207 * transaction is not enqueued, and false is returned.
208 *
209 * @return true if enqueued, false if queue is full or cannot accept new
210 * transactions.
211 */
212 public synchronized boolean enqueue(Transaction transaction) {
213 mTotalEnqueueAttempts++;
214
215 if (transaction == null || mThreadPool.isClosed()) {
216 return false;
217 }
218
219 int queueSize;
220 if ((queueSize = mQueue.size()) >= mMaxSize) {
221 if (mListeners.size() > 0) {
222 TransactionQueueEvent event =
223 new TransactionQueueEvent(this, transaction);
224
225 Iterator it = mListeners.iterator();
226 while (it.hasNext()) {
227 ((TransactionQueueListener)it.next())
228 .transactionQueueFull(event);
229 }
230 }
231 return false;
232 }
233
234 if (!mSuspended) {
235 if (!ensureWaitingThread()) {
236 return false;
237 }
238 }
239
240 mTotalEnqueued++;
241
242 TransactionQueueEvent event =
243 new TransactionQueueEvent(this, transaction);
244
245 mQueue.addLast(event);
246
247 if (++queueSize > mPeakQueueSize) {
248 mPeakQueueSize = queueSize;
249 }
250
251 notify();
252
253 if (mListeners.size() > 0) {
254 Iterator it = mListeners.iterator();
255 while (it.hasNext()) {
256 ((TransactionQueueListener)it.next())
257 .transactionEnqueued(event);
258 }
259 }
260
261 return true;
262 }
263
264 /**
265 * Suspends processing of transactions in the queue until resume is called.
266 * If suspend is called on a TransactionQueue that is already suspended,
267 * the call has no effect.
268 */
269 public synchronized void suspend() {
270 if (!mSuspended) {
271 mQueue.addFirst(null);
272 notify();
273 mSuspended = true;
274 }
275 }
276
277 /**
278 * Resumes processing of transactions in the queue if suspend was called.
279 * If resume is called on a TransactionQueue that is already running, the
280 * call has no effect, but true is still returned.
281 *
282 * @return false if couldn't resume because no threads available from pool.
283 */
284 public synchronized boolean resume() {
285 if (mSuspended) {
286 mSuspended = false;
287 }
288 return ensureWaitingThread();
289 }
290
291 /**
292 * Make this TransactionQueue go into idle mode and allow it to be
293 * reclaimed by the garbage collector if it is no longer used. Any pending
294 * transactions will be serviced, and any servicing transactions will
295 * finish. If any transactions are added to the TransactionQueue while
296 * idle, it will reactivate itself. New TransactionQueues start out in an
297 * idle state.
298 *
299 * @see #setIdleTimeout
300 * @see #getIdleTimeout
301 */
302 public synchronized void idle() {
303 mQueue.addLast(null);
304 notify();
305 }
306
307 public synchronized void addTransactionQueueListener
308 (TransactionQueueListener listener) {
309
310 mListeners.add(listener);
311 }
312
313 public synchronized void removeTransactionQueueListener
314 (TransactionQueueListener listener) {
315
316 mListeners.remove(listener);
317 }
318
319 public synchronized void addUncaughtExceptionListener
320 (UncaughtExceptionListener listener) {
321
322 mExceptionListeners.add(listener);
323 }
324
325 public synchronized void removeUncaughtExceptionListener
326 (UncaughtExceptionListener listener) {
327
328 mExceptionListeners.remove(listener);
329 }
330
331 /**
332 * Returns the number of currently queued transactions.
333 */
334 public synchronized int getQueueSize() {
335 return mQueue.size();
336 }
337
338 /**
339 * Returns the current amount of worker threads.
340 */
341 public synchronized int getThreadCount() {
342 return mThreadCount;
343 }
344
345 /**
346 * Returns a snapshot of the statistics on this TransactionQueue.
347 */
348 public synchronized TransactionQueueData getStatistics() {
349 return new TransactionQueueData(this,
350 mTimeLapseStart,
351 System.currentTimeMillis(),
352 mQueue.size(),
353 mThreadCount,
354 mServicingCount,
355 mPeakQueueSize,
356 mPeakThreadCount,
357 mPeakServicingCount,
358 mTotalEnqueueAttempts,
359 mTotalEnqueued,
360 mTotalServiced,
361 mTotalExpired,
362 mTotalServiceExceptions,
363 mTotalUncaughtExceptions,
364 mTotalQueueDuration,
365 mTotalServiceDuration);
366 }
367
368 /**
369 * Resets all time lapse statistics.
370 */
371 public synchronized void resetStatistics() {
372 mPeakQueueSize = 0;
373 mPeakThreadCount = 0;
374 mPeakServicingCount = 0;
375 mTotalEnqueueAttempts = 0;
376 mTotalEnqueued = 0;
377 mTotalServiced = 0;
378 mTotalExpired = 0;
379 mTotalServiceExceptions = 0;
380 mTotalUncaughtExceptions = 0;
381 mTotalQueueDuration = 0;
382 mTotalServiceDuration = 0;
383
384 mTimeLapseStart = System.currentTimeMillis();
385 }
386
387 /**
388 * Understands and applies the following integer properties.
389 *
390 * <ul>
391 * <li>max.size - setMaximumSize
392 * <li>max.threads - setMaximumThreads
393 * <li>timeout.idle - setIdleTimeout
394 * <li>timeout.transaction - setTransactionTimeout
395 * <li>tune.size - Automatically tunes queue size when "true" and
396 * transaction timeout set.
397 * <li>tune.threads - Automatically tunes maximum thread count.
398 * </ul>
399 */
400 public synchronized void applyProperties(PropertyMap properties) {
401 if (properties.containsKey("max.size")) {
402 setMaximumSize(properties.getInt("max.size"));
403 }
404
405 if (properties.containsKey("max.threads")) {
406 setMaximumThreads(properties.getInt("max.threads"));
407 }
408
409 if (properties.containsKey("timeout.idle")) {
410 setIdleTimeout(properties.getNumber("timeout.idle").longValue());
411 }
412
413 if (properties.containsKey("timeout.transaction")) {
414 setTransactionTimeout
415 (properties.getNumber("timeout.transaction").longValue());
416 }
417
418 if ("true".equalsIgnoreCase(properties.getString("tune.size"))) {
419 addTransactionQueueListener(new TransactionQueueSizeTuner());
420 }
421
422 if ("true".equalsIgnoreCase(properties.getString("tune.threads"))) {
423 addTransactionQueueListener(new TransactionQueueThreadTuner());
424 }
425 }
426
427 synchronized void startThread(boolean canwait)
428 throws InterruptedException {
429
430 if (mThreadCount < mMaxThreads) {
431 String threadName = getName() + ' ' + (mThreadId++);
432 if (canwait) {
433 mThreadPool.start(mWorker, threadName);
434 }
435 else {
436 mThreadPool.start(mWorker, 0, threadName);
437 }
438
439 if (++mThreadCount > mPeakThreadCount) {
440 mPeakThreadCount = mThreadCount;
441 }
442 }
443 }
444
445 /**
446 * Returns null when the TransactionQueue should go idle.
447 */
448 synchronized TransactionQueueEvent nextTransactionEvent()
449 throws InterruptedException {
450
451 if (mQueue.isEmpty()) {
452 if (mIdleTimeout != 0) {
453 if (mIdleTimeout < 0) {
454 wait();
455 }
456 else {
457 wait(mIdleTimeout);
458 }
459 }
460 }
461
462 if (mQueue.isEmpty()) {
463 return null;
464 }
465
466 return (TransactionQueueEvent)mQueue.removeFirst();
467 }
468
469 synchronized TransactionQueueEvent transactionDequeued
470 (TransactionQueueEvent event) {
471
472 if (++mServicingCount > mPeakServicingCount) {
473 mPeakServicingCount = mServicingCount;
474 }
475
476 TransactionQueueEvent deqEvent = new TransactionQueueEvent(event);
477
478 mTotalQueueDuration +=
479 (deqEvent.getTimestampMillis() - event.getTimestampMillis());
480
481 if (mListeners.size() > 0) {
482 Iterator it = mListeners.iterator();
483 while (it.hasNext()) {
484 ((TransactionQueueListener)it.next())
485 .transactionDequeued(deqEvent);
486 }
487 }
488
489 return deqEvent;
490 }
491
492 synchronized void transactionServiced(TransactionQueueEvent event) {
493 TransactionQueueEvent svcEvent = new TransactionQueueEvent(event);
494
495 mTotalServiceDuration +=
496 (svcEvent.getTimestampMillis() - event.getTimestampMillis());
497
498 if (mListeners.size() > 0) {
499 Iterator it = mListeners.iterator();
500 while (it.hasNext()) {
501 ((TransactionQueueListener)it.next())
502 .transactionServiced(svcEvent);
503 }
504 }
505
506 // Adjust counters at end in case a listener threw an exception and let
507 // the call to transactionException adjust the counters instead.
508 mServicingCount--;
509 mTotalServiced++;
510 }
511
512 synchronized void transactionExpired(TransactionQueueEvent event) {
513 mServicingCount--;
514 mTotalExpired++;
515
516 if (mListeners.size() > 0) {
517 event = new TransactionQueueEvent(event);
518
519 Iterator it = mListeners.iterator();
520 while (it.hasNext()) {
521 ((TransactionQueueListener)it.next())
522 .transactionExpired(event);
523 }
524 }
525 }
526
527 synchronized void transactionException(TransactionQueueEvent event,
528 Throwable e) {
529 mServicingCount--;
530 mTotalServiceExceptions++;
531
532 if (mListeners.size() > 0) {
533 event = new TransactionQueueEvent(event, e);
534
535 Iterator it = mListeners.iterator();
536 while (it.hasNext()) {
537 ((TransactionQueueListener)it.next())
538 .transactionException(event);
539 }
540 }
541 }
542
543 synchronized void uncaughtException(Throwable e) {
544 mTotalUncaughtExceptions++;
545
546 if (mExceptionListeners.size() > 0) {
547 UncaughtExceptionEvent event =
548 new UncaughtExceptionEvent(this, e);
549
550 Iterator it = mExceptionListeners.iterator();
551 while (it.hasNext()) {
552 ((UncaughtExceptionListener)it.next())
553 .uncaughtException(event);
554 }
555 }
556 else {
557 Thread current = Thread.currentThread();
558 current.getThreadGroup().uncaughtException(current, e);
559 }
560 }
561
562 synchronized boolean exitThread(boolean force) {
563 if (!force && (mThreadCount - mServicingCount) <= 1 &&
564 mQueue.size() > 0 && !mSuspended) {
565
566 // Can't exit thread because transactions are waiting to
567 // be serviced, and no thread is waiting on the queue.
568 return false;
569 }
570 else {
571 mThreadCount--;
572 return true;
573 }
574 }
575
576 private synchronized boolean ensureWaitingThread() {
577 if (mThreadCount <= mServicingCount) {
578 try {
579 // Only wait if no threads. Otherwise the lock on this object
580 // will prevent threads from entering the exitThread method.
581 startThread(mThreadCount == 0);
582 }
583 catch (NoThreadException e) {
584 if (!e.isThreadPoolClosed()) {
585 if (mThreadCount == 0) {
586 uncaughtException(e);
587 return false;
588 }
589 }
590 }
591 catch (InterruptedException e) {
592 return false;
593 }
594 catch (Throwable e) {
595 uncaughtException(e);
596 return false;
597 }
598 }
599 return true;
600 }
601
602 private class Worker implements Runnable {
603 public void run() {
604 boolean forceExit = false;
605 TransactionQueueEvent event;
606
607 while (true) {
608 try {
609 // Phase 1: wait for a transaction
610 try {
611 if ((event = nextTransactionEvent()) == null) {
612 // Go into idle mode.
613 continue;
614 }
615 }
616 catch (InterruptedException e) {
617 forceExit = true;
618 continue;
619 }
620
621 long enqueueTimestamp = event.getTimestampMillis();
622
623 // Phase 2: spawn off a replacement thread
624 try {
625 startThread(false);
626 }
627 catch (NoThreadException e) {
628 if (e.isThreadPoolClosed()) {
629 forceExit = true;
630 // Don't "continue" because the transaction must
631 // still be serviced first.
632 }
633 }
634 catch (InterruptedException e) {
635 forceExit = true;
636 // Don't "continue" because the transaction must
637 // still be serviced first.
638 }
639 catch (Throwable e) {
640 uncaughtException(e);
641 }
642 finally {
643 // Only indicate that transaction has been dequeued
644 // after a replacement thread has been created.
645 // Queue time is more accurate this way because time
646 // spent waiting for a thread is time spent not being
647 // serviced.
648 try {
649 event = transactionDequeued(event);
650 }
651 catch (Throwable e) {
652 uncaughtException(e);
653 }
654 }
655
656 long serviceTimestamp = event.getTimestampMillis();
657
658 // Phase 3: service the transaction
659 long timeout = getTransactionTimeout();
660 if (timeout >= 0 &&
661 (serviceTimestamp - enqueueTimestamp) >= timeout) {
662 try {
663 event.getTransaction().cancel();
664 }
665 finally {
666 transactionExpired(event);
667 }
668 }
669 else {
670 try {
671 event.getTransaction().service();
672 transactionServiced(event);
673 }
674 catch (Throwable e) {
675 uncaughtException(e);
676
677 try {
678 event.getTransaction().cancel();
679 }
680 catch (Throwable e2) {
681 uncaughtException(e2);
682 }
683
684 transactionException(event, e);
685 }
686 }
687 }
688 catch (Throwable e) {
689 uncaughtException(e);
690 }
691 finally {
692 if (exitThread(forceExit)) {
693 break;
694 }
695 }
696 }
697 }
698 }
699 }