comparison src/org/dancres/blitz/task/Tasks.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:3dc0c5604566
1 package org.dancres.blitz.task;
2
3 import java.util.Iterator;
4 import java.util.HashMap;
5
6 import java.util.logging.Logger;
7 import java.util.logging.Level;
8
9 import net.jini.config.ConfigurationException;
10
11 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
12 import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue;
13 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
14 import EDU.oswego.cs.dl.util.concurrent.Channel;
15
16 import org.dancres.blitz.ActiveObject;
17 import org.dancres.blitz.ActiveObjectRegistry;
18 import org.dancres.blitz.Logging;
19 import org.dancres.blitz.stats.StatsBoard;
20
21 import org.dancres.blitz.config.ConfigurationFactory;
22 import org.dancres.blitz.util.QueueStatGenerator;
23
24 /**
25 Various operations within the space must be handled in background.
26 This class encapsulates a pool of threads which execute operations
27 (Task instances) as they are queued. <P>
28
29 It might, at first, appear to make sense to have separate pools for
30 event notification, blocked call wakeups etc. in the belief that one can
31 better control the balance of dispatch of, for example, notifies in
32 comparison with searches. <P>
33
34 In reality, this won't work as each pool has a set of threads all with the
35 same priority. i.e. They share whatever CPU is available in a manner
36 driven by the number of tasks they must perform. Thus, if one wishes
37 to truly balance, say, notification rate against search wakeups, one
38 must assign differing priorities to these <I>tasks</I> as opposed to
39 <I>threads</I> to ensure CPU consumption is bounded and that, whichever
40 tasks have priority, get to use the CPU first.
41 */
42 public class Tasks implements ActiveObject {
43 private static Logger theLogger =
44 Logging.newLogger("org.dancres.blitz.task.Tasks");
45
46 private static final String DEFAULT_QUEUE = "DefaultTask";
47
48 private static Tasks theTasks = new Tasks();
49
50 private static int MAX_TASK_THREADS;
51
52 private static int TASK_QUEUE_BOUND;
53
54 static {
55 try {
56 MAX_TASK_THREADS = ((Integer)
57 ConfigurationFactory.getEntry("maxTaskThreads",
58 int.class,
59 new Integer(10))).intValue();
60 TASK_QUEUE_BOUND = ((Integer)
61 ConfigurationFactory.getEntry("taskQueueBound",
62 int.class,
63 new Integer(0))).intValue();
64
65 theLogger.log(Level.INFO, "Maximum task threads: " +
66 MAX_TASK_THREADS);
67 theLogger.log(Level.INFO, "Task queue bound: " +
68 TASK_QUEUE_BOUND);
69
70 } catch (ConfigurationException aCE) {
71 theLogger.log(Level.SEVERE, "Failed to load config", aCE);
72 }
73 }
74
75 private HashMap theExecutors = new HashMap();
76
77 private Tasks() {
78 ActiveObjectRegistry.add(this);
79 }
80
81 public static void queue(Task aTask) throws InterruptedException {
82 queue(DEFAULT_QUEUE, aTask);
83 }
84
85 public static void queue(String aQueue,
86 Task aTask) throws InterruptedException {
87 theTasks.execute(aQueue, aTask);
88 }
89
90 private void execute(String aQueue, Task aTask)
91 throws InterruptedException {
92
93 getExecutor(aQueue).execute(aTask);
94 }
95
96 public void begin() {
97 }
98
99 public void halt() {
100 synchronized(theExecutors) {
101 Iterator myExecs = theExecutors.values().iterator();
102
103 while (myExecs.hasNext()) {
104 PooledExecutor myExec = (PooledExecutor) myExecs.next();
105 myExec.shutdownNow();
106 }
107 }
108 }
109
110 private PooledExecutor getExecutor(String aName) {
111 PooledExecutor myExec;
112
113 synchronized(theExecutors) {
114 myExec = (PooledExecutor) theExecutors.get(aName);
115
116 if (myExec == null) {
117
118 BoundedLinkedQueue myQueue;
119
120 if (TASK_QUEUE_BOUND == 0) {
121 theLogger.log(Level.INFO,
122 "Creating task pool with no bounds");
123
124
125 myQueue = new BoundedLinkedQueue(Integer.MAX_VALUE);
126 myExec = new PooledExecutor(myQueue, MAX_TASK_THREADS);
127 } else {
128 theLogger.log(Level.INFO,
129 "Creating task pool with bounds: " +
130 TASK_QUEUE_BOUND);
131
132 myQueue = new BoundedLinkedQueue(TASK_QUEUE_BOUND);
133 myExec =
134 new PooledExecutor(myQueue, MAX_TASK_THREADS);
135 }
136
137 myExec.setMinimumPoolSize(MAX_TASK_THREADS);
138 // myExec.waitWhenBlocked();
139
140 StatsBoard.get().add(new QueueStatGenerator(aName, myQueue));
141 theExecutors.put(aName, myExec);
142 }
143
144 return myExec;
145 }
146 }
147 }