3 # Author:: Francis Cianfrocca (gmail: blackhedd)
\r
4 # Homepage:: http://rubyeventmachine.com
\r
5 # Date:: 15 November 2006
\r
7 # See EventMachine and EventMachine::Connection for documentation and
\r
10 #----------------------------------------------------------------------------
\r
12 # Copyright (C) 2006-08 by Francis Cianfrocca. All Rights Reserved.
\r
15 # This program is free software; you can redistribute it and/or modify
\r
16 # it under the terms of either: 1) the GNU General Public License
\r
17 # as published by the Free Software Foundation; either version 2 of the
\r
18 # License, or (at your option) any later version; or 2) Ruby's License.
\r
20 # See the file COPYING for complete licensing information.
\r
22 #---------------------------------------------------------------------------
\r
28 require 'postgres-pr/message'
\r
29 require 'postgres-pr/connection'
\r
32 class StringIO # :nodoc:
\r
33 # Reads exactly +n+ bytes.
\r
35 # If the data read is nil an EOFError is raised.
\r
37 # If the data read is too short a TruncatedDataError is raised and the read
\r
38 # data is obtainable via its #data method.
\r
42 raise EOFError, "End of file reached"
\r
45 raise TruncatedDataError.new("data truncated", str)
\r
49 alias read_exactly_n_bytes readbytes
\r
55 # PROVISIONAL IMPLEMENTATION of an evented Postgres client.
\r
56 # This implements version 3 of the Postgres wire protocol, which will work
\r
57 # with any Postgres version from roughly 7.4 onward.
\r
59 # Objective: we want to access Postgres databases without requiring threads.
\r
60 # Until now this has been a problem because the Postgres client implementations
\r
61 # have all made use of blocking I/O calls, which is incompatible with a
\r
62 # thread-free evented model.
\r
64 # But rather than re-implement the Postgres Wire3 protocol, we're taking advantage
\r
65 # of the existing postgres-pr library, which was originally written by Michael
\r
66 # Neumann but (at this writing) appears to be no longer maintained. Still, it's
\r
67 # in basically a production-ready state, and the wire protocol isn't that complicated
\r
70 # We need to monkeypatch StringIO because it lacks the #readbytes method needed
\r
73 # We're tucking in a bunch of require statements that may not be present in garden-variety
\r
74 # EM installations. Until we find a good way to only require these if a program
\r
75 # requires postgres, this file will need to be required explicitly.
\r
77 # The StringIO monkeypatch is lifted verbatim from the standard library readbytes.rb,
\r
78 # which adds method #readbytes directly to class IO. But StringIO is not a subclass of IO.
\r
80 # We cloned the handling of postgres messages from lib/postgres-pr/connection.rb
\r
81 # in the postgres-pr library, and modified it for event-handling.
\r
83 # TODO: The password handling in dispatch_conn_message is totally incomplete.
\r
86 # We return Deferrables from the user-level operations surfaced by this interface.
\r
87 # Experimentally, we're using the pattern of always returning a boolean value as the
\r
88 # first argument of a deferrable callback to indicate success or failure. This is
\r
89 # instead of the traditional pattern of calling Deferrable#succeed or #fail, and
\r
90 # requiring the user to define both a callback and an errback function.
\r
94 # db = EM.connect_unix_domain( "/tmp/.s.PGSQL.5432", EM::P::Postgres3 )
\r
95 # db.connect( dbname, username, psw ).callback do |status|
\r
97 # db.query( "select * from some_table" ).callback do |status, result, errors|
\r
99 # result.rows.each do |row|
\r
107 class Postgres3 < EventMachine::Connection
\r
115 def connect db, user, psw=nil
\r
116 d = EM::DefaultDeferrable.new
\r
119 if @pending_query || @pending_conn
\r
120 d.succeed false, "Operation already in progress"
\r
123 prms = {"user"=>user, "database"=>db}
\r
127 #prms["password"] = psw
\r
129 send_data PostgresPR::StartupMessage.new( 3 << 16, prms ).dump
\r
136 d = EM::DefaultDeferrable.new
\r
139 if @pending_query || @pending_conn
\r
140 d.succeed false, "Operation already in progress"
\r
142 @r = PostgresPR::Connection::Result.new
\r
145 send_data PostgresPR::Query.dump(sql)
\r
152 def receive_data data
\r
154 while @data.length >= 5
\r
155 pktlen = @data[1...5].unpack("N").first
\r
156 if @data.length >= (1 + pktlen)
\r
157 pkt = @data.slice!(0...(1+pktlen))
\r
158 m = StringIO.open( pkt, "r" ) {|io| PostgresPR::Message.read( io ) }
\r
160 dispatch_conn_message m
\r
161 elsif @pending_query
\r
162 dispatch_query_message m
\r
164 raise "Unexpected message from database"
\r
167 break # very important, break out of the while
\r
174 if o = (@pending_query || @pending_conn)
\r
175 o.succeed false, "lost connection"
\r
179 # Cloned and modified from the postgres-pr.
\r
180 def dispatch_conn_message msg
\r
182 when AuthentificationClearTextPassword
\r
183 raise ArgumentError, "no password specified" if @password.nil?
\r
184 send_data PasswordMessage.new(@password).dump
\r
186 when AuthentificationCryptPassword
\r
187 raise ArgumentError, "no password specified" if @password.nil?
\r
188 send_data PasswordMessage.new(@password.crypt(msg.salt)).dump
\r
190 when AuthentificationMD5Password
\r
191 raise ArgumentError, "no password specified" if @password.nil?
\r
192 require 'digest/md5'
\r
194 m = Digest::MD5.hexdigest(@password + @user)
\r
195 m = Digest::MD5.hexdigest(m + msg.salt)
\r
197 send_data PasswordMessage.new(m).dump
\r
199 when AuthentificationKerberosV4, AuthentificationKerberosV5, AuthentificationSCMCredential
\r
200 raise "unsupported authentification"
\r
202 when AuthentificationOk
\r
204 raise msg.field_values.join("\t")
\r
205 when NoticeResponse
\r
206 @notice_processor.call(msg) if @notice_processor
\r
207 when ParameterStatus
\r
208 @params[msg.key] = msg.value
\r
209 when BackendKeyData
\r
213 # TODO: use transaction status
\r
214 pc,@pending_conn = @pending_conn,nil
\r
217 raise "unhandled message type"
\r
221 # Cloned and modified from the postgres-pr.
\r
222 def dispatch_query_message msg
\r
225 @r.rows << msg.columns
\r
226 when CommandComplete
\r
227 @r.cmd_tag = msg.cmd_tag
\r
229 pq,@pending_query = @pending_query,nil
\r
230 pq.succeed true, @r, @e
\r
231 when RowDescription
\r
232 @r.fields = msg.fields
\r
233 when CopyInResponse
\r
234 when CopyOutResponse
\r
235 when EmptyQueryResponse
\r
239 when NoticeResponse
\r
240 @notice_processor.call(msg) if @notice_processor
\r