comparison src/org/dancres/blitz/remote/transport/ServerSessionHandler.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:3dc0c5604566
1 package org.dancres.blitz.remote.transport;
2
3 import java.rmi.RemoteException;
4
5 import org.dancres.blitz.SpaceImpl;
6 import org.dancres.blitz.remote.transport.task.ReadTask;
7 import org.dancres.blitz.remote.transport.task.TakeTask;
8 import org.dancres.blitz.remote.transport.task.WriteTask;
9 import edu.emory.mathcs.backport.java.util.concurrent.Executor;
10 import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
11 import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
12 import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
13 import org.apache.mina.common.IdleStatus;
14 import org.apache.mina.common.IoHandlerAdapter;
15 import org.apache.mina.common.IoSession;
16 import org.apache.mina.util.SessionLog;
17
18 /**
19 */
20 public class ServerSessionHandler extends IoHandlerAdapter {
21 private static final boolean STRAIGHT_THROUGH = true;
22
23 private Executor _executor =
24 new ThreadPoolExecutor(1, 16, 60, TimeUnit.SECONDS,
25 new LinkedBlockingQueue());
26
27 private SpaceImpl _space;
28
29 ServerSessionHandler(SpaceImpl aSpace) {
30 _space = aSpace;
31 }
32
33 public void sessionOpened(IoSession session) {
34 // set idle time to 60 seconds
35 session.setIdleTime(IdleStatus.BOTH_IDLE, 60);
36 }
37
38 public void messageReceived(IoSession session, Object message) {
39 Message myMessage = (Message) message;
40
41 int myConversationId = myMessage.getConversationId();
42
43 // System.err.println("Received request with: " + myConversationId);
44
45 try {
46 Object myRequest = MarshallUtil.unmarshall(myMessage);
47
48 // System.err.println("Request type: " + myRequest.getClass());
49
50 if (STRAIGHT_THROUGH) {
51 if (myRequest instanceof Write) {
52 new WriteTask(myConversationId, _space,
53 (Write) myRequest, session).run();
54 } else if (myRequest instanceof Take) {
55 new TakeTask(myConversationId, _space,
56 (Take) myRequest, session).run();
57 } else if (myRequest instanceof Read) {
58 new ReadTask(myConversationId, _space,
59 (Read) myRequest, session).run();
60 } else if (myRequest instanceof Ping) {
61 // System.err.println("ping");
62 }
63 } else {
64 if (myRequest instanceof Write) {
65 _executor.execute(
66 new WriteTask(myConversationId, _space,
67 (Write) myRequest, session));
68 } else if (myRequest instanceof Take) {
69 _executor.execute(
70 new TakeTask(myConversationId, _space,
71 (Take) myRequest, session));
72 } else if (myRequest instanceof Read) {
73 _executor.execute(
74 new ReadTask(myConversationId, _space,
75 (Read) myRequest, session));
76 } else if (myRequest instanceof Ping) {
77 // System.err.println("ping");
78 }
79 }
80
81 } catch (RemoteException anRE) {
82 try {
83 Message myResponse =
84 MarshallUtil.marshall(
85 new RemoteException("Server problem", anRE),
86 myConversationId);
87 session.write(myResponse);
88 } catch (RemoteException anREE) {
89 System.err.println("Couldn't post error response to client");
90 anREE.printStackTrace(System.err);
91 }
92 }
93 }
94
95 public void sessionIdle(IoSession session, IdleStatus status) {
96 SessionLog.info(session, "Disconnecting the idle.");
97 // disconnect an idle client
98 session.close();
99 }
100
101 public void exceptionCaught(IoSession session, Throwable cause) {
102 // close the connection on exceptional situation
103 session.close();
104 }
105 }