OSDN Git Service

WebSocket support 1t phase
authorISHIKAWA Mutsumi <ishikawa@hanzubon.jp>
Thu, 25 Nov 2010 13:32:22 +0000 (22:32 +0900)
committerISHIKAWA Mutsumi <ishikawa@hanzubon.jp>
Thu, 25 Nov 2010 13:32:22 +0000 (22:32 +0900)
data/public/webkit.js
data/templates/webkit/root_home.html
keitairc
lib/Keitairc/IrcBuffer.pm
lib/POE/Filter/HTTPD/Keitairc.pm [new file with mode: 0644]

index ebe9c93..f4c1407 100644 (file)
@@ -50,6 +50,7 @@ jQuery(document).ready(function($) {
 
     var map_load = true;
     var myScroll;
+    var ws = false;
 
     $(window).bind('orientationchange', function(){
        adjust_height();
@@ -72,6 +73,26 @@ jQuery(document).ready(function($) {
        scrollTo(0, 0);
        $('#loading').css('display', 'block').height($(document).height());
        $('body').append($('<div id="'+url.replace('/','_')+'"></div>').attr('animation', anim).load(web_root + session_id + '/' + url, param, reinit));
+
+        if (url.match(/all\/[0-9]+/) && !ws) {
+           ws = new WebSocket('ws://' + location.host + '/' + session_id + '/push');
+           ws.addEventListener("open",
+                               function () {
+                                   alert('open');
+                               },false);
+           ws.addEventListener("close",
+                               function () {
+                                   alert('close');
+                               },false);
+           ws.addEventListener("message",
+                               function (e) {
+                                   alert(e.data);
+                               },false);
+       } else {
+           ws.close();
+           ws = false;
+       }
+
        return false;
     }
 
index 24c3f28..9ea3072 100644 (file)
@@ -14,6 +14,7 @@
     var session_id = '<!-- tmpl_var sid -->';
   </script>
   <script src="<!-- tmpl_var root -->jquery-1.4.2.min.js" type="application/javascript" charset="utf-8"></script>
+  <script src="<!-- tmpl_var root -->jquery.websocket-0.0.1.js" type="application/javascript" charset="utf-8"></script>
   <script src="<!-- tmpl_var root -->iscroll/iscroll.js" type="application/javascript" charset="utf-8"></script>
   <script src="<!-- tmpl_var root -->webkit.js" type="application/javascript" charset="utf-8"></script>
   <script src="http://maps.google.com/maps/api/js?sensor=true&v=2" type="application/javascript" charset="utf-8"></script>
index fc7afd4..f6adbe1 100755 (executable)
--- a/keitairc
+++ b/keitairc
 
 use Encode;
 use POE;
-use POE::Filter::HTTPD;
+use POE::Filter::HTTPD::Keitairc;
 use POE::Component::IRC;
 use POE::Component::Server::TCP;
 use URI::Escape;
 use HTML::Template;
 use HTTP::Response;
 use HTTP::Status;
+use Digest::MD5 qw(md5);
 
 use FindBin;
 use lib ("$FindBin::Bin/lib", '/usr/share/keitairc/lib');
@@ -105,7 +106,7 @@ POE::Session->create(
 POE::Component::Server::TCP->new(
        Alias => 'keitairc',
        Port => $cf->web_listen_port(),
-       ClientFilter => 'POE::Filter::HTTPD',
+       ClientFilter => 'POE::Filter::HTTPD::Keitairc',
        ClientInput => \&http_request);
 
 # fire up main loop
@@ -123,9 +124,11 @@ sub http_request{
        if($request->isa('HTTP::Response')){
                $heap->{client}->put($request);
                $log->log_error($request->as_string());
-       }elsif(my $response = dispatch($request)){
-               $heap->{client}->put($response);
-               $log->log_access($heap->{'remote_ip'}, $request, $response);
+       }elsif(my $response = dispatch($request, $heap)){
+               if ($response ne '__STREAMING__') {
+                       $heap->{client}->put($response);
+                       $log->log_access($heap->{'remote_ip'}, $request, $response);
+               }
        }
 
        $kernel->yield('shutdown');
@@ -133,7 +136,7 @@ sub http_request{
 
 ################################################################
 sub dispatch{
-       my $request = shift;
+       my ($request, $heap) = @_;
        my $uri = $request->uri();
        my $ci = new Keitairc::ClientInfo($request);
 
@@ -161,6 +164,17 @@ sub dispatch{
                return action_login_imodeid($request);
        }
 
+       # FIXME:: ishikawa
+       # plugin にすべきだけど とりあえず
+       if($uri =~ m|^/(S[a-zA-Z]{10})/push$|) {
+               warn '__STREAMING__';
+               if (1 || $sm->verify({session_id => $1, user_agent => $ci->user_agent()})) {
+                       return action_streaming($request, $2, $heap);
+               } else {
+                       return action_401($request);
+               }
+       }
+
        for my $name ($pl->list_action_plugins()){
                if($uri =~ m|^/(S[a-zA-Z]{10})/$name/(.*)| ||
                   $uri =~ m|^/(S[a-zA-Z]{10})/$name$|){
@@ -404,6 +418,45 @@ sub action_root{
                        });
 }
 
+sub parse_websocket_key {
+       my ($key) = @_;
+
+       my $digits = join('', $key =~ m/\d/g);
+       my $spaces = scalar @{[$key =~ m/ /g]};
+
+       return $digits / $spaces;
+}
+
+sub action_streaming {
+       my $request = shift;
+       my $cid = shift;
+       my $heap = shift;
+       my $ci = new Keitairc::ClientInfo($request);
+       warn $request->dump;
+       #warn $request->content;
+       #warn $request->header('Sec-WebSocket-Key1');
+       #warn $request->header('Sec-WebSocket-Key2');
+
+       my $base_string = pack("NN", parse_websocket_key($request->header('Sec-WebSocket-Key1')), parse_websocket_key($request->header('Sec-WebSocket-Key2'))) . $request->content;
+       my $sig = md5($base_string);
+
+       my $response = HTTP::Response->new(101, 'WebSocket Protocol Handshake');
+       $response->push_header('Upgrade', 'WebSocket');
+       $response->push_header('Connection', 'Upgrade');
+       $response->push_header('Sec-WebSocket-Origin', 'http://' . $cf->web_host . ':' . $cf->web_port);
+       $response->push_header('Sec-WebSocket-Location', 'ws://' . $cf->web_host . ':' . $cf->web_port . $cf->web_root . $ci->{cookie}->{sid} . '/push');
+       $response->content($sig);
+       warn $response->as_string;
+       $heap->{client}->put($response);
+       $heap->{client}->flush();
+
+       $heap->{client}->set_output_filter(POE::Filter::Stream->new());
+       $ib->add_stream($heap->{client});
+
+       return '__STREAMING__';
+}
+
+
 ################################################################
 sub action_redirect_root{
        my $request = shift;
index 41594d6..98e1930 100644 (file)
@@ -5,6 +5,7 @@
 # This program is covered by the GNU General Public License 2
 
 package Keitairc::IrcBuffer;
+use JSON;
 use strict;
 use warnings;
 
@@ -195,6 +196,7 @@ sub buffer_ptr{
 # 引数の $msg の charset は perl internal
 sub add_message{
        my($me, $cid, $message, $who) = @_;
+       my $now = time;
 
        unless(defined $me->{tbuffer}->{$cid}){
                $me->{tbuffer}->{$cid} = [];
@@ -209,7 +211,7 @@ sub add_message{
                $me->{rbuffer}->{$cid} = [];
        }
 
-       push @{$me->{tbuffer}->{$cid}}, time;
+       push @{$me->{tbuffer}->{$cid}}, $now;
        push @{$me->{nbuffer}->{$cid}}, $who;
        push @{$me->{mbuffer}->{$cid}}, $message;
        push @{$me->{rbuffer}->{$cid}}, 0;
@@ -230,8 +232,10 @@ sub add_message{
        if($me->{cid2name}->{$cid} eq '*console*') {
                $me->{mtime}->{$cid} = -1;
        } else {
-               $me->{mtime}->{$cid} = time;
+               $me->{mtime}->{$cid} = $now;
        }
+
+       $me->update_stream($cid, $now, $message, $who);
 }
 
 ################################################################
@@ -337,4 +341,55 @@ sub update_timestamp{
        return 0;
 }
 
+################################################################
+sub add_stream {
+       my($me, $client) = @_;
+       warn 'add stream ' . $client->ID();
+       $me->{stream}->{$client->ID()} = {'client' => $client};
+}
+
+sub remove_stream {
+       my($me) = @_;
+       warn 'remove stream';
+       delete $me->{stream};
+}
+
+sub update_stream {
+       my($me, $cid, $time, $message, $who) = @_;
+       $message = $me->colorize($me->simple_escape($message));
+       for my $name ($::pl->list_replace_plugins()) {
+               last if $message =~ s/$::pl->{plugins}->{$name}->{message_replace_regexp}/$::pl->{plugins}->{$name}->{message_replace_imprementation}(undef, undef, $1, $2, $3, $4, $5, $6, $7, $8, $9)/eg;
+       }
+       $message =~ s/\s+$//;
+       $message =~ s/\s+/ /g;
+
+       my $value = encode_json({cid => $cid,
+                                time => $time,
+                                message => $message,
+                                who => $who});
+       $me->send_stream($value);
+}
+
+sub send_stream {
+       my($me, $value) = @_;
+
+       my $stream = $me->{stream};
+       return if(keys %$stream < 1);
+
+       warn 'send stream value = ' . $value;
+
+       foreach my $k (sort keys %$stream) {
+               my $st = $me->{stream}->{$k};
+               my $client = $st->{client};
+               warn $client;
+               warn $value;
+               if ($value) {
+                       $client->put("\x00");
+                       $client->put($value);
+                       $client->put("\xFF");
+               }
+               $client->flush();
+       }
+}
+
 1;
diff --git a/lib/POE/Filter/HTTPD/Keitairc.pm b/lib/POE/Filter/HTTPD/Keitairc.pm
new file mode 100644 (file)
index 0000000..449a06d
--- /dev/null
@@ -0,0 +1,51 @@
+# -*- mode: perl; coding: utf-8 -*-
+# POE::Filter::HTTPD::Keitairc
+#
+# Copyright (c) 2010 ISHIKAWA Mutsumi <ishikawa@hanzubon.jp>
+# This program is covered by the GNU General Public License 2
+
+package POE::Filter::HTTPD::Keitairc;
+use POE::Filter::HTTPD;
+use HTTP::Response;
+
+use strict;
+use warnings;
+
+use base qw(POE::Filter::HTTPD);
+
+sub STRM() { 6 } # raw data buffer to build requests
+
+sub new {
+       my $class = shift;
+       my $me = POE::Filter::HTTPD->new;
+       $me->[STRM] = '';
+
+       return bless $me, $class;
+}
+
+sub get_one_start {
+       my ($me, $stream) = @_;
+       $me->SUPER::get_one_start($stream);
+       $me->[STRM] .= join('', @$stream);
+}
+
+sub get_one {
+       my ($me) = @_;
+
+       my $req = $me->SUPER::get_one();
+
+       if(defined $req->[0]
+          && $req->[0]->header('Upgrade')
+          && $req->[0]->header('Upgrade') eq 'WebSocket'
+          && $req->[0]->header('Connection')
+          && $req->[0]->header('Connection') eq 'Upgrade') {
+               warn $me->[STRM];
+               if ($me->[STRM] =~ /\r\n\r\n(.{8})$/) {
+                       warn $1;
+                       $req->[0]->content($1);
+               }
+       }
+       return $req;
+}
+
+1;