Mercurial > hg > blitz_stable
comparison src/org/dancres/blitz/remote/nio/EventProcessor.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:3dc0c5604566 |
---|---|
1 package org.dancres.blitz.remote.nio; | |
2 | |
3 import java.nio.channels.Selector; | |
4 import java.nio.channels.SelectionKey; | |
5 import java.nio.channels.SocketChannel; | |
6 import java.nio.channels.SelectableChannel; | |
7 import java.io.IOException; | |
8 import java.util.Set; | |
9 import java.util.Iterator; | |
10 import java.util.List; | |
11 import java.util.ArrayList; | |
12 import java.net.Socket; | |
13 | |
14 /** | |
15 * Server-side skeleton which manages a Selector across a collection of socket | |
16 * connections supplied by some external entity. Events are actually | |
17 * handled by Dispatcher instances obtained from DispatcherFactory instances. | |
18 * | |
19 */ | |
20 public class EventProcessor extends Thread { | |
21 private List _socketsToAdd; | |
22 private Selector _selector; | |
23 private DispatcherFactory _dispatcherFactory; | |
24 | |
25 EventProcessor(Selector aSelector, DispatcherFactory aFactory) { | |
26 _selector = aSelector; | |
27 _socketsToAdd = new ArrayList(); | |
28 _dispatcherFactory = aFactory; | |
29 } | |
30 | |
31 void add(SocketChannel aSocket) { | |
32 try { | |
33 // System.err.println("Posting socket to add"); | |
34 | |
35 aSocket.configureBlocking(false); | |
36 aSocket.socket().setTcpNoDelay(true); | |
37 | |
38 synchronized(_socketsToAdd) { | |
39 _socketsToAdd.add(aSocket); | |
40 } | |
41 | |
42 _selector.wakeup(); | |
43 } catch (Exception anE) { | |
44 System.err.println("Failed to add"); | |
45 anE.printStackTrace(System.err); | |
46 } | |
47 } | |
48 | |
49 public void run() { | |
50 while (true) { | |
51 try { | |
52 _selector.select(); | |
53 | |
54 // See if we have some more sockets to add | |
55 synchronized(_socketsToAdd) { | |
56 if (_socketsToAdd.size() > 0) { | |
57 // System.err.println("Got sockets to add"); | |
58 | |
59 _selector.selectNow(); | |
60 | |
61 Iterator myNewChannels = _socketsToAdd.iterator(); | |
62 while(myNewChannels.hasNext()) { | |
63 SelectableChannel myChannel = | |
64 (SelectableChannel) myNewChannels.next(); | |
65 SelectionKey myKey = | |
66 myChannel.register(_selector, | |
67 SelectionKey.OP_READ); | |
68 myKey.attach(new ControlBlock(myKey, | |
69 _dispatcherFactory.newDispatcher())); | |
70 } | |
71 | |
72 _socketsToAdd.clear(); | |
73 } | |
74 } | |
75 | |
76 Iterator myKeys = _selector.selectedKeys().iterator(); | |
77 | |
78 while (myKeys.hasNext()) { | |
79 SelectionKey myKey = (SelectionKey) myKeys.next(); | |
80 | |
81 if (myKey.isValid()) { | |
82 ControlBlock myBlock = | |
83 (ControlBlock) myKey.attachment(); | |
84 | |
85 try { | |
86 myBlock.process(); | |
87 } catch (IOException anIOE) { | |
88 // System.err.println("Channel is dead - dumping it"); | |
89 myKey.cancel(); | |
90 ((SocketChannel) myKey.channel()).close(); | |
91 } | |
92 | |
93 } else { | |
94 // System.err.println("Channel is dead - dumping it"); | |
95 myKey.cancel(); | |
96 ((SocketChannel) myKey.channel()).close(); | |
97 } | |
98 | |
99 myKeys.remove(); | |
100 } | |
101 } catch (IOException anIOE) { | |
102 System.err.println("EventProcessor error'd"); | |
103 anIOE.printStackTrace(System.err); | |
104 } | |
105 } | |
106 } | |
107 } |