comparison src/com/go/trove/net/DistributedSocketFactory.java @ 0:3dc0c5604566

Initial checkin of blitz 2.0 fcs - no installer yet.
author Dan Creswell <dan.creswell@gmail.com>
date Sat, 21 Mar 2009 11:00:06 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:3dc0c5604566
1 /* ====================================================================
2 * Trove - Copyright (c) 1997-2000 Walt Disney Internet Group
3 * ====================================================================
4 * The Tea Software License, Version 1.1
5 *
6 * Copyright (c) 2000 Walt Disney Internet Group. All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
10 * are met:
11 *
12 * 1. Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 *
15 * 2. Redistributions in binary form must reproduce the above copyright
16 * notice, this list of conditions and the following disclaimer in
17 * the documentation and/or other materials provided with the
18 * distribution.
19 *
20 * 3. The end-user documentation included with the redistribution,
21 * if any, must include the following acknowledgment:
22 * "This product includes software developed by the
23 * Walt Disney Internet Group (http://opensource.go.com/)."
24 * Alternately, this acknowledgment may appear in the software itself,
25 * if and wherever such third-party acknowledgments normally appear.
26 *
27 * 4. The names "Tea", "TeaServlet", "Kettle", "Trove" and "BeanDoc" must
28 * not be used to endorse or promote products derived from this
29 * software without prior written permission. For written
30 * permission, please contact opensource@dig.com.
31 *
32 * 5. Products derived from this software may not be called "Tea",
33 * "TeaServlet", "Kettle" or "Trove", nor may "Tea", "TeaServlet",
34 * "Kettle", "Trove" or "BeanDoc" appear in their name, without prior
35 * written permission of the Walt Disney Internet Group.
36 *
37 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
38 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
39 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
40 * DISCLAIMED. IN NO EVENT SHALL THE WALT DISNEY INTERNET GROUP OR ITS
41 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
42 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
43 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
44 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
45 * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
46 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
47 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
48 * ====================================================================
49 *
50 * For more information about Tea, please see http://opensource.go.com/.
51 */
52
53 package com.go.trove.net;
54
55 import java.io.*;
56 import java.net.*;
57 import java.util.*;
58 import java.lang.ref.*;
59 import com.go.trove.util.IdentityMap;
60
61 /******************************************************************************
62 * A SocketFactory implementation for distributing load among several
63 * SocketFactories. If an exception occurs on a socket, its pool is put into
64 * the "dead" list. A special thread will run in the background, trying to
65 * resurrect the dead SocketSocket. As soon as its able to create sockets
66 * again, its added back into the "live" list.
67 * <p>
68 * Consider wrapping with a {@link LazySocketFactory} for automatic checking
69 * against socket factories that may be dead.
70 *
71 * @author Brian S O'Neill
72 * @version
73 * <!--$$Revision: 1.1 $-->, <!--$$JustDate:--> 01/01/22 <!-- $-->
74 */
75 public class DistributedSocketFactory implements SocketFactory {
76 private final long mTimeout;
77
78 private int mFactoryIndex;
79
80 // Contains only the live SocketFactories.
81 private List mFactories;
82
83 // Maps SocketPools to resurrector threads.
84 private Map mResurrectors;
85
86 // Maps CheckedSockets to the SocketPools that they came from.
87 private Map mSocketSources;
88
89 private CheckedSocket.ExceptionListener mListener;
90
91 /**
92 * @param timeout Maximum time to wait (in milliseconds) for new
93 * connections to be established before throwing an exception
94 */
95 public DistributedSocketFactory(long timeout) {
96 mTimeout = timeout;
97 mFactories = Collections.synchronizedList(new ArrayList());
98 mResurrectors = Collections.synchronizedMap(new HashMap());
99 mSocketSources = Collections.synchronizedMap(new IdentityMap());
100
101 mListener = new CheckedSocket.ExceptionListener() {
102 public void exceptionOccurred(CheckedSocket s, Exception e, int count) {
103 if (count == 1) {
104 deadFactory((SocketFactory)mSocketSources.get(s));
105 }
106 }
107 };
108 }
109
110 public void addSocketFactory(SocketFactory factory) {
111 mFactories.add(factory);
112 }
113
114 public void removeSocketFactory(SocketFactory factory) {
115 mFactories.remove(factory);
116 Thread t = (Thread)mResurrectors.remove(factory);
117 if (t != null) {
118 t.interrupt();
119 }
120 }
121
122 public InetAddressAndPort getInetAddressAndPort() {
123 try {
124 return getFactory(selectFactory(null)).getInetAddressAndPort();
125 }
126 catch (ConnectException e) {
127 return InetAddressAndPort.UNKNOWN;
128 }
129 }
130
131 public InetAddressAndPort getInetAddressAndPort(Object session) {
132 try {
133 return getFactory(selectFactory(session))
134 .getInetAddressAndPort(session);
135 }
136 catch (ConnectException e) {
137 return InetAddressAndPort.UNKNOWN;
138 }
139 }
140
141 public long getDefaultTimeout() {
142 return mTimeout;
143 }
144
145 public CheckedSocket createSocket()
146 throws ConnectException, SocketException
147 {
148 return createSocket(null, mTimeout);
149 }
150
151 public CheckedSocket createSocket(Object session)
152 throws ConnectException, SocketException
153 {
154 return createSocket(session, mTimeout);
155 }
156
157 public CheckedSocket createSocket(long timeout)
158 throws ConnectException, SocketException
159 {
160 return createSocket(null, timeout);
161 }
162
163 public CheckedSocket createSocket(Object session, long timeout)
164 throws ConnectException, SocketException
165 {
166 long startTime = timeout > 0 ? System.currentTimeMillis() : 0;
167 int index = selectFactory(session);
168 int count = mFactories.size();
169
170 for (int i=0; i<count; i++) {
171 SocketFactory factory = null;
172 try {
173 factory = getFactory(index++);
174 CheckedSocket socket = factory.createSocket(session, timeout);
175 socket.addExceptionListener(mListener);
176 mSocketSources.put(socket, factory);
177 return socket;
178 }
179 catch (SocketException e) {
180 deadFactory(factory);
181
182 if (timeout == 0) {
183 throw e;
184 }
185
186 if (timeout > 0) {
187 timeout -= (System.currentTimeMillis() - startTime);
188 if (timeout < 0) {
189 throw e;
190 }
191 }
192 }
193 }
194
195 throw new ConnectException("Unable to create socket");
196 }
197
198 public CheckedSocket getSocket() throws ConnectException, SocketException {
199 return getSocket(null, mTimeout);
200 }
201
202 public CheckedSocket getSocket(Object session)
203 throws ConnectException, SocketException
204 {
205 return getSocket(session, mTimeout);
206 }
207
208 public CheckedSocket getSocket(long timeout)
209 throws ConnectException, SocketException
210 {
211 return getSocket(null, timeout);
212 }
213
214 public CheckedSocket getSocket(Object session, long timeout)
215 throws ConnectException, SocketException
216 {
217 long startTime = timeout > 0 ? System.currentTimeMillis() : 0;
218 int index = selectFactory(session);
219 int count = mFactories.size();
220
221 for (int i=0; i<count; i++) {
222 SocketFactory factory = null;
223 try {
224 factory = getFactory(index++);
225 CheckedSocket socket = factory.getSocket(session, timeout);
226 socket.addExceptionListener(mListener);
227 mSocketSources.put(socket, factory);
228 return socket;
229 }
230 catch (SocketException e) {
231 deadFactory(factory);
232
233 if (timeout == 0) {
234 throw e;
235 }
236
237 if (timeout > 0) {
238 timeout -= (System.currentTimeMillis() - startTime);
239 if (timeout < 0) {
240 throw e;
241 }
242 }
243 }
244 }
245
246 throw new ConnectException("Unable to get socket");
247 }
248
249 public void recycleSocket(CheckedSocket socket)
250 throws SocketException, IllegalArgumentException
251 {
252 if (socket == null) {
253 return;
254 }
255
256 SocketFactory source = (SocketFactory)mSocketSources.remove(socket);
257
258 if (source == null) {
259 throw new IllegalArgumentException
260 ("Socket did not originate from this pool");
261 }
262
263 socket.removeExceptionListener(mListener);
264 source.recycleSocket(socket);
265 }
266
267 public void clear() {
268 synchronized (mFactories) {
269 for (int i = mFactories.size(); --i >= 0; ) {
270 ((SocketFactory)mFactories.get(i)).clear();
271 }
272 }
273 }
274
275 public int getAvailableCount() {
276 int count = 0;
277 synchronized (mFactories) {
278 for (int i = mFactories.size(); --i >= 0; ) {
279 count += ((SocketFactory)mFactories.get(i))
280 .getAvailableCount();
281 }
282 }
283 return count;
284 }
285
286 /**
287 * The provided index must be positive, but it can be out of the factory
288 * list bounds.
289 */
290 private SocketFactory getFactory(int index) throws ConnectException {
291 synchronized (mFactories) {
292 int size = mFactories.size();
293 if (size <= 0) {
294 throw new ConnectException("No SocketFactories available");
295 }
296 return (SocketFactory)mFactories.get(index % size);
297 }
298 }
299
300 /**
301 * Returns an index which is positive, but may be out of the factory list
302 * bounds.
303 */
304 private int selectFactory(Object session) throws ConnectException {
305 if (session != null) {
306 return session.hashCode() & 0x7fffffff;
307 }
308 else {
309 synchronized (mFactories) {
310 return mFactoryIndex++ & 0x7fffffff;
311 }
312 }
313 }
314
315 private void deadFactory(SocketFactory factory) {
316 if (factory == null) {
317 return;
318 }
319
320 synchronized (mFactories) {
321 // Only remove factory if its not the last one left.
322 if (mFactories.contains(factory) && mFactories.size() > 1) {
323 mFactories.remove(factory);
324
325 Resurrector r = new Resurrector(this, factory);
326 Thread t = new Thread(null, r, "Resurrector " +
327 factory.getInetAddressAndPort());
328 t.setDaemon(true);
329 t.start();
330 mResurrectors.put(factory, t);
331 }
332 }
333 }
334
335 private static class Resurrector implements Runnable {
336 // Weakly references owner so that this thread won't prevent it from
337 // being garbage collected.
338 private final Reference mOwner;
339 private final SocketFactory mFactory;
340
341 public Resurrector(DistributedSocketFactory owner,
342 SocketFactory factory) {
343 mOwner = new WeakReference(owner);
344 mFactory = factory;
345 }
346
347 public void run() {
348 DistributedSocketFactory owner = null;
349 try {
350 while (!Thread.interrupted()) {
351 owner = (DistributedSocketFactory)mOwner.get();
352 if (owner == null) {
353 break;
354 }
355
356 try {
357 mFactory.recycleSocket(mFactory.createSocket());
358 owner.mFactories.add(mFactory);
359 break;
360 }
361 catch (IOException e) {
362 }
363
364 owner = null;
365
366 // Wait at 5 seconds before trying again.
367 try {
368 Thread.sleep(5000);
369 }
370 catch (InterruptedException e) {
371 break;
372 }
373 }
374 }
375 finally {
376 owner = (DistributedSocketFactory)mOwner.get();
377 if (owner != null) {
378 owner.mResurrectors.remove(mFactory);
379 }
380 }
381 }
382 }
383 }