Mercurial > hg > blitz_condensed
comparison src/org/dancres/blitz/task/Tasks.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:3dc0c5604566 |
---|---|
1 package org.dancres.blitz.task; | |
2 | |
3 import java.util.Iterator; | |
4 import java.util.HashMap; | |
5 | |
6 import java.util.logging.Logger; | |
7 import java.util.logging.Level; | |
8 | |
9 import net.jini.config.ConfigurationException; | |
10 | |
11 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue; | |
12 import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue; | |
13 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor; | |
14 import EDU.oswego.cs.dl.util.concurrent.Channel; | |
15 | |
16 import org.dancres.blitz.ActiveObject; | |
17 import org.dancres.blitz.ActiveObjectRegistry; | |
18 import org.dancres.blitz.Logging; | |
19 import org.dancres.blitz.stats.StatsBoard; | |
20 | |
21 import org.dancres.blitz.config.ConfigurationFactory; | |
22 import org.dancres.blitz.util.QueueStatGenerator; | |
23 | |
24 /** | |
25 Various operations within the space must be handled in background. | |
26 This class encapsulates a pool of threads which execute operations | |
27 (Task instances) as they are queued. <P> | |
28 | |
29 It might, at first, appear to make sense to have separate pools for | |
30 event notification, blocked call wakeups etc. in the belief that one can | |
31 better control the balance of dispatch of, for example, notifies in | |
32 comparison with searches. <P> | |
33 | |
34 In reality, this won't work as each pool has a set of threads all with the | |
35 same priority. i.e. They share whatever CPU is available in a manner | |
36 driven by the number of tasks they must perform. Thus, if one wishes | |
37 to truly balance, say, notification rate against search wakeups, one | |
38 must assign differing priorities to these <I>tasks</I> as opposed to | |
39 <I>threads</I> to ensure CPU consumption is bounded and that, whichever | |
40 tasks have priority, get to use the CPU first. | |
41 */ | |
42 public class Tasks implements ActiveObject { | |
43 private static Logger theLogger = | |
44 Logging.newLogger("org.dancres.blitz.task.Tasks"); | |
45 | |
46 private static final String DEFAULT_QUEUE = "DefaultTask"; | |
47 | |
48 private static Tasks theTasks = new Tasks(); | |
49 | |
50 private static int MAX_TASK_THREADS; | |
51 | |
52 private static int TASK_QUEUE_BOUND; | |
53 | |
54 static { | |
55 try { | |
56 MAX_TASK_THREADS = ((Integer) | |
57 ConfigurationFactory.getEntry("maxTaskThreads", | |
58 int.class, | |
59 new Integer(10))).intValue(); | |
60 TASK_QUEUE_BOUND = ((Integer) | |
61 ConfigurationFactory.getEntry("taskQueueBound", | |
62 int.class, | |
63 new Integer(0))).intValue(); | |
64 | |
65 theLogger.log(Level.INFO, "Maximum task threads: " + | |
66 MAX_TASK_THREADS); | |
67 theLogger.log(Level.INFO, "Task queue bound: " + | |
68 TASK_QUEUE_BOUND); | |
69 | |
70 } catch (ConfigurationException aCE) { | |
71 theLogger.log(Level.SEVERE, "Failed to load config", aCE); | |
72 } | |
73 } | |
74 | |
75 private HashMap theExecutors = new HashMap(); | |
76 | |
77 private Tasks() { | |
78 ActiveObjectRegistry.add(this); | |
79 } | |
80 | |
81 public static void queue(Task aTask) throws InterruptedException { | |
82 queue(DEFAULT_QUEUE, aTask); | |
83 } | |
84 | |
85 public static void queue(String aQueue, | |
86 Task aTask) throws InterruptedException { | |
87 theTasks.execute(aQueue, aTask); | |
88 } | |
89 | |
90 private void execute(String aQueue, Task aTask) | |
91 throws InterruptedException { | |
92 | |
93 getExecutor(aQueue).execute(aTask); | |
94 } | |
95 | |
96 public void begin() { | |
97 } | |
98 | |
99 public void halt() { | |
100 synchronized(theExecutors) { | |
101 Iterator myExecs = theExecutors.values().iterator(); | |
102 | |
103 while (myExecs.hasNext()) { | |
104 PooledExecutor myExec = (PooledExecutor) myExecs.next(); | |
105 myExec.shutdownNow(); | |
106 } | |
107 } | |
108 } | |
109 | |
110 private PooledExecutor getExecutor(String aName) { | |
111 PooledExecutor myExec; | |
112 | |
113 synchronized(theExecutors) { | |
114 myExec = (PooledExecutor) theExecutors.get(aName); | |
115 | |
116 if (myExec == null) { | |
117 | |
118 BoundedLinkedQueue myQueue; | |
119 | |
120 if (TASK_QUEUE_BOUND == 0) { | |
121 theLogger.log(Level.INFO, | |
122 "Creating task pool with no bounds"); | |
123 | |
124 | |
125 myQueue = new BoundedLinkedQueue(Integer.MAX_VALUE); | |
126 myExec = new PooledExecutor(myQueue, MAX_TASK_THREADS); | |
127 } else { | |
128 theLogger.log(Level.INFO, | |
129 "Creating task pool with bounds: " + | |
130 TASK_QUEUE_BOUND); | |
131 | |
132 myQueue = new BoundedLinkedQueue(TASK_QUEUE_BOUND); | |
133 myExec = | |
134 new PooledExecutor(myQueue, MAX_TASK_THREADS); | |
135 } | |
136 | |
137 myExec.setMinimumPoolSize(MAX_TASK_THREADS); | |
138 // myExec.waitWhenBlocked(); | |
139 | |
140 StatsBoard.get().add(new QueueStatGenerator(aName, myQueue)); | |
141 theExecutors.put(aName, myExec); | |
142 } | |
143 | |
144 return myExec; | |
145 } | |
146 } | |
147 } |