Mercurial > hg > blitz_condensed
diff src/com/go/trove/util/ThreadPool.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/com/go/trove/util/ThreadPool.java Sat Mar 21 11:00:06 2009 +0000 @@ -0,0 +1,702 @@ +/* ==================================================================== + * Trove - Copyright (c) 1997-2000 Walt Disney Internet Group + * ==================================================================== + * The Tea Software License, Version 1.1 + * + * Copyright (c) 2000 Walt Disney Internet Group. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * + * 3. The end-user documentation included with the redistribution, + * if any, must include the following acknowledgment: + * "This product includes software developed by the + * Walt Disney Internet Group (http://opensource.go.com/)." + * Alternately, this acknowledgment may appear in the software itself, + * if and wherever such third-party acknowledgments normally appear. + * + * 4. The names "Tea", "TeaServlet", "Kettle", "Trove" and "BeanDoc" must + * not be used to endorse or promote products derived from this + * software without prior written permission. For written + * permission, please contact opensource@dig.com. + * + * 5. Products derived from this software may not be called "Tea", + * "TeaServlet", "Kettle" or "Trove", nor may "Tea", "TeaServlet", + * "Kettle", "Trove" or "BeanDoc" appear in their name, without prior + * written permission of the Walt Disney Internet Group. + * + * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE WALT DISNEY INTERNET GROUP OR ITS + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * ==================================================================== + * + * For more information about Tea, please see http://opensource.go.com/. + */ + +package com.go.trove.util; + +import java.util.*; + +/****************************************************************************** + * A ThreadPool contains a collection of re-usable threads. There is a slight + * performance overhead in creating new threads, and so a ThreadPool can + * improve performance in systems that create short-lived threads. Pooled + * threads operate on Runnable targets and return back to the pool when the + * Runnable.run method exits. + * + * @author Brian S O'Neill + * @version + * <!--$$Revision: 1.1 $-->, <!--$$JustDate:--> 01/03/12 <!-- $--> + */ +public class ThreadPool extends ThreadGroup { + private static int cThreadID; + + private synchronized static int nextThreadID() { + return cThreadID++; + } + + // Fields that use the monitor of this instance. + + private long mTimeout = -1; + private long mIdleTimeout = -1; + + // Fields that use the mListeners monitor. + + private Collection mListeners = new LinkedList(); + + // Fields that use the mPool monitor. + + // Pool is accessed like a stack. + private LinkedList mPool; + private int mMax; + private int mActive; + private boolean mDaemon; + private int mPriority; + private boolean mClosed; + + /** + * Create a ThreadPool of daemon threads. + * + * @param name Name of ThreadPool + * @param max The maximum allowed number of threads + * + * @throws IllegalArgumentException + */ + public ThreadPool(String name, int max) + throws IllegalArgumentException { + + this(name, max, true); + } + + /** + * Create a ThreadPool of daemon threads. + * + * @param parent Parent ThreadGroup + * @param name Name of ThreadPool + * @param max The maximum allowed number of threads + * + * @throws IllegalArgumentException + */ + public ThreadPool(ThreadGroup parent, String name, int max) + throws IllegalArgumentException { + + this(parent, name, max, true); + } + + /** + * Create a ThreadPool. + * + * @param name Name of ThreadPool + * @param max The maximum allowed number of threads + * @param daemon Set to true to create ThreadPool of daemon threads + * + * @throws IllegalArgumentException + */ + public ThreadPool(String name, int max, boolean daemon) + throws IllegalArgumentException { + + super(name); + + init(max, daemon); + } + + /** + * Create a ThreadPool. + * + * @param parent Parent ThreadGroup + * @param name Name of ThreadPool + * @param max The maximum allowed number of threads + * @param daemon Set to true to create ThreadPool of daemon threads + * + * @throws IllegalArgumentException + */ + public ThreadPool(ThreadGroup parent, String name, int max,boolean daemon) + throws IllegalArgumentException { + + super(parent, name); + + init(max, daemon); + } + + private void init(int max, boolean daemon) + throws IllegalArgumentException { + + if (max <= 0) { + throw new IllegalArgumentException + ("Maximum number of threads must be greater than zero: " + + max); + } + + mMax = max; + + mDaemon = daemon; + mPriority = Thread.currentThread().getPriority(); + mClosed = false; + + mPool = new LinkedList(); + } + + /** + * Sets the timeout (in milliseconds) for getting threads from the pool + * or for closing the pool. A negative value specifies an infinite timeout. + * Calling the start method that accepts a timeout value will override + * this setting. + */ + public synchronized void setTimeout(long timeout) { + mTimeout = timeout; + } + + /** + * Returns the timeout (in milliseconds) for getting threads from the pool. + * The default value is negative, which indicates an infinite wait. + */ + public synchronized long getTimeout() { + return mTimeout; + } + + /** + * Sets the timeout (in milliseconds) for idle threads to exit. A negative + * value specifies that an idle thread never exits. + */ + public synchronized void setIdleTimeout(long timeout) { + mIdleTimeout = timeout; + } + + /** + * Returns the idle timeout (in milliseconds) for threads to exit. The + * default value is negative, which indicates that idle threads never exit. + */ + public synchronized long getIdleTimeout() { + return mIdleTimeout; + } + + public void addThreadPoolListener(ThreadPoolListener listener) { + synchronized (mListeners) { + mListeners.add(listener); + } + } + + public void removeThreadPoolListener(ThreadPoolListener listener) { + synchronized (mListeners) { + mListeners.remove(listener); + } + } + + /** + * Returns the initial priority given to each thread in the pool. The + * default value is that of the thread that created the ThreadPool. + */ + public int getPriority() { + synchronized (mPool) { + return mPriority; + } + } + + /** + * Sets the priority given to each thread in the pool. + * + * @throws IllegalArgumentException if priority is out of range + */ + public void setPriority(int priority) throws IllegalArgumentException { + if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) { + throw new IllegalArgumentException + ("Priority out of range: " + priority); + } + + synchronized (mPool) { + mPriority = priority; + } + } + + /** + * @return The maximum allowed number of threads. + */ + public int getMaximumAllowed() { + synchronized (mPool) { + return mMax; + } + } + + /** + * @return The number of currently available threads in the pool. + */ + public int getAvailableCount() { + synchronized (mPool) { + return mPool.size(); + } + } + + /** + * @return The total number of threads in the pool that are either + * available or in use. + */ + public int getPooledCount() { + synchronized (mPool) { + return mActive; + } + } + + /** + * @return The total number of threads in the ThreadGroup. + */ + public int getThreadCount() { + return activeCount(); + } + + /** + * @return Each thread that is active in the entire ThreadGroup. + */ + public Thread[] getAllThreads() { + int count = activeCount(); + Thread[] threads = new Thread[count]; + count = enumerate(threads); + if (count >= threads.length) { + return sort(threads); + } + else { + Thread[] newThreads = new Thread[count]; + System.arraycopy(threads, 0, newThreads, 0, count); + return sort(newThreads); + } + } + + private Thread[] sort(Thread[] threads) { + Comparator c = BeanComparator.forClass(Thread.class) + .orderBy("threadGroup.name") + .orderBy("name") + .orderBy("priority"); + Arrays.sort(threads, c); + return threads; + } + + /** + * Waits for a Thread to become available and starts a Runnable in it. + * If there are no available threads and the number of active threads is + * less than the maximum allowed, then a newly created thread is returned. + * + * @param target The Runnable instance that gets started by the returned + * thread. + * @exception NoThreadException If no thread could be obtained. + * @exception InterruptedException If interrupted while waiting for a + * thread to become available. + * @return A Thread that has been started on the given Runnable. + */ + public Thread start(Runnable target) + throws NoThreadException, InterruptedException + { + try { + return start0(target, getTimeout(), null); + } + catch (NoThreadException e) { + e.fillInStackTrace(); + throw e; + } + } + + /** + * Waits for a Thread to become available and starts a Runnable in it. + * If there are no available threads and the number of active threads is + * less than the maximum allowed, then a newly created thread is returned. + * + * @param target The Runnable instance that gets started by the returned + * thread. + * @param timeout Milliseconds to wait for a thread to become + * available. If zero, don't wait at all. If negative, wait forever. + * @exception NoThreadException If no thread could be obtained. + * @exception InterruptedException If interrupted while waiting for a + * thread to become available. + * @return A Thread that has been started on the given Runnable. + */ + public Thread start(Runnable target, long timeout) + throws NoThreadException, InterruptedException + { + try { + return start0(target, timeout, null); + } + catch (NoThreadException e) { + e.fillInStackTrace(); + throw e; + } + } + + + /** + * Waits for a Thread to become available and starts a Runnable in it. + * If there are no available threads and the number of active threads is + * less than the maximum allowed, then a newly created thread is returned. + * + * @param target The Runnable instance that gets started by the returned + * thread. + * @param name The name to give the thread. + * @exception NoThreadException If no thread could be obtained. + * @exception InterruptedException If interrupted while waiting for a + * thread to become available. + * @return A Thread that has been started on the given Runnable. + */ + public Thread start(Runnable target, String name) + throws NoThreadException, InterruptedException + { + try { + return start0(target, getTimeout(), name); + } + catch (NoThreadException e) { + e.fillInStackTrace(); + throw e; + } + } + + /** + * Waits for a Thread to become available and starts a Runnable in it. + * If there are no available threads and the number of active threads is + * less than the maximum allowed, then a newly created thread is returned. + * + * @param target The Runnable instance that gets started by the returned + * thread. + * @param timeout Milliseconds to wait for a thread to become + * @param name The name to give the thread. + * available. If zero, don't wait at all. If negative, wait forever. + * @exception NoThreadException If no thread could be obtained. + * @exception InterruptedException If interrupted while waiting for a + * thread to become available. + * @return A Thread that has been started on the given Runnable. + */ + public Thread start(Runnable target, long timeout, String name) + throws NoThreadException, InterruptedException + { + try { + return start0(target, timeout, name); + } + catch (NoThreadException e) { + e.fillInStackTrace(); + throw e; + } + } + + private Thread start0(Runnable target, long timeout, String name) + throws NoThreadException, InterruptedException + { + PooledThread thread; + + while (true) { + synchronized (mPool) { + closeCheck(); + + // Obtain a thread from the pool if non-empty. + if (mPool.size() > 0) { + thread = (PooledThread)mPool.removeLast(); + } + else { + // Create a new thread if the number of active threads + // is less than the maximum allowed. + if (mActive < mMax) { + return startThread(target, name); + } + else { + break; + } + } + } + + if (name != null) { + thread.setName(name); + } + + if (thread.setTarget(target)) { + return thread; + } + + // Couldn't set the target because the pooled thread is exiting. + // Wait for it to exit to ensure that the active count is less + // than the maximum and try to obtain another thread. + thread.join(); + } + + if (timeout == 0) { + throw new NoThreadException("No thread available from " + this); + } + + // Wait for a thread to become available in the pool. + synchronized (mPool) { + closeCheck(); + + if (timeout < 0) { + while (mPool.size() <= 0) { + mPool.wait(0); + closeCheck(); + } + } + else { + long expireTime = System.currentTimeMillis() + timeout; + while (mPool.size() <= 0) { + mPool.wait(timeout); + closeCheck(); + + // Thread could have been notified, but another thread may + // have stolen the thread away. + if (mPool.size() <= 0 && + System.currentTimeMillis() > expireTime) { + + throw new NoThreadException + ("No thread available after waiting " + + timeout + " milliseconds: " + this); + } + } + } + + thread = (PooledThread)mPool.removeLast(); + if (name != null) { + thread.setName(name); + } + + if (thread.setTarget(target)) { + return thread; + } + } + + // Couldn't set the target because the pooled thread is exiting. + // Wait for it to exit to ensure that the active count is less + // than the maximum and create a new thread. + thread.join(); + return startThread(target, name); + } + + public boolean isClosed() { + return mClosed; + } + + /** + * Will close down all the threads in the pool as they become + * available. This method may block forever if any threads are + * never returned to the thread pool. + */ + public void close() throws InterruptedException { + close(getTimeout()); + } + + /** + * Will close down all the threads in the pool as they become + * available. If all the threads cannot become available within the + * specified timeout, any active threads not yet returned to the + * thread pool are interrupted. + * + * @param timeout Milliseconds to wait before unavailable threads + * are interrupted. If zero, don't wait at all. If negative, wait forever. + */ + public void close(long timeout) throws InterruptedException { + synchronized (mPool) { + mClosed = true; + mPool.notifyAll(); + + if (timeout != 0) { + if (timeout < 0) { + while (mActive > 0) { + // Infinite wait for notification. + mPool.wait(0); + } + } + else { + long expireTime = System.currentTimeMillis() + timeout; + while (mActive > 0) { + mPool.wait(timeout); + if (System.currentTimeMillis() > expireTime) { + break; + } + } + } + } + } + + interrupt(); + } + + private PooledThread startThread(Runnable target, String name) { + PooledThread thread; + + synchronized (mPool) { + mActive++; + thread = new PooledThread(getName() + ' ' + nextThreadID()); + thread.setPriority(mPriority); + thread.setDaemon(mDaemon); + + if (name != null) { + thread.setName(name); + } + + thread.setTarget(target); + thread.start(); + } + + ThreadPoolEvent event = new ThreadPoolEvent(this, thread); + synchronized (mListeners) { + for (Iterator it = mListeners.iterator(); it.hasNext();) { + ((ThreadPoolListener)it.next()).threadStarted(event); + } + } + + return thread; + } + + private void closeCheck() throws NoThreadException { + if (mClosed) { + throw new NoThreadException("Thread pool is closed", true); + } + } + + void threadAvailable(PooledThread thread) { + synchronized (mPool) { + if (thread.getPriority() != mPriority) { + thread.setPriority(mPriority); + } + mPool.addLast(thread); + mPool.notify(); + } + } + + void threadExiting(PooledThread thread) { + synchronized (mPool) { + if (mPool.remove(thread)) { + mActive--; + + ThreadPoolEvent event = new ThreadPoolEvent(this, thread); + synchronized (mListeners) { + for (Iterator it = mListeners.iterator(); it.hasNext();) { + ((ThreadPoolListener)it.next()).threadExiting(event); + } + } + + mPool.notify(); + } + } + } + + private class PooledThread extends Thread { + private String mOriginalName; + private Runnable mTarget; + private boolean mExiting; + + public PooledThread(String name) { + super(ThreadPool.this, name); + mOriginalName = name; + } + + synchronized boolean setTarget(Runnable target) { + if (mTarget != null) { + throw new IllegalStateException + ("Target runnable in pooled thread is already set"); + } + + if (mExiting) { + return false; + } + else { + mTarget = target; + notify(); + return true; + } + } + + private synchronized Runnable waitForTarget() { + Runnable target; + + if ((target = mTarget) == null) { + long idle = getIdleTimeout(); + + if ((target = mTarget) == null) { + if (idle != 0) { + try { + if (idle < 0) { + wait(0); + } + else { + wait(idle); + } + } + catch (InterruptedException e) { + } + } + + if ((target = mTarget) == null) { + mExiting = true; + } + } + } + + return target; + } + + public void run() { + try { + while (!isClosed()) { + if (Thread.interrupted()) { + continue; + } + + Runnable target; + + if ((target = waitForTarget()) == null) { + break; + } + + try { + target.run(); + } + catch (ThreadDeath death) { + break; + } + catch (Throwable e) { + uncaughtException(Thread.currentThread(), e); + e = null; + } + + // Allow the garbage collector to reclaim target from + // stack while we wait for another target. + target = null; + + mTarget = null; + setName(mOriginalName); + threadAvailable(this); + } + } + finally { + threadExiting(this); + } + } + } +}