From: ISHIKAWA Mutsumi Date: Thu, 25 Nov 2010 13:32:22 +0000 (+0900) Subject: WebSocket support 1t phase X-Git-Url: http://git.osdn.net/view?p=keitairc%2Fkeitairc.git;a=commitdiff_plain;h=f11accb5cc0f02fba786aecaf40fd7fc9030486f WebSocket support 1t phase --- diff --git a/data/public/webkit.js b/data/public/webkit.js index ebe9c93..f4c1407 100644 --- a/data/public/webkit.js +++ b/data/public/webkit.js @@ -50,6 +50,7 @@ jQuery(document).ready(function($) { var map_load = true; var myScroll; + var ws = false; $(window).bind('orientationchange', function(){ adjust_height(); @@ -72,6 +73,26 @@ jQuery(document).ready(function($) { scrollTo(0, 0); $('#loading').css('display', 'block').height($(document).height()); $('body').append($('
').attr('animation', anim).load(web_root + session_id + '/' + url, param, reinit)); + + if (url.match(/all\/[0-9]+/) && !ws) { + ws = new WebSocket('ws://' + location.host + '/' + session_id + '/push'); + ws.addEventListener("open", + function () { + alert('open'); + },false); + ws.addEventListener("close", + function () { + alert('close'); + },false); + ws.addEventListener("message", + function (e) { + alert(e.data); + },false); + } else { + ws.close(); + ws = false; + } + return false; } diff --git a/data/templates/webkit/root_home.html b/data/templates/webkit/root_home.html index 24c3f28..9ea3072 100644 --- a/data/templates/webkit/root_home.html +++ b/data/templates/webkit/root_home.html @@ -14,6 +14,7 @@ var session_id = ''; + diff --git a/keitairc b/keitairc index fc7afd4..f6adbe1 100755 --- a/keitairc +++ b/keitairc @@ -14,13 +14,14 @@ use Encode; use POE; -use POE::Filter::HTTPD; +use POE::Filter::HTTPD::Keitairc; use POE::Component::IRC; use POE::Component::Server::TCP; use URI::Escape; use HTML::Template; use HTTP::Response; use HTTP::Status; +use Digest::MD5 qw(md5); use FindBin; use lib ("$FindBin::Bin/lib", '/usr/share/keitairc/lib'); @@ -105,7 +106,7 @@ POE::Session->create( POE::Component::Server::TCP->new( Alias => 'keitairc', Port => $cf->web_listen_port(), - ClientFilter => 'POE::Filter::HTTPD', + ClientFilter => 'POE::Filter::HTTPD::Keitairc', ClientInput => \&http_request); # fire up main loop @@ -123,9 +124,11 @@ sub http_request{ if($request->isa('HTTP::Response')){ $heap->{client}->put($request); $log->log_error($request->as_string()); - }elsif(my $response = dispatch($request)){ - $heap->{client}->put($response); - $log->log_access($heap->{'remote_ip'}, $request, $response); + }elsif(my $response = dispatch($request, $heap)){ + if ($response ne '__STREAMING__') { + $heap->{client}->put($response); + $log->log_access($heap->{'remote_ip'}, $request, $response); + } } $kernel->yield('shutdown'); @@ -133,7 +136,7 @@ sub http_request{ ################################################################ sub dispatch{ - my $request = shift; + my ($request, $heap) = @_; my $uri = $request->uri(); my $ci = new Keitairc::ClientInfo($request); @@ -161,6 +164,17 @@ sub dispatch{ return action_login_imodeid($request); } + # FIXME:: ishikawa + # plugin にすべきだけど とりあえず + if($uri =~ m|^/(S[a-zA-Z]{10})/push$|) { + warn '__STREAMING__'; + if (1 || $sm->verify({session_id => $1, user_agent => $ci->user_agent()})) { + return action_streaming($request, $2, $heap); + } else { + return action_401($request); + } + } + for my $name ($pl->list_action_plugins()){ if($uri =~ m|^/(S[a-zA-Z]{10})/$name/(.*)| || $uri =~ m|^/(S[a-zA-Z]{10})/$name$|){ @@ -404,6 +418,45 @@ sub action_root{ }); } +sub parse_websocket_key { + my ($key) = @_; + + my $digits = join('', $key =~ m/\d/g); + my $spaces = scalar @{[$key =~ m/ /g]}; + + return $digits / $spaces; +} + +sub action_streaming { + my $request = shift; + my $cid = shift; + my $heap = shift; + my $ci = new Keitairc::ClientInfo($request); + warn $request->dump; + #warn $request->content; + #warn $request->header('Sec-WebSocket-Key1'); + #warn $request->header('Sec-WebSocket-Key2'); + + my $base_string = pack("NN", parse_websocket_key($request->header('Sec-WebSocket-Key1')), parse_websocket_key($request->header('Sec-WebSocket-Key2'))) . $request->content; + my $sig = md5($base_string); + + my $response = HTTP::Response->new(101, 'WebSocket Protocol Handshake'); + $response->push_header('Upgrade', 'WebSocket'); + $response->push_header('Connection', 'Upgrade'); + $response->push_header('Sec-WebSocket-Origin', 'http://' . $cf->web_host . ':' . $cf->web_port); + $response->push_header('Sec-WebSocket-Location', 'ws://' . $cf->web_host . ':' . $cf->web_port . $cf->web_root . $ci->{cookie}->{sid} . '/push'); + $response->content($sig); + warn $response->as_string; + $heap->{client}->put($response); + $heap->{client}->flush(); + + $heap->{client}->set_output_filter(POE::Filter::Stream->new()); + $ib->add_stream($heap->{client}); + + return '__STREAMING__'; +} + + ################################################################ sub action_redirect_root{ my $request = shift; diff --git a/lib/Keitairc/IrcBuffer.pm b/lib/Keitairc/IrcBuffer.pm index 41594d6..98e1930 100644 --- a/lib/Keitairc/IrcBuffer.pm +++ b/lib/Keitairc/IrcBuffer.pm @@ -5,6 +5,7 @@ # This program is covered by the GNU General Public License 2 package Keitairc::IrcBuffer; +use JSON; use strict; use warnings; @@ -195,6 +196,7 @@ sub buffer_ptr{ # 引数の $msg の charset は perl internal sub add_message{ my($me, $cid, $message, $who) = @_; + my $now = time; unless(defined $me->{tbuffer}->{$cid}){ $me->{tbuffer}->{$cid} = []; @@ -209,7 +211,7 @@ sub add_message{ $me->{rbuffer}->{$cid} = []; } - push @{$me->{tbuffer}->{$cid}}, time; + push @{$me->{tbuffer}->{$cid}}, $now; push @{$me->{nbuffer}->{$cid}}, $who; push @{$me->{mbuffer}->{$cid}}, $message; push @{$me->{rbuffer}->{$cid}}, 0; @@ -230,8 +232,10 @@ sub add_message{ if($me->{cid2name}->{$cid} eq '*console*') { $me->{mtime}->{$cid} = -1; } else { - $me->{mtime}->{$cid} = time; + $me->{mtime}->{$cid} = $now; } + + $me->update_stream($cid, $now, $message, $who); } ################################################################ @@ -337,4 +341,55 @@ sub update_timestamp{ return 0; } +################################################################ +sub add_stream { + my($me, $client) = @_; + warn 'add stream ' . $client->ID(); + $me->{stream}->{$client->ID()} = {'client' => $client}; +} + +sub remove_stream { + my($me) = @_; + warn 'remove stream'; + delete $me->{stream}; +} + +sub update_stream { + my($me, $cid, $time, $message, $who) = @_; + $message = $me->colorize($me->simple_escape($message)); + for my $name ($::pl->list_replace_plugins()) { + last if $message =~ s/$::pl->{plugins}->{$name}->{message_replace_regexp}/$::pl->{plugins}->{$name}->{message_replace_imprementation}(undef, undef, $1, $2, $3, $4, $5, $6, $7, $8, $9)/eg; + } + $message =~ s/\s+$//; + $message =~ s/\s+/ /g; + + my $value = encode_json({cid => $cid, + time => $time, + message => $message, + who => $who}); + $me->send_stream($value); +} + +sub send_stream { + my($me, $value) = @_; + + my $stream = $me->{stream}; + return if(keys %$stream < 1); + + warn 'send stream value = ' . $value; + + foreach my $k (sort keys %$stream) { + my $st = $me->{stream}->{$k}; + my $client = $st->{client}; + warn $client; + warn $value; + if ($value) { + $client->put("\x00"); + $client->put($value); + $client->put("\xFF"); + } + $client->flush(); + } +} + 1; diff --git a/lib/POE/Filter/HTTPD/Keitairc.pm b/lib/POE/Filter/HTTPD/Keitairc.pm new file mode 100644 index 0000000..449a06d --- /dev/null +++ b/lib/POE/Filter/HTTPD/Keitairc.pm @@ -0,0 +1,51 @@ +# -*- mode: perl; coding: utf-8 -*- +# POE::Filter::HTTPD::Keitairc +# +# Copyright (c) 2010 ISHIKAWA Mutsumi +# This program is covered by the GNU General Public License 2 + +package POE::Filter::HTTPD::Keitairc; +use POE::Filter::HTTPD; +use HTTP::Response; + +use strict; +use warnings; + +use base qw(POE::Filter::HTTPD); + +sub STRM() { 6 } # raw data buffer to build requests + +sub new { + my $class = shift; + my $me = POE::Filter::HTTPD->new; + $me->[STRM] = ''; + + return bless $me, $class; +} + +sub get_one_start { + my ($me, $stream) = @_; + $me->SUPER::get_one_start($stream); + $me->[STRM] .= join('', @$stream); +} + +sub get_one { + my ($me) = @_; + + my $req = $me->SUPER::get_one(); + + if(defined $req->[0] + && $req->[0]->header('Upgrade') + && $req->[0]->header('Upgrade') eq 'WebSocket' + && $req->[0]->header('Connection') + && $req->[0]->header('Connection') eq 'Upgrade') { + warn $me->[STRM]; + if ($me->[STRM] =~ /\r\n\r\n(.{8})$/) { + warn $1; + $req->[0]->content($1); + } + } + return $req; +} + +1;