OSDN Git Service

new repo
[bytom/vapor.git] / vendor / github.com / tendermint / abci / example / python3 / abci / wire.py
1
2 # the decoder works off a reader
3 # the encoder returns bytearray
4
5
6 def hex2bytes(h):
7     return bytearray(h.decode('hex'))
8
9
10 def bytes2hex(b):
11     if type(b) in (str, str):
12         return "".join([hex(ord(c))[2:].zfill(2) for c in b])
13     else:
14         return bytes2hex(b.decode())
15
16
17 # expects uvarint64 (no crazy big nums!)
18 def uvarint_size(i):
19     if i == 0:
20         return 0
21     for j in range(1, 8):
22         if i < 1 << j * 8:
23             return j
24     return 8
25
26 # expects i < 2**size
27
28
29 def encode_big_endian(i, size):
30     if size == 0:
31         return bytearray()
32     return encode_big_endian(i // 256, size - 1) + bytearray([i % 256])
33
34
35 def decode_big_endian(reader, size):
36     if size == 0:
37         return 0
38     firstByte = reader.read(1)[0]
39     return firstByte * (256 ** (size - 1)) + decode_big_endian(reader, size - 1)
40
41 # ints are max 16 bytes long
42
43
44 def encode_varint(i):
45     negate = False
46     if i < 0:
47         negate = True
48         i = -i
49     size = uvarint_size(i)
50     if size == 0:
51         return bytearray([0])
52     big_end = encode_big_endian(i, size)
53     if negate:
54         size += 0xF0
55     return bytearray([size]) + big_end
56
57 # returns the int and whats left of the byte array
58
59
60 def decode_varint(reader):
61     size = reader.read(1)[0]
62     if size == 0:
63         return 0
64
65     negate = True if size > int(0xF0) else False
66     if negate:
67         size = size - 0xF0
68     i = decode_big_endian(reader, size)
69     if negate:
70         i = i * (-1)
71     return i
72
73
74 def encode_string(s):
75     size = encode_varint(len(s))
76     return size + bytearray(s, 'utf8')
77
78
79 def decode_string(reader):
80     length = decode_varint(reader)
81     raw_data = reader.read(length)
82     return raw_data.decode()
83
84
85 def encode_list(s):
86     b = bytearray()
87     list(map(b.extend, list(map(encode, s))))
88     return encode_varint(len(s)) + b
89
90
91 def encode(s):
92     print('encoding', repr(s))
93     if s is None:
94         return bytearray()
95     if isinstance(s, int):
96         return encode_varint(s)
97     elif isinstance(s, str):
98         return encode_string(s)
99     elif isinstance(s, list):
100         return encode_list(s)
101     elif isinstance(s, bytearray):
102         return encode_string(s)
103     else:
104         print("UNSUPPORTED TYPE!", type(s), s)
105
106
107 if __name__ == '__main__':
108     ns = [100, 100, 1000, 256]
109     ss = [2, 5, 5, 2]
110     bs = list(map(encode_big_endian, ns, ss))
111     ds = list(map(decode_big_endian, bs, ss))
112     print(ns)
113     print([i[0] for i in ds])
114
115     ss = ["abc", "hi there jim", "ok now what"]
116     e = list(map(encode_string, ss))
117     d = list(map(decode_string, e))
118     print(ss)
119     print([i[0] for i in d])