Mercurial > hg > blitz_stable
diff src/org/dancres/blitz/remote/nio/EventProcessor.java @ 0:3dc0c5604566
Initial checkin of blitz 2.0 fcs - no installer yet.
author | Dan Creswell <dan.creswell@gmail.com> |
---|---|
date | Sat, 21 Mar 2009 11:00:06 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/org/dancres/blitz/remote/nio/EventProcessor.java Sat Mar 21 11:00:06 2009 +0000 @@ -0,0 +1,107 @@ +package org.dancres.blitz.remote.nio; + +import java.nio.channels.Selector; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.nio.channels.SelectableChannel; +import java.io.IOException; +import java.util.Set; +import java.util.Iterator; +import java.util.List; +import java.util.ArrayList; +import java.net.Socket; + +/** + * Server-side skeleton which manages a Selector across a collection of socket + * connections supplied by some external entity. Events are actually + * handled by Dispatcher instances obtained from DispatcherFactory instances. + * + */ +public class EventProcessor extends Thread { + private List _socketsToAdd; + private Selector _selector; + private DispatcherFactory _dispatcherFactory; + + EventProcessor(Selector aSelector, DispatcherFactory aFactory) { + _selector = aSelector; + _socketsToAdd = new ArrayList(); + _dispatcherFactory = aFactory; + } + + void add(SocketChannel aSocket) { + try { + // System.err.println("Posting socket to add"); + + aSocket.configureBlocking(false); + aSocket.socket().setTcpNoDelay(true); + + synchronized(_socketsToAdd) { + _socketsToAdd.add(aSocket); + } + + _selector.wakeup(); + } catch (Exception anE) { + System.err.println("Failed to add"); + anE.printStackTrace(System.err); + } + } + + public void run() { + while (true) { + try { + _selector.select(); + + // See if we have some more sockets to add + synchronized(_socketsToAdd) { + if (_socketsToAdd.size() > 0) { + // System.err.println("Got sockets to add"); + + _selector.selectNow(); + + Iterator myNewChannels = _socketsToAdd.iterator(); + while(myNewChannels.hasNext()) { + SelectableChannel myChannel = + (SelectableChannel) myNewChannels.next(); + SelectionKey myKey = + myChannel.register(_selector, + SelectionKey.OP_READ); + myKey.attach(new ControlBlock(myKey, + _dispatcherFactory.newDispatcher())); + } + + _socketsToAdd.clear(); + } + } + + Iterator myKeys = _selector.selectedKeys().iterator(); + + while (myKeys.hasNext()) { + SelectionKey myKey = (SelectionKey) myKeys.next(); + + if (myKey.isValid()) { + ControlBlock myBlock = + (ControlBlock) myKey.attachment(); + + try { + myBlock.process(); + } catch (IOException anIOE) { + // System.err.println("Channel is dead - dumping it"); + myKey.cancel(); + ((SocketChannel) myKey.channel()).close(); + } + + } else { + // System.err.println("Channel is dead - dumping it"); + myKey.cancel(); + ((SocketChannel) myKey.channel()).close(); + } + + myKeys.remove(); + } + } catch (IOException anIOE) { + System.err.println("EventProcessor error'd"); + anIOE.printStackTrace(System.err); + } + } + } +}