Mercurial > hg > blitz_condensed
diff src/com/go/trove/util/tq/TransactionQueue.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/com/go/trove/util/tq/TransactionQueue.java Sat Mar 21 11:00:06 2009 +0000 @@ -0,0 +1,699 @@ +/* ==================================================================== + * Trove - Copyright (c) 1997-2000 Walt Disney Internet Group + * ==================================================================== + * The Tea Software License, Version 1.1 + * + * Copyright (c) 2000 Walt Disney Internet Group. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * + * 3. The end-user documentation included with the redistribution, + * if any, must include the following acknowledgment: + * "This product includes software developed by the + * Walt Disney Internet Group (http://opensource.go.com/)." + * Alternately, this acknowledgment may appear in the software itself, + * if and wherever such third-party acknowledgments normally appear. + * + * 4. The names "Tea", "TeaServlet", "Kettle", "Trove" and "BeanDoc" must + * not be used to endorse or promote products derived from this + * software without prior written permission. For written + * permission, please contact opensource@dig.com. + * + * 5. Products derived from this software may not be called "Tea", + * "TeaServlet", "Kettle" or "Trove", nor may "Tea", "TeaServlet", + * "Kettle", "Trove" or "BeanDoc" appear in their name, without prior + * written permission of the Walt Disney Internet Group. + * + * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE WALT DISNEY INTERNET GROUP OR ITS + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * ==================================================================== + * + * For more information about Tea, please see http://opensource.go.com/. + */ + +package com.go.trove.util.tq; + +import java.util.*; +import com.go.trove.util.*; + +/****************************************************************************** + * TransactionQueue processes {@link Transaction Transactions} concurrently + * using threads obtained from a {@link ThreadPool}. When a transaction is + * enqueued, it goes into a waiting queue, and it is serviced as soon as a + * thread is available. + * + * @author Brian S O'Neill + * @version + * <!--$$Revision: 1.1 $-->, <!--$$JustDate:--> 01/02/15 <!-- $--> + */ +public class TransactionQueue { + private ThreadPool mThreadPool; + private String mName; + private int mMaxSize; + private int mMaxThreads; + private long mIdleTimeout; + private long mTransactionTimeout; + + private LinkedList mQueue = new LinkedList(); + private int mThreadCount; + private int mServicingCount; + private int mThreadId; + private boolean mSuspended; + + private Worker mWorker = new Worker(); + + private Collection mListeners = new LinkedList(); + private Collection mExceptionListeners = new LinkedList(); + + // Used to gather time lapse statistics. + private long mTimeLapseStart; + private int mPeakQueueSize; + private int mPeakThreadCount; + private int mPeakServicingCount; + private int mTotalEnqueueAttempts; + private int mTotalEnqueued; + private int mTotalServiced; + private int mTotalExpired; + private int mTotalServiceExceptions; + private int mTotalUncaughtExceptions; + private long mTotalQueueDuration; + private long mTotalServiceDuration; + + public TransactionQueue(ThreadPool tp, int maxSize, int maxThreads) { + this(tp, "TransactionQueue", maxSize, maxThreads); + } + + public TransactionQueue(ThreadPool tp, String name, + int maxSize, int maxThreads) { + mThreadPool = tp; + mName = name; + + setMaximumSize(maxSize); + setMaximumThreads(maxThreads); + + setIdleTimeout(tp.getIdleTimeout()); + setTransactionTimeout(-1); + + resetStatistics(); + } + + /** + * Sets the timeout (in milliseconds) for the TransactionQueue to wait + * inactive before going into an idle state. When the TransactionQueue is + * idle, there are no internal worker threads. A negative value specifies + * that the TransactionQueue should never automatically go into idle mode. + * + * @see #idle() + */ + public synchronized void setIdleTimeout(long timeout) { + mIdleTimeout = timeout; + } + + /** + * Returns the timeout (in milliseconds) that the TransactionQueue will + * wait inactive before going into an idle state. The default value is + * the same as the ThreadPool's idle timeout. + * + * @see #idle() + * @see ThreadPool#getIdleTimeout + */ + public synchronized long getIdleTimeout() { + return mIdleTimeout; + } + + /** + * Sets the timeout (in milliseconds) to wait before an enqueued + * transaction expires. If a worker receives an expired transaction, it + * is cancelled. A negative timeout specifies that enqueued transactions + * never expire. + */ + public synchronized void setTransactionTimeout(long timeout) { + mTransactionTimeout = timeout; + } + + /** + * Returns the timeout (in milliseconds) to wait before an enqueued + * transaction expires. The default value is -1, indicating that enqueued + * transactions never expire. + */ + public synchronized long getTransactionTimeout() { + return mTransactionTimeout; + } + + /** + * Returns the name of this TransactionQueue. + */ + public String getName() { + return mName; + } + + /** + * Returns the maximum allowed number of queued transactions. + */ + public synchronized int getMaximumSize() { + return mMaxSize; + } + + /** + * Setting the max size to zero disables enqueueing. + */ + public synchronized void setMaximumSize(int max) { + if (max < 0) { + throw new IllegalArgumentException + ("TransactionQueue max size must be positive: " + max); + } + + mMaxSize = max; + } + + /** + * Returns the maximum allowed number of worker threads. + */ + public synchronized int getMaximumThreads() { + return mMaxThreads; + } + + public synchronized void setMaximumThreads(int max) { + if (max < 1) { + throw new IllegalArgumentException + ("TransactionQueue must have at least one thread: " + max); + } + + mMaxThreads = max; + } + + /** + * Enqueues a transaction that will be serviced when a worker is + * available. If the queue is full or cannot accept new transactions, the + * transaction is not enqueued, and false is returned. + * + * @return true if enqueued, false if queue is full or cannot accept new + * transactions. + */ + public synchronized boolean enqueue(Transaction transaction) { + mTotalEnqueueAttempts++; + + if (transaction == null || mThreadPool.isClosed()) { + return false; + } + + int queueSize; + if ((queueSize = mQueue.size()) >= mMaxSize) { + if (mListeners.size() > 0) { + TransactionQueueEvent event = + new TransactionQueueEvent(this, transaction); + + Iterator it = mListeners.iterator(); + while (it.hasNext()) { + ((TransactionQueueListener)it.next()) + .transactionQueueFull(event); + } + } + return false; + } + + if (!mSuspended) { + if (!ensureWaitingThread()) { + return false; + } + } + + mTotalEnqueued++; + + TransactionQueueEvent event = + new TransactionQueueEvent(this, transaction); + + mQueue.addLast(event); + + if (++queueSize > mPeakQueueSize) { + mPeakQueueSize = queueSize; + } + + notify(); + + if (mListeners.size() > 0) { + Iterator it = mListeners.iterator(); + while (it.hasNext()) { + ((TransactionQueueListener)it.next()) + .transactionEnqueued(event); + } + } + + return true; + } + + /** + * Suspends processing of transactions in the queue until resume is called. + * If suspend is called on a TransactionQueue that is already suspended, + * the call has no effect. + */ + public synchronized void suspend() { + if (!mSuspended) { + mQueue.addFirst(null); + notify(); + mSuspended = true; + } + } + + /** + * Resumes processing of transactions in the queue if suspend was called. + * If resume is called on a TransactionQueue that is already running, the + * call has no effect, but true is still returned. + * + * @return false if couldn't resume because no threads available from pool. + */ + public synchronized boolean resume() { + if (mSuspended) { + mSuspended = false; + } + return ensureWaitingThread(); + } + + /** + * Make this TransactionQueue go into idle mode and allow it to be + * reclaimed by the garbage collector if it is no longer used. Any pending + * transactions will be serviced, and any servicing transactions will + * finish. If any transactions are added to the TransactionQueue while + * idle, it will reactivate itself. New TransactionQueues start out in an + * idle state. + * + * @see #setIdleTimeout + * @see #getIdleTimeout + */ + public synchronized void idle() { + mQueue.addLast(null); + notify(); + } + + public synchronized void addTransactionQueueListener + (TransactionQueueListener listener) { + + mListeners.add(listener); + } + + public synchronized void removeTransactionQueueListener + (TransactionQueueListener listener) { + + mListeners.remove(listener); + } + + public synchronized void addUncaughtExceptionListener + (UncaughtExceptionListener listener) { + + mExceptionListeners.add(listener); + } + + public synchronized void removeUncaughtExceptionListener + (UncaughtExceptionListener listener) { + + mExceptionListeners.remove(listener); + } + + /** + * Returns the number of currently queued transactions. + */ + public synchronized int getQueueSize() { + return mQueue.size(); + } + + /** + * Returns the current amount of worker threads. + */ + public synchronized int getThreadCount() { + return mThreadCount; + } + + /** + * Returns a snapshot of the statistics on this TransactionQueue. + */ + public synchronized TransactionQueueData getStatistics() { + return new TransactionQueueData(this, + mTimeLapseStart, + System.currentTimeMillis(), + mQueue.size(), + mThreadCount, + mServicingCount, + mPeakQueueSize, + mPeakThreadCount, + mPeakServicingCount, + mTotalEnqueueAttempts, + mTotalEnqueued, + mTotalServiced, + mTotalExpired, + mTotalServiceExceptions, + mTotalUncaughtExceptions, + mTotalQueueDuration, + mTotalServiceDuration); + } + + /** + * Resets all time lapse statistics. + */ + public synchronized void resetStatistics() { + mPeakQueueSize = 0; + mPeakThreadCount = 0; + mPeakServicingCount = 0; + mTotalEnqueueAttempts = 0; + mTotalEnqueued = 0; + mTotalServiced = 0; + mTotalExpired = 0; + mTotalServiceExceptions = 0; + mTotalUncaughtExceptions = 0; + mTotalQueueDuration = 0; + mTotalServiceDuration = 0; + + mTimeLapseStart = System.currentTimeMillis(); + } + + /** + * Understands and applies the following integer properties. + * + * <ul> + * <li>max.size - setMaximumSize + * <li>max.threads - setMaximumThreads + * <li>timeout.idle - setIdleTimeout + * <li>timeout.transaction - setTransactionTimeout + * <li>tune.size - Automatically tunes queue size when "true" and + * transaction timeout set. + * <li>tune.threads - Automatically tunes maximum thread count. + * </ul> + */ + public synchronized void applyProperties(PropertyMap properties) { + if (properties.containsKey("max.size")) { + setMaximumSize(properties.getInt("max.size")); + } + + if (properties.containsKey("max.threads")) { + setMaximumThreads(properties.getInt("max.threads")); + } + + if (properties.containsKey("timeout.idle")) { + setIdleTimeout(properties.getNumber("timeout.idle").longValue()); + } + + if (properties.containsKey("timeout.transaction")) { + setTransactionTimeout + (properties.getNumber("timeout.transaction").longValue()); + } + + if ("true".equalsIgnoreCase(properties.getString("tune.size"))) { + addTransactionQueueListener(new TransactionQueueSizeTuner()); + } + + if ("true".equalsIgnoreCase(properties.getString("tune.threads"))) { + addTransactionQueueListener(new TransactionQueueThreadTuner()); + } + } + + synchronized void startThread(boolean canwait) + throws InterruptedException { + + if (mThreadCount < mMaxThreads) { + String threadName = getName() + ' ' + (mThreadId++); + if (canwait) { + mThreadPool.start(mWorker, threadName); + } + else { + mThreadPool.start(mWorker, 0, threadName); + } + + if (++mThreadCount > mPeakThreadCount) { + mPeakThreadCount = mThreadCount; + } + } + } + + /** + * Returns null when the TransactionQueue should go idle. + */ + synchronized TransactionQueueEvent nextTransactionEvent() + throws InterruptedException { + + if (mQueue.isEmpty()) { + if (mIdleTimeout != 0) { + if (mIdleTimeout < 0) { + wait(); + } + else { + wait(mIdleTimeout); + } + } + } + + if (mQueue.isEmpty()) { + return null; + } + + return (TransactionQueueEvent)mQueue.removeFirst(); + } + + synchronized TransactionQueueEvent transactionDequeued + (TransactionQueueEvent event) { + + if (++mServicingCount > mPeakServicingCount) { + mPeakServicingCount = mServicingCount; + } + + TransactionQueueEvent deqEvent = new TransactionQueueEvent(event); + + mTotalQueueDuration += + (deqEvent.getTimestampMillis() - event.getTimestampMillis()); + + if (mListeners.size() > 0) { + Iterator it = mListeners.iterator(); + while (it.hasNext()) { + ((TransactionQueueListener)it.next()) + .transactionDequeued(deqEvent); + } + } + + return deqEvent; + } + + synchronized void transactionServiced(TransactionQueueEvent event) { + TransactionQueueEvent svcEvent = new TransactionQueueEvent(event); + + mTotalServiceDuration += + (svcEvent.getTimestampMillis() - event.getTimestampMillis()); + + if (mListeners.size() > 0) { + Iterator it = mListeners.iterator(); + while (it.hasNext()) { + ((TransactionQueueListener)it.next()) + .transactionServiced(svcEvent); + } + } + + // Adjust counters at end in case a listener threw an exception and let + // the call to transactionException adjust the counters instead. + mServicingCount--; + mTotalServiced++; + } + + synchronized void transactionExpired(TransactionQueueEvent event) { + mServicingCount--; + mTotalExpired++; + + if (mListeners.size() > 0) { + event = new TransactionQueueEvent(event); + + Iterator it = mListeners.iterator(); + while (it.hasNext()) { + ((TransactionQueueListener)it.next()) + .transactionExpired(event); + } + } + } + + synchronized void transactionException(TransactionQueueEvent event, + Throwable e) { + mServicingCount--; + mTotalServiceExceptions++; + + if (mListeners.size() > 0) { + event = new TransactionQueueEvent(event, e); + + Iterator it = mListeners.iterator(); + while (it.hasNext()) { + ((TransactionQueueListener)it.next()) + .transactionException(event); + } + } + } + + synchronized void uncaughtException(Throwable e) { + mTotalUncaughtExceptions++; + + if (mExceptionListeners.size() > 0) { + UncaughtExceptionEvent event = + new UncaughtExceptionEvent(this, e); + + Iterator it = mExceptionListeners.iterator(); + while (it.hasNext()) { + ((UncaughtExceptionListener)it.next()) + .uncaughtException(event); + } + } + else { + Thread current = Thread.currentThread(); + current.getThreadGroup().uncaughtException(current, e); + } + } + + synchronized boolean exitThread(boolean force) { + if (!force && (mThreadCount - mServicingCount) <= 1 && + mQueue.size() > 0 && !mSuspended) { + + // Can't exit thread because transactions are waiting to + // be serviced, and no thread is waiting on the queue. + return false; + } + else { + mThreadCount--; + return true; + } + } + + private synchronized boolean ensureWaitingThread() { + if (mThreadCount <= mServicingCount) { + try { + // Only wait if no threads. Otherwise the lock on this object + // will prevent threads from entering the exitThread method. + startThread(mThreadCount == 0); + } + catch (NoThreadException e) { + if (!e.isThreadPoolClosed()) { + if (mThreadCount == 0) { + uncaughtException(e); + return false; + } + } + } + catch (InterruptedException e) { + return false; + } + catch (Throwable e) { + uncaughtException(e); + return false; + } + } + return true; + } + + private class Worker implements Runnable { + public void run() { + boolean forceExit = false; + TransactionQueueEvent event; + + while (true) { + try { + // Phase 1: wait for a transaction + try { + if ((event = nextTransactionEvent()) == null) { + // Go into idle mode. + continue; + } + } + catch (InterruptedException e) { + forceExit = true; + continue; + } + + long enqueueTimestamp = event.getTimestampMillis(); + + // Phase 2: spawn off a replacement thread + try { + startThread(false); + } + catch (NoThreadException e) { + if (e.isThreadPoolClosed()) { + forceExit = true; + // Don't "continue" because the transaction must + // still be serviced first. + } + } + catch (InterruptedException e) { + forceExit = true; + // Don't "continue" because the transaction must + // still be serviced first. + } + catch (Throwable e) { + uncaughtException(e); + } + finally { + // Only indicate that transaction has been dequeued + // after a replacement thread has been created. + // Queue time is more accurate this way because time + // spent waiting for a thread is time spent not being + // serviced. + try { + event = transactionDequeued(event); + } + catch (Throwable e) { + uncaughtException(e); + } + } + + long serviceTimestamp = event.getTimestampMillis(); + + // Phase 3: service the transaction + long timeout = getTransactionTimeout(); + if (timeout >= 0 && + (serviceTimestamp - enqueueTimestamp) >= timeout) { + try { + event.getTransaction().cancel(); + } + finally { + transactionExpired(event); + } + } + else { + try { + event.getTransaction().service(); + transactionServiced(event); + } + catch (Throwable e) { + uncaughtException(e); + + try { + event.getTransaction().cancel(); + } + catch (Throwable e2) { + uncaughtException(e2); + } + + transactionException(event, e); + } + } + } + catch (Throwable e) { + uncaughtException(e); + } + finally { + if (exitThread(forceExit)) { + break; + } + } + } + } + } +}