5 from wire import decode_varint, encode
6 from reader import BytesBuffer
7 from msg import RequestDecoder, message_types
9 # hold the asyncronous state of a connection
10 # ie. we may not get enough bytes on one read to decode the message
14 def __init__(self, fd, app):
17 self.recBuf = BytesBuffer(bytearray())
18 self.resBuf = BytesBuffer(bytearray())
20 self.decoder = RequestDecoder(self.recBuf)
21 self.inProgress = False # are we in the middle of a message
24 data = this.fd.recv(1024)
25 if not data: # what about len(data) == 0
26 raise IOError("dead connection")
27 this.recBuf.write(data)
29 # ABCI server responds to messges by calling methods on the app
33 def __init__(self, app, port=5410):
35 # map conn file descriptors to (app, reqBuf, resBuf, msgDecoder)
39 self.listen_backlog = 10
41 self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
42 self.listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
43 self.listener.setblocking(0)
44 self.listener.bind(('', port))
46 self.listener.listen(self.listen_backlog)
50 self.read_list = [self.listener]
53 def handle_new_connection(self, r):
54 new_fd, new_addr = r.accept()
55 new_fd.setblocking(0) # non-blocking
56 self.read_list.append(new_fd)
57 self.write_list.append(new_fd)
58 print 'new connection to', new_addr
60 self.appMap[new_fd] = Connection(new_fd, self.app)
62 def handle_conn_closed(self, r):
63 self.read_list.remove(r)
64 self.write_list.remove(r)
66 print "connection closed"
68 def handle_recv(self, r):
69 # app, recBuf, resBuf, conn
74 # check if we need more data first
76 if (conn.msgLength == 0 or conn.recBuf.size() < conn.msgLength):
79 if conn.recBuf.size() == 0:
82 conn.inProgress = True
84 # see if we have enough to get the message length
85 if conn.msgLength == 0:
86 ll = conn.recBuf.peek()
87 if conn.recBuf.size() < 1 + ll:
88 # we don't have enough bytes to read the length yet
90 print "decoding msg length"
91 conn.msgLength = decode_varint(conn.recBuf)
93 # see if we have enough to decode the message
94 if conn.recBuf.size() < conn.msgLength:
97 # now we can decode the message
99 # first read the request type and get the particular msg
101 typeByte = conn.recBuf.read(1)
102 typeByte = int(typeByte[0])
103 resTypeByte = typeByte + 0x10
104 req_type = message_types[typeByte]
106 if req_type == "flush":
107 # messages are length prefixed
108 conn.resBuf.write(encode(1))
109 conn.resBuf.write([resTypeByte])
110 conn.fd.send(str(conn.resBuf.buf))
112 conn.inProgress = False
113 conn.resBuf = BytesBuffer(bytearray())
116 decoder = getattr(conn.decoder, req_type)
118 print "decoding args"
120 print "got args", req_args
122 # done decoding message
124 conn.inProgress = False
126 req_f = getattr(conn.app, req_type)
129 elif isinstance(req_args, tuple):
130 res = req_f(*req_args)
132 res = req_f(req_args)
134 if isinstance(res, tuple):
140 print "called", req_type, "ret code:", ret_code
142 print "non-zero retcode:", ret_code
144 if req_type in ("echo", "info"): # these dont return a ret code
146 # messages are length prefixed
147 conn.resBuf.write(encode(len(enc) + 1))
148 conn.resBuf.write([resTypeByte])
149 conn.resBuf.write(enc)
151 enc, encRet = encode(res), encode(ret_code)
152 # messages are length prefixed
153 conn.resBuf.write(encode(len(enc) + len(encRet) + 1))
154 conn.resBuf.write([resTypeByte])
155 conn.resBuf.write(encRet)
156 conn.resBuf.write(enc)
157 except TypeError as e:
158 print "TypeError on reading from connection:", e
159 self.handle_conn_closed(r)
161 except ValueError as e:
162 print "ValueError on reading from connection:", e
163 self.handle_conn_closed(r)
166 print "IOError on reading from connection:", e
167 self.handle_conn_closed(r)
169 except Exception as e:
170 # sys.exc_info()[0] # TODO better
171 print "error reading from connection", str(e)
172 self.handle_conn_closed(r)
176 while not self.shutdown:
177 r_list, w_list, _ = select.select(
178 self.read_list, self.write_list, [], 2.5)
181 if (r == self.listener):
183 self.handle_new_connection(r)
184 # undo adding to read list ...
185 except NameError as e:
186 print "Could not connect due to NameError:", e
187 except TypeError as e:
188 print "Could not connect due to TypeError:", e
190 print "Could not connect due to unexpected error:", sys.exc_info()[0]
194 def handle_shutdown(self):
195 for r in self.read_list:
197 for w in self.write_list:
200 except Exception as e:
201 print(e) # TODO: add logging