Mercurial > hg > blitz_condensed
comparison src/org/dancres/blitz/disk/WriteDaemon.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:3dc0c5604566 |
---|---|
1 package org.dancres.blitz.disk; | |
2 | |
3 import java.util.logging.*; | |
4 | |
5 import net.jini.config.ConfigurationException; | |
6 | |
7 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue; | |
8 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor; | |
9 | |
10 import org.dancres.blitz.Logging; | |
11 | |
12 import org.dancres.blitz.ActiveObject; | |
13 import org.dancres.blitz.ActiveObjectRegistry; | |
14 | |
15 import org.dancres.blitz.config.ConfigurationFactory; | |
16 | |
17 import org.dancres.blitz.task.Tasks; | |
18 import org.dancres.blitz.task.Task; | |
19 | |
20 import org.dancres.blitz.stats.StatGenerator; | |
21 import org.dancres.blitz.stats.Stat; | |
22 import org.dancres.blitz.stats.StatsBoard; | |
23 import org.dancres.blitz.stats.IOStat; | |
24 | |
25 /** | |
26 <p> Certain elements of Blitz (such as ArcCache) require all writes to be | |
27 asynchronous to avoid deadlock situations and reduce time spent waiting for | |
28 I/O completion by a client thread. WriteDaemon provides the asynchronous | |
29 I/O infrastructure to satisfy such requirements. </p> | |
30 | |
31 <p> We stop when Disk is stop'd which allows for proper sync'ing </p> | |
32 | |
33 <p>We can only have one thread for writing at this moment because: | |
34 <ol> | |
35 <li>WriteBuffer assumes serialized updating of images. If two threads | |
36 start performing jobs on the same UID, then there's currently scope for | |
37 a collision. The problem is compounded by the fact that we can't simply | |
38 requeue the job because that will break checkpointing/sync'ing</li> | |
39 <li>Sync'ing assumes that there's only one thread which will execute the | |
40 appropriate callback placed in the queue once all prior Jobs have been | |
41 dispatched. This ordering dependency is what complicates the WriteBuffer | |
42 issue above.</li> | |
43 <li>There's an ordering issue - we must write before we decide to then | |
44 delete something that is on disk.</li> | |
45 </ol> | |
46 </p> | |
47 | |
48 @see org.dancres.blitz.arc.ArcCache | |
49 */ | |
50 public class WriteDaemon implements StatGenerator { | |
51 static Logger theLogger = | |
52 Logging.newLogger("org.dancres.blitz.disk.WriteDaemon"); | |
53 | |
54 private static int MAX_WRITE_THREADS; | |
55 private static int THREAD_KEEPALIVE; | |
56 private static int DESIRED_PENDING_WRITES; | |
57 private static int THROTTLE_PENDING_WRITES; | |
58 private static long THROTTLE_PAUSE; | |
59 | |
60 static { | |
61 try { | |
62 MAX_WRITE_THREADS = ((Integer) | |
63 ConfigurationFactory.getEntry("maxWriteThreads", | |
64 int.class, | |
65 new Integer(5))).intValue(); | |
66 THREAD_KEEPALIVE = ((Integer) | |
67 ConfigurationFactory.getEntry("threadKeepalive", | |
68 int.class, | |
69 new Integer(60000))).intValue(); | |
70 DESIRED_PENDING_WRITES = ((Integer) | |
71 ConfigurationFactory.getEntry("desiredPendingWrites", | |
72 int.class, | |
73 new Integer(20))).intValue(); | |
74 | |
75 THROTTLE_PENDING_WRITES = ((Integer) | |
76 ConfigurationFactory.getEntry("throttlePendingWrites", | |
77 int.class, | |
78 new Integer(Integer.MAX_VALUE))).intValue(); | |
79 THROTTLE_PAUSE = ((Long) | |
80 ConfigurationFactory.getEntry("throttlePause", | |
81 long.class, | |
82 new Long(50))).longValue(); | |
83 } catch (ConfigurationException aCE) { | |
84 } | |
85 } | |
86 | |
87 private static WriteDaemon theDaemon = new WriteDaemon(); | |
88 | |
89 private int thePendingCount; | |
90 private LinkedQueue thePendingUpdates = new LinkedQueue(); | |
91 | |
92 private LinkedQueue theAsyncUpdates; | |
93 private PooledExecutor theWriters; | |
94 | |
95 private LinkedQueue theCompletions; | |
96 private PooledExecutor theCompleters; | |
97 | |
98 private IOStats theIOStats = new IOStats(); | |
99 | |
100 private long theStatId = StatGenerator.UNSET_ID; | |
101 private int theThrottleCount = 0; | |
102 | |
103 private WriteDaemon() { | |
104 theAsyncUpdates = new LinkedQueue(); | |
105 theCompletions = new LinkedQueue(); | |
106 | |
107 theWriters = new PooledExecutor(theAsyncUpdates, MAX_WRITE_THREADS); | |
108 theCompleters = new PooledExecutor(theCompletions, 1); | |
109 | |
110 theLogger.log(Level.INFO, "Async keepalive: " + THREAD_KEEPALIVE); | |
111 theLogger.log(Level.INFO, "Pending write size: " + | |
112 DESIRED_PENDING_WRITES); | |
113 theLogger.log(Level.INFO, "Throttle write size: " + | |
114 THROTTLE_PENDING_WRITES); | |
115 theLogger.log(Level.INFO, "Throttle pause: " + | |
116 THROTTLE_PAUSE); | |
117 | |
118 theWriters.setKeepAliveTime(THREAD_KEEPALIVE); | |
119 theCompleters.setKeepAliveTime(THREAD_KEEPALIVE); | |
120 | |
121 StatsBoard.get().add(this); | |
122 } | |
123 | |
124 public static WriteDaemon get() { | |
125 return theDaemon; | |
126 } | |
127 | |
128 /** | |
129 Queue a task for execution by the WriteDaemon thread. | |
130 */ | |
131 public void queue(Runnable anUpdate) { | |
132 | |
133 // If the write queue is getting too large, start stalling | |
134 if (theIOStats.getQueueSize() > THROTTLE_PENDING_WRITES) { | |
135 ++theThrottleCount; | |
136 | |
137 theLogger.log(Level.WARNING, | |
138 "Write queue overflowing - THROTTLING"); | |
139 | |
140 try { | |
141 Thread.sleep(THROTTLE_PAUSE); | |
142 } catch (InterruptedException anIE) { | |
143 theLogger.log(Level.SEVERE, "Throttle broken!"); | |
144 } | |
145 } | |
146 | |
147 synchronized(this) { | |
148 try { | |
149 thePendingUpdates.put(new OutputTracker(anUpdate)); | |
150 ++thePendingCount; | |
151 | |
152 if (thePendingCount >= DESIRED_PENDING_WRITES) { | |
153 pushImpl(); | |
154 } | |
155 } catch (InterruptedException anIE) { | |
156 theLogger.log(Level.SEVERE, "Failed to queue update", anIE); | |
157 } | |
158 } | |
159 | |
160 theIOStats.incAsyncInCount(); | |
161 } | |
162 | |
163 /** | |
164 Should only be called from within a sync block. | |
165 */ | |
166 private void pushImpl() { | |
167 Object myTask; | |
168 | |
169 try { | |
170 while ((myTask = thePendingUpdates.poll(0)) != null) { | |
171 theWriters.execute((Runnable) myTask); | |
172 } | |
173 } catch (InterruptedException anIE) { | |
174 } | |
175 | |
176 thePendingCount = 0; | |
177 } | |
178 | |
179 /** | |
180 Force the queue to be processed | |
181 */ | |
182 void push() { | |
183 synchronized(this) { | |
184 pushImpl(); | |
185 } | |
186 } | |
187 | |
188 /** | |
189 <p>Force the updates in the queue to disk. On completion, invoke the | |
190 passed task. Note this task is processed asynchronously outside | |
191 of the WriteDaemon thread. This allows the WriteDaemon to continue | |
192 processing updates whilst the completion task runs.</p> | |
193 | |
194 <p>If you want a task to be performed synchronously by the WriteDaemon | |
195 after other requests, <code>queue</code> the task and invoke | |
196 <code>push</code>.</p> | |
197 */ | |
198 void push(Task aCompletionTask) { | |
199 if (aCompletionTask == null) | |
200 throw new IllegalArgumentException(); | |
201 else { | |
202 synchronized(this) { | |
203 /* | |
204 We ensure the passed task doesn't execute until the | |
205 queue has been emptied by putting a barrier task at the end | |
206 of the queue. Thus, when we force the queue, this barrier | |
207 will only execute once all preceeding updates have been | |
208 performed. | |
209 */ | |
210 queue(new Scheduler(aCompletionTask)); | |
211 pushImpl(); | |
212 } | |
213 } | |
214 } | |
215 | |
216 void halt() { | |
217 theLogger.log(Level.INFO, "WriteDaemon doing halt"); | |
218 theWriters.shutdownAfterProcessingCurrentlyQueuedTasks(); | |
219 theCompleters.shutdownAfterProcessingCurrentlyQueuedTasks(); | |
220 theLogger.log(Level.INFO, "WriteDaemon done halt"); | |
221 } | |
222 | |
223 /** | |
224 We do not run the completion task in-line. This is potentially time | |
225 consuming and we have more important things to do (write updates) so | |
226 we palm the completion task off to a separate thread pool when we've | |
227 done the necessary work. | |
228 */ | |
229 private class Scheduler implements Runnable { | |
230 private Task theTask; | |
231 | |
232 Scheduler(Task aCompletionTask) { | |
233 theTask = aCompletionTask; | |
234 } | |
235 | |
236 public void run() { | |
237 try { | |
238 theCompleters.execute(theTask); | |
239 } catch (InterruptedException anIE) { | |
240 theLogger.log(Level.SEVERE, "Failed to scheduler sync completion task", anIE); | |
241 } | |
242 } | |
243 } | |
244 | |
245 private class OutputTracker implements Runnable { | |
246 private Runnable theWrite; | |
247 | |
248 OutputTracker(Runnable aRunnable) { | |
249 theWrite = aRunnable; | |
250 } | |
251 | |
252 public void run() { | |
253 theWrite.run(); | |
254 | |
255 theIOStats.incAsyncOutCount(); | |
256 } | |
257 } | |
258 | |
259 /** | |
260 @return the id of the StatGenerator that produced the stat | |
261 AdministrableStat.UNSET_ID if the id has never been set | |
262 */ | |
263 public long getId() { | |
264 return theStatId; | |
265 } | |
266 | |
267 public void setId(long anId) { | |
268 theStatId = anId; | |
269 } | |
270 | |
271 public Stat generate() { | |
272 return new IOStat(theStatId, theIOStats.getTimePerIn(), | |
273 theIOStats.getTimePerOut(), theIOStats.getInOutRatio(), | |
274 theIOStats.getQueueSize(), theThrottleCount); | |
275 } | |
276 } |