diff src/com/go/trove/util/tq/TransactionQueue.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/com/go/trove/util/tq/TransactionQueue.java	Sat Mar 21 11:00:06 2009 +0000
@@ -0,0 +1,699 @@
+/* ====================================================================
+ * Trove - Copyright (c) 1997-2000 Walt Disney Internet Group
+ * ====================================================================
+ * The Tea Software License, Version 1.1
+ *
+ * Copyright (c) 2000 Walt Disney Internet Group. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in
+ *    the documentation and/or other materials provided with the
+ *    distribution.
+ *
+ * 3. The end-user documentation included with the redistribution,
+ *    if any, must include the following acknowledgment:
+ *       "This product includes software developed by the
+ *        Walt Disney Internet Group (http://opensource.go.com/)."
+ *    Alternately, this acknowledgment may appear in the software itself,
+ *    if and wherever such third-party acknowledgments normally appear.
+ *
+ * 4. The names "Tea", "TeaServlet", "Kettle", "Trove" and "BeanDoc" must
+ *    not be used to endorse or promote products derived from this
+ *    software without prior written permission. For written
+ *    permission, please contact opensource@dig.com.
+ *
+ * 5. Products derived from this software may not be called "Tea",
+ *    "TeaServlet", "Kettle" or "Trove", nor may "Tea", "TeaServlet",
+ *    "Kettle", "Trove" or "BeanDoc" appear in their name, without prior
+ *    written permission of the Walt Disney Internet Group.
+ *
+ * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
+ * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED.  IN NO EVENT SHALL THE WALT DISNEY INTERNET GROUP OR ITS
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
+ * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ * ====================================================================
+ *
+ * For more information about Tea, please see http://opensource.go.com/.
+ */
+
+package com.go.trove.util.tq;
+
+import java.util.*;
+import com.go.trove.util.*;
+
+/******************************************************************************
+ * TransactionQueue processes {@link Transaction Transactions} concurrently
+ * using threads obtained from a {@link ThreadPool}. When a transaction is
+ * enqueued, it goes into a waiting queue, and it is serviced as soon as a
+ * thread is available.
+ *
+ * @author Brian S O'Neill
+ * @version
+ * <!--$$Revision: 1.1 $-->, <!--$$JustDate:--> 01/02/15 <!-- $-->
+ */
+public class TransactionQueue {
+    private ThreadPool mThreadPool;
+    private String mName;
+    private int mMaxSize;
+    private int mMaxThreads;
+    private long mIdleTimeout;
+    private long mTransactionTimeout;
+
+    private LinkedList mQueue = new LinkedList();
+    private int mThreadCount;
+    private int mServicingCount;
+    private int mThreadId;
+    private boolean mSuspended;
+
+    private Worker mWorker = new Worker();
+
+    private Collection mListeners = new LinkedList();
+    private Collection mExceptionListeners = new LinkedList();
+
+    // Used to gather time lapse statistics.
+    private long mTimeLapseStart;
+    private int mPeakQueueSize;
+    private int mPeakThreadCount;
+    private int mPeakServicingCount;
+    private int mTotalEnqueueAttempts;
+    private int mTotalEnqueued;
+    private int mTotalServiced;
+    private int mTotalExpired;
+    private int mTotalServiceExceptions;
+    private int mTotalUncaughtExceptions;
+    private long mTotalQueueDuration;
+    private long mTotalServiceDuration;
+
+    public TransactionQueue(ThreadPool tp, int maxSize, int maxThreads) {
+        this(tp, "TransactionQueue", maxSize, maxThreads);
+    }
+
+    public TransactionQueue(ThreadPool tp, String name,
+                            int maxSize, int maxThreads) {
+        mThreadPool = tp;
+        mName = name;
+
+        setMaximumSize(maxSize);
+        setMaximumThreads(maxThreads);
+
+        setIdleTimeout(tp.getIdleTimeout());
+        setTransactionTimeout(-1);
+
+        resetStatistics();
+    }
+
+    /**
+     * Sets the timeout (in milliseconds) for the TransactionQueue to wait
+     * inactive before going into an idle state. When the TransactionQueue is
+     * idle, there are no internal worker threads. A negative value specifies
+     * that the TransactionQueue should never automatically go into idle mode.
+     *
+     * @see #idle()
+     */
+    public synchronized void setIdleTimeout(long timeout) {
+        mIdleTimeout = timeout;
+    }
+
+    /**
+     * Returns the timeout (in milliseconds) that the TransactionQueue will
+     * wait inactive before going into an idle state. The default value is
+     * the same as the ThreadPool's idle timeout.
+     *
+     * @see #idle()
+     * @see ThreadPool#getIdleTimeout
+     */
+    public synchronized long getIdleTimeout() {
+        return mIdleTimeout;
+    }
+
+    /**
+     * Sets the timeout (in milliseconds) to wait before an enqueued
+     * transaction expires. If a worker receives an expired transaction, it
+     * is cancelled. A negative timeout specifies that enqueued transactions
+     * never expire.
+     */
+    public synchronized void setTransactionTimeout(long timeout) {
+        mTransactionTimeout = timeout;
+    }
+
+    /**
+     * Returns the timeout (in milliseconds) to wait before an enqueued
+     * transaction expires. The default value is -1, indicating that enqueued
+     * transactions never expire.
+     */
+    public synchronized long getTransactionTimeout() {
+        return mTransactionTimeout;
+    }
+
+    /**
+     * Returns the name of this TransactionQueue.
+     */
+    public String getName() {
+        return mName;
+    }
+
+    /**
+     * Returns the maximum allowed number of queued transactions.
+     */
+    public synchronized int getMaximumSize() {
+        return mMaxSize;
+    }
+
+    /**
+     * Setting the max size to zero disables enqueueing.
+     */
+    public synchronized void setMaximumSize(int max) {
+        if (max < 0) {
+            throw new IllegalArgumentException
+                ("TransactionQueue max size must be positive: " + max);
+        }
+
+        mMaxSize = max;
+    }
+
+    /**
+     * Returns the maximum allowed number of worker threads.
+     */
+    public synchronized int getMaximumThreads() {
+        return mMaxThreads;
+    }
+
+    public synchronized void setMaximumThreads(int max) {
+        if (max < 1) {
+            throw new IllegalArgumentException
+                ("TransactionQueue must have at least one thread: " + max);
+        }
+
+        mMaxThreads = max;
+    }
+
+    /**
+     * Enqueues a transaction that will be serviced when a worker is
+     * available. If the queue is full or cannot accept new transactions, the
+     * transaction is not enqueued, and false is returned.
+     *
+     * @return true if enqueued, false if queue is full or cannot accept new
+     * transactions.
+     */
+    public synchronized boolean enqueue(Transaction transaction) {
+        mTotalEnqueueAttempts++;
+
+        if (transaction == null || mThreadPool.isClosed()) {
+            return false;
+        }
+
+        int queueSize;
+        if ((queueSize = mQueue.size()) >= mMaxSize) {
+            if (mListeners.size() > 0) {
+                TransactionQueueEvent event =
+                    new TransactionQueueEvent(this, transaction);
+
+                Iterator it = mListeners.iterator();
+                while (it.hasNext()) {
+                    ((TransactionQueueListener)it.next())
+                        .transactionQueueFull(event);
+                }
+            }
+            return false;
+        }
+
+        if (!mSuspended) {
+            if (!ensureWaitingThread()) {
+                return false;
+            }
+        }
+
+        mTotalEnqueued++;
+
+        TransactionQueueEvent event =
+            new TransactionQueueEvent(this, transaction);
+
+        mQueue.addLast(event);
+
+        if (++queueSize > mPeakQueueSize) {
+            mPeakQueueSize = queueSize;
+        }
+
+        notify();
+
+        if (mListeners.size() > 0) {
+            Iterator it = mListeners.iterator();
+            while (it.hasNext()) {
+                ((TransactionQueueListener)it.next())
+                    .transactionEnqueued(event);
+            }
+        }
+
+        return true;
+    }
+
+    /**
+     * Suspends processing of transactions in the queue until resume is called.
+     * If suspend is called on a TransactionQueue that is already suspended,
+     * the call has no effect.
+     */
+    public synchronized void suspend() {
+        if (!mSuspended) {
+            mQueue.addFirst(null);
+            notify();
+            mSuspended = true;
+        }
+    }
+
+    /**
+     * Resumes processing of transactions in the queue if suspend was called.
+     * If resume is called on a TransactionQueue that is already running, the
+     * call has no effect, but true is still returned.
+     *
+     * @return false if couldn't resume because no threads available from pool.
+     */
+    public synchronized boolean resume() {
+        if (mSuspended) {
+            mSuspended = false;
+        }
+        return ensureWaitingThread();
+    }
+
+    /**
+     * Make this TransactionQueue go into idle mode and allow it to be
+     * reclaimed by the garbage collector if it is no longer used. Any pending
+     * transactions will be serviced, and any servicing transactions will
+     * finish. If any transactions are added to the TransactionQueue while
+     * idle, it will reactivate itself. New TransactionQueues start out in an
+     * idle state.
+     *
+     * @see #setIdleTimeout
+     * @see #getIdleTimeout
+     */
+    public synchronized void idle() {
+        mQueue.addLast(null);
+        notify();
+    }
+
+    public synchronized void addTransactionQueueListener
+        (TransactionQueueListener listener) {
+
+        mListeners.add(listener);
+    }
+
+    public synchronized void removeTransactionQueueListener
+        (TransactionQueueListener listener) {
+
+        mListeners.remove(listener);
+    }
+
+    public synchronized void addUncaughtExceptionListener
+        (UncaughtExceptionListener listener) {
+
+        mExceptionListeners.add(listener);
+    }
+
+    public synchronized void removeUncaughtExceptionListener
+        (UncaughtExceptionListener listener) {
+
+        mExceptionListeners.remove(listener);
+    }
+
+    /**
+     * Returns the number of currently queued transactions.
+     */
+    public synchronized int getQueueSize() {
+        return mQueue.size();
+    }
+
+    /**
+     * Returns the current amount of worker threads.
+     */
+    public synchronized int getThreadCount() {
+        return mThreadCount;
+    }
+
+    /**
+     * Returns a snapshot of the statistics on this TransactionQueue.
+     */
+    public synchronized TransactionQueueData getStatistics() {
+        return new TransactionQueueData(this,
+                                        mTimeLapseStart,
+                                        System.currentTimeMillis(),
+                                        mQueue.size(),
+                                        mThreadCount,
+                                        mServicingCount,
+                                        mPeakQueueSize,
+                                        mPeakThreadCount,
+                                        mPeakServicingCount,
+                                        mTotalEnqueueAttempts,
+                                        mTotalEnqueued,
+                                        mTotalServiced,
+                                        mTotalExpired,
+                                        mTotalServiceExceptions,
+                                        mTotalUncaughtExceptions,
+                                        mTotalQueueDuration,
+                                        mTotalServiceDuration);
+    }
+
+    /**
+     * Resets all time lapse statistics.
+     */
+    public synchronized void resetStatistics() {
+        mPeakQueueSize = 0;
+        mPeakThreadCount = 0;
+        mPeakServicingCount = 0;
+        mTotalEnqueueAttempts = 0;
+        mTotalEnqueued = 0;
+        mTotalServiced = 0;
+        mTotalExpired = 0;
+        mTotalServiceExceptions = 0;
+        mTotalUncaughtExceptions = 0;
+        mTotalQueueDuration = 0;
+        mTotalServiceDuration = 0;
+
+        mTimeLapseStart = System.currentTimeMillis();
+    }
+
+    /**
+     * Understands and applies the following integer properties.
+     *
+     * <ul>
+     * <li>max.size - setMaximumSize
+     * <li>max.threads - setMaximumThreads
+     * <li>timeout.idle - setIdleTimeout
+     * <li>timeout.transaction - setTransactionTimeout
+     * <li>tune.size - Automatically tunes queue size when "true" and
+     *                 transaction timeout set.
+     * <li>tune.threads - Automatically tunes maximum thread count.
+     * </ul>
+     */
+    public synchronized void applyProperties(PropertyMap properties) {
+        if (properties.containsKey("max.size")) {
+            setMaximumSize(properties.getInt("max.size"));
+        }
+
+        if (properties.containsKey("max.threads")) {
+            setMaximumThreads(properties.getInt("max.threads"));
+        }
+
+        if (properties.containsKey("timeout.idle")) {
+            setIdleTimeout(properties.getNumber("timeout.idle").longValue());
+        }
+
+        if (properties.containsKey("timeout.transaction")) {
+            setTransactionTimeout
+                (properties.getNumber("timeout.transaction").longValue());
+        }
+
+        if ("true".equalsIgnoreCase(properties.getString("tune.size"))) {
+            addTransactionQueueListener(new TransactionQueueSizeTuner());
+        }
+
+        if ("true".equalsIgnoreCase(properties.getString("tune.threads"))) {
+            addTransactionQueueListener(new TransactionQueueThreadTuner());
+        }
+    }
+
+    synchronized void startThread(boolean canwait)
+        throws InterruptedException {
+
+        if (mThreadCount < mMaxThreads) {
+            String threadName = getName() + ' ' + (mThreadId++);
+            if (canwait) {
+                mThreadPool.start(mWorker, threadName);
+            }
+            else {
+                mThreadPool.start(mWorker, 0, threadName);
+            }
+
+            if (++mThreadCount > mPeakThreadCount) {
+                mPeakThreadCount = mThreadCount;
+            }
+        }
+    }
+
+    /**
+     * Returns null when the TransactionQueue should go idle.
+     */
+    synchronized TransactionQueueEvent nextTransactionEvent()
+        throws InterruptedException {
+
+        if (mQueue.isEmpty()) {
+            if (mIdleTimeout != 0) {
+                if (mIdleTimeout < 0) {
+                    wait();
+                }
+                else {
+                    wait(mIdleTimeout);
+                }
+            }
+        }
+
+        if (mQueue.isEmpty()) {
+            return null;
+        }
+
+        return (TransactionQueueEvent)mQueue.removeFirst();
+    }
+
+    synchronized TransactionQueueEvent transactionDequeued
+        (TransactionQueueEvent event) {
+
+        if (++mServicingCount > mPeakServicingCount) {
+            mPeakServicingCount = mServicingCount;
+        }
+
+        TransactionQueueEvent deqEvent = new TransactionQueueEvent(event);
+
+        mTotalQueueDuration +=
+            (deqEvent.getTimestampMillis() - event.getTimestampMillis());
+
+        if (mListeners.size() > 0) {
+            Iterator it = mListeners.iterator();
+            while (it.hasNext()) {
+                ((TransactionQueueListener)it.next())
+                    .transactionDequeued(deqEvent);
+            }
+        }
+
+        return deqEvent;
+    }
+
+    synchronized void transactionServiced(TransactionQueueEvent event) {
+        TransactionQueueEvent svcEvent = new TransactionQueueEvent(event);
+
+        mTotalServiceDuration +=
+            (svcEvent.getTimestampMillis() - event.getTimestampMillis());
+
+        if (mListeners.size() > 0) {
+            Iterator it = mListeners.iterator();
+            while (it.hasNext()) {
+                ((TransactionQueueListener)it.next())
+                    .transactionServiced(svcEvent);
+            }
+        }
+
+        // Adjust counters at end in case a listener threw an exception and let
+        // the call to transactionException adjust the counters instead.
+        mServicingCount--;
+        mTotalServiced++;
+    }
+
+    synchronized void transactionExpired(TransactionQueueEvent event) {
+        mServicingCount--;
+        mTotalExpired++;
+
+        if (mListeners.size() > 0) {
+            event = new TransactionQueueEvent(event);
+
+            Iterator it = mListeners.iterator();
+            while (it.hasNext()) {
+                ((TransactionQueueListener)it.next())
+                    .transactionExpired(event);
+            }
+        }
+    }
+
+    synchronized void transactionException(TransactionQueueEvent event,
+                                           Throwable e) {
+        mServicingCount--;
+        mTotalServiceExceptions++;
+
+        if (mListeners.size() > 0) {
+            event = new TransactionQueueEvent(event, e);
+
+            Iterator it = mListeners.iterator();
+            while (it.hasNext()) {
+                ((TransactionQueueListener)it.next())
+                    .transactionException(event);
+            }
+        }
+    }
+
+    synchronized void uncaughtException(Throwable e) {
+        mTotalUncaughtExceptions++;
+
+        if (mExceptionListeners.size() > 0) {
+            UncaughtExceptionEvent event =
+                new UncaughtExceptionEvent(this, e);
+
+            Iterator it = mExceptionListeners.iterator();
+            while (it.hasNext()) {
+                ((UncaughtExceptionListener)it.next())
+                    .uncaughtException(event);
+            }
+        }
+        else {
+            Thread current = Thread.currentThread();
+            current.getThreadGroup().uncaughtException(current, e);
+        }
+    }
+
+    synchronized boolean exitThread(boolean force) {
+        if (!force && (mThreadCount - mServicingCount) <= 1 &&
+            mQueue.size() > 0 && !mSuspended) {
+
+            // Can't exit thread because transactions are waiting to
+            // be serviced, and no thread is waiting on the queue.
+            return false;
+        }
+        else {
+            mThreadCount--;
+            return true;
+        }
+    }
+
+    private synchronized boolean ensureWaitingThread() {
+        if (mThreadCount <= mServicingCount) {
+            try {
+                // Only wait if no threads. Otherwise the lock on this object
+                // will prevent threads from entering the exitThread method.
+                startThread(mThreadCount == 0);
+            }
+            catch (NoThreadException e) {
+                if (!e.isThreadPoolClosed()) {
+                    if (mThreadCount == 0) {
+                        uncaughtException(e);
+                        return false;
+                    }
+                }
+            }
+            catch (InterruptedException e) {
+                return false;
+            }
+            catch (Throwable e) {
+                uncaughtException(e);
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private class Worker implements Runnable {
+        public void run() {
+            boolean forceExit = false;
+            TransactionQueueEvent event;
+
+            while (true) {
+                try {
+                    // Phase 1: wait for a transaction
+                    try {
+                        if ((event = nextTransactionEvent()) == null) {
+                            // Go into idle mode.
+                            continue;
+                        }
+                    }
+                    catch (InterruptedException e) {
+                        forceExit = true;
+                        continue;
+                    }
+
+                    long enqueueTimestamp = event.getTimestampMillis();
+
+                    // Phase 2: spawn off a replacement thread
+                    try {
+                        startThread(false);
+                    }
+                    catch (NoThreadException e) {
+                        if (e.isThreadPoolClosed()) {
+                            forceExit = true;
+                            // Don't "continue" because the transaction must
+                            // still be serviced first.
+                        }
+                    }
+                    catch (InterruptedException e) {
+                        forceExit = true;
+                        // Don't "continue" because the transaction must
+                        // still be serviced first.
+                    }
+                    catch (Throwable e) {
+                        uncaughtException(e);
+                    }
+                    finally {
+                        // Only indicate that transaction has been dequeued
+                        // after a replacement thread has been created.
+                        // Queue time is more accurate this way because time
+                        // spent waiting for a thread is time spent not being
+                        // serviced.
+                        try {
+                            event = transactionDequeued(event);
+                        }
+                        catch (Throwable e) {
+                            uncaughtException(e);
+                        }
+                    }
+
+                    long serviceTimestamp = event.getTimestampMillis();
+
+                    // Phase 3: service the transaction
+                    long timeout = getTransactionTimeout();
+                    if (timeout >= 0 &&
+                        (serviceTimestamp - enqueueTimestamp) >= timeout) {
+                        try {
+                            event.getTransaction().cancel();
+                        }
+                        finally {
+                            transactionExpired(event);
+                        }
+                    }
+                    else {
+                        try {
+                            event.getTransaction().service();
+                            transactionServiced(event);
+                        }
+                        catch (Throwable e) {
+                            uncaughtException(e);
+
+                            try {
+                                event.getTransaction().cancel();
+                            }
+                            catch (Throwable e2) {
+                                uncaughtException(e2);
+                            }
+
+                            transactionException(event, e);
+                        }
+                    }
+                }
+                catch (Throwable e) {
+                    uncaughtException(e);
+                }
+                finally {
+                    if (exitThread(forceExit)) {
+                        break;
+                    }
+                }
+            }
+        }
+    }
+}