3 # Copyright 2014 The Android Open Source Project
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
20 from socket import * # pylint: disable=wildcard-import
24 from scapy import all as scapy
34 IPV6_FLOWLABEL_MGR = 32
35 IPV6_FLOWINFO_SEND = 33
42 SIOCSIFHWADDR = 0x8924
55 IPV4_PING = "\x08\x00\x00\x00\x0a\xce\x00\x03"
56 IPV6_PING = "\x80\x00\x00\x00\x0a\xce\x00\x03"
59 IPV6_ADDR = "2001:4860:4860::8888"
61 IPV6_SEQ_DGRAM_HEADER = (" sl "
64 "st tx_queue rx_queue tr tm->when retrnsmt"
65 " uid timeout inode ref pointer drops\n")
67 # Arbitrary packet payload.
68 UDP_PAYLOAD = str(scapy.DNS(rd=1,
69 id=random.randint(0, 65535),
70 qd=scapy.DNSQR(qname="wWW.GoOGle.CoM",
73 # Unix group to use if we want to open sockets as non-root.
78 # Example: "3.4.67-00753-gb7a556f".
79 # Get the part before the dash.
80 version = os.uname()[2].split("-")[0]
81 # Convert it into a tuple such as (3, 4, 67). That allows comparing versions
82 # using < and >, since tuples are compared lexicographically.
83 version = tuple(int(i) for i in version.split("."))
87 LINUX_VERSION = LinuxVersion()
90 def SetSocketTimeout(sock, ms):
92 us = (ms % 1000) * 1000
93 sock.setsockopt(SOL_SOCKET, SO_RCVTIMEO, struct.pack("LL", s, us))
96 def SetSocketTos(s, tos):
97 level = {AF_INET: SOL_IP, AF_INET6: SOL_IPV6}[s.family]
98 option = {AF_INET: IP_TOS, AF_INET6: IPV6_TCLASS}[s.family]
99 s.setsockopt(level, option, tos)
102 def SetNonBlocking(fd):
103 flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
104 fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
107 # Convenience functions to create sockets.
108 def Socket(family, sock_type, protocol):
109 s = socket(family, sock_type, protocol)
110 SetSocketTimeout(s, 1000)
114 def PingSocket(family):
115 proto = {AF_INET: IPPROTO_ICMP, AF_INET6: IPPROTO_ICMPV6}[family]
116 return Socket(family, SOCK_DGRAM, proto)
119 def IPv4PingSocket():
120 return PingSocket(AF_INET)
123 def IPv6PingSocket():
124 return PingSocket(AF_INET6)
127 def TCPSocket(family):
128 s = Socket(family, SOCK_STREAM, IPPROTO_TCP)
129 SetNonBlocking(s.fileno())
134 return TCPSocket(AF_INET)
138 return TCPSocket(AF_INET6)
141 def UDPSocket(family):
142 return Socket(family, SOCK_DGRAM, IPPROTO_UDP)
145 def RawGRESocket(family):
146 s = Socket(family, SOCK_RAW, IPPROTO_GRE)
150 def GetInterfaceIndex(ifname):
152 ifr = struct.pack("16si", ifname, 0)
153 ifr = fcntl.ioctl(s, scapy.SIOCGIFINDEX, ifr)
154 return struct.unpack("16si", ifr)[1]
157 def SetInterfaceHWAddr(ifname, hwaddr):
159 hwaddr = hwaddr.replace(":", "")
160 hwaddr = hwaddr.decode("hex")
162 raise ValueError("Unknown hardware address length %d" % len(hwaddr))
163 ifr = struct.pack("16sH6s", ifname, scapy.ARPHDR_ETHER, hwaddr)
164 fcntl.ioctl(s, SIOCSIFHWADDR, ifr)
167 def SetInterfaceState(ifname, up):
169 ifr = struct.pack("16sH", ifname, 0)
170 ifr = fcntl.ioctl(s, scapy.SIOCGIFFLAGS, ifr)
171 _, flags = struct.unpack("16sH", ifr)
173 flags |= scapy.IFF_UP
175 flags &= ~scapy.IFF_UP
176 ifr = struct.pack("16sH", ifname, flags)
177 ifr = fcntl.ioctl(s, scapy.SIOCSIFFLAGS, ifr)
180 def SetInterfaceUp(ifname):
181 return SetInterfaceState(ifname, True)
184 def SetInterfaceDown(ifname):
185 return SetInterfaceState(ifname, False)
188 def FormatProcAddress(unformatted):
190 for i in xrange(0, len(unformatted), 4):
191 groups.append(unformatted[i:i+4])
192 formatted = ":".join(groups)
193 # Compress the address.
194 address = inet_ntop(AF_INET6, inet_pton(AF_INET6, formatted))
198 def FormatSockStatAddress(address):
203 binary = inet_pton(family, address)
205 for i in xrange(0, len(binary), 4):
206 out += "%08X" % struct.unpack("=L", binary[i:i+4])
210 def GetLinkAddress(ifname, linklocal):
211 addresses = open("/proc/net/if_inet6").readlines()
212 for address in addresses:
213 address = [s for s in address.strip().split(" ") if s]
214 if address[5] == ifname:
215 if (linklocal and address[0].startswith("fe80")
216 or not linklocal and not address[0].startswith("fe80")):
217 # Convert the address from raw hex to something with colons in it.
218 return FormatProcAddress(address[0])
222 def GetDefaultRoute(version=6):
224 routes = open("/proc/net/ipv6_route").readlines()
226 route = [s for s in route.strip().split(" ") if s]
227 if (route[0] == "00000000000000000000000000000000" and route[1] == "00"
228 # Routes in non-default tables end up in /proc/net/ipv6_route!!!
229 and route[9] != "lo" and not route[9].startswith("nettest")):
230 return FormatProcAddress(route[4]), route[9]
231 raise ValueError("No IPv6 default route found")
233 routes = open("/proc/net/route").readlines()
235 route = [s for s in route.strip().split("\t") if s]
236 if route[1] == "00000000" and route[7] == "00000000":
237 gw, iface = route[2], route[0]
238 gw = inet_ntop(AF_INET, gw.decode("hex")[::-1])
240 raise ValueError("No IPv4 default route found")
242 raise ValueError("Don't know about IPv%s" % version)
245 def GetDefaultRouteInterface():
246 unused_gw, iface = GetDefaultRoute()
250 def MakeFlowLabelOption(addr, label):
251 # struct in6_flowlabel_req {
252 # struct in6_addr flr_dst;
260 # /* Options in format of IPV6_PKTOPTIONS */
263 assert struct.calcsize(fmt) == 32
264 addr = inet_pton(AF_INET6, addr)
265 assert len(addr) == 16
266 label = htonl(label & 0xfffff)
267 action = IPV6_FL_A_GET
268 share = IPV6_FL_S_ANY
269 flags = IPV6_FL_F_CREATE
271 return struct.pack(fmt, addr, label, action, share, flags, 0, 0, pad)
274 def SetFlowLabel(s, addr, label):
275 opt = MakeFlowLabelOption(addr, label)
276 s.setsockopt(SOL_IPV6, IPV6_FLOWLABEL_MGR, opt)
277 # Caller also needs to do s.setsockopt(SOL_IPV6, IPV6_FLOWINFO_SEND, 1).
280 # Determine network configuration.
282 GetDefaultRoute(version=4)
288 GetDefaultRoute(version=6)
294 class RunAsUid(object):
296 """Context guard to run a code block as a given UID."""
298 def __init__(self, uid):
303 self.saved_uid = os.geteuid()
304 self.saved_groups = os.getgroups()
306 os.setgroups(self.saved_groups + [AID_INET])
309 def __exit__(self, unused_type, unused_value, unused_traceback):
311 os.seteuid(self.saved_uid)
312 os.setgroups(self.saved_groups)
315 class NetworkTest(unittest.TestCase):
317 def assertRaisesErrno(self, err_num, f, *args):
318 msg = os.strerror(err_num)
319 self.assertRaisesRegexp(EnvironmentError, msg, f, *args)
322 if __name__ == "__main__":