diff src/org/dancres/blitz/remote/transport/ServerSessionHandler.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/org/dancres/blitz/remote/transport/ServerSessionHandler.java	Sat Mar 21 11:00:06 2009 +0000
@@ -0,0 +1,105 @@
+package org.dancres.blitz.remote.transport;
+
+import java.rmi.RemoteException;
+
+import org.dancres.blitz.SpaceImpl;
+import org.dancres.blitz.remote.transport.task.ReadTask;
+import org.dancres.blitz.remote.transport.task.TakeTask;
+import org.dancres.blitz.remote.transport.task.WriteTask;
+import edu.emory.mathcs.backport.java.util.concurrent.Executor;
+import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
+import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.util.SessionLog;
+
+/**
+ */
+public class ServerSessionHandler extends IoHandlerAdapter {
+    private static final boolean STRAIGHT_THROUGH = true;
+
+    private Executor _executor =
+        new ThreadPoolExecutor(1, 16, 60, TimeUnit.SECONDS,
+            new LinkedBlockingQueue());
+
+    private SpaceImpl _space;
+
+    ServerSessionHandler(SpaceImpl aSpace) {
+        _space = aSpace;
+    }
+
+    public void sessionOpened(IoSession session) {
+        // set idle time to 60 seconds
+        session.setIdleTime(IdleStatus.BOTH_IDLE, 60);
+    }
+
+    public void messageReceived(IoSession session, Object message) {
+        Message myMessage = (Message) message;
+
+        int myConversationId = myMessage.getConversationId();
+
+        // System.err.println("Received request with: " + myConversationId);
+        
+        try {
+            Object myRequest = MarshallUtil.unmarshall(myMessage);
+
+            // System.err.println("Request type: " + myRequest.getClass());
+
+            if (STRAIGHT_THROUGH) {
+                if (myRequest instanceof Write) {
+                    new WriteTask(myConversationId, _space,
+                        (Write) myRequest, session).run();
+                } else if (myRequest instanceof Take) {
+                    new TakeTask(myConversationId, _space,
+                        (Take) myRequest, session).run();
+                } else if (myRequest instanceof Read) {
+                    new ReadTask(myConversationId, _space,
+                        (Read) myRequest, session).run();
+                } else if (myRequest instanceof Ping) {
+                    // System.err.println("ping");
+                }
+            } else {
+                if (myRequest instanceof Write) {
+                    _executor.execute(
+                        new WriteTask(myConversationId, _space,
+                            (Write) myRequest, session));
+                } else if (myRequest instanceof Take) {
+                    _executor.execute(
+                        new TakeTask(myConversationId, _space,
+                            (Take) myRequest, session));
+                } else if (myRequest instanceof Read) {
+                    _executor.execute(
+                        new ReadTask(myConversationId, _space,
+                            (Read) myRequest, session));
+                } else if (myRequest instanceof Ping) {
+                    // System.err.println("ping");
+                }
+            }
+
+        } catch (RemoteException anRE) {
+            try {
+                Message myResponse =
+                    MarshallUtil.marshall(
+                        new RemoteException("Server problem", anRE),
+                        myConversationId);
+                session.write(myResponse);
+            } catch (RemoteException anREE) {
+                System.err.println("Couldn't post error response to client");
+                anREE.printStackTrace(System.err);
+            }
+        }
+    }
+
+    public void sessionIdle(IoSession session, IdleStatus status) {
+        SessionLog.info(session, "Disconnecting the idle.");
+        // disconnect an idle client
+        session.close();
+    }
+
+    public void exceptionCaught(IoSession session, Throwable cause) {
+        // close the connection on exceptional situation
+        session.close();
+    }
+}