comparison src/org/dancres/blitz/disk/WriteDaemon.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:3dc0c5604566
1 package org.dancres.blitz.disk;
2
3 import java.util.logging.*;
4
5 import net.jini.config.ConfigurationException;
6
7 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
8 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
9
10 import org.dancres.blitz.Logging;
11
12 import org.dancres.blitz.ActiveObject;
13 import org.dancres.blitz.ActiveObjectRegistry;
14
15 import org.dancres.blitz.config.ConfigurationFactory;
16
17 import org.dancres.blitz.task.Tasks;
18 import org.dancres.blitz.task.Task;
19
20 import org.dancres.blitz.stats.StatGenerator;
21 import org.dancres.blitz.stats.Stat;
22 import org.dancres.blitz.stats.StatsBoard;
23 import org.dancres.blitz.stats.IOStat;
24
25 /**
26 <p> Certain elements of Blitz (such as ArcCache) require all writes to be
27 asynchronous to avoid deadlock situations and reduce time spent waiting for
28 I/O completion by a client thread. WriteDaemon provides the asynchronous
29 I/O infrastructure to satisfy such requirements. </p>
30
31 <p> We stop when Disk is stop'd which allows for proper sync'ing </p>
32
33 <p>We can only have one thread for writing at this moment because:
34 <ol>
35 <li>WriteBuffer assumes serialized updating of images. If two threads
36 start performing jobs on the same UID, then there's currently scope for
37 a collision. The problem is compounded by the fact that we can't simply
38 requeue the job because that will break checkpointing/sync'ing</li>
39 <li>Sync'ing assumes that there's only one thread which will execute the
40 appropriate callback placed in the queue once all prior Jobs have been
41 dispatched. This ordering dependency is what complicates the WriteBuffer
42 issue above.</li>
43 <li>There's an ordering issue - we must write before we decide to then
44 delete something that is on disk.</li>
45 </ol>
46 </p>
47
48 @see org.dancres.blitz.arc.ArcCache
49 */
50 public class WriteDaemon implements StatGenerator {
51 static Logger theLogger =
52 Logging.newLogger("org.dancres.blitz.disk.WriteDaemon");
53
54 private static int MAX_WRITE_THREADS;
55 private static int THREAD_KEEPALIVE;
56 private static int DESIRED_PENDING_WRITES;
57 private static int THROTTLE_PENDING_WRITES;
58 private static long THROTTLE_PAUSE;
59
60 static {
61 try {
62 MAX_WRITE_THREADS = ((Integer)
63 ConfigurationFactory.getEntry("maxWriteThreads",
64 int.class,
65 new Integer(5))).intValue();
66 THREAD_KEEPALIVE = ((Integer)
67 ConfigurationFactory.getEntry("threadKeepalive",
68 int.class,
69 new Integer(60000))).intValue();
70 DESIRED_PENDING_WRITES = ((Integer)
71 ConfigurationFactory.getEntry("desiredPendingWrites",
72 int.class,
73 new Integer(20))).intValue();
74
75 THROTTLE_PENDING_WRITES = ((Integer)
76 ConfigurationFactory.getEntry("throttlePendingWrites",
77 int.class,
78 new Integer(Integer.MAX_VALUE))).intValue();
79 THROTTLE_PAUSE = ((Long)
80 ConfigurationFactory.getEntry("throttlePause",
81 long.class,
82 new Long(50))).longValue();
83 } catch (ConfigurationException aCE) {
84 }
85 }
86
87 private static WriteDaemon theDaemon = new WriteDaemon();
88
89 private int thePendingCount;
90 private LinkedQueue thePendingUpdates = new LinkedQueue();
91
92 private LinkedQueue theAsyncUpdates;
93 private PooledExecutor theWriters;
94
95 private LinkedQueue theCompletions;
96 private PooledExecutor theCompleters;
97
98 private IOStats theIOStats = new IOStats();
99
100 private long theStatId = StatGenerator.UNSET_ID;
101 private int theThrottleCount = 0;
102
103 private WriteDaemon() {
104 theAsyncUpdates = new LinkedQueue();
105 theCompletions = new LinkedQueue();
106
107 theWriters = new PooledExecutor(theAsyncUpdates, MAX_WRITE_THREADS);
108 theCompleters = new PooledExecutor(theCompletions, 1);
109
110 theLogger.log(Level.INFO, "Async keepalive: " + THREAD_KEEPALIVE);
111 theLogger.log(Level.INFO, "Pending write size: " +
112 DESIRED_PENDING_WRITES);
113 theLogger.log(Level.INFO, "Throttle write size: " +
114 THROTTLE_PENDING_WRITES);
115 theLogger.log(Level.INFO, "Throttle pause: " +
116 THROTTLE_PAUSE);
117
118 theWriters.setKeepAliveTime(THREAD_KEEPALIVE);
119 theCompleters.setKeepAliveTime(THREAD_KEEPALIVE);
120
121 StatsBoard.get().add(this);
122 }
123
124 public static WriteDaemon get() {
125 return theDaemon;
126 }
127
128 /**
129 Queue a task for execution by the WriteDaemon thread.
130 */
131 public void queue(Runnable anUpdate) {
132
133 // If the write queue is getting too large, start stalling
134 if (theIOStats.getQueueSize() > THROTTLE_PENDING_WRITES) {
135 ++theThrottleCount;
136
137 theLogger.log(Level.WARNING,
138 "Write queue overflowing - THROTTLING");
139
140 try {
141 Thread.sleep(THROTTLE_PAUSE);
142 } catch (InterruptedException anIE) {
143 theLogger.log(Level.SEVERE, "Throttle broken!");
144 }
145 }
146
147 synchronized(this) {
148 try {
149 thePendingUpdates.put(new OutputTracker(anUpdate));
150 ++thePendingCount;
151
152 if (thePendingCount >= DESIRED_PENDING_WRITES) {
153 pushImpl();
154 }
155 } catch (InterruptedException anIE) {
156 theLogger.log(Level.SEVERE, "Failed to queue update", anIE);
157 }
158 }
159
160 theIOStats.incAsyncInCount();
161 }
162
163 /**
164 Should only be called from within a sync block.
165 */
166 private void pushImpl() {
167 Object myTask;
168
169 try {
170 while ((myTask = thePendingUpdates.poll(0)) != null) {
171 theWriters.execute((Runnable) myTask);
172 }
173 } catch (InterruptedException anIE) {
174 }
175
176 thePendingCount = 0;
177 }
178
179 /**
180 Force the queue to be processed
181 */
182 void push() {
183 synchronized(this) {
184 pushImpl();
185 }
186 }
187
188 /**
189 <p>Force the updates in the queue to disk. On completion, invoke the
190 passed task. Note this task is processed asynchronously outside
191 of the WriteDaemon thread. This allows the WriteDaemon to continue
192 processing updates whilst the completion task runs.</p>
193
194 <p>If you want a task to be performed synchronously by the WriteDaemon
195 after other requests, <code>queue</code> the task and invoke
196 <code>push</code>.</p>
197 */
198 void push(Task aCompletionTask) {
199 if (aCompletionTask == null)
200 throw new IllegalArgumentException();
201 else {
202 synchronized(this) {
203 /*
204 We ensure the passed task doesn't execute until the
205 queue has been emptied by putting a barrier task at the end
206 of the queue. Thus, when we force the queue, this barrier
207 will only execute once all preceeding updates have been
208 performed.
209 */
210 queue(new Scheduler(aCompletionTask));
211 pushImpl();
212 }
213 }
214 }
215
216 void halt() {
217 theLogger.log(Level.INFO, "WriteDaemon doing halt");
218 theWriters.shutdownAfterProcessingCurrentlyQueuedTasks();
219 theCompleters.shutdownAfterProcessingCurrentlyQueuedTasks();
220 theLogger.log(Level.INFO, "WriteDaemon done halt");
221 }
222
223 /**
224 We do not run the completion task in-line. This is potentially time
225 consuming and we have more important things to do (write updates) so
226 we palm the completion task off to a separate thread pool when we've
227 done the necessary work.
228 */
229 private class Scheduler implements Runnable {
230 private Task theTask;
231
232 Scheduler(Task aCompletionTask) {
233 theTask = aCompletionTask;
234 }
235
236 public void run() {
237 try {
238 theCompleters.execute(theTask);
239 } catch (InterruptedException anIE) {
240 theLogger.log(Level.SEVERE, "Failed to scheduler sync completion task", anIE);
241 }
242 }
243 }
244
245 private class OutputTracker implements Runnable {
246 private Runnable theWrite;
247
248 OutputTracker(Runnable aRunnable) {
249 theWrite = aRunnable;
250 }
251
252 public void run() {
253 theWrite.run();
254
255 theIOStats.incAsyncOutCount();
256 }
257 }
258
259 /**
260 @return the id of the StatGenerator that produced the stat
261 AdministrableStat.UNSET_ID if the id has never been set
262 */
263 public long getId() {
264 return theStatId;
265 }
266
267 public void setId(long anId) {
268 theStatId = anId;
269 }
270
271 public Stat generate() {
272 return new IOStat(theStatId, theIOStats.getTimePerIn(),
273 theIOStats.getTimePerOut(), theIOStats.getInOutRatio(),
274 theIOStats.getQueueSize(), theThrottleCount);
275 }
276 }