comparison src/org/dancres/blitz/remote/nio/EventProcessor.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:3dc0c5604566
1 package org.dancres.blitz.remote.nio;
2
3 import java.nio.channels.Selector;
4 import java.nio.channels.SelectionKey;
5 import java.nio.channels.SocketChannel;
6 import java.nio.channels.SelectableChannel;
7 import java.io.IOException;
8 import java.util.Set;
9 import java.util.Iterator;
10 import java.util.List;
11 import java.util.ArrayList;
12 import java.net.Socket;
13
14 /**
15 * Server-side skeleton which manages a Selector across a collection of socket
16 * connections supplied by some external entity. Events are actually
17 * handled by Dispatcher instances obtained from DispatcherFactory instances.
18 *
19 */
20 public class EventProcessor extends Thread {
21 private List _socketsToAdd;
22 private Selector _selector;
23 private DispatcherFactory _dispatcherFactory;
24
25 EventProcessor(Selector aSelector, DispatcherFactory aFactory) {
26 _selector = aSelector;
27 _socketsToAdd = new ArrayList();
28 _dispatcherFactory = aFactory;
29 }
30
31 void add(SocketChannel aSocket) {
32 try {
33 // System.err.println("Posting socket to add");
34
35 aSocket.configureBlocking(false);
36 aSocket.socket().setTcpNoDelay(true);
37
38 synchronized(_socketsToAdd) {
39 _socketsToAdd.add(aSocket);
40 }
41
42 _selector.wakeup();
43 } catch (Exception anE) {
44 System.err.println("Failed to add");
45 anE.printStackTrace(System.err);
46 }
47 }
48
49 public void run() {
50 while (true) {
51 try {
52 _selector.select();
53
54 // See if we have some more sockets to add
55 synchronized(_socketsToAdd) {
56 if (_socketsToAdd.size() > 0) {
57 // System.err.println("Got sockets to add");
58
59 _selector.selectNow();
60
61 Iterator myNewChannels = _socketsToAdd.iterator();
62 while(myNewChannels.hasNext()) {
63 SelectableChannel myChannel =
64 (SelectableChannel) myNewChannels.next();
65 SelectionKey myKey =
66 myChannel.register(_selector,
67 SelectionKey.OP_READ);
68 myKey.attach(new ControlBlock(myKey,
69 _dispatcherFactory.newDispatcher()));
70 }
71
72 _socketsToAdd.clear();
73 }
74 }
75
76 Iterator myKeys = _selector.selectedKeys().iterator();
77
78 while (myKeys.hasNext()) {
79 SelectionKey myKey = (SelectionKey) myKeys.next();
80
81 if (myKey.isValid()) {
82 ControlBlock myBlock =
83 (ControlBlock) myKey.attachment();
84
85 try {
86 myBlock.process();
87 } catch (IOException anIOE) {
88 // System.err.println("Channel is dead - dumping it");
89 myKey.cancel();
90 ((SocketChannel) myKey.channel()).close();
91 }
92
93 } else {
94 // System.err.println("Channel is dead - dumping it");
95 myKey.cancel();
96 ((SocketChannel) myKey.channel()).close();
97 }
98
99 myKeys.remove();
100 }
101 } catch (IOException anIOE) {
102 System.err.println("EventProcessor error'd");
103 anIOE.printStackTrace(System.err);
104 }
105 }
106 }
107 }