OSDN Git Service

Add LTE location information to DATA_REGISTRATION_STATE
[android-x86/hardware-ril.git] / mock-ril / src / python / tcs.py
1 #!/usr/bin/python
2 #
3 # Copyright (C) 2010 The Android Open Source Project
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License")
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 #      http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 """Test server
19
20 A detailed description of tms.
21 """
22
23 __author__ = 'wink@google.com (Wink Saville)'
24
25 import socket
26 import struct
27 import sys
28 import time
29
30 import ctrl_pb2
31 import ril_pb2
32 import msgheader_pb2
33
34 def recvall(s, count):
35   """Receive all of the data otherwise return none.
36
37   Args:
38     s: socket
39     count: number of bytes
40
41   Returns:
42     data received
43     None if no data is received
44   """
45   all_data = []
46   while (count > 0):
47     data = s.recv(count)
48     if (len(data) == 0):
49       return None
50     count -= len(data)
51     all_data.append(data)
52   result = ''.join(all_data);
53   return result
54
55 def sendall(s, data):
56   """Send all of the data
57
58   Args:
59     s: socket
60     count: number of bytes
61
62   Returns:
63     Nothing
64   """
65   s.sendall(data)
66
67 class MsgHeader:
68   """A fixed length message header; cmd, token, status and length of protobuf."""
69   def __init__(self):
70     self.cmd = 0
71     self.token = 0
72     self.status = 0
73     self.length_protobuf = 0
74
75   def sendHeader(self, s):
76     """Send the header to the socket
77
78     Args:
79       s: socket
80
81     Returns
82       nothing
83     """
84     mh = msgheader_pb2.MsgHeader()
85     mh.cmd = self.cmd
86     mh.token = self.token
87     mh.status = self.status
88     mh.length_data = self.length_protobuf
89     mhser = mh.SerializeToString()
90     len_msg_header_raw = struct.pack('<i', len(mhser))
91     sendall(s, len_msg_header_raw)
92     sendall(s, mhser)
93
94   def recvHeader(self, s):
95     """Receive the header from the socket"""
96     len_msg_header_raw = recvall(s, 4)
97     len_msg_hdr = struct.unpack('<i', len_msg_header_raw)
98     mh = msgheader_pb2.MsgHeader()
99     mh_raw = recvall(s, len_msg_hdr[0])
100     mh.ParseFromString(mh_raw)
101     self.cmd = mh.cmd
102     self.token = mh.token
103     self.status = mh.status
104     self.length_protobuf = mh.length_data;
105
106 class Msg:
107   """A message consists of a fixed length MsgHeader followed by a protobuf.
108
109   This class sends and receives messages, when sending the status
110   will be zero and when receiving the protobuf field will be an
111   empty string if there was no protobuf.
112   """
113   def __init__(self):
114     """Initialize the protobuf to None and header to an empty"""
115     self.protobuf = None
116     self.header = MsgHeader()
117
118     # Keep a local copy of header for convenience
119     self.cmd = 0
120     self.token = 0
121     self.status = 0
122
123   def sendMsg(self, s, cmd, token, protobuf=''):
124     """Send a message to a socket
125
126     Args:
127       s: socket
128       cmd: command to send
129       token: token to send, will be returned unmodified
130       protobuf: optional protobuf to send
131
132     Returns
133       nothing
134     """
135     self.cmd = cmd
136     self.token = token
137     self.status = 0
138     self.protobuf = protobuf
139
140     self.header.cmd = self.cmd
141     self.header.token = self.token
142     self.header.status = self.status
143     if (len(protobuf) > 0):
144       self.header.length_protobuf = len(protobuf)
145     else:
146       self.header.length_protobuf = 0
147       self.protobuf = ''
148     self.header.sendHeader(s)
149     if (self.header.length_protobuf > 0):
150       sendall(s, self.protobuf)
151
152   def recvMsg(self, s):
153     """Receive a message from a socket
154
155     Args:
156       s: socket
157
158     Returns:
159       nothing
160     """
161     self.header.recvHeader(s)
162     self.cmd = self.header.cmd
163     self.token = self.header.token
164     self.status = self.header.status
165     if (self.header.length_protobuf > 0):
166       self.protobuf = recvall(s, self.header.length_protobuf)
167     else:
168       self.protobuf = ''
169
170 def main(argv):
171   """Create a socket and connect.
172
173   Before using you'll need to forward the port
174   used by mock_ril, 54312 to a port on the PC.
175   The following worked for me:
176
177     adb forward tcp:11111 tcp:54312.
178
179   Then you can execute this test using:
180
181     tms.py 127.0.0.1 11111
182   """
183   s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
184   host = sys.argv[1]        # server address
185   print "host=%s" % host
186   port = int(sys.argv[2])   # server port
187   print "port=%d" % port
188   s.connect((host, port))
189
190   # Create an object which is serializable to a protobuf
191   rrs = ctrl_pb2.CtrlRspRadioState()
192   rrs.state = ril_pb2.RADIOSTATE_UNAVAILABLE
193   print "rrs.state=%d" % (rrs.state)
194
195   # Serialize
196   rrs_ser = rrs.SerializeToString()
197   print "len(rrs_ser)=%d" % (len(rrs_ser))
198
199   # Deserialize
200   rrs_new = ctrl_pb2.CtrlRspRadioState()
201   rrs_new.ParseFromString(rrs_ser)
202   print "rrs_new.state=%d" % (rrs_new.state)
203
204
205   # Do an echo test
206   req = Msg()
207   req.sendMsg(s, 0, 1234567890123, rrs_ser)
208   resp = Msg()
209   resp.recvMsg(s)
210   response = ctrl_pb2.CtrlRspRadioState()
211   response.ParseFromString(resp.protobuf)
212
213   print "cmd=%d" % (resp.cmd)
214   print "token=%d" % (resp.token)
215   print "status=%d" % (resp.status)
216   print "len(protobuf)=%d" % (len(resp.protobuf))
217   print "response.state=%d" % (response.state)
218   if ((resp.cmd == 0) & (resp.token == 1234567890123) &
219         (resp.status == 0) & (response.state == 1)):
220     print "SUCCESS: echo ok"
221   else:
222     print "ERROR: expecting cmd=0 token=1234567890123 status=0 state=1"
223
224   # Test CTRL_GET_RADIO_STATE
225   req.sendMsg(s, ctrl_pb2.CTRL_CMD_GET_RADIO_STATE, 4)
226   resp = Msg()
227   resp.recvMsg(s)
228
229   print "cmd=%d" % (resp.cmd)
230   print "token=%d" % (resp.token)
231   print "status=%d" % (resp.status)
232   print "length_protobuf=%d" % (len(resp.protobuf))
233
234   if (resp.cmd == ctrl_pb2.CTRL_CMD_GET_RADIO_STATE):
235       response = ctrl_pb2.CtrlRspRadioState()
236       response.ParseFromString(resp.protobuf)
237       print "SUCCESS: response.state=%d" % (response.state)
238   else:
239       print "ERROR: expecting resp.cmd == ctrl_pb2.CTRL_CMD_GET_RADIO_STATE"
240
241   # Close socket
242   print "closing socket"
243   s.close()
244
245
246 if __name__ == '__main__':
247   main(sys.argv)