comparison src/org/dancres/blitz/remote/nio/ControlBlock.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:3dc0c5604566
1 package org.dancres.blitz.remote.nio;
2
3 import java.nio.channels.SelectionKey;
4 import java.nio.channels.SocketChannel;
5 import java.nio.ByteBuffer;
6 import java.io.IOException;
7 import java.util.ArrayList;
8
9 /**
10 * Responsible for processing selector messages associated with a particular
11 * endpoint (socket) as defined by a SelectionKey. Uses a dispatcher to
12 * actually process the data received (it may also invoke methods etc) and to
13 * generate responses to be transmitted back through the endpoint.
14 */
15 public class ControlBlock {
16 private SelectionKey _key;
17 private ArrayList _outputBuffers = new ArrayList();
18 private Dispatcher _dispatcher;
19
20 ControlBlock(SelectionKey aKey, Dispatcher aDispatcher) {
21 _key = aKey;
22 _dispatcher = aDispatcher;
23 }
24
25 /**
26 * Processes the SelectionKey's events according to internal state machine
27 * and dispatches work to threads accordingly
28 */
29 public void process() throws IOException {
30 /*
31 Determine what flags are tripped
32
33 If write flag is set invoke a write using current list of
34 buffers. Scan buffers for now done and remove them from the
35 list. If there are no buffers, turn of the write interest flag
36 in selector
37
38 If read flag is set grab bytes and parse with state machine
39 which will then figure out whether we are ready to process
40 a request and dispatch it to the pool with a reference to us
41 so's we can post the response.
42 */
43 SocketChannel myChannel = (SocketChannel) _key.channel();
44
45 if (_key.isWritable()) {
46 synchronized(_outputBuffers) {
47
48 while (_outputBuffers.size() > 0) {
49 ByteBuffer myBuffer = (ByteBuffer) _outputBuffers.get(0);
50
51 myChannel.write(myBuffer);
52
53 if (myBuffer.hasRemaining()) {
54 break;
55 } else
56 _outputBuffers.remove(0);
57 }
58
59 if (_outputBuffers.size() == 0) {
60 _key.interestOps(_key.interestOps() ^ SelectionKey.OP_WRITE);
61 }
62 }
63 }
64
65 if (_key.isReadable()) {
66 _dispatcher.process(this);
67 }
68 }
69
70 public SocketChannel getChannel() {
71 return (SocketChannel) _key.channel();
72 }
73
74 public void send(ByteBuffer[] aBuffers) {
75 /*
76 Lock current list of buffers add this one to the list
77
78 If this is the first new set of buffers we need to set
79 write interest on our selection key and then do
80 _key.selector().wakeup() so we get events
81 */
82 boolean enableWrites;
83
84 synchronized(_outputBuffers) {
85 enableWrites = (_outputBuffers.size() == 0);
86
87 for (int i = 0; i < aBuffers.length; i++) {
88 _outputBuffers.add(aBuffers[i]);
89 }
90 }
91
92 if (enableWrites) {
93 if (_key.isValid()) {
94 _key.interestOps(_key.interestOps() | SelectionKey.OP_WRITE);
95 _key.selector().wakeup();
96 }
97 }
98 }
99 }