OSDN Git Service

Update binder benchmark to use google-benchmark
[android-x86/system-extras.git] / tests / net_test / ping6_test.py
1 #!/usr/bin/python
2 #
3 # Copyright 2014 The Android Open Source Project
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17 # pylint: disable=g-bad-todo
18
19 import errno
20 import os
21 import posix
22 import random
23 from socket import *  # pylint: disable=wildcard-import
24 import threading
25 import time
26 import unittest
27
28 from scapy import all as scapy
29
30 import csocket
31 import multinetwork_base
32 import net_test
33
34
35 HAVE_PROC_NET_ICMP6 = os.path.isfile("/proc/net/icmp6")
36
37 ICMP_ECHO = 8
38 ICMP_ECHOREPLY = 0
39 ICMPV6_ECHO_REQUEST = 128
40 ICMPV6_ECHO_REPLY = 129
41
42
43 class PingReplyThread(threading.Thread):
44
45   MIN_TTL = 10
46   INTERMEDIATE_IPV4 = "192.0.2.2"
47   INTERMEDIATE_IPV6 = "2001:db8:1:2::ace:d00d"
48   NEIGHBOURS = ["fe80::1"]
49
50   def __init__(self, tun, mymac, routermac):
51     super(PingReplyThread, self).__init__()
52     self._tun = tun
53     self._stopped = False
54     self._mymac = mymac
55     self._routermac = routermac
56
57   def Stop(self):
58     self._stopped = True
59
60   def ChecksumValid(self, packet):
61     # Get and clear the checksums.
62     def GetAndClearChecksum(layer):
63       if not layer:
64         return
65       try:
66         checksum = layer.chksum
67         del layer.chksum
68       except AttributeError:
69         checksum = layer.cksum
70         del layer.cksum
71       return checksum
72
73     def GetChecksum(layer):
74       try:
75         return layer.chksum
76       except AttributeError:
77         return layer.cksum
78
79     layers = ["IP", "ICMP", scapy.ICMPv6EchoRequest]
80     sums = {}
81     for name in layers:
82       sums[name] = GetAndClearChecksum(packet.getlayer(name))
83
84     # Serialize the packet, so scapy recalculates the checksums, and compare
85     # them with the ones in the packet.
86     packet = packet.__class__(str(packet))
87     for name in layers:
88       layer = packet.getlayer(name)
89       if layer and GetChecksum(layer) != sums[name]:
90         return False
91
92     return True
93
94   def SendTimeExceeded(self, version, packet):
95     if version == 4:
96       src = packet.getlayer(scapy.IP).src
97       self.SendPacket(
98           scapy.IP(src=self.INTERMEDIATE_IPV4, dst=src) /
99           scapy.ICMP(type=11, code=0) /
100           packet)
101     elif version == 6:
102       src = packet.getlayer(scapy.IPv6).src
103       self.SendPacket(
104           scapy.IPv6(src=self.INTERMEDIATE_IPV6, dst=src) /
105           scapy.ICMPv6TimeExceeded(code=0) /
106           packet)
107
108   def IPv4Packet(self, ip):
109     icmp = ip.getlayer(scapy.ICMP)
110
111     # We only support ping for now.
112     if (ip.proto != IPPROTO_ICMP or
113         icmp.type != ICMP_ECHO or
114         icmp.code != 0):
115       return
116
117     # Check the checksums.
118     if not self.ChecksumValid(ip):
119       return
120
121     if ip.ttl < self.MIN_TTL:
122       self.SendTimeExceeded(4, ip)
123       return
124
125     icmp.type = ICMP_ECHOREPLY
126     self.SwapAddresses(ip)
127     self.SendPacket(ip)
128
129   def IPv6Packet(self, ipv6):
130     icmpv6 = ipv6.getlayer(scapy.ICMPv6EchoRequest)
131
132     # We only support ping for now.
133     if (ipv6.nh != IPPROTO_ICMPV6 or
134         not icmpv6 or
135         icmpv6.type != ICMPV6_ECHO_REQUEST or
136         icmpv6.code != 0):
137       return
138
139     # Check the checksums.
140     if not self.ChecksumValid(ipv6):
141       return
142
143     if ipv6.dst.startswith("ff02::"):
144       ipv6.dst = ipv6.src
145       for src in self.NEIGHBOURS:
146         ipv6.src = src
147         icmpv6.type = ICMPV6_ECHO_REPLY
148         self.SendPacket(ipv6)
149     elif ipv6.hlim < self.MIN_TTL:
150       self.SendTimeExceeded(6, ipv6)
151     else:
152       icmpv6.type = ICMPV6_ECHO_REPLY
153       self.SwapAddresses(ipv6)
154       self.SendPacket(ipv6)
155
156   def SwapAddresses(self, packet):
157     src = packet.src
158     packet.src = packet.dst
159     packet.dst = src
160
161   def SendPacket(self, packet):
162     packet = scapy.Ether(src=self._routermac, dst=self._mymac) / packet
163     try:
164       posix.write(self._tun.fileno(), str(packet))
165     except ValueError:
166       pass
167
168   def run(self):
169     while not self._stopped:
170
171       try:
172         packet = posix.read(self._tun.fileno(), 4096)
173       except OSError, e:
174         if e.errno == errno.EAGAIN:
175           continue
176         else:
177           break
178
179       ether = scapy.Ether(packet)
180       if ether.type == net_test.ETH_P_IPV6:
181         self.IPv6Packet(ether.payload)
182       elif ether.type == net_test.ETH_P_IP:
183         self.IPv4Packet(ether.payload)
184
185
186 class Ping6Test(multinetwork_base.MultiNetworkBaseTest):
187
188   @classmethod
189   def setUpClass(cls):
190     super(Ping6Test, cls).setUpClass()
191     cls.netid = random.choice(cls.NETIDS)
192     cls.reply_thread = PingReplyThread(
193         cls.tuns[cls.netid],
194         cls.MyMacAddress(cls.netid),
195         cls.RouterMacAddress(cls.netid))
196     cls.SetDefaultNetwork(cls.netid)
197     cls.reply_thread.start()
198
199   @classmethod
200   def tearDownClass(cls):
201     cls.reply_thread.Stop()
202     cls.ClearDefaultNetwork()
203     super(Ping6Test, cls).tearDownClass()
204
205   def setUp(self):
206     self.ifname = self.GetInterfaceName(self.netid)
207     self.ifindex = self.ifindices[self.netid]
208     self.lladdr = net_test.GetLinkAddress(self.ifname, True)
209     self.globaladdr = net_test.GetLinkAddress(self.ifname, False)
210
211   def assertValidPingResponse(self, s, data):
212     family = s.family
213
214     # Receive the reply.
215     rcvd, src = s.recvfrom(32768)
216     self.assertNotEqual(0, len(rcvd), "No data received")
217
218     # If this is a dual-stack socket sending to a mapped IPv4 address, treat it
219     # as IPv4.
220     if src[0].startswith("::ffff:"):
221       family = AF_INET
222       src = (src[0].replace("::ffff:", ""), src[1:])
223
224     # Check the data being sent is valid.
225     self.assertGreater(len(data), 7, "Not enough data for ping packet")
226     if family == AF_INET:
227       self.assertTrue(data.startswith("\x08\x00"), "Not an IPv4 echo request")
228     elif family == AF_INET6:
229       self.assertTrue(data.startswith("\x80\x00"), "Not an IPv6 echo request")
230     else:
231       self.fail("Unknown socket address family %d" * s.family)
232
233     # Check address, ICMP type, and ICMP code.
234     if family == AF_INET:
235       addr, unused_port = src
236       self.assertGreaterEqual(len(addr), len("1.1.1.1"))
237       self.assertTrue(rcvd.startswith("\x00\x00"), "Not an IPv4 echo reply")
238     else:
239       addr, unused_port, flowlabel, scope_id = src  # pylint: disable=unbalanced-tuple-unpacking
240       self.assertGreaterEqual(len(addr), len("::"))
241       self.assertTrue(rcvd.startswith("\x81\x00"), "Not an IPv6 echo reply")
242       # Check that the flow label is zero and that the scope ID is sane.
243       self.assertEqual(flowlabel, 0)
244       if addr.startswith("fe80::"):
245         self.assertTrue(scope_id in self.ifindices.values())
246       else:
247         self.assertEquals(0, scope_id)
248
249     # TODO: check the checksum. We can't do this easily now for ICMPv6 because
250     # we don't have the IP addresses so we can't construct the pseudoheader.
251
252     # Check the sequence number and the data.
253     self.assertEqual(len(data), len(rcvd))
254     self.assertEqual(data[6:].encode("hex"), rcvd[6:].encode("hex"))
255
256   def CheckSockStatFile(self, name, srcaddr, srcport, dstaddr, dstport, state,
257                         txmem=0, rxmem=0):
258     expected = ["%s:%04X" % (net_test.FormatSockStatAddress(srcaddr), srcport),
259                 "%s:%04X" % (net_test.FormatSockStatAddress(dstaddr), dstport),
260                 "%02X" % state,
261                 "%08X:%08X" % (txmem, rxmem),
262                 str(os.getuid()), "2", "0"]
263     actual = self.ReadProcNetSocket(name)[-1]
264     self.assertListEqual(expected, actual)
265
266   def testIPv4SendWithNoConnection(self):
267     s = net_test.IPv4PingSocket()
268     self.assertRaisesErrno(errno.EDESTADDRREQ, s.send, net_test.IPV4_PING)
269
270   def testIPv6SendWithNoConnection(self):
271     s = net_test.IPv6PingSocket()
272     self.assertRaisesErrno(errno.EDESTADDRREQ, s.send, net_test.IPV6_PING)
273
274   def testIPv4LoopbackPingWithConnect(self):
275     s = net_test.IPv4PingSocket()
276     s.connect(("127.0.0.1", 55))
277     data = net_test.IPV4_PING + "foobarbaz"
278     s.send(data)
279     self.assertValidPingResponse(s, data)
280
281   def testIPv6LoopbackPingWithConnect(self):
282     s = net_test.IPv6PingSocket()
283     s.connect(("::1", 55))
284     s.send(net_test.IPV6_PING)
285     self.assertValidPingResponse(s, net_test.IPV6_PING)
286
287   def testIPv4PingUsingSendto(self):
288     s = net_test.IPv4PingSocket()
289     written = s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 55))
290     self.assertEquals(len(net_test.IPV4_PING), written)
291     self.assertValidPingResponse(s, net_test.IPV4_PING)
292
293   def testIPv6PingUsingSendto(self):
294     s = net_test.IPv6PingSocket()
295     written = s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
296     self.assertEquals(len(net_test.IPV6_PING), written)
297     self.assertValidPingResponse(s, net_test.IPV6_PING)
298
299   def testIPv4NoCrash(self):
300     # Python 2.x does not provide either read() or recvmsg.
301     s = net_test.IPv4PingSocket()
302     written = s.sendto(net_test.IPV4_PING, ("127.0.0.1", 55))
303     self.assertEquals(len(net_test.IPV4_PING), written)
304     fd = s.fileno()
305     reply = posix.read(fd, 4096)
306     self.assertEquals(written, len(reply))
307
308   def testIPv6NoCrash(self):
309     # Python 2.x does not provide either read() or recvmsg.
310     s = net_test.IPv6PingSocket()
311     written = s.sendto(net_test.IPV6_PING, ("::1", 55))
312     self.assertEquals(len(net_test.IPV6_PING), written)
313     fd = s.fileno()
314     reply = posix.read(fd, 4096)
315     self.assertEquals(written, len(reply))
316
317   def testCrossProtocolCrash(self):
318     # Checks that an ICMP error containing a ping packet that matches the ID
319     # of a socket of the wrong protocol (which can happen when using 464xlat)
320     # doesn't crash the kernel.
321
322     # We can only test this using IPv6 unreachables and IPv4 ping sockets,
323     # because IPv4 packets sent by scapy.send() on loopback are not received by
324     # the kernel. So we don't actually use this function yet.
325     def GetIPv4Unreachable(port):  # pylint: disable=unused-variable
326       return (scapy.IP(src="192.0.2.1", dst="127.0.0.1") /
327               scapy.ICMP(type=3, code=0) /
328               scapy.IP(src="127.0.0.1", dst="127.0.0.1") /
329               scapy.ICMP(type=8, id=port, seq=1))
330
331     def GetIPv6Unreachable(port):
332       return (scapy.IPv6(src="::1", dst="::1") /
333               scapy.ICMPv6DestUnreach() /
334               scapy.IPv6(src="::1", dst="::1") /
335               scapy.ICMPv6EchoRequest(id=port, seq=1, data="foobarbaz"))
336
337     # An unreachable matching the ID of a socket of the wrong protocol
338     # shouldn't crash.
339     s = net_test.IPv4PingSocket()
340     s.connect(("127.0.0.1", 12345))
341     _, port = s.getsockname()
342     scapy.send(GetIPv6Unreachable(port))
343     # No crash? Good.
344
345   def testCrossProtocolCalls(self):
346     """Tests that passing in the wrong family returns EAFNOSUPPORT.
347
348     Relevant kernel commits:
349       upstream net:
350         91a0b60 net/ping: handle protocol mismatching scenario
351         9145736d net: ping: Return EAFNOSUPPORT when appropriate.
352
353       android-3.10:
354         78a6809 net/ping: handle protocol mismatching scenario
355         428e6d6 net: ping: Return EAFNOSUPPORT when appropriate.
356     """
357
358     def CheckEAFNoSupport(function, *args):
359       self.assertRaisesErrno(errno.EAFNOSUPPORT, function, *args)
360
361     ipv6sockaddr = csocket.Sockaddr((net_test.IPV6_ADDR, 53))
362
363     # In order to check that IPv6 socket calls return EAFNOSUPPORT when passed
364     # IPv4 socket address structures, we need to pass down a socket address
365     # length argument that's at least sizeof(sockaddr_in6). Otherwise, the calls
366     # will fail immediately with EINVAL because the passed-in socket length is
367     # too short. So create a sockaddr_in that's as long as a sockaddr_in6.
368     ipv4sockaddr = csocket.Sockaddr((net_test.IPV4_ADDR, 53))
369     ipv4sockaddr = csocket.SockaddrIn6(
370         ipv4sockaddr.Pack() +
371         "\x00" * (len(csocket.SockaddrIn6) - len(csocket.SockaddrIn)))
372
373     s4 = net_test.IPv4PingSocket()
374     s6 = net_test.IPv6PingSocket()
375
376     # We can't just call s.connect(), s.bind() etc. with a tuple of the wrong
377     # address family, because the Python implementation will just pass garbage
378     # down to the kernel. So call the C functions directly.
379     CheckEAFNoSupport(csocket.Bind, s4, ipv6sockaddr)
380     CheckEAFNoSupport(csocket.Bind, s6, ipv4sockaddr)
381     CheckEAFNoSupport(csocket.Connect, s4, ipv6sockaddr)
382     CheckEAFNoSupport(csocket.Connect, s6, ipv4sockaddr)
383     CheckEAFNoSupport(csocket.Sendmsg,
384                       s4, ipv6sockaddr, net_test.IPV4_PING, None, 0)
385     CheckEAFNoSupport(csocket.Sendmsg,
386                       s6, ipv4sockaddr, net_test.IPV6_PING, None, 0)
387
388   def testIPv4Bind(self):
389     # Bind to unspecified address.
390     s = net_test.IPv4PingSocket()
391     s.bind(("0.0.0.0", 544))
392     self.assertEquals(("0.0.0.0", 544), s.getsockname())
393
394     # Bind to loopback.
395     s = net_test.IPv4PingSocket()
396     s.bind(("127.0.0.1", 99))
397     self.assertEquals(("127.0.0.1", 99), s.getsockname())
398
399     # Binding twice is not allowed.
400     self.assertRaisesErrno(errno.EINVAL, s.bind, ("127.0.0.1", 22))
401
402     # But binding two different sockets to the same ID is allowed.
403     s2 = net_test.IPv4PingSocket()
404     s2.bind(("127.0.0.1", 99))
405     self.assertEquals(("127.0.0.1", 99), s2.getsockname())
406     s3 = net_test.IPv4PingSocket()
407     s3.bind(("127.0.0.1", 99))
408     self.assertEquals(("127.0.0.1", 99), s3.getsockname())
409
410     # If two sockets bind to the same port, the first one to call read() gets
411     # the response.
412     s4 = net_test.IPv4PingSocket()
413     s5 = net_test.IPv4PingSocket()
414     s4.bind(("0.0.0.0", 167))
415     s5.bind(("0.0.0.0", 167))
416     s4.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 44))
417     self.assertValidPingResponse(s5, net_test.IPV4_PING)
418     net_test.SetSocketTimeout(s4, 100)
419     self.assertRaisesErrno(errno.EAGAIN, s4.recv, 32768)
420
421     # If SO_REUSEADDR is turned off, then we get EADDRINUSE.
422     s6 = net_test.IPv4PingSocket()
423     s4.setsockopt(SOL_SOCKET, SO_REUSEADDR, 0)
424     self.assertRaisesErrno(errno.EADDRINUSE, s6.bind, ("0.0.0.0", 167))
425
426     # Can't bind after sendto.
427     s = net_test.IPv4PingSocket()
428     s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 9132))
429     self.assertRaisesErrno(errno.EINVAL, s.bind, ("0.0.0.0", 5429))
430
431   def testIPv6Bind(self):
432     # Bind to unspecified address.
433     s = net_test.IPv6PingSocket()
434     s.bind(("::", 769))
435     self.assertEquals(("::", 769, 0, 0), s.getsockname())
436
437     # Bind to loopback.
438     s = net_test.IPv6PingSocket()
439     s.bind(("::1", 99))
440     self.assertEquals(("::1", 99, 0, 0), s.getsockname())
441
442     # Binding twice is not allowed.
443     self.assertRaisesErrno(errno.EINVAL, s.bind, ("::1", 22))
444
445     # But binding two different sockets to the same ID is allowed.
446     s2 = net_test.IPv6PingSocket()
447     s2.bind(("::1", 99))
448     self.assertEquals(("::1", 99, 0, 0), s2.getsockname())
449     s3 = net_test.IPv6PingSocket()
450     s3.bind(("::1", 99))
451     self.assertEquals(("::1", 99, 0, 0), s3.getsockname())
452
453     # Binding both IPv4 and IPv6 to the same socket works.
454     s4 = net_test.IPv4PingSocket()
455     s6 = net_test.IPv6PingSocket()
456     s4.bind(("0.0.0.0", 444))
457     s6.bind(("::", 666, 0, 0))
458
459     # Can't bind after sendto.
460     s = net_test.IPv6PingSocket()
461     s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 9132))
462     self.assertRaisesErrno(errno.EINVAL, s.bind, ("::", 5429))
463
464   def testIPv4InvalidBind(self):
465     s = net_test.IPv4PingSocket()
466     self.assertRaisesErrno(errno.EADDRNOTAVAIL,
467                            s.bind, ("255.255.255.255", 1026))
468     self.assertRaisesErrno(errno.EADDRNOTAVAIL,
469                            s.bind, ("224.0.0.1", 651))
470     # Binding to an address we don't have only works with IP_TRANSPARENT.
471     self.assertRaisesErrno(errno.EADDRNOTAVAIL,
472                            s.bind, (net_test.IPV4_ADDR, 651))
473     try:
474       s.setsockopt(SOL_IP, net_test.IP_TRANSPARENT, 1)
475       s.bind((net_test.IPV4_ADDR, 651))
476     except IOError, e:
477       if e.errno == errno.EACCES:
478         pass  # We're not root. let it go for now.
479
480   def testIPv6InvalidBind(self):
481     s = net_test.IPv6PingSocket()
482     self.assertRaisesErrno(errno.EINVAL,
483                            s.bind, ("ff02::2", 1026))
484
485     # Binding to an address we don't have only works with IPV6_TRANSPARENT.
486     self.assertRaisesErrno(errno.EADDRNOTAVAIL,
487                            s.bind, (net_test.IPV6_ADDR, 651))
488     try:
489       s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_TRANSPARENT, 1)
490       s.bind((net_test.IPV6_ADDR, 651))
491     except IOError, e:
492       if e.errno == errno.EACCES:
493         pass  # We're not root. let it go for now.
494
495   def testAfUnspecBind(self):
496     # Binding to AF_UNSPEC is treated as IPv4 if the address is 0.0.0.0.
497     s4 = net_test.IPv4PingSocket()
498     sockaddr = csocket.Sockaddr(("0.0.0.0", 12996))
499     sockaddr.family = AF_UNSPEC
500     csocket.Bind(s4, sockaddr)
501     self.assertEquals(("0.0.0.0", 12996), s4.getsockname())
502
503     # But not if the address is anything else.
504     sockaddr = csocket.Sockaddr(("127.0.0.1", 58234))
505     sockaddr.family = AF_UNSPEC
506     self.assertRaisesErrno(errno.EAFNOSUPPORT, csocket.Bind, s4, sockaddr)
507
508     # This doesn't work for IPv6.
509     s6 = net_test.IPv6PingSocket()
510     sockaddr = csocket.Sockaddr(("::1", 58997))
511     sockaddr.family = AF_UNSPEC
512     self.assertRaisesErrno(errno.EAFNOSUPPORT, csocket.Bind, s6, sockaddr)
513
514   def testIPv6ScopedBind(self):
515     # Can't bind to a link-local address without a scope ID.
516     s = net_test.IPv6PingSocket()
517     self.assertRaisesErrno(errno.EINVAL,
518                            s.bind, (self.lladdr, 1026, 0, 0))
519
520     # Binding to a link-local address with a scope ID works, and the scope ID is
521     # returned by a subsequent getsockname. Interestingly, Python's getsockname
522     # returns "fe80:1%foo", even though it does not understand it.
523     expected = self.lladdr + "%" + self.ifname
524     s.bind((self.lladdr, 4646, 0, self.ifindex))
525     self.assertEquals((expected, 4646, 0, self.ifindex), s.getsockname())
526
527     # Of course, for the above to work the address actually has to be configured
528     # on the machine.
529     self.assertRaisesErrno(errno.EADDRNOTAVAIL,
530                            s.bind, ("fe80::f00", 1026, 0, 1))
531
532     # Scope IDs on non-link-local addresses are silently ignored.
533     s = net_test.IPv6PingSocket()
534     s.bind(("::1", 1234, 0, 1))
535     self.assertEquals(("::1", 1234, 0, 0), s.getsockname())
536
537   def testBindAffectsIdentifier(self):
538     s = net_test.IPv6PingSocket()
539     s.bind((self.globaladdr, 0xf976))
540     s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
541     self.assertEquals("\xf9\x76", s.recv(32768)[4:6])
542
543     s = net_test.IPv6PingSocket()
544     s.bind((self.globaladdr, 0xace))
545     s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
546     self.assertEquals("\x0a\xce", s.recv(32768)[4:6])
547
548   def testLinkLocalAddress(self):
549     s = net_test.IPv6PingSocket()
550     # Sending to a link-local address with no scope fails with EINVAL.
551     self.assertRaisesErrno(errno.EINVAL,
552                            s.sendto, net_test.IPV6_PING, ("fe80::1", 55))
553     # Sending to link-local address with a scope succeeds. Note that Python
554     # doesn't understand the "fe80::1%lo" format, even though it returns it.
555     s.sendto(net_test.IPV6_PING, ("fe80::1", 55, 0, self.ifindex))
556     # No exceptions? Good.
557
558   def testMappedAddressFails(self):
559     s = net_test.IPv6PingSocket()
560     s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
561     self.assertValidPingResponse(s, net_test.IPV6_PING)
562     s.sendto(net_test.IPV6_PING, ("2001:4860:4860::8844", 55))
563     self.assertValidPingResponse(s, net_test.IPV6_PING)
564     self.assertRaisesErrno(errno.EINVAL, s.sendto, net_test.IPV6_PING,
565                            ("::ffff:192.0.2.1", 55))
566
567   @unittest.skipUnless(False, "skipping: does not work yet")
568   def testFlowLabel(self):
569     s = net_test.IPv6PingSocket()
570
571     # Specifying a flowlabel without having set IPV6_FLOWINFO_SEND succeeds but
572     # the flow label in the packet is not set.
573     s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 93, 0xdead, 0))
574     self.assertValidPingResponse(s, net_test.IPV6_PING)  # Checks flow label==0.
575
576     # If IPV6_FLOWINFO_SEND is set on the socket, attempting to set a flow label
577     # that is not registered with the flow manager should return EINVAL...
578     s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_FLOWINFO_SEND, 1)
579     # ... but this doesn't work yet.
580     if False:
581       self.assertRaisesErrno(errno.EINVAL, s.sendto, net_test.IPV6_PING,
582                              (net_test.IPV6_ADDR, 93, 0xdead, 0))
583
584     # After registering the flow label, it gets sent properly, appears in the
585     # output packet, and is returned in the response.
586     net_test.SetFlowLabel(s, net_test.IPV6_ADDR, 0xdead)
587     self.assertEqual(1, s.getsockopt(net_test.SOL_IPV6,
588                                      net_test.IPV6_FLOWINFO_SEND))
589     s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 93, 0xdead, 0))
590     _, src = s.recvfrom(32768)
591     _, _, flowlabel, _ = src
592     self.assertEqual(0xdead, flowlabel & 0xfffff)
593
594   def testIPv4Error(self):
595     s = net_test.IPv4PingSocket()
596     s.setsockopt(SOL_IP, IP_TTL, 2)
597     s.setsockopt(SOL_IP, net_test.IP_RECVERR, 1)
598     s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 55))
599     # We can't check the actual error because Python 2.7 doesn't implement
600     # recvmsg, but we can at least check that the socket returns an error.
601     self.assertRaisesErrno(errno.EHOSTUNREACH, s.recv, 32768)  # No response.
602
603   def testIPv6Error(self):
604     s = net_test.IPv6PingSocket()
605     s.setsockopt(net_test.SOL_IPV6, IPV6_UNICAST_HOPS, 2)
606     s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_RECVERR, 1)
607     s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
608     # We can't check the actual error because Python 2.7 doesn't implement
609     # recvmsg, but we can at least check that the socket returns an error.
610     self.assertRaisesErrno(errno.EHOSTUNREACH, s.recv, 32768)  # No response.
611
612   def testIPv6MulticastPing(self):
613     s = net_test.IPv6PingSocket()
614     # Send a multicast ping and check we get at least one duplicate.
615     # The setsockopt should not be necessary, but ping_v6_sendmsg has a bug.
616     s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_MULTICAST_IF, self.ifindex)
617     s.sendto(net_test.IPV6_PING, ("ff02::1", 55, 0, self.ifindex))
618     self.assertValidPingResponse(s, net_test.IPV6_PING)
619     self.assertValidPingResponse(s, net_test.IPV6_PING)
620
621   def testIPv4LargePacket(self):
622     s = net_test.IPv4PingSocket()
623     data = net_test.IPV4_PING + 20000 * "a"
624     s.sendto(data, ("127.0.0.1", 987))
625     self.assertValidPingResponse(s, data)
626
627   def testIPv6LargePacket(self):
628     s = net_test.IPv6PingSocket()
629     s.bind(("::", 0xace))
630     data = net_test.IPV6_PING + "\x01" + 19994 * "\x00" + "aaaaa"
631     s.sendto(data, ("::1", 953))
632
633   @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6")
634   def testIcmpSocketsNotInIcmp6(self):
635     numrows = len(self.ReadProcNetSocket("icmp"))
636     numrows6 = len(self.ReadProcNetSocket("icmp6"))
637     s = net_test.Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP)
638     s.bind(("127.0.0.1", 0xace))
639     s.connect(("127.0.0.1", 0xbeef))
640     self.assertEquals(numrows + 1, len(self.ReadProcNetSocket("icmp")))
641     self.assertEquals(numrows6, len(self.ReadProcNetSocket("icmp6")))
642
643   @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6")
644   def testIcmp6SocketsNotInIcmp(self):
645     numrows = len(self.ReadProcNetSocket("icmp"))
646     numrows6 = len(self.ReadProcNetSocket("icmp6"))
647     s = net_test.IPv6PingSocket()
648     s.bind(("::1", 0xace))
649     s.connect(("::1", 0xbeef))
650     self.assertEquals(numrows, len(self.ReadProcNetSocket("icmp")))
651     self.assertEquals(numrows6 + 1, len(self.ReadProcNetSocket("icmp6")))
652
653   def testProcNetIcmp(self):
654     s = net_test.Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP)
655     s.bind(("127.0.0.1", 0xace))
656     s.connect(("127.0.0.1", 0xbeef))
657     self.CheckSockStatFile("icmp", "127.0.0.1", 0xace, "127.0.0.1", 0xbeef, 1)
658
659   @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6")
660   def testProcNetIcmp6(self):
661     numrows6 = len(self.ReadProcNetSocket("icmp6"))
662     s = net_test.IPv6PingSocket()
663     s.bind(("::1", 0xace))
664     s.connect(("::1", 0xbeef))
665     self.CheckSockStatFile("icmp6", "::1", 0xace, "::1", 0xbeef, 1)
666
667     # Check the row goes away when the socket is closed.
668     s.close()
669     self.assertEquals(numrows6, len(self.ReadProcNetSocket("icmp6")))
670
671     # Try send, bind and connect to check the addresses and the state.
672     s = net_test.IPv6PingSocket()
673     self.assertEqual(0, len(self.ReadProcNetSocket("icmp6")))
674     s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 12345))
675     self.assertEqual(1, len(self.ReadProcNetSocket("icmp6")))
676
677     # Can't bind after sendto, apparently.
678     s = net_test.IPv6PingSocket()
679     self.assertEqual(0, len(self.ReadProcNetSocket("icmp6")))
680     s.bind((self.lladdr, 0xd00d, 0, self.ifindex))
681     self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "::", 0, 7)
682
683     # Check receive bytes.
684     s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_MULTICAST_IF, self.ifindex)
685     s.connect(("ff02::1", 0xdead))
686     self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1)
687     s.send(net_test.IPV6_PING)
688     time.sleep(0.01)  # Give the other thread time to reply.
689     self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1,
690                            txmem=0, rxmem=0x300)
691     self.assertValidPingResponse(s, net_test.IPV6_PING)
692     self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1,
693                            txmem=0, rxmem=0)
694
695   def testProcNetUdp6(self):
696     s = net_test.Socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP)
697     s.bind(("::1", 0xace))
698     s.connect(("::1", 0xbeef))
699     self.CheckSockStatFile("udp6", "::1", 0xace, "::1", 0xbeef, 1)
700
701   def testProcNetRaw6(self):
702     s = net_test.Socket(AF_INET6, SOCK_RAW, IPPROTO_RAW)
703     s.bind(("::1", 0xace))
704     s.connect(("::1", 0xbeef))
705     self.CheckSockStatFile("raw6", "::1", 0xff, "::1", 0, 1)
706
707
708 if __name__ == "__main__":
709   unittest.main()