OSDN Git Service

new repo
[bytom/vapor.git] / vendor / github.com / tendermint / abci / example / python / abci / server.py
1 import socket
2 import select
3 import sys
4
5 from wire import decode_varint, encode
6 from reader import BytesBuffer
7 from msg import RequestDecoder, message_types
8
9 # hold the asyncronous state of a connection
10 # ie. we may not get enough bytes on one read to decode the message
11
12 class Connection():
13
14     def __init__(self, fd, app):
15         self.fd = fd
16         self.app = app
17         self.recBuf = BytesBuffer(bytearray())
18         self.resBuf = BytesBuffer(bytearray())
19         self.msgLength = 0
20         self.decoder = RequestDecoder(self.recBuf)
21         self.inProgress = False  # are we in the middle of a message
22
23     def recv(this):
24         data = this.fd.recv(1024)
25         if not data:  # what about len(data) == 0
26             raise IOError("dead connection")
27         this.recBuf.write(data)
28
29 # ABCI server responds to messges by calling methods on the app
30
31 class ABCIServer():
32
33     def __init__(self, app, port=5410):
34         self.app = app
35         # map conn file descriptors to (app, reqBuf, resBuf, msgDecoder)
36         self.appMap = {}
37
38         self.port = port
39         self.listen_backlog = 10
40
41         self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
42         self.listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
43         self.listener.setblocking(0)
44         self.listener.bind(('', port))
45
46         self.listener.listen(self.listen_backlog)
47
48         self.shutdown = False
49
50         self.read_list = [self.listener]
51         self.write_list = []
52
53     def handle_new_connection(self, r):
54         new_fd, new_addr = r.accept()
55         new_fd.setblocking(0)  # non-blocking
56         self.read_list.append(new_fd)
57         self.write_list.append(new_fd)
58         print 'new connection to', new_addr
59
60         self.appMap[new_fd] = Connection(new_fd, self.app)
61
62     def handle_conn_closed(self, r):
63         self.read_list.remove(r)
64         self.write_list.remove(r)
65         r.close()
66         print "connection closed"
67
68     def handle_recv(self, r):
69         #  app, recBuf, resBuf, conn
70         conn = self.appMap[r]
71         while True:
72             try:
73                 print "recv loop"
74                 # check if we need more data first
75                 if conn.inProgress:
76                     if (conn.msgLength == 0 or conn.recBuf.size() < conn.msgLength):
77                         conn.recv()
78                 else:
79                     if conn.recBuf.size() == 0:
80                         conn.recv()
81
82                 conn.inProgress = True
83
84                 # see if we have enough to get the message length
85                 if conn.msgLength == 0:
86                     ll = conn.recBuf.peek()
87                     if conn.recBuf.size() < 1 + ll:
88                         # we don't have enough bytes to read the length yet
89                         return
90                     print "decoding msg length"
91                     conn.msgLength = decode_varint(conn.recBuf)
92
93                 # see if we have enough to decode the message
94                 if conn.recBuf.size() < conn.msgLength:
95                     return
96
97                 # now we can decode the message
98
99                 # first read the request type and get the particular msg
100                 # decoder
101                 typeByte = conn.recBuf.read(1)
102                 typeByte = int(typeByte[0])
103                 resTypeByte = typeByte + 0x10
104                 req_type = message_types[typeByte]
105
106                 if req_type == "flush":
107                     # messages are length prefixed
108                     conn.resBuf.write(encode(1))
109                     conn.resBuf.write([resTypeByte])
110                     conn.fd.send(str(conn.resBuf.buf))
111                     conn.msgLength = 0
112                     conn.inProgress = False
113                     conn.resBuf = BytesBuffer(bytearray())
114                     return
115
116                 decoder = getattr(conn.decoder, req_type)
117
118                 print "decoding args"
119                 req_args = decoder()
120                 print "got args", req_args
121
122                 # done decoding message
123                 conn.msgLength = 0
124                 conn.inProgress = False
125
126                 req_f = getattr(conn.app, req_type)
127                 if req_args is None:
128                     res = req_f()
129                 elif isinstance(req_args, tuple):
130                     res = req_f(*req_args)
131                 else:
132                     res = req_f(req_args)
133
134                 if isinstance(res, tuple):
135                     res, ret_code = res
136                 else:
137                     ret_code = res
138                     res = None
139
140                 print "called", req_type, "ret code:", ret_code
141                 if ret_code != 0:
142                     print "non-zero retcode:", ret_code
143
144                 if req_type in ("echo", "info"):  # these dont return a ret code
145                     enc = encode(res)
146                     # messages are length prefixed
147                     conn.resBuf.write(encode(len(enc) + 1))
148                     conn.resBuf.write([resTypeByte])
149                     conn.resBuf.write(enc)
150                 else:
151                     enc, encRet = encode(res), encode(ret_code)
152                     # messages are length prefixed
153                     conn.resBuf.write(encode(len(enc) + len(encRet) + 1))
154                     conn.resBuf.write([resTypeByte])
155                     conn.resBuf.write(encRet)
156                     conn.resBuf.write(enc)
157             except TypeError as e:
158                 print "TypeError on reading from connection:", e
159                 self.handle_conn_closed(r)
160                 return
161             except ValueError as e:
162                 print "ValueError on reading from connection:", e
163                 self.handle_conn_closed(r)
164                 return
165             except IOError as e:
166                 print "IOError on reading from connection:", e
167                 self.handle_conn_closed(r)
168                 return
169             except Exception as e:
170                 # sys.exc_info()[0] # TODO better
171                 print "error reading from connection", str(e)
172                 self.handle_conn_closed(r)
173                 return
174
175     def main_loop(self):
176         while not self.shutdown:
177             r_list, w_list, _ = select.select(
178                 self.read_list, self.write_list, [], 2.5)
179
180             for r in r_list:
181                 if (r == self.listener):
182                     try:
183                         self.handle_new_connection(r)
184                         # undo adding to read list ...
185                     except NameError as e:
186                         print "Could not connect due to NameError:", e
187                     except TypeError as e:
188                         print "Could not connect due to TypeError:", e
189                     except:
190                         print "Could not connect due to unexpected error:", sys.exc_info()[0]
191                 else:
192                     self.handle_recv(r)
193
194     def handle_shutdown(self):
195         for r in self.read_list:
196             r.close()
197         for w in self.write_list:
198             try:
199                 w.close()
200             except Exception as e:
201                 print(e)  # TODO: add logging
202         self.shutdown = True