3 # Copyright 2014 The Android Open Source Project
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 # pylint: disable=g-bad-todo
23 from socket import * # pylint: disable=wildcard-import
28 from scapy import all as scapy
31 import multinetwork_base
35 HAVE_PROC_NET_ICMP6 = os.path.isfile("/proc/net/icmp6")
39 ICMPV6_ECHO_REQUEST = 128
40 ICMPV6_ECHO_REPLY = 129
43 class PingReplyThread(threading.Thread):
46 INTERMEDIATE_IPV4 = "192.0.2.2"
47 INTERMEDIATE_IPV6 = "2001:db8:1:2::ace:d00d"
48 NEIGHBOURS = ["fe80::1"]
50 def __init__(self, tun, mymac, routermac):
51 super(PingReplyThread, self).__init__()
55 self._routermac = routermac
60 def ChecksumValid(self, packet):
61 # Get and clear the checksums.
62 def GetAndClearChecksum(layer):
66 checksum = layer.chksum
68 except AttributeError:
69 checksum = layer.cksum
73 def GetChecksum(layer):
76 except AttributeError:
79 layers = ["IP", "ICMP", scapy.ICMPv6EchoRequest]
82 sums[name] = GetAndClearChecksum(packet.getlayer(name))
84 # Serialize the packet, so scapy recalculates the checksums, and compare
85 # them with the ones in the packet.
86 packet = packet.__class__(str(packet))
88 layer = packet.getlayer(name)
89 if layer and GetChecksum(layer) != sums[name]:
94 def SendTimeExceeded(self, version, packet):
96 src = packet.getlayer(scapy.IP).src
98 scapy.IP(src=self.INTERMEDIATE_IPV4, dst=src) /
99 scapy.ICMP(type=11, code=0) /
102 src = packet.getlayer(scapy.IPv6).src
104 scapy.IPv6(src=self.INTERMEDIATE_IPV6, dst=src) /
105 scapy.ICMPv6TimeExceeded(code=0) /
108 def IPv4Packet(self, ip):
109 icmp = ip.getlayer(scapy.ICMP)
111 # We only support ping for now.
112 if (ip.proto != IPPROTO_ICMP or
113 icmp.type != ICMP_ECHO or
117 # Check the checksums.
118 if not self.ChecksumValid(ip):
121 if ip.ttl < self.MIN_TTL:
122 self.SendTimeExceeded(4, ip)
125 icmp.type = ICMP_ECHOREPLY
126 self.SwapAddresses(ip)
129 def IPv6Packet(self, ipv6):
130 icmpv6 = ipv6.getlayer(scapy.ICMPv6EchoRequest)
132 # We only support ping for now.
133 if (ipv6.nh != IPPROTO_ICMPV6 or
135 icmpv6.type != ICMPV6_ECHO_REQUEST or
139 # Check the checksums.
140 if not self.ChecksumValid(ipv6):
143 if ipv6.dst.startswith("ff02::"):
145 for src in self.NEIGHBOURS:
147 icmpv6.type = ICMPV6_ECHO_REPLY
148 self.SendPacket(ipv6)
149 elif ipv6.hlim < self.MIN_TTL:
150 self.SendTimeExceeded(6, ipv6)
152 icmpv6.type = ICMPV6_ECHO_REPLY
153 self.SwapAddresses(ipv6)
154 self.SendPacket(ipv6)
156 def SwapAddresses(self, packet):
158 packet.src = packet.dst
161 def SendPacket(self, packet):
162 packet = scapy.Ether(src=self._routermac, dst=self._mymac) / packet
164 posix.write(self._tun.fileno(), str(packet))
169 while not self._stopped:
172 packet = posix.read(self._tun.fileno(), 4096)
174 if e.errno == errno.EAGAIN:
179 ether = scapy.Ether(packet)
180 if ether.type == net_test.ETH_P_IPV6:
181 self.IPv6Packet(ether.payload)
182 elif ether.type == net_test.ETH_P_IP:
183 self.IPv4Packet(ether.payload)
186 class Ping6Test(multinetwork_base.MultiNetworkBaseTest):
190 super(Ping6Test, cls).setUpClass()
191 cls.netid = random.choice(cls.NETIDS)
192 cls.reply_thread = PingReplyThread(
194 cls.MyMacAddress(cls.netid),
195 cls.RouterMacAddress(cls.netid))
196 cls.SetDefaultNetwork(cls.netid)
197 cls.reply_thread.start()
200 def tearDownClass(cls):
201 cls.reply_thread.Stop()
202 cls.ClearDefaultNetwork()
203 super(Ping6Test, cls).tearDownClass()
206 self.ifname = self.GetInterfaceName(self.netid)
207 self.ifindex = self.ifindices[self.netid]
208 self.lladdr = net_test.GetLinkAddress(self.ifname, True)
209 self.globaladdr = net_test.GetLinkAddress(self.ifname, False)
211 def assertValidPingResponse(self, s, data):
215 rcvd, src = s.recvfrom(32768)
216 self.assertNotEqual(0, len(rcvd), "No data received")
218 # If this is a dual-stack socket sending to a mapped IPv4 address, treat it
220 if src[0].startswith("::ffff:"):
222 src = (src[0].replace("::ffff:", ""), src[1:])
224 # Check the data being sent is valid.
225 self.assertGreater(len(data), 7, "Not enough data for ping packet")
226 if family == AF_INET:
227 self.assertTrue(data.startswith("\x08\x00"), "Not an IPv4 echo request")
228 elif family == AF_INET6:
229 self.assertTrue(data.startswith("\x80\x00"), "Not an IPv6 echo request")
231 self.fail("Unknown socket address family %d" * s.family)
233 # Check address, ICMP type, and ICMP code.
234 if family == AF_INET:
235 addr, unused_port = src
236 self.assertGreaterEqual(len(addr), len("1.1.1.1"))
237 self.assertTrue(rcvd.startswith("\x00\x00"), "Not an IPv4 echo reply")
239 addr, unused_port, flowlabel, scope_id = src # pylint: disable=unbalanced-tuple-unpacking
240 self.assertGreaterEqual(len(addr), len("::"))
241 self.assertTrue(rcvd.startswith("\x81\x00"), "Not an IPv6 echo reply")
242 # Check that the flow label is zero and that the scope ID is sane.
243 self.assertEqual(flowlabel, 0)
244 if addr.startswith("fe80::"):
245 self.assertTrue(scope_id in self.ifindices.values())
247 self.assertEquals(0, scope_id)
249 # TODO: check the checksum. We can't do this easily now for ICMPv6 because
250 # we don't have the IP addresses so we can't construct the pseudoheader.
252 # Check the sequence number and the data.
253 self.assertEqual(len(data), len(rcvd))
254 self.assertEqual(data[6:].encode("hex"), rcvd[6:].encode("hex"))
256 def CheckSockStatFile(self, name, srcaddr, srcport, dstaddr, dstport, state,
258 expected = ["%s:%04X" % (net_test.FormatSockStatAddress(srcaddr), srcport),
259 "%s:%04X" % (net_test.FormatSockStatAddress(dstaddr), dstport),
261 "%08X:%08X" % (txmem, rxmem),
262 str(os.getuid()), "2", "0"]
263 actual = self.ReadProcNetSocket(name)[-1]
264 self.assertListEqual(expected, actual)
266 def testIPv4SendWithNoConnection(self):
267 s = net_test.IPv4PingSocket()
268 self.assertRaisesErrno(errno.EDESTADDRREQ, s.send, net_test.IPV4_PING)
270 def testIPv6SendWithNoConnection(self):
271 s = net_test.IPv6PingSocket()
272 self.assertRaisesErrno(errno.EDESTADDRREQ, s.send, net_test.IPV6_PING)
274 def testIPv4LoopbackPingWithConnect(self):
275 s = net_test.IPv4PingSocket()
276 s.connect(("127.0.0.1", 55))
277 data = net_test.IPV4_PING + "foobarbaz"
279 self.assertValidPingResponse(s, data)
281 def testIPv6LoopbackPingWithConnect(self):
282 s = net_test.IPv6PingSocket()
283 s.connect(("::1", 55))
284 s.send(net_test.IPV6_PING)
285 self.assertValidPingResponse(s, net_test.IPV6_PING)
287 def testIPv4PingUsingSendto(self):
288 s = net_test.IPv4PingSocket()
289 written = s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 55))
290 self.assertEquals(len(net_test.IPV4_PING), written)
291 self.assertValidPingResponse(s, net_test.IPV4_PING)
293 def testIPv6PingUsingSendto(self):
294 s = net_test.IPv6PingSocket()
295 written = s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
296 self.assertEquals(len(net_test.IPV6_PING), written)
297 self.assertValidPingResponse(s, net_test.IPV6_PING)
299 def testIPv4NoCrash(self):
300 # Python 2.x does not provide either read() or recvmsg.
301 s = net_test.IPv4PingSocket()
302 written = s.sendto(net_test.IPV4_PING, ("127.0.0.1", 55))
303 self.assertEquals(len(net_test.IPV4_PING), written)
305 reply = posix.read(fd, 4096)
306 self.assertEquals(written, len(reply))
308 def testIPv6NoCrash(self):
309 # Python 2.x does not provide either read() or recvmsg.
310 s = net_test.IPv6PingSocket()
311 written = s.sendto(net_test.IPV6_PING, ("::1", 55))
312 self.assertEquals(len(net_test.IPV6_PING), written)
314 reply = posix.read(fd, 4096)
315 self.assertEquals(written, len(reply))
317 def testCrossProtocolCrash(self):
318 # Checks that an ICMP error containing a ping packet that matches the ID
319 # of a socket of the wrong protocol (which can happen when using 464xlat)
320 # doesn't crash the kernel.
322 # We can only test this using IPv6 unreachables and IPv4 ping sockets,
323 # because IPv4 packets sent by scapy.send() on loopback are not received by
324 # the kernel. So we don't actually use this function yet.
325 def GetIPv4Unreachable(port): # pylint: disable=unused-variable
326 return (scapy.IP(src="192.0.2.1", dst="127.0.0.1") /
327 scapy.ICMP(type=3, code=0) /
328 scapy.IP(src="127.0.0.1", dst="127.0.0.1") /
329 scapy.ICMP(type=8, id=port, seq=1))
331 def GetIPv6Unreachable(port):
332 return (scapy.IPv6(src="::1", dst="::1") /
333 scapy.ICMPv6DestUnreach() /
334 scapy.IPv6(src="::1", dst="::1") /
335 scapy.ICMPv6EchoRequest(id=port, seq=1, data="foobarbaz"))
337 # An unreachable matching the ID of a socket of the wrong protocol
339 s = net_test.IPv4PingSocket()
340 s.connect(("127.0.0.1", 12345))
341 _, port = s.getsockname()
342 scapy.send(GetIPv6Unreachable(port))
345 def testCrossProtocolCalls(self):
346 """Tests that passing in the wrong family returns EAFNOSUPPORT.
348 Relevant kernel commits:
350 91a0b60 net/ping: handle protocol mismatching scenario
351 9145736d net: ping: Return EAFNOSUPPORT when appropriate.
354 78a6809 net/ping: handle protocol mismatching scenario
355 428e6d6 net: ping: Return EAFNOSUPPORT when appropriate.
358 def CheckEAFNoSupport(function, *args):
359 self.assertRaisesErrno(errno.EAFNOSUPPORT, function, *args)
361 ipv6sockaddr = csocket.Sockaddr((net_test.IPV6_ADDR, 53))
363 # In order to check that IPv6 socket calls return EAFNOSUPPORT when passed
364 # IPv4 socket address structures, we need to pass down a socket address
365 # length argument that's at least sizeof(sockaddr_in6). Otherwise, the calls
366 # will fail immediately with EINVAL because the passed-in socket length is
367 # too short. So create a sockaddr_in that's as long as a sockaddr_in6.
368 ipv4sockaddr = csocket.Sockaddr((net_test.IPV4_ADDR, 53))
369 ipv4sockaddr = csocket.SockaddrIn6(
370 ipv4sockaddr.Pack() +
371 "\x00" * (len(csocket.SockaddrIn6) - len(csocket.SockaddrIn)))
373 s4 = net_test.IPv4PingSocket()
374 s6 = net_test.IPv6PingSocket()
376 # We can't just call s.connect(), s.bind() etc. with a tuple of the wrong
377 # address family, because the Python implementation will just pass garbage
378 # down to the kernel. So call the C functions directly.
379 CheckEAFNoSupport(csocket.Bind, s4, ipv6sockaddr)
380 CheckEAFNoSupport(csocket.Bind, s6, ipv4sockaddr)
381 CheckEAFNoSupport(csocket.Connect, s4, ipv6sockaddr)
382 CheckEAFNoSupport(csocket.Connect, s6, ipv4sockaddr)
383 CheckEAFNoSupport(csocket.Sendmsg,
384 s4, ipv6sockaddr, net_test.IPV4_PING, None, 0)
385 CheckEAFNoSupport(csocket.Sendmsg,
386 s6, ipv4sockaddr, net_test.IPV6_PING, None, 0)
388 def testIPv4Bind(self):
389 # Bind to unspecified address.
390 s = net_test.IPv4PingSocket()
391 s.bind(("0.0.0.0", 544))
392 self.assertEquals(("0.0.0.0", 544), s.getsockname())
395 s = net_test.IPv4PingSocket()
396 s.bind(("127.0.0.1", 99))
397 self.assertEquals(("127.0.0.1", 99), s.getsockname())
399 # Binding twice is not allowed.
400 self.assertRaisesErrno(errno.EINVAL, s.bind, ("127.0.0.1", 22))
402 # But binding two different sockets to the same ID is allowed.
403 s2 = net_test.IPv4PingSocket()
404 s2.bind(("127.0.0.1", 99))
405 self.assertEquals(("127.0.0.1", 99), s2.getsockname())
406 s3 = net_test.IPv4PingSocket()
407 s3.bind(("127.0.0.1", 99))
408 self.assertEquals(("127.0.0.1", 99), s3.getsockname())
410 # If two sockets bind to the same port, the first one to call read() gets
412 s4 = net_test.IPv4PingSocket()
413 s5 = net_test.IPv4PingSocket()
414 s4.bind(("0.0.0.0", 167))
415 s5.bind(("0.0.0.0", 167))
416 s4.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 44))
417 self.assertValidPingResponse(s5, net_test.IPV4_PING)
418 net_test.SetSocketTimeout(s4, 100)
419 self.assertRaisesErrno(errno.EAGAIN, s4.recv, 32768)
421 # If SO_REUSEADDR is turned off, then we get EADDRINUSE.
422 s6 = net_test.IPv4PingSocket()
423 s4.setsockopt(SOL_SOCKET, SO_REUSEADDR, 0)
424 self.assertRaisesErrno(errno.EADDRINUSE, s6.bind, ("0.0.0.0", 167))
426 # Can't bind after sendto.
427 s = net_test.IPv4PingSocket()
428 s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 9132))
429 self.assertRaisesErrno(errno.EINVAL, s.bind, ("0.0.0.0", 5429))
431 def testIPv6Bind(self):
432 # Bind to unspecified address.
433 s = net_test.IPv6PingSocket()
435 self.assertEquals(("::", 769, 0, 0), s.getsockname())
438 s = net_test.IPv6PingSocket()
440 self.assertEquals(("::1", 99, 0, 0), s.getsockname())
442 # Binding twice is not allowed.
443 self.assertRaisesErrno(errno.EINVAL, s.bind, ("::1", 22))
445 # But binding two different sockets to the same ID is allowed.
446 s2 = net_test.IPv6PingSocket()
448 self.assertEquals(("::1", 99, 0, 0), s2.getsockname())
449 s3 = net_test.IPv6PingSocket()
451 self.assertEquals(("::1", 99, 0, 0), s3.getsockname())
453 # Binding both IPv4 and IPv6 to the same socket works.
454 s4 = net_test.IPv4PingSocket()
455 s6 = net_test.IPv6PingSocket()
456 s4.bind(("0.0.0.0", 444))
457 s6.bind(("::", 666, 0, 0))
459 # Can't bind after sendto.
460 s = net_test.IPv6PingSocket()
461 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 9132))
462 self.assertRaisesErrno(errno.EINVAL, s.bind, ("::", 5429))
464 def testIPv4InvalidBind(self):
465 s = net_test.IPv4PingSocket()
466 self.assertRaisesErrno(errno.EADDRNOTAVAIL,
467 s.bind, ("255.255.255.255", 1026))
468 self.assertRaisesErrno(errno.EADDRNOTAVAIL,
469 s.bind, ("224.0.0.1", 651))
470 # Binding to an address we don't have only works with IP_TRANSPARENT.
471 self.assertRaisesErrno(errno.EADDRNOTAVAIL,
472 s.bind, (net_test.IPV4_ADDR, 651))
474 s.setsockopt(SOL_IP, net_test.IP_TRANSPARENT, 1)
475 s.bind((net_test.IPV4_ADDR, 651))
477 if e.errno == errno.EACCES:
478 pass # We're not root. let it go for now.
480 def testIPv6InvalidBind(self):
481 s = net_test.IPv6PingSocket()
482 self.assertRaisesErrno(errno.EINVAL,
483 s.bind, ("ff02::2", 1026))
485 # Binding to an address we don't have only works with IPV6_TRANSPARENT.
486 self.assertRaisesErrno(errno.EADDRNOTAVAIL,
487 s.bind, (net_test.IPV6_ADDR, 651))
489 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_TRANSPARENT, 1)
490 s.bind((net_test.IPV6_ADDR, 651))
492 if e.errno == errno.EACCES:
493 pass # We're not root. let it go for now.
495 def testAfUnspecBind(self):
496 # Binding to AF_UNSPEC is treated as IPv4 if the address is 0.0.0.0.
497 s4 = net_test.IPv4PingSocket()
498 sockaddr = csocket.Sockaddr(("0.0.0.0", 12996))
499 sockaddr.family = AF_UNSPEC
500 csocket.Bind(s4, sockaddr)
501 self.assertEquals(("0.0.0.0", 12996), s4.getsockname())
503 # But not if the address is anything else.
504 sockaddr = csocket.Sockaddr(("127.0.0.1", 58234))
505 sockaddr.family = AF_UNSPEC
506 self.assertRaisesErrno(errno.EAFNOSUPPORT, csocket.Bind, s4, sockaddr)
508 # This doesn't work for IPv6.
509 s6 = net_test.IPv6PingSocket()
510 sockaddr = csocket.Sockaddr(("::1", 58997))
511 sockaddr.family = AF_UNSPEC
512 self.assertRaisesErrno(errno.EAFNOSUPPORT, csocket.Bind, s6, sockaddr)
514 def testIPv6ScopedBind(self):
515 # Can't bind to a link-local address without a scope ID.
516 s = net_test.IPv6PingSocket()
517 self.assertRaisesErrno(errno.EINVAL,
518 s.bind, (self.lladdr, 1026, 0, 0))
520 # Binding to a link-local address with a scope ID works, and the scope ID is
521 # returned by a subsequent getsockname. Interestingly, Python's getsockname
522 # returns "fe80:1%foo", even though it does not understand it.
523 expected = self.lladdr + "%" + self.ifname
524 s.bind((self.lladdr, 4646, 0, self.ifindex))
525 self.assertEquals((expected, 4646, 0, self.ifindex), s.getsockname())
527 # Of course, for the above to work the address actually has to be configured
529 self.assertRaisesErrno(errno.EADDRNOTAVAIL,
530 s.bind, ("fe80::f00", 1026, 0, 1))
532 # Scope IDs on non-link-local addresses are silently ignored.
533 s = net_test.IPv6PingSocket()
534 s.bind(("::1", 1234, 0, 1))
535 self.assertEquals(("::1", 1234, 0, 0), s.getsockname())
537 def testBindAffectsIdentifier(self):
538 s = net_test.IPv6PingSocket()
539 s.bind((self.globaladdr, 0xf976))
540 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
541 self.assertEquals("\xf9\x76", s.recv(32768)[4:6])
543 s = net_test.IPv6PingSocket()
544 s.bind((self.globaladdr, 0xace))
545 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
546 self.assertEquals("\x0a\xce", s.recv(32768)[4:6])
548 def testLinkLocalAddress(self):
549 s = net_test.IPv6PingSocket()
550 # Sending to a link-local address with no scope fails with EINVAL.
551 self.assertRaisesErrno(errno.EINVAL,
552 s.sendto, net_test.IPV6_PING, ("fe80::1", 55))
553 # Sending to link-local address with a scope succeeds. Note that Python
554 # doesn't understand the "fe80::1%lo" format, even though it returns it.
555 s.sendto(net_test.IPV6_PING, ("fe80::1", 55, 0, self.ifindex))
556 # No exceptions? Good.
558 def testMappedAddressFails(self):
559 s = net_test.IPv6PingSocket()
560 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
561 self.assertValidPingResponse(s, net_test.IPV6_PING)
562 s.sendto(net_test.IPV6_PING, ("2001:4860:4860::8844", 55))
563 self.assertValidPingResponse(s, net_test.IPV6_PING)
564 self.assertRaisesErrno(errno.EINVAL, s.sendto, net_test.IPV6_PING,
565 ("::ffff:192.0.2.1", 55))
567 @unittest.skipUnless(False, "skipping: does not work yet")
568 def testFlowLabel(self):
569 s = net_test.IPv6PingSocket()
571 # Specifying a flowlabel without having set IPV6_FLOWINFO_SEND succeeds but
572 # the flow label in the packet is not set.
573 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 93, 0xdead, 0))
574 self.assertValidPingResponse(s, net_test.IPV6_PING) # Checks flow label==0.
576 # If IPV6_FLOWINFO_SEND is set on the socket, attempting to set a flow label
577 # that is not registered with the flow manager should return EINVAL...
578 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_FLOWINFO_SEND, 1)
579 # ... but this doesn't work yet.
581 self.assertRaisesErrno(errno.EINVAL, s.sendto, net_test.IPV6_PING,
582 (net_test.IPV6_ADDR, 93, 0xdead, 0))
584 # After registering the flow label, it gets sent properly, appears in the
585 # output packet, and is returned in the response.
586 net_test.SetFlowLabel(s, net_test.IPV6_ADDR, 0xdead)
587 self.assertEqual(1, s.getsockopt(net_test.SOL_IPV6,
588 net_test.IPV6_FLOWINFO_SEND))
589 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 93, 0xdead, 0))
590 _, src = s.recvfrom(32768)
591 _, _, flowlabel, _ = src
592 self.assertEqual(0xdead, flowlabel & 0xfffff)
594 def testIPv4Error(self):
595 s = net_test.IPv4PingSocket()
596 s.setsockopt(SOL_IP, IP_TTL, 2)
597 s.setsockopt(SOL_IP, net_test.IP_RECVERR, 1)
598 s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 55))
599 # We can't check the actual error because Python 2.7 doesn't implement
600 # recvmsg, but we can at least check that the socket returns an error.
601 self.assertRaisesErrno(errno.EHOSTUNREACH, s.recv, 32768) # No response.
603 def testIPv6Error(self):
604 s = net_test.IPv6PingSocket()
605 s.setsockopt(net_test.SOL_IPV6, IPV6_UNICAST_HOPS, 2)
606 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_RECVERR, 1)
607 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
608 # We can't check the actual error because Python 2.7 doesn't implement
609 # recvmsg, but we can at least check that the socket returns an error.
610 self.assertRaisesErrno(errno.EHOSTUNREACH, s.recv, 32768) # No response.
612 def testIPv6MulticastPing(self):
613 s = net_test.IPv6PingSocket()
614 # Send a multicast ping and check we get at least one duplicate.
615 # The setsockopt should not be necessary, but ping_v6_sendmsg has a bug.
616 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_MULTICAST_IF, self.ifindex)
617 s.sendto(net_test.IPV6_PING, ("ff02::1", 55, 0, self.ifindex))
618 self.assertValidPingResponse(s, net_test.IPV6_PING)
619 self.assertValidPingResponse(s, net_test.IPV6_PING)
621 def testIPv4LargePacket(self):
622 s = net_test.IPv4PingSocket()
623 data = net_test.IPV4_PING + 20000 * "a"
624 s.sendto(data, ("127.0.0.1", 987))
625 self.assertValidPingResponse(s, data)
627 def testIPv6LargePacket(self):
628 s = net_test.IPv6PingSocket()
629 s.bind(("::", 0xace))
630 data = net_test.IPV6_PING + "\x01" + 19994 * "\x00" + "aaaaa"
631 s.sendto(data, ("::1", 953))
633 @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6")
634 def testIcmpSocketsNotInIcmp6(self):
635 numrows = len(self.ReadProcNetSocket("icmp"))
636 numrows6 = len(self.ReadProcNetSocket("icmp6"))
637 s = net_test.Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP)
638 s.bind(("127.0.0.1", 0xace))
639 s.connect(("127.0.0.1", 0xbeef))
640 self.assertEquals(numrows + 1, len(self.ReadProcNetSocket("icmp")))
641 self.assertEquals(numrows6, len(self.ReadProcNetSocket("icmp6")))
643 @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6")
644 def testIcmp6SocketsNotInIcmp(self):
645 numrows = len(self.ReadProcNetSocket("icmp"))
646 numrows6 = len(self.ReadProcNetSocket("icmp6"))
647 s = net_test.IPv6PingSocket()
648 s.bind(("::1", 0xace))
649 s.connect(("::1", 0xbeef))
650 self.assertEquals(numrows, len(self.ReadProcNetSocket("icmp")))
651 self.assertEquals(numrows6 + 1, len(self.ReadProcNetSocket("icmp6")))
653 def testProcNetIcmp(self):
654 s = net_test.Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP)
655 s.bind(("127.0.0.1", 0xace))
656 s.connect(("127.0.0.1", 0xbeef))
657 self.CheckSockStatFile("icmp", "127.0.0.1", 0xace, "127.0.0.1", 0xbeef, 1)
659 @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6")
660 def testProcNetIcmp6(self):
661 numrows6 = len(self.ReadProcNetSocket("icmp6"))
662 s = net_test.IPv6PingSocket()
663 s.bind(("::1", 0xace))
664 s.connect(("::1", 0xbeef))
665 self.CheckSockStatFile("icmp6", "::1", 0xace, "::1", 0xbeef, 1)
667 # Check the row goes away when the socket is closed.
669 self.assertEquals(numrows6, len(self.ReadProcNetSocket("icmp6")))
671 # Try send, bind and connect to check the addresses and the state.
672 s = net_test.IPv6PingSocket()
673 self.assertEqual(0, len(self.ReadProcNetSocket("icmp6")))
674 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 12345))
675 self.assertEqual(1, len(self.ReadProcNetSocket("icmp6")))
677 # Can't bind after sendto, apparently.
678 s = net_test.IPv6PingSocket()
679 self.assertEqual(0, len(self.ReadProcNetSocket("icmp6")))
680 s.bind((self.lladdr, 0xd00d, 0, self.ifindex))
681 self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "::", 0, 7)
683 # Check receive bytes.
684 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_MULTICAST_IF, self.ifindex)
685 s.connect(("ff02::1", 0xdead))
686 self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1)
687 s.send(net_test.IPV6_PING)
688 time.sleep(0.01) # Give the other thread time to reply.
689 self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1,
690 txmem=0, rxmem=0x300)
691 self.assertValidPingResponse(s, net_test.IPV6_PING)
692 self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1,
695 def testProcNetUdp6(self):
696 s = net_test.Socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP)
697 s.bind(("::1", 0xace))
698 s.connect(("::1", 0xbeef))
699 self.CheckSockStatFile("udp6", "::1", 0xace, "::1", 0xbeef, 1)
701 def testProcNetRaw6(self):
702 s = net_test.Socket(AF_INET6, SOCK_RAW, IPPROTO_RAW)
703 s.bind(("::1", 0xace))
704 s.connect(("::1", 0xbeef))
705 self.CheckSockStatFile("raw6", "::1", 0xff, "::1", 0, 1)
708 if __name__ == "__main__":