comparison src/com/go/trove/util/ThreadPool.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:3dc0c5604566
1 /* ====================================================================
2 * Trove - Copyright (c) 1997-2000 Walt Disney Internet Group
3 * ====================================================================
4 * The Tea Software License, Version 1.1
5 *
6 * Copyright (c) 2000 Walt Disney Internet Group. All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
10 * are met:
11 *
12 * 1. Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 *
15 * 2. Redistributions in binary form must reproduce the above copyright
16 * notice, this list of conditions and the following disclaimer in
17 * the documentation and/or other materials provided with the
18 * distribution.
19 *
20 * 3. The end-user documentation included with the redistribution,
21 * if any, must include the following acknowledgment:
22 * "This product includes software developed by the
23 * Walt Disney Internet Group (http://opensource.go.com/)."
24 * Alternately, this acknowledgment may appear in the software itself,
25 * if and wherever such third-party acknowledgments normally appear.
26 *
27 * 4. The names "Tea", "TeaServlet", "Kettle", "Trove" and "BeanDoc" must
28 * not be used to endorse or promote products derived from this
29 * software without prior written permission. For written
30 * permission, please contact opensource@dig.com.
31 *
32 * 5. Products derived from this software may not be called "Tea",
33 * "TeaServlet", "Kettle" or "Trove", nor may "Tea", "TeaServlet",
34 * "Kettle", "Trove" or "BeanDoc" appear in their name, without prior
35 * written permission of the Walt Disney Internet Group.
36 *
37 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
38 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
39 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
40 * DISCLAIMED. IN NO EVENT SHALL THE WALT DISNEY INTERNET GROUP OR ITS
41 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
42 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
43 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
44 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
45 * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
46 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
47 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
48 * ====================================================================
49 *
50 * For more information about Tea, please see http://opensource.go.com/.
51 */
52
53 package com.go.trove.util;
54
55 import java.util.*;
56
57 /******************************************************************************
58 * A ThreadPool contains a collection of re-usable threads. There is a slight
59 * performance overhead in creating new threads, and so a ThreadPool can
60 * improve performance in systems that create short-lived threads. Pooled
61 * threads operate on Runnable targets and return back to the pool when the
62 * Runnable.run method exits.
63 *
64 * @author Brian S O'Neill
65 * @version
66 * <!--$$Revision: 1.1 $-->, <!--$$JustDate:--> 01/03/12 <!-- $-->
67 */
68 public class ThreadPool extends ThreadGroup {
69 private static int cThreadID;
70
71 private synchronized static int nextThreadID() {
72 return cThreadID++;
73 }
74
75 // Fields that use the monitor of this instance.
76
77 private long mTimeout = -1;
78 private long mIdleTimeout = -1;
79
80 // Fields that use the mListeners monitor.
81
82 private Collection mListeners = new LinkedList();
83
84 // Fields that use the mPool monitor.
85
86 // Pool is accessed like a stack.
87 private LinkedList mPool;
88 private int mMax;
89 private int mActive;
90 private boolean mDaemon;
91 private int mPriority;
92 private boolean mClosed;
93
94 /**
95 * Create a ThreadPool of daemon threads.
96 *
97 * @param name Name of ThreadPool
98 * @param max The maximum allowed number of threads
99 *
100 * @throws IllegalArgumentException
101 */
102 public ThreadPool(String name, int max)
103 throws IllegalArgumentException {
104
105 this(name, max, true);
106 }
107
108 /**
109 * Create a ThreadPool of daemon threads.
110 *
111 * @param parent Parent ThreadGroup
112 * @param name Name of ThreadPool
113 * @param max The maximum allowed number of threads
114 *
115 * @throws IllegalArgumentException
116 */
117 public ThreadPool(ThreadGroup parent, String name, int max)
118 throws IllegalArgumentException {
119
120 this(parent, name, max, true);
121 }
122
123 /**
124 * Create a ThreadPool.
125 *
126 * @param name Name of ThreadPool
127 * @param max The maximum allowed number of threads
128 * @param daemon Set to true to create ThreadPool of daemon threads
129 *
130 * @throws IllegalArgumentException
131 */
132 public ThreadPool(String name, int max, boolean daemon)
133 throws IllegalArgumentException {
134
135 super(name);
136
137 init(max, daemon);
138 }
139
140 /**
141 * Create a ThreadPool.
142 *
143 * @param parent Parent ThreadGroup
144 * @param name Name of ThreadPool
145 * @param max The maximum allowed number of threads
146 * @param daemon Set to true to create ThreadPool of daemon threads
147 *
148 * @throws IllegalArgumentException
149 */
150 public ThreadPool(ThreadGroup parent, String name, int max,boolean daemon)
151 throws IllegalArgumentException {
152
153 super(parent, name);
154
155 init(max, daemon);
156 }
157
158 private void init(int max, boolean daemon)
159 throws IllegalArgumentException {
160
161 if (max <= 0) {
162 throw new IllegalArgumentException
163 ("Maximum number of threads must be greater than zero: " +
164 max);
165 }
166
167 mMax = max;
168
169 mDaemon = daemon;
170 mPriority = Thread.currentThread().getPriority();
171 mClosed = false;
172
173 mPool = new LinkedList();
174 }
175
176 /**
177 * Sets the timeout (in milliseconds) for getting threads from the pool
178 * or for closing the pool. A negative value specifies an infinite timeout.
179 * Calling the start method that accepts a timeout value will override
180 * this setting.
181 */
182 public synchronized void setTimeout(long timeout) {
183 mTimeout = timeout;
184 }
185
186 /**
187 * Returns the timeout (in milliseconds) for getting threads from the pool.
188 * The default value is negative, which indicates an infinite wait.
189 */
190 public synchronized long getTimeout() {
191 return mTimeout;
192 }
193
194 /**
195 * Sets the timeout (in milliseconds) for idle threads to exit. A negative
196 * value specifies that an idle thread never exits.
197 */
198 public synchronized void setIdleTimeout(long timeout) {
199 mIdleTimeout = timeout;
200 }
201
202 /**
203 * Returns the idle timeout (in milliseconds) for threads to exit. The
204 * default value is negative, which indicates that idle threads never exit.
205 */
206 public synchronized long getIdleTimeout() {
207 return mIdleTimeout;
208 }
209
210 public void addThreadPoolListener(ThreadPoolListener listener) {
211 synchronized (mListeners) {
212 mListeners.add(listener);
213 }
214 }
215
216 public void removeThreadPoolListener(ThreadPoolListener listener) {
217 synchronized (mListeners) {
218 mListeners.remove(listener);
219 }
220 }
221
222 /**
223 * Returns the initial priority given to each thread in the pool. The
224 * default value is that of the thread that created the ThreadPool.
225 */
226 public int getPriority() {
227 synchronized (mPool) {
228 return mPriority;
229 }
230 }
231
232 /**
233 * Sets the priority given to each thread in the pool.
234 *
235 * @throws IllegalArgumentException if priority is out of range
236 */
237 public void setPriority(int priority) throws IllegalArgumentException {
238 if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
239 throw new IllegalArgumentException
240 ("Priority out of range: " + priority);
241 }
242
243 synchronized (mPool) {
244 mPriority = priority;
245 }
246 }
247
248 /**
249 * @return The maximum allowed number of threads.
250 */
251 public int getMaximumAllowed() {
252 synchronized (mPool) {
253 return mMax;
254 }
255 }
256
257 /**
258 * @return The number of currently available threads in the pool.
259 */
260 public int getAvailableCount() {
261 synchronized (mPool) {
262 return mPool.size();
263 }
264 }
265
266 /**
267 * @return The total number of threads in the pool that are either
268 * available or in use.
269 */
270 public int getPooledCount() {
271 synchronized (mPool) {
272 return mActive;
273 }
274 }
275
276 /**
277 * @return The total number of threads in the ThreadGroup.
278 */
279 public int getThreadCount() {
280 return activeCount();
281 }
282
283 /**
284 * @return Each thread that is active in the entire ThreadGroup.
285 */
286 public Thread[] getAllThreads() {
287 int count = activeCount();
288 Thread[] threads = new Thread[count];
289 count = enumerate(threads);
290 if (count >= threads.length) {
291 return sort(threads);
292 }
293 else {
294 Thread[] newThreads = new Thread[count];
295 System.arraycopy(threads, 0, newThreads, 0, count);
296 return sort(newThreads);
297 }
298 }
299
300 private Thread[] sort(Thread[] threads) {
301 Comparator c = BeanComparator.forClass(Thread.class)
302 .orderBy("threadGroup.name")
303 .orderBy("name")
304 .orderBy("priority");
305 Arrays.sort(threads, c);
306 return threads;
307 }
308
309 /**
310 * Waits for a Thread to become available and starts a Runnable in it.
311 * If there are no available threads and the number of active threads is
312 * less than the maximum allowed, then a newly created thread is returned.
313 *
314 * @param target The Runnable instance that gets started by the returned
315 * thread.
316 * @exception NoThreadException If no thread could be obtained.
317 * @exception InterruptedException If interrupted while waiting for a
318 * thread to become available.
319 * @return A Thread that has been started on the given Runnable.
320 */
321 public Thread start(Runnable target)
322 throws NoThreadException, InterruptedException
323 {
324 try {
325 return start0(target, getTimeout(), null);
326 }
327 catch (NoThreadException e) {
328 e.fillInStackTrace();
329 throw e;
330 }
331 }
332
333 /**
334 * Waits for a Thread to become available and starts a Runnable in it.
335 * If there are no available threads and the number of active threads is
336 * less than the maximum allowed, then a newly created thread is returned.
337 *
338 * @param target The Runnable instance that gets started by the returned
339 * thread.
340 * @param timeout Milliseconds to wait for a thread to become
341 * available. If zero, don't wait at all. If negative, wait forever.
342 * @exception NoThreadException If no thread could be obtained.
343 * @exception InterruptedException If interrupted while waiting for a
344 * thread to become available.
345 * @return A Thread that has been started on the given Runnable.
346 */
347 public Thread start(Runnable target, long timeout)
348 throws NoThreadException, InterruptedException
349 {
350 try {
351 return start0(target, timeout, null);
352 }
353 catch (NoThreadException e) {
354 e.fillInStackTrace();
355 throw e;
356 }
357 }
358
359
360 /**
361 * Waits for a Thread to become available and starts a Runnable in it.
362 * If there are no available threads and the number of active threads is
363 * less than the maximum allowed, then a newly created thread is returned.
364 *
365 * @param target The Runnable instance that gets started by the returned
366 * thread.
367 * @param name The name to give the thread.
368 * @exception NoThreadException If no thread could be obtained.
369 * @exception InterruptedException If interrupted while waiting for a
370 * thread to become available.
371 * @return A Thread that has been started on the given Runnable.
372 */
373 public Thread start(Runnable target, String name)
374 throws NoThreadException, InterruptedException
375 {
376 try {
377 return start0(target, getTimeout(), name);
378 }
379 catch (NoThreadException e) {
380 e.fillInStackTrace();
381 throw e;
382 }
383 }
384
385 /**
386 * Waits for a Thread to become available and starts a Runnable in it.
387 * If there are no available threads and the number of active threads is
388 * less than the maximum allowed, then a newly created thread is returned.
389 *
390 * @param target The Runnable instance that gets started by the returned
391 * thread.
392 * @param timeout Milliseconds to wait for a thread to become
393 * @param name The name to give the thread.
394 * available. If zero, don't wait at all. If negative, wait forever.
395 * @exception NoThreadException If no thread could be obtained.
396 * @exception InterruptedException If interrupted while waiting for a
397 * thread to become available.
398 * @return A Thread that has been started on the given Runnable.
399 */
400 public Thread start(Runnable target, long timeout, String name)
401 throws NoThreadException, InterruptedException
402 {
403 try {
404 return start0(target, timeout, name);
405 }
406 catch (NoThreadException e) {
407 e.fillInStackTrace();
408 throw e;
409 }
410 }
411
412 private Thread start0(Runnable target, long timeout, String name)
413 throws NoThreadException, InterruptedException
414 {
415 PooledThread thread;
416
417 while (true) {
418 synchronized (mPool) {
419 closeCheck();
420
421 // Obtain a thread from the pool if non-empty.
422 if (mPool.size() > 0) {
423 thread = (PooledThread)mPool.removeLast();
424 }
425 else {
426 // Create a new thread if the number of active threads
427 // is less than the maximum allowed.
428 if (mActive < mMax) {
429 return startThread(target, name);
430 }
431 else {
432 break;
433 }
434 }
435 }
436
437 if (name != null) {
438 thread.setName(name);
439 }
440
441 if (thread.setTarget(target)) {
442 return thread;
443 }
444
445 // Couldn't set the target because the pooled thread is exiting.
446 // Wait for it to exit to ensure that the active count is less
447 // than the maximum and try to obtain another thread.
448 thread.join();
449 }
450
451 if (timeout == 0) {
452 throw new NoThreadException("No thread available from " + this);
453 }
454
455 // Wait for a thread to become available in the pool.
456 synchronized (mPool) {
457 closeCheck();
458
459 if (timeout < 0) {
460 while (mPool.size() <= 0) {
461 mPool.wait(0);
462 closeCheck();
463 }
464 }
465 else {
466 long expireTime = System.currentTimeMillis() + timeout;
467 while (mPool.size() <= 0) {
468 mPool.wait(timeout);
469 closeCheck();
470
471 // Thread could have been notified, but another thread may
472 // have stolen the thread away.
473 if (mPool.size() <= 0 &&
474 System.currentTimeMillis() > expireTime) {
475
476 throw new NoThreadException
477 ("No thread available after waiting " +
478 timeout + " milliseconds: " + this);
479 }
480 }
481 }
482
483 thread = (PooledThread)mPool.removeLast();
484 if (name != null) {
485 thread.setName(name);
486 }
487
488 if (thread.setTarget(target)) {
489 return thread;
490 }
491 }
492
493 // Couldn't set the target because the pooled thread is exiting.
494 // Wait for it to exit to ensure that the active count is less
495 // than the maximum and create a new thread.
496 thread.join();
497 return startThread(target, name);
498 }
499
500 public boolean isClosed() {
501 return mClosed;
502 }
503
504 /**
505 * Will close down all the threads in the pool as they become
506 * available. This method may block forever if any threads are
507 * never returned to the thread pool.
508 */
509 public void close() throws InterruptedException {
510 close(getTimeout());
511 }
512
513 /**
514 * Will close down all the threads in the pool as they become
515 * available. If all the threads cannot become available within the
516 * specified timeout, any active threads not yet returned to the
517 * thread pool are interrupted.
518 *
519 * @param timeout Milliseconds to wait before unavailable threads
520 * are interrupted. If zero, don't wait at all. If negative, wait forever.
521 */
522 public void close(long timeout) throws InterruptedException {
523 synchronized (mPool) {
524 mClosed = true;
525 mPool.notifyAll();
526
527 if (timeout != 0) {
528 if (timeout < 0) {
529 while (mActive > 0) {
530 // Infinite wait for notification.
531 mPool.wait(0);
532 }
533 }
534 else {
535 long expireTime = System.currentTimeMillis() + timeout;
536 while (mActive > 0) {
537 mPool.wait(timeout);
538 if (System.currentTimeMillis() > expireTime) {
539 break;
540 }
541 }
542 }
543 }
544 }
545
546 interrupt();
547 }
548
549 private PooledThread startThread(Runnable target, String name) {
550 PooledThread thread;
551
552 synchronized (mPool) {
553 mActive++;
554 thread = new PooledThread(getName() + ' ' + nextThreadID());
555 thread.setPriority(mPriority);
556 thread.setDaemon(mDaemon);
557
558 if (name != null) {
559 thread.setName(name);
560 }
561
562 thread.setTarget(target);
563 thread.start();
564 }
565
566 ThreadPoolEvent event = new ThreadPoolEvent(this, thread);
567 synchronized (mListeners) {
568 for (Iterator it = mListeners.iterator(); it.hasNext();) {
569 ((ThreadPoolListener)it.next()).threadStarted(event);
570 }
571 }
572
573 return thread;
574 }
575
576 private void closeCheck() throws NoThreadException {
577 if (mClosed) {
578 throw new NoThreadException("Thread pool is closed", true);
579 }
580 }
581
582 void threadAvailable(PooledThread thread) {
583 synchronized (mPool) {
584 if (thread.getPriority() != mPriority) {
585 thread.setPriority(mPriority);
586 }
587 mPool.addLast(thread);
588 mPool.notify();
589 }
590 }
591
592 void threadExiting(PooledThread thread) {
593 synchronized (mPool) {
594 if (mPool.remove(thread)) {
595 mActive--;
596
597 ThreadPoolEvent event = new ThreadPoolEvent(this, thread);
598 synchronized (mListeners) {
599 for (Iterator it = mListeners.iterator(); it.hasNext();) {
600 ((ThreadPoolListener)it.next()).threadExiting(event);
601 }
602 }
603
604 mPool.notify();
605 }
606 }
607 }
608
609 private class PooledThread extends Thread {
610 private String mOriginalName;
611 private Runnable mTarget;
612 private boolean mExiting;
613
614 public PooledThread(String name) {
615 super(ThreadPool.this, name);
616 mOriginalName = name;
617 }
618
619 synchronized boolean setTarget(Runnable target) {
620 if (mTarget != null) {
621 throw new IllegalStateException
622 ("Target runnable in pooled thread is already set");
623 }
624
625 if (mExiting) {
626 return false;
627 }
628 else {
629 mTarget = target;
630 notify();
631 return true;
632 }
633 }
634
635 private synchronized Runnable waitForTarget() {
636 Runnable target;
637
638 if ((target = mTarget) == null) {
639 long idle = getIdleTimeout();
640
641 if ((target = mTarget) == null) {
642 if (idle != 0) {
643 try {
644 if (idle < 0) {
645 wait(0);
646 }
647 else {
648 wait(idle);
649 }
650 }
651 catch (InterruptedException e) {
652 }
653 }
654
655 if ((target = mTarget) == null) {
656 mExiting = true;
657 }
658 }
659 }
660
661 return target;
662 }
663
664 public void run() {
665 try {
666 while (!isClosed()) {
667 if (Thread.interrupted()) {
668 continue;
669 }
670
671 Runnable target;
672
673 if ((target = waitForTarget()) == null) {
674 break;
675 }
676
677 try {
678 target.run();
679 }
680 catch (ThreadDeath death) {
681 break;
682 }
683 catch (Throwable e) {
684 uncaughtException(Thread.currentThread(), e);
685 e = null;
686 }
687
688 // Allow the garbage collector to reclaim target from
689 // stack while we wait for another target.
690 target = null;
691
692 mTarget = null;
693 setName(mOriginalName);
694 threadAvailable(this);
695 }
696 }
697 finally {
698 threadExiting(this);
699 }
700 }
701 }
702 }