6 from .wire import decode_varint, encode
7 from .reader import BytesBuffer
8 from .msg import RequestDecoder, message_types
10 # hold the asyncronous state of a connection
11 # ie. we may not get enough bytes on one read to decode the message
13 logger = logging.getLogger(__name__)
17 def __init__(self, fd, app):
20 self.recBuf = BytesBuffer(bytearray())
21 self.resBuf = BytesBuffer(bytearray())
23 self.decoder = RequestDecoder(self.recBuf)
24 self.inProgress = False # are we in the middle of a message
27 data = this.fd.recv(1024)
28 if not data: # what about len(data) == 0
29 raise IOError("dead connection")
30 this.recBuf.write(data)
32 # ABCI server responds to messges by calling methods on the app
36 def __init__(self, app, port=5410):
38 # map conn file descriptors to (app, reqBuf, resBuf, msgDecoder)
42 self.listen_backlog = 10
44 self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
45 self.listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
46 self.listener.setblocking(0)
47 self.listener.bind(('', port))
49 self.listener.listen(self.listen_backlog)
53 self.read_list = [self.listener]
56 def handle_new_connection(self, r):
57 new_fd, new_addr = r.accept()
58 new_fd.setblocking(0) # non-blocking
59 self.read_list.append(new_fd)
60 self.write_list.append(new_fd)
61 print('new connection to', new_addr)
63 self.appMap[new_fd] = Connection(new_fd, self.app)
65 def handle_conn_closed(self, r):
66 self.read_list.remove(r)
67 self.write_list.remove(r)
69 print("connection closed")
71 def handle_recv(self, r):
72 # app, recBuf, resBuf, conn
77 # check if we need more data first
79 if (conn.msgLength == 0 or conn.recBuf.size() < conn.msgLength):
82 if conn.recBuf.size() == 0:
85 conn.inProgress = True
87 # see if we have enough to get the message length
88 if conn.msgLength == 0:
89 ll = conn.recBuf.peek()
90 if conn.recBuf.size() < 1 + ll:
91 # we don't have enough bytes to read the length yet
93 print("decoding msg length")
94 conn.msgLength = decode_varint(conn.recBuf)
96 # see if we have enough to decode the message
97 if conn.recBuf.size() < conn.msgLength:
100 # now we can decode the message
102 # first read the request type and get the particular msg
104 typeByte = conn.recBuf.read(1)
105 typeByte = int(typeByte[0])
106 resTypeByte = typeByte + 0x10
107 req_type = message_types[typeByte]
109 if req_type == "flush":
110 # messages are length prefixed
111 conn.resBuf.write(encode(1))
112 conn.resBuf.write([resTypeByte])
113 conn.fd.send(conn.resBuf.buf)
115 conn.inProgress = False
116 conn.resBuf = BytesBuffer(bytearray())
119 decoder = getattr(conn.decoder, req_type)
121 print("decoding args")
123 print("got args", req_args)
125 # done decoding message
127 conn.inProgress = False
129 req_f = getattr(conn.app, req_type)
132 elif isinstance(req_args, tuple):
133 res = req_f(*req_args)
135 res = req_f(req_args)
137 if isinstance(res, tuple):
143 print("called", req_type, "ret code:", ret_code, 'res:', res)
145 print("non-zero retcode:", ret_code)
147 if req_type in ("echo", "info"): # these dont return a ret code
149 # messages are length prefixed
150 conn.resBuf.write(encode(len(enc) + 1))
151 conn.resBuf.write([resTypeByte])
152 conn.resBuf.write(enc)
154 enc, encRet = encode(res), encode(ret_code)
155 # messages are length prefixed
156 conn.resBuf.write(encode(len(enc) + len(encRet) + 1))
157 conn.resBuf.write([resTypeByte])
158 conn.resBuf.write(encRet)
159 conn.resBuf.write(enc)
161 print("IOError on reading from connection:", e)
162 self.handle_conn_closed(r)
164 except Exception as e:
165 logger.exception("error reading from connection")
166 self.handle_conn_closed(r)
170 while not self.shutdown:
171 r_list, w_list, _ = select.select(
172 self.read_list, self.write_list, [], 2.5)
175 if (r == self.listener):
177 self.handle_new_connection(r)
178 # undo adding to read list ...
179 except NameError as e:
180 print("Could not connect due to NameError:", e)
181 except TypeError as e:
182 print("Could not connect due to TypeError:", e)
184 print("Could not connect due to unexpected error:", sys.exc_info()[0])
188 def handle_shutdown(self):
189 for r in self.read_list:
191 for w in self.write_list:
194 except Exception as e:
195 print(e) # TODO: add logging