3 # pylint: disable=g-bad-todo
9 from socket import * # pylint: disable=wildcard-import
15 HAVE_PROC_NET_ICMP6 = os.path.isfile("/proc/net/icmp6")
18 class Ping6Test(net_test.NetworkTest):
21 if net_test.HAVE_IPV6:
22 self.ifname = net_test.GetDefaultRouteInterface()
23 self.ifindex = net_test.GetInterfaceIndex(self.ifname)
24 self.lladdr = net_test.GetLinkAddress(self.ifname, True)
25 self.globaladdr = net_test.GetLinkAddress(self.ifname, False)
27 def assertValidPingResponse(self, s, data):
31 rcvd, src = s.recvfrom(32768)
32 self.assertNotEqual(0, len(rcvd), "No data received")
34 # If this is a dual-stack socket sending to a mapped IPv4 address, treat it
36 if src[0].startswith("::ffff:"):
38 src = (src[0].replace("::ffff:", ""), src[1:])
40 # Check the data being sent is valid.
41 self.assertGreater(len(data), 7, "Not enough data for ping packet")
43 self.assertTrue(data.startswith("\x08\x00"), "Not an IPv4 echo request")
44 elif family == AF_INET6:
45 self.assertTrue(data.startswith("\x80\x00"), "Not an IPv6 echo request")
47 self.fail("Unknown socket address family %d" * s.family)
49 # Check address, ICMP type, and ICMP code.
51 addr, unused_port = src
52 self.assertGreaterEqual(len(addr), len("1.1.1.1"))
53 self.assertTrue(rcvd.startswith("\x00\x00"), "Not an IPv4 echo reply")
55 addr, unused_port, flowlabel, scope_id = src
56 self.assertGreaterEqual(len(addr), len("::"))
57 self.assertTrue(rcvd.startswith("\x81\x00"), "Not an IPv6 echo reply")
58 # Check that the flow label is zero and that the scope ID is sane.
59 self.assertEqual(flowlabel, 0)
60 self.assertLess(scope_id, 100)
62 # TODO: check the checksum. We can't do this easily now for ICMPv6 because
63 # we don't have the IP addresses so we can't construct the pseudoheader.
65 # Check the sequence number and the data.
66 self.assertEqual(len(data), len(rcvd))
67 self.assertEqual(data[6:].encode("hex"), rcvd[6:].encode("hex"))
69 def ReadProcNetSocket(self, protocol):
71 lines = open("/proc/net/%s" % protocol).readlines()
73 # Possibly check, and strip, header.
74 if protocol in ["icmp6", "raw6", "udp6"]:
75 self.assertEqual(net_test.IPV6_SEQ_DGRAM_HEADER, lines[0])
79 if protocol.endswith("6"):
83 regexp = re.compile(r" *(\d+): " # bucket
84 "([0-9A-F]{%d}:[0-9A-F]{4}) " # srcaddr, port
85 "([0-9A-F]{%d}:[0-9A-F]{4}) " # dstaddr, port
86 "([0-9A-F][0-9A-F]) " # state
87 "([0-9A-F]{8}:[0-9A-F]{8}) " # mem
88 "([0-9A-F]{2}:[0-9A-F]{8}) " # ?
95 "([0-9]+) *$" # drops, icmp has spaces
97 # Return a list of lists with only source / dest addresses for now.
100 (_, src, dst, state, mem,
101 _, _, uid, _, _, refcnt, _, drops) = regexp.match(line).groups()
102 out.append([src, dst, state, mem, uid, refcnt, drops])
105 def CheckSockStatFile(self, name, srcaddr, srcport, dstaddr, dstport, state,
107 expected = ["%s:%04X" % (net_test.FormatSockStatAddress(srcaddr), srcport),
108 "%s:%04X" % (net_test.FormatSockStatAddress(dstaddr), dstport),
110 "%08X:%08X" % (txmem, rxmem),
111 str(os.getuid()), "2", "0"]
112 actual = self.ReadProcNetSocket(name)[-1]
113 self.assertListEqual(expected, actual)
115 def testIPv4SendWithNoConnection(self):
116 s = net_test.IPv4PingSocket()
117 self.assertRaisesErrno(errno.EDESTADDRREQ, s.send, net_test.IPV4_PING)
119 @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
120 def testIPv6SendWithNoConnection(self):
121 s = net_test.IPv6PingSocket()
122 self.assertRaisesErrno(errno.EDESTADDRREQ, s.send, net_test.IPV6_PING)
124 def testIPv4LoopbackPingWithConnect(self):
125 s = net_test.IPv4PingSocket()
126 s.connect(("127.0.0.1", 55))
127 data = net_test.IPV4_PING + "foobarbaz"
129 self.assertValidPingResponse(s, data)
131 @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
132 def testIPv6LoopbackPingWithConnect(self):
133 s = net_test.IPv6PingSocket()
134 s.connect(("::1", 55))
135 s.send(net_test.IPV6_PING)
136 self.assertValidPingResponse(s, net_test.IPV6_PING)
138 def testIPv4PingUsingSendto(self):
139 s = net_test.IPv4PingSocket()
140 written = s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 55))
141 self.assertEquals(len(net_test.IPV4_PING), written)
142 self.assertValidPingResponse(s, net_test.IPV4_PING)
144 @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
145 def testIPv6PingUsingSendto(self):
146 s = net_test.IPv6PingSocket()
147 written = s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
148 self.assertEquals(len(net_test.IPV6_PING), written)
149 self.assertValidPingResponse(s, net_test.IPV6_PING)
151 def testIPv4NoCrash(self):
152 # Python 2.x does not provide either read() or recvmsg.
153 s = net_test.IPv4PingSocket()
154 written = s.sendto(net_test.IPV4_PING, ("127.0.0.1", 55))
155 self.assertEquals(len(net_test.IPV4_PING), written)
157 reply = posix.read(fd, 4096)
158 self.assertEquals(written, len(reply))
160 def testIPv6NoCrash(self):
161 # Python 2.x does not provide either read() or recvmsg.
162 s = net_test.IPv6PingSocket()
163 written = s.sendto(net_test.IPV6_PING, ("::1", 55))
164 self.assertEquals(len(net_test.IPV6_PING), written)
166 reply = posix.read(fd, 4096)
167 self.assertEquals(written, len(reply))
169 def testIPv4Bind(self):
170 # Bind to unspecified address.
171 s = net_test.IPv4PingSocket()
172 s.bind(("0.0.0.0", 544))
173 self.assertEquals(("0.0.0.0", 544), s.getsockname())
176 s = net_test.IPv4PingSocket()
177 s.bind(("127.0.0.1", 99))
178 self.assertEquals(("127.0.0.1", 99), s.getsockname())
180 # Binding twice is not allowed.
181 self.assertRaisesErrno(errno.EINVAL, s.bind, ("127.0.0.1", 22))
183 # But binding two different sockets to the same ID is allowed.
184 s2 = net_test.IPv4PingSocket()
185 s2.bind(("127.0.0.1", 99))
186 self.assertEquals(("127.0.0.1", 99), s2.getsockname())
187 s3 = net_test.IPv4PingSocket()
188 s3.bind(("127.0.0.1", 99))
189 self.assertEquals(("127.0.0.1", 99), s3.getsockname())
191 # If two sockets bind to the same port, the first one to call read() gets
193 s4 = net_test.IPv4PingSocket()
194 s5 = net_test.IPv4PingSocket()
195 s4.bind(("0.0.0.0", 167))
196 s5.bind(("0.0.0.0", 167))
197 s4.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 44))
198 self.assertValidPingResponse(s5, net_test.IPV4_PING)
199 net_test.SetSocketTimeout(s4, 100)
200 self.assertRaisesErrno(errno.EAGAIN, s4.recv, 32768)
202 # If SO_REUSEADDR is turned off, then we get EADDRINUSE.
203 s6 = net_test.IPv4PingSocket()
204 s4.setsockopt(SOL_SOCKET, SO_REUSEADDR, 0)
205 self.assertRaisesErrno(errno.EADDRINUSE, s6.bind, ("0.0.0.0", 167))
207 # Can't bind after sendto.
208 s = net_test.IPv4PingSocket()
209 s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 9132))
210 self.assertRaisesErrno(errno.EINVAL, s.bind, ("0.0.0.0", 5429))
212 @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
213 def testIPv6Bind(self):
214 # Bind to unspecified address.
215 s = net_test.IPv6PingSocket()
217 self.assertEquals(("::", 769, 0, 0), s.getsockname())
220 s = net_test.IPv6PingSocket()
222 self.assertEquals(("::1", 99, 0, 0), s.getsockname())
224 # Binding twice is not allowed.
225 self.assertRaisesErrno(errno.EINVAL, s.bind, ("::1", 22))
227 # But binding two different sockets to the same ID is allowed.
228 s2 = net_test.IPv6PingSocket()
230 self.assertEquals(("::1", 99, 0, 0), s2.getsockname())
231 s3 = net_test.IPv6PingSocket()
233 self.assertEquals(("::1", 99, 0, 0), s3.getsockname())
235 # Binding both IPv4 and IPv6 to the same socket works.
236 s4 = net_test.IPv4PingSocket()
237 s6 = net_test.IPv6PingSocket()
238 s4.bind(("0.0.0.0", 444))
239 s6.bind(("::", 666, 0, 0))
241 # Can't bind after sendto.
242 s = net_test.IPv6PingSocket()
243 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 9132))
244 self.assertRaisesErrno(errno.EINVAL, s.bind, ("::", 5429))
246 def testIPv4InvalidBind(self):
247 s = net_test.IPv4PingSocket()
248 self.assertRaisesErrno(errno.EADDRNOTAVAIL,
249 s.bind, ("255.255.255.255", 1026))
250 self.assertRaisesErrno(errno.EADDRNOTAVAIL,
251 s.bind, ("224.0.0.1", 651))
252 # Binding to an address we don't have only works with IP_TRANSPARENT.
253 self.assertRaisesErrno(errno.EADDRNOTAVAIL,
254 s.bind, (net_test.IPV4_ADDR, 651))
256 s.setsockopt(SOL_IP, net_test.IP_TRANSPARENT, 1)
257 s.bind((net_test.IPV4_ADDR, 651))
259 if e.errno == errno.EACCES:
260 pass # We're not root. let it go for now.
262 @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
263 def testIPv6InvalidBind(self):
264 s = net_test.IPv6PingSocket()
265 self.assertRaisesErrno(errno.EINVAL,
266 s.bind, ("ff02::2", 1026))
268 # Binding to an address we don't have only works with IPV6_TRANSPARENT.
269 self.assertRaisesErrno(errno.EADDRNOTAVAIL,
270 s.bind, (net_test.IPV6_ADDR, 651))
272 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_TRANSPARENT, 1)
273 s.bind((net_test.IPV6_ADDR, 651))
275 if e.errno == errno.EACCES:
276 pass # We're not root. let it go for now.
278 @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
279 def testIPv6ScopedBind(self):
280 # Can't bind to a link-local address without a scope ID.
281 s = net_test.IPv6PingSocket()
282 self.assertRaisesErrno(errno.EINVAL,
283 s.bind, (self.lladdr, 1026, 0, 0))
285 # Binding to a link-local address with a scope ID works, and the scope ID is
286 # returned by a subsequent getsockname. Interestingly, Python's getsockname
287 # returns "fe80:1%foo", even though it does not understand it.
288 expected = self.lladdr + "%" + self.ifname
289 s.bind((self.lladdr, 4646, 0, self.ifindex))
290 self.assertEquals((expected, 4646, 0, self.ifindex), s.getsockname())
292 # Of course, for the above to work the address actually has to be configured
294 self.assertRaisesErrno(errno.EADDRNOTAVAIL,
295 s.bind, ("fe80::f00", 1026, 0, 1))
297 # Scope IDs on non-link-local addresses are silently ignored.
298 s = net_test.IPv6PingSocket()
299 s.bind(("::1", 1234, 0, 1))
300 self.assertEquals(("::1", 1234, 0, 0), s.getsockname())
302 @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
303 def testBindAffectsIdentifier(self):
304 s = net_test.IPv6PingSocket()
305 s.bind((self.globaladdr, 0xf976))
306 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
307 self.assertEquals("\xf9\x76", s.recv(32768)[4:6])
309 s = net_test.IPv6PingSocket()
310 s.bind((self.globaladdr, 0xace))
311 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
312 self.assertEquals("\x0a\xce", s.recv(32768)[4:6])
314 @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
315 def testLinkLocalAddress(self):
316 s = net_test.IPv6PingSocket()
317 # Sending to a link-local address with no scope fails with EINVAL.
318 self.assertRaisesErrno(errno.EINVAL,
319 s.sendto, net_test.IPV6_PING, ("fe80::1", 55))
320 # Sending to link-local address with a scope succeeds. Note that Python
321 # doesn't understand the "fe80::1%lo" format, even though it returns it.
322 s.sendto(net_test.IPV6_PING, ("fe80::1", 55, 0, self.ifindex))
323 # No exceptions? Good.
325 @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
326 def testMappedAddressFails(self):
327 s = net_test.IPv6PingSocket()
328 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
329 self.assertValidPingResponse(s, net_test.IPV6_PING)
330 s.sendto(net_test.IPV6_PING, ("2001:4860:4860::8844", 55))
331 self.assertValidPingResponse(s, net_test.IPV6_PING)
332 self.assertRaisesErrno(errno.EINVAL, s.sendto, net_test.IPV6_PING,
333 ("::ffff:192.0.2.1", 55))
335 @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
336 def testFlowLabel(self):
337 s = net_test.IPv6PingSocket()
339 # Specifying a flowlabel without having set IPV6_FLOWINFO_SEND succeeds but
340 # the flow label in the packet is not set.
341 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 93, 0xdead, 0))
342 self.assertValidPingResponse(s, net_test.IPV6_PING) # Checks flow label==0.
344 # If IPV6_FLOWINFO_SEND is set on the socket, attempting to set a flow label
345 # that is not registered with the flow manager should return EINVAL...
346 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_FLOWINFO_SEND, 1)
347 # ... but this doesn't work yet.
349 self.assertRaisesErrno(errno.EINVAL, s.sendto, net_test.IPV6_PING,
350 (net_test.IPV6_ADDR, 93, 0xdead, 0))
352 # After registering the flow label, it gets sent properly, appears in the
353 # output packet, and is returned in the response.
354 net_test.SetFlowLabel(s, net_test.IPV6_ADDR, 0xdead)
355 self.assertEqual(1, s.getsockopt(net_test.SOL_IPV6,
356 net_test.IPV6_FLOWINFO_SEND))
357 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 93, 0xdead, 0))
358 _, src = s.recvfrom(32768)
359 _, _, flowlabel, _ = src
360 self.assertEqual(0xdead, flowlabel & 0xfffff)
362 def testIPv4Error(self):
363 s = net_test.IPv4PingSocket()
364 s.setsockopt(SOL_IP, IP_TTL, 2)
365 s.setsockopt(SOL_IP, net_test.IP_RECVERR, 1)
366 s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 55))
367 # We can't check the actual error because Python 2.7 doesn't implement
368 # recvmsg, but we can at least check that the socket returns an error.
369 self.assertRaisesErrno(errno.EHOSTUNREACH, s.recv, 32768) # No response.
371 @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
372 def testIPv6Error(self):
373 s = net_test.IPv6PingSocket()
374 s.setsockopt(net_test.SOL_IPV6, IPV6_UNICAST_HOPS, 2)
375 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_RECVERR, 1)
376 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
377 # We can't check the actual error because Python 2.7 doesn't implement
378 # recvmsg, but we can at least check that the socket returns an error.
379 self.assertRaisesErrno(errno.EHOSTUNREACH, s.recv, 32768) # No response.
381 @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
382 def testIPv6MulticastPing(self):
383 s = net_test.IPv6PingSocket()
384 # Send a multicast ping and check we get at least one duplicate.
385 s.sendto(net_test.IPV6_PING, ("ff02::1", 55, 0, self.ifindex))
386 self.assertValidPingResponse(s, net_test.IPV6_PING)
387 self.assertValidPingResponse(s, net_test.IPV6_PING)
389 def testIPv4LargePacket(self):
390 s = net_test.IPv4PingSocket()
391 data = net_test.IPV4_PING + 20000 * "a"
392 s.sendto(data, ("127.0.0.1", 987))
393 self.assertValidPingResponse(s, data)
395 @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
396 def testIPv6LargePacket(self):
397 s = net_test.IPv6PingSocket()
398 s.bind(("::", 0xace))
399 data = net_test.IPV6_PING + "\x01" + 19994 * "\x00" + "aaaaa"
400 s.sendto(data, ("::1", 953))
402 @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
403 @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6")
404 def testIcmpSocketsNotInIcmp6(self):
405 numrows = len(self.ReadProcNetSocket("icmp"))
406 numrows6 = len(self.ReadProcNetSocket("icmp6"))
407 s = net_test.Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP)
408 s.bind(("127.0.0.1", 0xace))
409 s.connect(("127.0.0.1", 0xbeef))
410 self.assertEquals(numrows + 1, len(self.ReadProcNetSocket("icmp")))
411 self.assertEquals(numrows6, len(self.ReadProcNetSocket("icmp6")))
413 @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
414 @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6")
415 def testIcmp6SocketsNotInIcmp(self):
416 numrows = len(self.ReadProcNetSocket("icmp"))
417 numrows6 = len(self.ReadProcNetSocket("icmp6"))
418 s = net_test.IPv6PingSocket()
419 s.bind(("::1", 0xace))
420 s.connect(("::1", 0xbeef))
421 self.assertEquals(numrows, len(self.ReadProcNetSocket("icmp")))
422 self.assertEquals(numrows6 + 1, len(self.ReadProcNetSocket("icmp6")))
424 def testProcNetIcmp(self):
425 s = net_test.Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP)
426 s.bind(("127.0.0.1", 0xace))
427 s.connect(("127.0.0.1", 0xbeef))
428 self.CheckSockStatFile("icmp", "127.0.0.1", 0xace, "127.0.0.1", 0xbeef, 1)
430 @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
431 @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6")
432 def testProcNetIcmp6(self):
433 numrows6 = len(self.ReadProcNetSocket("icmp6"))
434 s = net_test.IPv6PingSocket()
435 s.bind(("::1", 0xace))
436 s.connect(("::1", 0xbeef))
437 self.CheckSockStatFile("icmp6", "::1", 0xace, "::1", 0xbeef, 1)
439 # Check the row goes away when the socket is closed.
441 self.assertEquals(numrows6, len(self.ReadProcNetSocket("icmp6")))
443 # Try send, bind and connect to check the addresses and the state.
444 s = net_test.IPv6PingSocket()
445 self.assertEqual(0, len(self.ReadProcNetSocket("icmp6")))
446 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 12345))
447 self.assertEqual(1, len(self.ReadProcNetSocket("icmp6")))
449 # Can't bind after sendto, apparently.
450 s = net_test.IPv6PingSocket()
451 self.assertEqual(0, len(self.ReadProcNetSocket("icmp6")))
452 s.bind((self.lladdr, 0xd00d, 0, self.ifindex))
453 self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "::", 0, 7)
455 # Check receive bytes.
456 s.connect(("ff02::1", 0xdead))
457 self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1)
458 s.send(net_test.IPV6_PING)
459 self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1,
460 txmem=0, rxmem=0x880)
461 self.assertValidPingResponse(s, net_test.IPV6_PING)
462 self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1,
465 @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
466 def testProcNetUdp6(self):
467 s = net_test.Socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP)
468 s.bind(("::1", 0xace))
469 s.connect(("::1", 0xbeef))
470 self.CheckSockStatFile("udp6", "::1", 0xace, "::1", 0xbeef, 1)
472 @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
473 def testProcNetRaw6(self):
474 s = net_test.Socket(AF_INET6, SOCK_RAW, IPPROTO_RAW)
475 s.bind(("::1", 0xace))
476 s.connect(("::1", 0xbeef))
477 self.CheckSockStatFile("raw6", "::1", 0xff, "::1", 0, 1)
480 if __name__ == "__main__":