OSDN Git Service

Replaced mongrel with thin
[redminele/redminele.git] / ruby / lib / ruby / gems / 1.8 / gems / eventmachine-0.12.10-x86-mswin32-60 / java / src / com / rubyeventmachine / EventableSocketChannel.java
diff --git a/ruby/lib/ruby/gems/1.8/gems/eventmachine-0.12.10-x86-mswin32-60/java/src/com/rubyeventmachine/EventableSocketChannel.java b/ruby/lib/ruby/gems/1.8/gems/eventmachine-0.12.10-x86-mswin32-60/java/src/com/rubyeventmachine/EventableSocketChannel.java
new file mode 100644 (file)
index 0000000..18f7504
--- /dev/null
@@ -0,0 +1,364 @@
+/**
+ * $Id$
+ * 
+ * Author:: Francis Cianfrocca (gmail: blackhedd)
+ * Homepage::  http://rubyeventmachine.com
+ * Date:: 15 Jul 2007
+ * 
+ * See EventMachine and EventMachine::Connection for documentation and
+ * usage examples.
+ * 
+ *
+ *----------------------------------------------------------------------------
+ *
+ * Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved.
+ * Gmail: blackhedd
+ * 
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of either: 1) the GNU General Public License
+ * as published by the Free Software Foundation; either version 2 of the
+ * License, or (at your option) any later version; or 2) Ruby's License.
+ * 
+ * See the file COPYING for complete licensing information.
+ *
+ *---------------------------------------------------------------------------
+ *
+ * 
+ */
+
+/**
+ * 
+ */
+package com.rubyeventmachine;
+
+/**
+ * @author francis
+ *
+ */
+
+import java.nio.channels.*;
+import java.nio.*;
+import java.util.*;
+import java.io.*;
+import java.net.Socket;
+import javax.net.ssl.*;
+import javax.net.ssl.SSLEngineResult.*;
+import java.lang.reflect.Field;
+
+import java.security.*;
+
+public class EventableSocketChannel implements EventableChannel {
+       Selector selector;
+       SelectionKey channelKey;
+       SocketChannel channel;
+
+       long binding;
+       LinkedList<ByteBuffer> outboundQ;
+
+       boolean bCloseScheduled;
+       boolean bConnectPending;
+       boolean bWatchOnly;
+       boolean bAttached;
+       boolean bNotifyReadable;
+       boolean bNotifyWritable;
+       
+       SSLEngine sslEngine;
+       SSLContext sslContext;
+
+       public EventableSocketChannel (SocketChannel sc, long _binding, Selector sel) {
+               channel = sc;
+               binding = _binding;
+               selector = sel;
+               bCloseScheduled = false;
+               bConnectPending = false;
+               bWatchOnly = false;
+               bAttached = false;
+               bNotifyReadable = false;
+               bNotifyWritable = false;
+               outboundQ = new LinkedList<ByteBuffer>();
+       }
+       
+       public long getBinding() {
+               return binding;
+       }
+
+       public SocketChannel getChannel() {
+               return channel;
+       }
+
+       public void register() throws ClosedChannelException {
+               if (channelKey == null) {
+                       int events = currentEvents();
+                       channelKey = channel.register(selector, events, this);
+               }
+       }
+
+       /**
+        * Terminate with extreme prejudice. Don't assume there will be another pass through
+        * the reactor core.
+        */
+       public void close() {
+               if (channelKey != null) {
+                       channelKey.cancel();
+                       channelKey = null;
+               }
+
+               if (bAttached) {
+                       // attached channels are copies, so reset the file descriptor to prevent java from close()ing it
+                       Field f;
+                       FileDescriptor fd;
+
+                       try {
+                               /* do _NOT_ clobber fdVal here, it will break epoll/kqueue on jdk6!
+                                * channelKey.cancel() above does not occur until the next call to select
+                                * and if fdVal is gone, we will continue to get events for this fd.
+                                *
+                                * instead, remove fdVal in cleanup(), which is processed via DetachedConnections,
+                                * after UnboundConnections but before NewConnections.
+                                */
+
+                               f = channel.getClass().getDeclaredField("fd");
+                               f.setAccessible(true);
+                               fd = (FileDescriptor) f.get(channel);
+
+                               f = fd.getClass().getDeclaredField("fd");
+                               f.setAccessible(true);
+                               f.set(fd, -1);
+                       } catch (java.lang.NoSuchFieldException e) {
+                               e.printStackTrace();
+                       } catch (java.lang.IllegalAccessException e) {
+                               e.printStackTrace();
+                       }
+
+                       return;
+               }
+
+               try {
+                       channel.close();
+               } catch (IOException e) {
+               }
+       }
+
+       public void cleanup() {
+               if (bAttached) {
+                       Field f;
+                       try {
+                               f = channel.getClass().getDeclaredField("fdVal");
+                               f.setAccessible(true);
+                               f.set(channel, -1);
+                       } catch (java.lang.NoSuchFieldException e) {
+                               e.printStackTrace();
+                       } catch (java.lang.IllegalAccessException e) {
+                               e.printStackTrace();
+                       }
+               }
+
+               channel = null;
+       }
+       
+       public void scheduleOutboundData (ByteBuffer bb) {
+               if (!bCloseScheduled && bb.remaining() > 0) {
+                       if (sslEngine != null) {
+                               try {
+                                       ByteBuffer b = ByteBuffer.allocate(32*1024); // TODO, preallocate this buffer.
+                                       sslEngine.wrap(bb, b);
+                                       b.flip();
+                                       outboundQ.addLast(b);
+                               } catch (SSLException e) {
+                                       throw new RuntimeException ("ssl error");
+                               }
+                       }
+                       else {
+                               outboundQ.addLast(bb);
+                       }
+
+                       updateEvents();
+               }
+       }
+       
+       public void scheduleOutboundDatagram (ByteBuffer bb, String recipAddress, int recipPort) {
+               throw new RuntimeException ("datagram sends not supported on this channel");
+       }
+       
+       /**
+        * Called by the reactor when we have selected readable.
+        */
+       public void readInboundData (ByteBuffer bb) throws IOException {
+               if (channel.read(bb) == -1)
+                       throw new IOException ("eof");
+       }
+
+       /**
+        * Called by the reactor when we have selected writable.
+        * Return false to indicate an error that should cause the connection to close.
+        * TODO, VERY IMPORTANT: we're here because we selected writable, but it's always
+        * possible to become unwritable between the poll and when we get here. The way
+        * this code is written, we're depending on a nonblocking write NOT TO CONSUME
+        * the whole outbound buffer in this case, rather than firing an exception.
+        * We should somehow verify that this is indeed Java's defined behavior.
+        * Also TODO, see if we can use gather I/O rather than one write at a time.
+        * Ought to be a big performance enhancer.
+        * @return
+        */
+       public boolean writeOutboundData() throws IOException {
+               while (!outboundQ.isEmpty()) {
+                       ByteBuffer b = outboundQ.getFirst();
+                       if (b.remaining() > 0)
+                               channel.write(b);
+
+                       // Did we consume the whole outbound buffer? If yes,
+                       // pop it off and keep looping. If no, the outbound network
+                       // buffers are full, so break out of here.
+                       if (b.remaining() == 0)
+                               outboundQ.removeFirst();
+                       else
+                               break;
+               }
+
+               if (outboundQ.isEmpty() && !bCloseScheduled) {
+                       updateEvents();
+               }
+
+               // ALWAYS drain the outbound queue before triggering a connection close.
+               // If anyone wants to close immediately, they're responsible for clearing
+               // the outbound queue.
+               return (bCloseScheduled && outboundQ.isEmpty()) ? false : true;
+       }
+       
+       public void setConnectPending() {
+               bConnectPending = true;
+               updateEvents();
+       }
+       
+       /**
+        * Called by the reactor when we have selected connectable.
+        * Return false to indicate an error that should cause the connection to close.
+        */
+       public boolean finishConnecting() throws IOException {
+               channel.finishConnect();
+
+               bConnectPending = false;
+               updateEvents();
+               return true;
+       }
+       
+       public boolean scheduleClose (boolean afterWriting) {
+               // TODO: What the hell happens here if bConnectPending is set?
+               if (!afterWriting)
+                       outboundQ.clear();
+
+               if (outboundQ.isEmpty())
+                       return true;
+               else {
+                       updateEvents();
+                       bCloseScheduled = true;
+                       return false;
+               }
+       }
+
+       public void startTls() {
+               if (sslEngine == null) {
+                       try {
+                               sslContext = SSLContext.getInstance("TLS");
+                               sslContext.init(null, null, null); // TODO, fill in the parameters.
+                               sslEngine = sslContext.createSSLEngine(); // TODO, should use the parameterized version, to get Kerb stuff and session re-use.
+                               sslEngine.setUseClientMode(false);
+                       } catch (NoSuchAlgorithmException e) {
+                               throw new RuntimeException ("unable to start TLS"); // TODO, get rid of this.                           
+                       } catch (KeyManagementException e) {
+                               throw new RuntimeException ("unable to start TLS"); // TODO, get rid of this.                           
+                       }
+               }
+               System.out.println ("Starting TLS");
+       }
+       
+       public ByteBuffer dispatchInboundData (ByteBuffer bb) throws SSLException {
+               if (sslEngine != null) {
+                       if (true) throw new RuntimeException ("TLS currently unimplemented");
+                       System.setProperty("javax.net.debug", "all");
+                       ByteBuffer w = ByteBuffer.allocate(32*1024); // TODO, WRONG, preallocate this buffer.
+                       SSLEngineResult res = sslEngine.unwrap(bb, w);
+                       if (res.getHandshakeStatus() == HandshakeStatus.NEED_TASK) {
+                               Runnable r;
+                               while ((r = sslEngine.getDelegatedTask()) != null) {
+                                       r.run();
+                               }
+                       }
+                       System.out.println (bb);
+                       w.flip();
+                       return w;
+               }
+               else
+                       return bb;
+       }
+
+       public void setCommInactivityTimeout (long seconds) {
+               // TODO
+               System.out.println ("SOCKET: SET COMM INACTIVITY UNIMPLEMENTED " + seconds);
+       }
+
+       public Object[] getPeerName () {
+               Socket sock = channel.socket();
+               return new Object[]{ sock.getPort(), sock.getInetAddress().getHostAddress() };
+       }
+
+       public void setWatchOnly() {
+               bWatchOnly = true;
+               updateEvents();
+       }
+       public boolean isWatchOnly() { return bWatchOnly; }
+
+       public void setAttached() {
+               bAttached = true;
+       }
+       public boolean isAttached() { return bAttached; }
+
+       public void setNotifyReadable (boolean mode) {
+               bNotifyReadable = mode;
+               updateEvents();
+       }
+       public boolean isNotifyReadable() { return bNotifyReadable; }
+
+       public void setNotifyWritable (boolean mode) {
+               bNotifyWritable = mode;
+               updateEvents();
+       }
+       public boolean isNotifyWritable() { return bNotifyWritable; }
+
+       private void updateEvents() {
+               if (channelKey == null)
+                       return;
+
+               int events = currentEvents();
+
+               if (channelKey.interestOps() != events) {
+                       channelKey.interestOps(events);
+               }
+       }
+
+       private int currentEvents() {
+               int events = 0;
+
+               if (bWatchOnly)
+               {
+                       if (bNotifyReadable)
+                               events |= SelectionKey.OP_READ;
+
+                       if (bNotifyWritable)
+                               events |= SelectionKey.OP_WRITE;
+               }
+               else
+               {
+                       if (bConnectPending)
+                               events |= SelectionKey.OP_CONNECT;
+                       else {
+                               events |= SelectionKey.OP_READ;
+
+                               if (!outboundQ.isEmpty())
+                                       events |= SelectionKey.OP_WRITE;
+                       }
+               }
+
+               return events;
+       }
+}