3 # Copyright (C) 2010 The Android Open Source Project
5 # Licensed under the Apache License, Version 2.0 (the "License")
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
20 A detailed description of tms.
23 __author__ = 'wink@google.com (Wink Saville)'
34 def recvall(s, count):
35 """Receive all of the data otherwise return none.
39 count: number of bytes
43 None if no data is received
52 result = ''.join(all_data);
56 """Send all of the data
60 count: number of bytes
68 """A fixed length message header; cmd, token, status and length of protobuf."""
73 self.length_protobuf = 0
75 def sendHeader(self, s):
76 """Send the header to the socket
84 mh = msgheader_pb2.MsgHeader()
87 mh.status = self.status
88 mh.length_data = self.length_protobuf
89 mhser = mh.SerializeToString()
90 len_msg_header_raw = struct.pack('<i', len(mhser))
91 sendall(s, len_msg_header_raw)
94 def recvHeader(self, s):
95 """Receive the header from the socket"""
96 len_msg_header_raw = recvall(s, 4)
97 len_msg_hdr = struct.unpack('<i', len_msg_header_raw)
98 mh = msgheader_pb2.MsgHeader()
99 mh_raw = recvall(s, len_msg_hdr[0])
100 mh.ParseFromString(mh_raw)
102 self.token = mh.token
103 self.status = mh.status
104 self.length_protobuf = mh.length_data;
107 """A message consists of a fixed length MsgHeader followed by a protobuf.
109 This class sends and receives messages, when sending the status
110 will be zero and when receiving the protobuf field will be an
111 empty string if there was no protobuf.
114 """Initialize the protobuf to None and header to an empty"""
116 self.header = MsgHeader()
118 # Keep a local copy of header for convenience
123 def sendMsg(self, s, cmd, token, protobuf=''):
124 """Send a message to a socket
129 token: token to send, will be returned unmodified
130 protobuf: optional protobuf to send
138 self.protobuf = protobuf
140 self.header.cmd = self.cmd
141 self.header.token = self.token
142 self.header.status = self.status
143 if (len(protobuf) > 0):
144 self.header.length_protobuf = len(protobuf)
146 self.header.length_protobuf = 0
148 self.header.sendHeader(s)
149 if (self.header.length_protobuf > 0):
150 sendall(s, self.protobuf)
152 def recvMsg(self, s):
153 """Receive a message from a socket
161 self.header.recvHeader(s)
162 self.cmd = self.header.cmd
163 self.token = self.header.token
164 self.status = self.header.status
165 if (self.header.length_protobuf > 0):
166 self.protobuf = recvall(s, self.header.length_protobuf)
171 """Create a socket and connect.
173 Before using you'll need to forward the port
174 used by mock_ril, 54312 to a port on the PC.
175 The following worked for me:
177 adb forward tcp:11111 tcp:54312.
179 Then you can execute this test using:
181 tms.py 127.0.0.1 11111
183 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
184 host = sys.argv[1] # server address
185 print "host=%s" % host
186 port = int(sys.argv[2]) # server port
187 print "port=%d" % port
188 s.connect((host, port))
190 # Create an object which is serializable to a protobuf
191 rrs = ctrl_pb2.CtrlRspRadioState()
192 rrs.state = ril_pb2.RADIOSTATE_UNAVAILABLE
193 print "rrs.state=%d" % (rrs.state)
196 rrs_ser = rrs.SerializeToString()
197 print "len(rrs_ser)=%d" % (len(rrs_ser))
200 rrs_new = ctrl_pb2.CtrlRspRadioState()
201 rrs_new.ParseFromString(rrs_ser)
202 print "rrs_new.state=%d" % (rrs_new.state)
207 req.sendMsg(s, 0, 1234567890123, rrs_ser)
210 response = ctrl_pb2.CtrlRspRadioState()
211 response.ParseFromString(resp.protobuf)
213 print "cmd=%d" % (resp.cmd)
214 print "token=%d" % (resp.token)
215 print "status=%d" % (resp.status)
216 print "len(protobuf)=%d" % (len(resp.protobuf))
217 print "response.state=%d" % (response.state)
218 if ((resp.cmd == 0) & (resp.token == 1234567890123) &
219 (resp.status == 0) & (response.state == 1)):
220 print "SUCCESS: echo ok"
222 print "ERROR: expecting cmd=0 token=1234567890123 status=0 state=1"
224 # Test CTRL_GET_RADIO_STATE
225 req.sendMsg(s, ctrl_pb2.CTRL_CMD_GET_RADIO_STATE, 4)
229 print "cmd=%d" % (resp.cmd)
230 print "token=%d" % (resp.token)
231 print "status=%d" % (resp.status)
232 print "length_protobuf=%d" % (len(resp.protobuf))
234 if (resp.cmd == ctrl_pb2.CTRL_CMD_GET_RADIO_STATE):
235 response = ctrl_pb2.CtrlRspRadioState()
236 response.ParseFromString(resp.protobuf)
237 print "SUCCESS: response.state=%d" % (response.state)
239 print "ERROR: expecting resp.cmd == ctrl_pb2.CTRL_CMD_GET_RADIO_STATE"
242 print "closing socket"
246 if __name__ == '__main__':