OSDN Git Service

am ab7f3975: am 58cd4204: am c7d19e4d: Adding 64 bit emmc_rand_perf
[android-x86/system-extras.git] / tests / net_test / net_test.py
1 #!/usr/bin/python
2 #
3 # Copyright 2014 The Android Open Source Project
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17 import fcntl
18 import os
19 import random
20 from socket import *  # pylint: disable=wildcard-import
21 import struct
22 import unittest
23
24 from scapy import all as scapy
25
26 SOL_IPV6 = 41
27 IP_RECVERR = 11
28 IPV6_RECVERR = 25
29 IP_TRANSPARENT = 19
30 IPV6_TRANSPARENT = 75
31 IPV6_TCLASS = 67
32 SO_BINDTODEVICE = 25
33 SO_MARK = 36
34 IPV6_FLOWLABEL_MGR = 32
35 IPV6_FLOWINFO_SEND = 33
36
37 ETH_P_IP = 0x0800
38 ETH_P_IPV6 = 0x86dd
39
40 IPPROTO_GRE = 47
41
42 SIOCSIFHWADDR = 0x8924
43
44 IPV6_FL_A_GET = 0
45 IPV6_FL_A_PUT = 1
46 IPV6_FL_A_RENEW = 1
47
48 IPV6_FL_F_CREATE = 1
49 IPV6_FL_F_EXCL = 2
50
51 IPV6_FL_S_NONE = 0
52 IPV6_FL_S_EXCL = 1
53 IPV6_FL_S_ANY = 255
54
55 IPV4_PING = "\x08\x00\x00\x00\x0a\xce\x00\x03"
56 IPV6_PING = "\x80\x00\x00\x00\x0a\xce\x00\x03"
57
58 IPV4_ADDR = "8.8.8.8"
59 IPV6_ADDR = "2001:4860:4860::8888"
60
61 IPV6_SEQ_DGRAM_HEADER = ("  sl  "
62                          "local_address                         "
63                          "remote_address                        "
64                          "st tx_queue rx_queue tr tm->when retrnsmt"
65                          "   uid  timeout inode ref pointer drops\n")
66
67 # Arbitrary packet payload.
68 UDP_PAYLOAD = str(scapy.DNS(rd=1,
69                             id=random.randint(0, 65535),
70                             qd=scapy.DNSQR(qname="wWW.GoOGle.CoM",
71                                            qtype="AAAA")))
72
73 # Unix group to use if we want to open sockets as non-root.
74 AID_INET = 3003
75
76
77 def LinuxVersion():
78   # Example: "3.4.67-00753-gb7a556f".
79   # Get the part before the dash.
80   version = os.uname()[2].split("-")[0]
81   # Convert it into a tuple such as (3, 4, 67). That allows comparing versions
82   # using < and >, since tuples are compared lexicographically.
83   version = tuple(int(i) for i in version.split("."))
84   return version
85
86
87 LINUX_VERSION = LinuxVersion()
88
89
90 def SetSocketTimeout(sock, ms):
91   s = ms / 1000
92   us = (ms % 1000) * 1000
93   sock.setsockopt(SOL_SOCKET, SO_RCVTIMEO, struct.pack("LL", s, us))
94
95
96 def SetSocketTos(s, tos):
97   level = {AF_INET: SOL_IP, AF_INET6: SOL_IPV6}[s.family]
98   option = {AF_INET: IP_TOS, AF_INET6: IPV6_TCLASS}[s.family]
99   s.setsockopt(level, option, tos)
100
101
102 def SetNonBlocking(fd):
103   flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
104   fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
105
106
107 # Convenience functions to create sockets.
108 def Socket(family, sock_type, protocol):
109   s = socket(family, sock_type, protocol)
110   SetSocketTimeout(s, 1000)
111   return s
112
113
114 def PingSocket(family):
115   proto = {AF_INET: IPPROTO_ICMP, AF_INET6: IPPROTO_ICMPV6}[family]
116   return Socket(family, SOCK_DGRAM, proto)
117
118
119 def IPv4PingSocket():
120   return PingSocket(AF_INET)
121
122
123 def IPv6PingSocket():
124   return PingSocket(AF_INET6)
125
126
127 def TCPSocket(family):
128   s = Socket(family, SOCK_STREAM, IPPROTO_TCP)
129   SetNonBlocking(s.fileno())
130   return s
131
132
133 def IPv4TCPSocket():
134   return TCPSocket(AF_INET)
135
136
137 def IPv6TCPSocket():
138   return TCPSocket(AF_INET6)
139
140
141 def UDPSocket(family):
142   return Socket(family, SOCK_DGRAM, IPPROTO_UDP)
143
144
145 def RawGRESocket(family):
146   s = Socket(family, SOCK_RAW, IPPROTO_GRE)
147   return s
148
149
150 def GetInterfaceIndex(ifname):
151   s = IPv4PingSocket()
152   ifr = struct.pack("16si", ifname, 0)
153   ifr = fcntl.ioctl(s, scapy.SIOCGIFINDEX, ifr)
154   return struct.unpack("16si", ifr)[1]
155
156
157 def SetInterfaceHWAddr(ifname, hwaddr):
158   s = IPv4PingSocket()
159   hwaddr = hwaddr.replace(":", "")
160   hwaddr = hwaddr.decode("hex")
161   if len(hwaddr) != 6:
162     raise ValueError("Unknown hardware address length %d" % len(hwaddr))
163   ifr = struct.pack("16sH6s", ifname, scapy.ARPHDR_ETHER, hwaddr)
164   fcntl.ioctl(s, SIOCSIFHWADDR, ifr)
165
166
167 def SetInterfaceState(ifname, up):
168   s = IPv4PingSocket()
169   ifr = struct.pack("16sH", ifname, 0)
170   ifr = fcntl.ioctl(s, scapy.SIOCGIFFLAGS, ifr)
171   _, flags = struct.unpack("16sH", ifr)
172   if up:
173     flags |= scapy.IFF_UP
174   else:
175     flags &= ~scapy.IFF_UP
176   ifr = struct.pack("16sH", ifname, flags)
177   ifr = fcntl.ioctl(s, scapy.SIOCSIFFLAGS, ifr)
178
179
180 def SetInterfaceUp(ifname):
181   return SetInterfaceState(ifname, True)
182
183
184 def SetInterfaceDown(ifname):
185   return SetInterfaceState(ifname, False)
186
187
188 def FormatProcAddress(unformatted):
189   groups = []
190   for i in xrange(0, len(unformatted), 4):
191     groups.append(unformatted[i:i+4])
192   formatted = ":".join(groups)
193   # Compress the address.
194   address = inet_ntop(AF_INET6, inet_pton(AF_INET6, formatted))
195   return address
196
197
198 def FormatSockStatAddress(address):
199   if ":" in address:
200     family = AF_INET6
201   else:
202     family = AF_INET
203   binary = inet_pton(family, address)
204   out = ""
205   for i in xrange(0, len(binary), 4):
206     out += "%08X" % struct.unpack("=L", binary[i:i+4])
207   return out
208
209
210 def GetLinkAddress(ifname, linklocal):
211   addresses = open("/proc/net/if_inet6").readlines()
212   for address in addresses:
213     address = [s for s in address.strip().split(" ") if s]
214     if address[5] == ifname:
215       if (linklocal and address[0].startswith("fe80")
216           or not linklocal and not address[0].startswith("fe80")):
217         # Convert the address from raw hex to something with colons in it.
218         return FormatProcAddress(address[0])
219   return None
220
221
222 def GetDefaultRoute(version=6):
223   if version == 6:
224     routes = open("/proc/net/ipv6_route").readlines()
225     for route in routes:
226       route = [s for s in route.strip().split(" ") if s]
227       if (route[0] == "00000000000000000000000000000000" and route[1] == "00"
228           # Routes in non-default tables end up in /proc/net/ipv6_route!!!
229           and route[9] != "lo" and not route[9].startswith("nettest")):
230         return FormatProcAddress(route[4]), route[9]
231     raise ValueError("No IPv6 default route found")
232   elif version == 4:
233     routes = open("/proc/net/route").readlines()
234     for route in routes:
235       route = [s for s in route.strip().split("\t") if s]
236       if route[1] == "00000000" and route[7] == "00000000":
237         gw, iface = route[2], route[0]
238         gw = inet_ntop(AF_INET, gw.decode("hex")[::-1])
239         return gw, iface
240     raise ValueError("No IPv4 default route found")
241   else:
242     raise ValueError("Don't know about IPv%s" % version)
243
244
245 def GetDefaultRouteInterface():
246   unused_gw, iface = GetDefaultRoute()
247   return iface
248
249
250 def MakeFlowLabelOption(addr, label):
251   # struct in6_flowlabel_req {
252   #         struct in6_addr flr_dst;
253   #         __be32  flr_label;
254   #         __u8    flr_action;
255   #         __u8    flr_share;
256   #         __u16   flr_flags;
257   #         __u16   flr_expires;
258   #         __u16   flr_linger;
259   #         __u32   __flr_pad;
260   #         /* Options in format of IPV6_PKTOPTIONS */
261   # };
262   fmt = "16sIBBHHH4s"
263   assert struct.calcsize(fmt) == 32
264   addr = inet_pton(AF_INET6, addr)
265   assert len(addr) == 16
266   label = htonl(label & 0xfffff)
267   action = IPV6_FL_A_GET
268   share = IPV6_FL_S_ANY
269   flags = IPV6_FL_F_CREATE
270   pad = "\x00" * 4
271   return struct.pack(fmt, addr, label, action, share, flags, 0, 0, pad)
272
273
274 def SetFlowLabel(s, addr, label):
275   opt = MakeFlowLabelOption(addr, label)
276   s.setsockopt(SOL_IPV6, IPV6_FLOWLABEL_MGR, opt)
277   # Caller also needs to do s.setsockopt(SOL_IPV6, IPV6_FLOWINFO_SEND, 1).
278
279
280 # Determine network configuration.
281 try:
282   GetDefaultRoute(version=4)
283   HAVE_IPV4 = True
284 except ValueError:
285   HAVE_IPV4 = False
286
287 try:
288   GetDefaultRoute(version=6)
289   HAVE_IPV6 = True
290 except ValueError:
291   HAVE_IPV6 = False
292
293
294 class RunAsUid(object):
295
296   """Context guard to run a code block as a given UID."""
297
298   def __init__(self, uid):
299     self.uid = uid
300
301   def __enter__(self):
302     if self.uid:
303       self.saved_uid = os.geteuid()
304       self.saved_groups = os.getgroups()
305       if self.uid:
306         os.setgroups(self.saved_groups + [AID_INET])
307         os.seteuid(self.uid)
308
309   def __exit__(self, unused_type, unused_value, unused_traceback):
310     if self.uid:
311       os.seteuid(self.saved_uid)
312       os.setgroups(self.saved_groups)
313
314
315 class NetworkTest(unittest.TestCase):
316
317   def assertRaisesErrno(self, err_num, f, *args):
318     msg = os.strerror(err_num)
319     self.assertRaisesRegexp(EnvironmentError, msg, f, *args)
320
321
322 if __name__ == "__main__":
323   unittest.main()