OSDN Git Service

new repo
[bytom/vapor.git] / vendor / github.com / tendermint / abci / example / python3 / abci / server.py
1 import socket
2 import select
3 import sys
4 import logging
5
6 from .wire import decode_varint, encode
7 from .reader import BytesBuffer
8 from .msg import RequestDecoder, message_types
9
10 # hold the asyncronous state of a connection
11 # ie. we may not get enough bytes on one read to decode the message
12
13 logger = logging.getLogger(__name__)
14
15 class Connection():
16
17     def __init__(self, fd, app):
18         self.fd = fd
19         self.app = app
20         self.recBuf = BytesBuffer(bytearray())
21         self.resBuf = BytesBuffer(bytearray())
22         self.msgLength = 0
23         self.decoder = RequestDecoder(self.recBuf)
24         self.inProgress = False  # are we in the middle of a message
25
26     def recv(this):
27         data = this.fd.recv(1024)
28         if not data:  # what about len(data) == 0
29             raise IOError("dead connection")
30         this.recBuf.write(data)
31
32 # ABCI server responds to messges by calling methods on the app
33
34 class ABCIServer():
35
36     def __init__(self, app, port=5410):
37         self.app = app
38         # map conn file descriptors to (app, reqBuf, resBuf, msgDecoder)
39         self.appMap = {}
40
41         self.port = port
42         self.listen_backlog = 10
43
44         self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
45         self.listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
46         self.listener.setblocking(0)
47         self.listener.bind(('', port))
48
49         self.listener.listen(self.listen_backlog)
50
51         self.shutdown = False
52
53         self.read_list = [self.listener]
54         self.write_list = []
55
56     def handle_new_connection(self, r):
57         new_fd, new_addr = r.accept()
58         new_fd.setblocking(0)  # non-blocking
59         self.read_list.append(new_fd)
60         self.write_list.append(new_fd)
61         print('new connection to', new_addr)
62
63         self.appMap[new_fd] = Connection(new_fd, self.app)
64
65     def handle_conn_closed(self, r):
66         self.read_list.remove(r)
67         self.write_list.remove(r)
68         r.close()
69         print("connection closed")
70
71     def handle_recv(self, r):
72         #  app, recBuf, resBuf, conn
73         conn = self.appMap[r]
74         while True:
75             try:
76                 print("recv loop")
77                 # check if we need more data first
78                 if conn.inProgress:
79                     if (conn.msgLength == 0 or conn.recBuf.size() < conn.msgLength):
80                         conn.recv()
81                 else:
82                     if conn.recBuf.size() == 0:
83                         conn.recv()
84
85                 conn.inProgress = True
86
87                 # see if we have enough to get the message length
88                 if conn.msgLength == 0:
89                     ll = conn.recBuf.peek()
90                     if conn.recBuf.size() < 1 + ll:
91                         # we don't have enough bytes to read the length yet
92                         return
93                     print("decoding msg length")
94                     conn.msgLength = decode_varint(conn.recBuf)
95
96                 # see if we have enough to decode the message
97                 if conn.recBuf.size() < conn.msgLength:
98                     return
99
100                 # now we can decode the message
101
102                 # first read the request type and get the particular msg
103                 # decoder
104                 typeByte = conn.recBuf.read(1)
105                 typeByte = int(typeByte[0])
106                 resTypeByte = typeByte + 0x10
107                 req_type = message_types[typeByte]
108
109                 if req_type == "flush":
110                     # messages are length prefixed
111                     conn.resBuf.write(encode(1))
112                     conn.resBuf.write([resTypeByte])
113                     conn.fd.send(conn.resBuf.buf)
114                     conn.msgLength = 0
115                     conn.inProgress = False
116                     conn.resBuf = BytesBuffer(bytearray())
117                     return
118
119                 decoder = getattr(conn.decoder, req_type)
120
121                 print("decoding args")
122                 req_args = decoder()
123                 print("got args", req_args)
124
125                 # done decoding message
126                 conn.msgLength = 0
127                 conn.inProgress = False
128
129                 req_f = getattr(conn.app, req_type)
130                 if req_args is None:
131                     res = req_f()
132                 elif isinstance(req_args, tuple):
133                     res = req_f(*req_args)
134                 else:
135                     res = req_f(req_args)
136
137                 if isinstance(res, tuple):
138                     res, ret_code = res
139                 else:
140                     ret_code = res
141                     res = None
142
143                 print("called", req_type, "ret code:", ret_code, 'res:', res)
144                 if ret_code != 0:
145                     print("non-zero retcode:", ret_code)
146
147                 if req_type in ("echo", "info"):  # these dont return a ret code
148                     enc = encode(res)
149                     # messages are length prefixed
150                     conn.resBuf.write(encode(len(enc) + 1))
151                     conn.resBuf.write([resTypeByte])
152                     conn.resBuf.write(enc)
153                 else:
154                     enc, encRet = encode(res), encode(ret_code)
155                     # messages are length prefixed
156                     conn.resBuf.write(encode(len(enc) + len(encRet) + 1))
157                     conn.resBuf.write([resTypeByte])
158                     conn.resBuf.write(encRet)
159                     conn.resBuf.write(enc)
160             except IOError as e:
161                 print("IOError on reading from connection:", e)
162                 self.handle_conn_closed(r)
163                 return
164             except Exception as e:
165                 logger.exception("error reading from connection")
166                 self.handle_conn_closed(r)
167                 return
168
169     def main_loop(self):
170         while not self.shutdown:
171             r_list, w_list, _ = select.select(
172                 self.read_list, self.write_list, [], 2.5)
173
174             for r in r_list:
175                 if (r == self.listener):
176                     try:
177                         self.handle_new_connection(r)
178                         # undo adding to read list ...
179                     except NameError as e:
180                         print("Could not connect due to NameError:", e)
181                     except TypeError as e:
182                         print("Could not connect due to TypeError:", e)
183                     except:
184                         print("Could not connect due to unexpected error:", sys.exc_info()[0])
185                 else:
186                     self.handle_recv(r)
187
188     def handle_shutdown(self):
189         for r in self.read_list:
190             r.close()
191         for w in self.write_list:
192             try:
193                 w.close()
194             except Exception as e:
195                 print(e)  # TODO: add logging
196         self.shutdown = True