OSDN Git Service

d6efa4ef5e758ec274fdd3e558942e5147c1132d
[android-x86/system-extras.git] / tests / net_test / ping6_test.py
1 #!/usr/bin/python
2 #
3 # Copyright 2014 The Android Open Source Project
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17 # pylint: disable=g-bad-todo
18
19 import errno
20 import os
21 import posix
22 import random
23 import re
24 from socket import *  # pylint: disable=wildcard-import
25 import threading
26 import time
27 import unittest
28
29 from scapy import all as scapy
30
31 import csocket
32 import multinetwork_base
33 import net_test
34
35
36 HAVE_PROC_NET_ICMP6 = os.path.isfile("/proc/net/icmp6")
37
38 ICMP_ECHO = 8
39 ICMP_ECHOREPLY = 0
40 ICMPV6_ECHO_REQUEST = 128
41 ICMPV6_ECHO_REPLY = 129
42
43
44 class PingReplyThread(threading.Thread):
45
46   MIN_TTL = 10
47   INTERMEDIATE_IPV4 = "192.0.2.2"
48   INTERMEDIATE_IPV6 = "2001:db8:1:2::ace:d00d"
49   NEIGHBOURS = ["fe80::1"]
50
51   def __init__(self, tun, mymac, routermac):
52     super(PingReplyThread, self).__init__()
53     self._tun = tun
54     self._stopped = False
55     self._mymac = mymac
56     self._routermac = routermac
57
58   def Stop(self):
59     self._stopped = True
60
61   def ChecksumValid(self, packet):
62     # Get and clear the checksums.
63     def GetAndClearChecksum(layer):
64       if not layer:
65         return
66       try:
67         checksum = layer.chksum
68         del layer.chksum
69       except AttributeError:
70         checksum = layer.cksum
71         del layer.cksum
72       return checksum
73
74     def GetChecksum(layer):
75       try:
76         return layer.chksum
77       except AttributeError:
78         return layer.cksum
79
80     layers = ["IP", "ICMP", scapy.ICMPv6EchoRequest]
81     sums = {}
82     for name in layers:
83       sums[name] = GetAndClearChecksum(packet.getlayer(name))
84
85     # Serialize the packet, so scapy recalculates the checksums, and compare
86     # them with the ones in the packet.
87     packet = packet.__class__(str(packet))
88     for name in layers:
89       layer = packet.getlayer(name)
90       if layer and GetChecksum(layer) != sums[name]:
91         return False
92
93     return True
94
95   def SendTimeExceeded(self, version, packet):
96     if version == 4:
97       src = packet.getlayer(scapy.IP).src
98       self.SendPacket(
99           scapy.IP(src=self.INTERMEDIATE_IPV4, dst=src) /
100           scapy.ICMP(type=11, code=0) /
101           packet)
102     elif version == 6:
103       src = packet.getlayer(scapy.IPv6).src
104       self.SendPacket(
105           scapy.IPv6(src=self.INTERMEDIATE_IPV6, dst=src) /
106           scapy.ICMPv6TimeExceeded(code=0) /
107           packet)
108
109   def IPv4Packet(self, ip):
110     icmp = ip.getlayer(scapy.ICMP)
111
112     # We only support ping for now.
113     if (ip.proto != IPPROTO_ICMP or
114         icmp.type != ICMP_ECHO or
115         icmp.code != 0):
116       return
117
118     # Check the checksums.
119     if not self.ChecksumValid(ip):
120       return
121
122     if ip.ttl < self.MIN_TTL:
123       self.SendTimeExceeded(4, ip)
124       return
125
126     icmp.type = ICMP_ECHOREPLY
127     self.SwapAddresses(ip)
128     self.SendPacket(ip)
129
130   def IPv6Packet(self, ipv6):
131     icmpv6 = ipv6.getlayer(scapy.ICMPv6EchoRequest)
132
133     # We only support ping for now.
134     if (ipv6.nh != IPPROTO_ICMPV6 or
135         not icmpv6 or
136         icmpv6.type != ICMPV6_ECHO_REQUEST or
137         icmpv6.code != 0):
138       return
139
140     # Check the checksums.
141     if not self.ChecksumValid(ipv6):
142       return
143
144     if ipv6.dst.startswith("ff02::"):
145       ipv6.dst = ipv6.src
146       for src in self.NEIGHBOURS:
147         ipv6.src = src
148         icmpv6.type = ICMPV6_ECHO_REPLY
149         self.SendPacket(ipv6)
150     elif ipv6.hlim < self.MIN_TTL:
151       self.SendTimeExceeded(6, ipv6)
152     else:
153       icmpv6.type = ICMPV6_ECHO_REPLY
154       self.SwapAddresses(ipv6)
155       self.SendPacket(ipv6)
156
157   def SwapAddresses(self, packet):
158     src = packet.src
159     packet.src = packet.dst
160     packet.dst = src
161
162   def SendPacket(self, packet):
163     packet = scapy.Ether(src=self._routermac, dst=self._mymac) / packet
164     try:
165       posix.write(self._tun.fileno(), str(packet))
166     except ValueError:
167       pass
168
169   def run(self):
170     while not self._stopped:
171
172       try:
173         packet = posix.read(self._tun.fileno(), 4096)
174       except OSError, e:
175         if e.errno == errno.EAGAIN:
176           continue
177         else:
178           break
179
180       ether = scapy.Ether(packet)
181       if ether.type == net_test.ETH_P_IPV6:
182         self.IPv6Packet(ether.payload)
183       elif ether.type == net_test.ETH_P_IP:
184         self.IPv4Packet(ether.payload)
185
186
187 class Ping6Test(multinetwork_base.MultiNetworkBaseTest):
188
189   @classmethod
190   def setUpClass(cls):
191     super(Ping6Test, cls).setUpClass()
192     cls.netid = random.choice(cls.NETIDS)
193     cls.reply_thread = PingReplyThread(
194         cls.tuns[cls.netid],
195         cls.MyMacAddress(cls.netid),
196         cls.RouterMacAddress(cls.netid))
197     cls.SetDefaultNetwork(cls.netid)
198     cls.reply_thread.start()
199
200   @classmethod
201   def tearDownClass(cls):
202     cls.reply_thread.Stop()
203     cls.ClearDefaultNetwork()
204     super(Ping6Test, cls).tearDownClass()
205
206   def setUp(self):
207     self.ifname = self.GetInterfaceName(self.netid)
208     self.ifindex = self.ifindices[self.netid]
209     self.lladdr = net_test.GetLinkAddress(self.ifname, True)
210     self.globaladdr = net_test.GetLinkAddress(self.ifname, False)
211
212   def assertValidPingResponse(self, s, data):
213     family = s.family
214
215     # Receive the reply.
216     rcvd, src = s.recvfrom(32768)
217     self.assertNotEqual(0, len(rcvd), "No data received")
218
219     # If this is a dual-stack socket sending to a mapped IPv4 address, treat it
220     # as IPv4.
221     if src[0].startswith("::ffff:"):
222       family = AF_INET
223       src = (src[0].replace("::ffff:", ""), src[1:])
224
225     # Check the data being sent is valid.
226     self.assertGreater(len(data), 7, "Not enough data for ping packet")
227     if family == AF_INET:
228       self.assertTrue(data.startswith("\x08\x00"), "Not an IPv4 echo request")
229     elif family == AF_INET6:
230       self.assertTrue(data.startswith("\x80\x00"), "Not an IPv6 echo request")
231     else:
232       self.fail("Unknown socket address family %d" * s.family)
233
234     # Check address, ICMP type, and ICMP code.
235     if family == AF_INET:
236       addr, unused_port = src
237       self.assertGreaterEqual(len(addr), len("1.1.1.1"))
238       self.assertTrue(rcvd.startswith("\x00\x00"), "Not an IPv4 echo reply")
239     else:
240       addr, unused_port, flowlabel, scope_id = src  # pylint: disable=unbalanced-tuple-unpacking
241       self.assertGreaterEqual(len(addr), len("::"))
242       self.assertTrue(rcvd.startswith("\x81\x00"), "Not an IPv6 echo reply")
243       # Check that the flow label is zero and that the scope ID is sane.
244       self.assertEqual(flowlabel, 0)
245       if addr.startswith("fe80::"):
246         self.assertTrue(scope_id in self.ifindices.values())
247       else:
248         self.assertEquals(0, scope_id)
249
250     # TODO: check the checksum. We can't do this easily now for ICMPv6 because
251     # we don't have the IP addresses so we can't construct the pseudoheader.
252
253     # Check the sequence number and the data.
254     self.assertEqual(len(data), len(rcvd))
255     self.assertEqual(data[6:].encode("hex"), rcvd[6:].encode("hex"))
256
257   def ReadProcNetSocket(self, protocol):
258     # Read file.
259     lines = open("/proc/net/%s" % protocol).readlines()
260
261     # Possibly check, and strip, header.
262     if protocol in ["icmp6", "raw6", "udp6"]:
263       self.assertEqual(net_test.IPV6_SEQ_DGRAM_HEADER, lines[0])
264     lines = lines[1:]
265
266     # Check contents.
267     if protocol.endswith("6"):
268       addrlen = 32
269     else:
270       addrlen = 8
271     regexp = re.compile(r" *(\d+): "                    # bucket
272                         "([0-9A-F]{%d}:[0-9A-F]{4}) "   # srcaddr, port
273                         "([0-9A-F]{%d}:[0-9A-F]{4}) "   # dstaddr, port
274                         "([0-9A-F][0-9A-F]) "           # state
275                         "([0-9A-F]{8}:[0-9A-F]{8}) "    # mem
276                         "([0-9A-F]{2}:[0-9A-F]{8}) "    # ?
277                         "([0-9A-F]{8}) +"               # ?
278                         "([0-9]+) +"                    # uid
279                         "([0-9]+) +"                    # ?
280                         "([0-9]+) +"                    # inode
281                         "([0-9]+) +"                    # refcnt
282                         "([0-9a-f]+) +"                 # sp
283                         "([0-9]+) *$"                   # drops, icmp has spaces
284                         % (addrlen, addrlen))
285     # Return a list of lists with only source / dest addresses for now.
286     out = []
287     for line in lines:
288       (_, src, dst, state, mem,
289        _, _, uid, _, _, refcnt, _, drops) = regexp.match(line).groups()
290       out.append([src, dst, state, mem, uid, refcnt, drops])
291     return out
292
293   def CheckSockStatFile(self, name, srcaddr, srcport, dstaddr, dstport, state,
294                         txmem=0, rxmem=0):
295     expected = ["%s:%04X" % (net_test.FormatSockStatAddress(srcaddr), srcport),
296                 "%s:%04X" % (net_test.FormatSockStatAddress(dstaddr), dstport),
297                 "%02X" % state,
298                 "%08X:%08X" % (txmem, rxmem),
299                 str(os.getuid()), "2", "0"]
300     actual = self.ReadProcNetSocket(name)[-1]
301     self.assertListEqual(expected, actual)
302
303   def testIPv4SendWithNoConnection(self):
304     s = net_test.IPv4PingSocket()
305     self.assertRaisesErrno(errno.EDESTADDRREQ, s.send, net_test.IPV4_PING)
306
307   def testIPv6SendWithNoConnection(self):
308     s = net_test.IPv6PingSocket()
309     self.assertRaisesErrno(errno.EDESTADDRREQ, s.send, net_test.IPV6_PING)
310
311   def testIPv4LoopbackPingWithConnect(self):
312     s = net_test.IPv4PingSocket()
313     s.connect(("127.0.0.1", 55))
314     data = net_test.IPV4_PING + "foobarbaz"
315     s.send(data)
316     self.assertValidPingResponse(s, data)
317
318   def testIPv6LoopbackPingWithConnect(self):
319     s = net_test.IPv6PingSocket()
320     s.connect(("::1", 55))
321     s.send(net_test.IPV6_PING)
322     self.assertValidPingResponse(s, net_test.IPV6_PING)
323
324   def testIPv4PingUsingSendto(self):
325     s = net_test.IPv4PingSocket()
326     written = s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 55))
327     self.assertEquals(len(net_test.IPV4_PING), written)
328     self.assertValidPingResponse(s, net_test.IPV4_PING)
329
330   def testIPv6PingUsingSendto(self):
331     s = net_test.IPv6PingSocket()
332     written = s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
333     self.assertEquals(len(net_test.IPV6_PING), written)
334     self.assertValidPingResponse(s, net_test.IPV6_PING)
335
336   def testIPv4NoCrash(self):
337     # Python 2.x does not provide either read() or recvmsg.
338     s = net_test.IPv4PingSocket()
339     written = s.sendto(net_test.IPV4_PING, ("127.0.0.1", 55))
340     self.assertEquals(len(net_test.IPV4_PING), written)
341     fd = s.fileno()
342     reply = posix.read(fd, 4096)
343     self.assertEquals(written, len(reply))
344
345   def testIPv6NoCrash(self):
346     # Python 2.x does not provide either read() or recvmsg.
347     s = net_test.IPv6PingSocket()
348     written = s.sendto(net_test.IPV6_PING, ("::1", 55))
349     self.assertEquals(len(net_test.IPV6_PING), written)
350     fd = s.fileno()
351     reply = posix.read(fd, 4096)
352     self.assertEquals(written, len(reply))
353
354   def testCrossProtocolCrash(self):
355     # Checks that an ICMP error containing a ping packet that matches the ID
356     # of a socket of the wrong protocol (which can happen when using 464xlat)
357     # doesn't crash the kernel.
358
359     # We can only test this using IPv6 unreachables and IPv4 ping sockets,
360     # because IPv4 packets sent by scapy.send() on loopback are not received by
361     # the kernel. So we don't actually use this function yet.
362     def GetIPv4Unreachable(port):  # pylint: disable=unused-variable
363       return (scapy.IP(src="192.0.2.1", dst="127.0.0.1") /
364               scapy.ICMP(type=3, code=0) /
365               scapy.IP(src="127.0.0.1", dst="127.0.0.1") /
366               scapy.ICMP(type=8, id=port, seq=1))
367
368     def GetIPv6Unreachable(port):
369       return (scapy.IPv6(src="::1", dst="::1") /
370               scapy.ICMPv6DestUnreach() /
371               scapy.IPv6(src="::1", dst="::1") /
372               scapy.ICMPv6EchoRequest(id=port, seq=1, data="foobarbaz"))
373
374     # An unreachable matching the ID of a socket of the wrong protocol
375     # shouldn't crash.
376     s = net_test.IPv4PingSocket()
377     s.connect(("127.0.0.1", 12345))
378     _, port = s.getsockname()
379     scapy.send(GetIPv6Unreachable(port))
380     # No crash? Good.
381
382   def testCrossProtocolCalls(self):
383     """Tests that passing in the wrong family returns EAFNOSUPPORT.
384
385     Relevant kernel commits:
386       upstream net:
387         91a0b60 net/ping: handle protocol mismatching scenario
388         9145736d net: ping: Return EAFNOSUPPORT when appropriate.
389
390       android-3.10:
391         78a6809 net/ping: handle protocol mismatching scenario
392         428e6d6 net: ping: Return EAFNOSUPPORT when appropriate.
393     """
394
395     def CheckEAFNoSupport(function, *args):
396       self.assertRaisesErrno(errno.EAFNOSUPPORT, function, *args)
397
398     ipv6sockaddr = csocket.Sockaddr((net_test.IPV6_ADDR, 53))
399
400     # In order to check that IPv6 socket calls return EAFNOSUPPORT when passed
401     # IPv4 socket address structures, we need to pass down a socket address
402     # length argument that's at least sizeof(sockaddr_in6). Otherwise, the calls
403     # will fail immediately with EINVAL because the passed-in socket length is
404     # too short. So create a sockaddr_in that's as long as a sockaddr_in6.
405     ipv4sockaddr = csocket.Sockaddr((net_test.IPV4_ADDR, 53))
406     ipv4sockaddr = csocket.SockaddrIn6(
407         ipv4sockaddr.Pack() +
408         "\x00" * (len(csocket.SockaddrIn6) - len(csocket.SockaddrIn)))
409
410     s4 = net_test.IPv4PingSocket()
411     s6 = net_test.IPv6PingSocket()
412
413     # We can't just call s.connect(), s.bind() etc. with a tuple of the wrong
414     # address family, because the Python implementation will just pass garbage
415     # down to the kernel. So call the C functions directly.
416     CheckEAFNoSupport(csocket.Bind, s4, ipv6sockaddr)
417     CheckEAFNoSupport(csocket.Bind, s6, ipv4sockaddr)
418     CheckEAFNoSupport(csocket.Connect, s4, ipv6sockaddr)
419     CheckEAFNoSupport(csocket.Connect, s6, ipv4sockaddr)
420     CheckEAFNoSupport(csocket.Sendmsg,
421                       s4, ipv6sockaddr, net_test.IPV4_PING, None, 0)
422     CheckEAFNoSupport(csocket.Sendmsg,
423                       s6, ipv4sockaddr, net_test.IPV6_PING, None, 0)
424
425   def testIPv4Bind(self):
426     # Bind to unspecified address.
427     s = net_test.IPv4PingSocket()
428     s.bind(("0.0.0.0", 544))
429     self.assertEquals(("0.0.0.0", 544), s.getsockname())
430
431     # Bind to loopback.
432     s = net_test.IPv4PingSocket()
433     s.bind(("127.0.0.1", 99))
434     self.assertEquals(("127.0.0.1", 99), s.getsockname())
435
436     # Binding twice is not allowed.
437     self.assertRaisesErrno(errno.EINVAL, s.bind, ("127.0.0.1", 22))
438
439     # But binding two different sockets to the same ID is allowed.
440     s2 = net_test.IPv4PingSocket()
441     s2.bind(("127.0.0.1", 99))
442     self.assertEquals(("127.0.0.1", 99), s2.getsockname())
443     s3 = net_test.IPv4PingSocket()
444     s3.bind(("127.0.0.1", 99))
445     self.assertEquals(("127.0.0.1", 99), s3.getsockname())
446
447     # If two sockets bind to the same port, the first one to call read() gets
448     # the response.
449     s4 = net_test.IPv4PingSocket()
450     s5 = net_test.IPv4PingSocket()
451     s4.bind(("0.0.0.0", 167))
452     s5.bind(("0.0.0.0", 167))
453     s4.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 44))
454     self.assertValidPingResponse(s5, net_test.IPV4_PING)
455     net_test.SetSocketTimeout(s4, 100)
456     self.assertRaisesErrno(errno.EAGAIN, s4.recv, 32768)
457
458     # If SO_REUSEADDR is turned off, then we get EADDRINUSE.
459     s6 = net_test.IPv4PingSocket()
460     s4.setsockopt(SOL_SOCKET, SO_REUSEADDR, 0)
461     self.assertRaisesErrno(errno.EADDRINUSE, s6.bind, ("0.0.0.0", 167))
462
463     # Can't bind after sendto.
464     s = net_test.IPv4PingSocket()
465     s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 9132))
466     self.assertRaisesErrno(errno.EINVAL, s.bind, ("0.0.0.0", 5429))
467
468   def testIPv6Bind(self):
469     # Bind to unspecified address.
470     s = net_test.IPv6PingSocket()
471     s.bind(("::", 769))
472     self.assertEquals(("::", 769, 0, 0), s.getsockname())
473
474     # Bind to loopback.
475     s = net_test.IPv6PingSocket()
476     s.bind(("::1", 99))
477     self.assertEquals(("::1", 99, 0, 0), s.getsockname())
478
479     # Binding twice is not allowed.
480     self.assertRaisesErrno(errno.EINVAL, s.bind, ("::1", 22))
481
482     # But binding two different sockets to the same ID is allowed.
483     s2 = net_test.IPv6PingSocket()
484     s2.bind(("::1", 99))
485     self.assertEquals(("::1", 99, 0, 0), s2.getsockname())
486     s3 = net_test.IPv6PingSocket()
487     s3.bind(("::1", 99))
488     self.assertEquals(("::1", 99, 0, 0), s3.getsockname())
489
490     # Binding both IPv4 and IPv6 to the same socket works.
491     s4 = net_test.IPv4PingSocket()
492     s6 = net_test.IPv6PingSocket()
493     s4.bind(("0.0.0.0", 444))
494     s6.bind(("::", 666, 0, 0))
495
496     # Can't bind after sendto.
497     s = net_test.IPv6PingSocket()
498     s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 9132))
499     self.assertRaisesErrno(errno.EINVAL, s.bind, ("::", 5429))
500
501   def testIPv4InvalidBind(self):
502     s = net_test.IPv4PingSocket()
503     self.assertRaisesErrno(errno.EADDRNOTAVAIL,
504                            s.bind, ("255.255.255.255", 1026))
505     self.assertRaisesErrno(errno.EADDRNOTAVAIL,
506                            s.bind, ("224.0.0.1", 651))
507     # Binding to an address we don't have only works with IP_TRANSPARENT.
508     self.assertRaisesErrno(errno.EADDRNOTAVAIL,
509                            s.bind, (net_test.IPV4_ADDR, 651))
510     try:
511       s.setsockopt(SOL_IP, net_test.IP_TRANSPARENT, 1)
512       s.bind((net_test.IPV4_ADDR, 651))
513     except IOError, e:
514       if e.errno == errno.EACCES:
515         pass  # We're not root. let it go for now.
516
517   def testIPv6InvalidBind(self):
518     s = net_test.IPv6PingSocket()
519     self.assertRaisesErrno(errno.EINVAL,
520                            s.bind, ("ff02::2", 1026))
521
522     # Binding to an address we don't have only works with IPV6_TRANSPARENT.
523     self.assertRaisesErrno(errno.EADDRNOTAVAIL,
524                            s.bind, (net_test.IPV6_ADDR, 651))
525     try:
526       s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_TRANSPARENT, 1)
527       s.bind((net_test.IPV6_ADDR, 651))
528     except IOError, e:
529       if e.errno == errno.EACCES:
530         pass  # We're not root. let it go for now.
531
532   def testAfUnspecBind(self):
533     # Binding to AF_UNSPEC is treated as IPv4 if the address is 0.0.0.0.
534     s4 = net_test.IPv4PingSocket()
535     sockaddr = csocket.Sockaddr(("0.0.0.0", 12996))
536     sockaddr.family = AF_UNSPEC
537     csocket.Bind(s4, sockaddr)
538     self.assertEquals(("0.0.0.0", 12996), s4.getsockname())
539
540     # But not if the address is anything else.
541     sockaddr = csocket.Sockaddr(("127.0.0.1", 58234))
542     sockaddr.family = AF_UNSPEC
543     self.assertRaisesErrno(errno.EAFNOSUPPORT, csocket.Bind, s4, sockaddr)
544
545     # This doesn't work for IPv6.
546     s6 = net_test.IPv6PingSocket()
547     sockaddr = csocket.Sockaddr(("::1", 58997))
548     sockaddr.family = AF_UNSPEC
549     self.assertRaisesErrno(errno.EAFNOSUPPORT, csocket.Bind, s6, sockaddr)
550
551   def testIPv6ScopedBind(self):
552     # Can't bind to a link-local address without a scope ID.
553     s = net_test.IPv6PingSocket()
554     self.assertRaisesErrno(errno.EINVAL,
555                            s.bind, (self.lladdr, 1026, 0, 0))
556
557     # Binding to a link-local address with a scope ID works, and the scope ID is
558     # returned by a subsequent getsockname. Interestingly, Python's getsockname
559     # returns "fe80:1%foo", even though it does not understand it.
560     expected = self.lladdr + "%" + self.ifname
561     s.bind((self.lladdr, 4646, 0, self.ifindex))
562     self.assertEquals((expected, 4646, 0, self.ifindex), s.getsockname())
563
564     # Of course, for the above to work the address actually has to be configured
565     # on the machine.
566     self.assertRaisesErrno(errno.EADDRNOTAVAIL,
567                            s.bind, ("fe80::f00", 1026, 0, 1))
568
569     # Scope IDs on non-link-local addresses are silently ignored.
570     s = net_test.IPv6PingSocket()
571     s.bind(("::1", 1234, 0, 1))
572     self.assertEquals(("::1", 1234, 0, 0), s.getsockname())
573
574   def testBindAffectsIdentifier(self):
575     s = net_test.IPv6PingSocket()
576     s.bind((self.globaladdr, 0xf976))
577     s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
578     self.assertEquals("\xf9\x76", s.recv(32768)[4:6])
579
580     s = net_test.IPv6PingSocket()
581     s.bind((self.globaladdr, 0xace))
582     s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
583     self.assertEquals("\x0a\xce", s.recv(32768)[4:6])
584
585   def testLinkLocalAddress(self):
586     s = net_test.IPv6PingSocket()
587     # Sending to a link-local address with no scope fails with EINVAL.
588     self.assertRaisesErrno(errno.EINVAL,
589                            s.sendto, net_test.IPV6_PING, ("fe80::1", 55))
590     # Sending to link-local address with a scope succeeds. Note that Python
591     # doesn't understand the "fe80::1%lo" format, even though it returns it.
592     s.sendto(net_test.IPV6_PING, ("fe80::1", 55, 0, self.ifindex))
593     # No exceptions? Good.
594
595   def testMappedAddressFails(self):
596     s = net_test.IPv6PingSocket()
597     s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
598     self.assertValidPingResponse(s, net_test.IPV6_PING)
599     s.sendto(net_test.IPV6_PING, ("2001:4860:4860::8844", 55))
600     self.assertValidPingResponse(s, net_test.IPV6_PING)
601     self.assertRaisesErrno(errno.EINVAL, s.sendto, net_test.IPV6_PING,
602                            ("::ffff:192.0.2.1", 55))
603
604   @unittest.skipUnless(False, "skipping: does not work yet")
605   def testFlowLabel(self):
606     s = net_test.IPv6PingSocket()
607
608     # Specifying a flowlabel without having set IPV6_FLOWINFO_SEND succeeds but
609     # the flow label in the packet is not set.
610     s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 93, 0xdead, 0))
611     self.assertValidPingResponse(s, net_test.IPV6_PING)  # Checks flow label==0.
612
613     # If IPV6_FLOWINFO_SEND is set on the socket, attempting to set a flow label
614     # that is not registered with the flow manager should return EINVAL...
615     s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_FLOWINFO_SEND, 1)
616     # ... but this doesn't work yet.
617     if False:
618       self.assertRaisesErrno(errno.EINVAL, s.sendto, net_test.IPV6_PING,
619                              (net_test.IPV6_ADDR, 93, 0xdead, 0))
620
621     # After registering the flow label, it gets sent properly, appears in the
622     # output packet, and is returned in the response.
623     net_test.SetFlowLabel(s, net_test.IPV6_ADDR, 0xdead)
624     self.assertEqual(1, s.getsockopt(net_test.SOL_IPV6,
625                                      net_test.IPV6_FLOWINFO_SEND))
626     s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 93, 0xdead, 0))
627     _, src = s.recvfrom(32768)
628     _, _, flowlabel, _ = src
629     self.assertEqual(0xdead, flowlabel & 0xfffff)
630
631   def testIPv4Error(self):
632     s = net_test.IPv4PingSocket()
633     s.setsockopt(SOL_IP, IP_TTL, 2)
634     s.setsockopt(SOL_IP, net_test.IP_RECVERR, 1)
635     s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 55))
636     # We can't check the actual error because Python 2.7 doesn't implement
637     # recvmsg, but we can at least check that the socket returns an error.
638     self.assertRaisesErrno(errno.EHOSTUNREACH, s.recv, 32768)  # No response.
639
640   def testIPv6Error(self):
641     s = net_test.IPv6PingSocket()
642     s.setsockopt(net_test.SOL_IPV6, IPV6_UNICAST_HOPS, 2)
643     s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_RECVERR, 1)
644     s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
645     # We can't check the actual error because Python 2.7 doesn't implement
646     # recvmsg, but we can at least check that the socket returns an error.
647     self.assertRaisesErrno(errno.EHOSTUNREACH, s.recv, 32768)  # No response.
648
649   def testIPv6MulticastPing(self):
650     s = net_test.IPv6PingSocket()
651     # Send a multicast ping and check we get at least one duplicate.
652     # The setsockopt should not be necessary, but ping_v6_sendmsg has a bug.
653     s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_MULTICAST_IF, self.ifindex)
654     s.sendto(net_test.IPV6_PING, ("ff02::1", 55, 0, self.ifindex))
655     self.assertValidPingResponse(s, net_test.IPV6_PING)
656     self.assertValidPingResponse(s, net_test.IPV6_PING)
657
658   def testIPv4LargePacket(self):
659     s = net_test.IPv4PingSocket()
660     data = net_test.IPV4_PING + 20000 * "a"
661     s.sendto(data, ("127.0.0.1", 987))
662     self.assertValidPingResponse(s, data)
663
664   def testIPv6LargePacket(self):
665     s = net_test.IPv6PingSocket()
666     s.bind(("::", 0xace))
667     data = net_test.IPV6_PING + "\x01" + 19994 * "\x00" + "aaaaa"
668     s.sendto(data, ("::1", 953))
669
670   @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6")
671   def testIcmpSocketsNotInIcmp6(self):
672     numrows = len(self.ReadProcNetSocket("icmp"))
673     numrows6 = len(self.ReadProcNetSocket("icmp6"))
674     s = net_test.Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP)
675     s.bind(("127.0.0.1", 0xace))
676     s.connect(("127.0.0.1", 0xbeef))
677     self.assertEquals(numrows + 1, len(self.ReadProcNetSocket("icmp")))
678     self.assertEquals(numrows6, len(self.ReadProcNetSocket("icmp6")))
679
680   @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6")
681   def testIcmp6SocketsNotInIcmp(self):
682     numrows = len(self.ReadProcNetSocket("icmp"))
683     numrows6 = len(self.ReadProcNetSocket("icmp6"))
684     s = net_test.IPv6PingSocket()
685     s.bind(("::1", 0xace))
686     s.connect(("::1", 0xbeef))
687     self.assertEquals(numrows, len(self.ReadProcNetSocket("icmp")))
688     self.assertEquals(numrows6 + 1, len(self.ReadProcNetSocket("icmp6")))
689
690   def testProcNetIcmp(self):
691     s = net_test.Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP)
692     s.bind(("127.0.0.1", 0xace))
693     s.connect(("127.0.0.1", 0xbeef))
694     self.CheckSockStatFile("icmp", "127.0.0.1", 0xace, "127.0.0.1", 0xbeef, 1)
695
696   @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6")
697   def testProcNetIcmp6(self):
698     numrows6 = len(self.ReadProcNetSocket("icmp6"))
699     s = net_test.IPv6PingSocket()
700     s.bind(("::1", 0xace))
701     s.connect(("::1", 0xbeef))
702     self.CheckSockStatFile("icmp6", "::1", 0xace, "::1", 0xbeef, 1)
703
704     # Check the row goes away when the socket is closed.
705     s.close()
706     self.assertEquals(numrows6, len(self.ReadProcNetSocket("icmp6")))
707
708     # Try send, bind and connect to check the addresses and the state.
709     s = net_test.IPv6PingSocket()
710     self.assertEqual(0, len(self.ReadProcNetSocket("icmp6")))
711     s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 12345))
712     self.assertEqual(1, len(self.ReadProcNetSocket("icmp6")))
713
714     # Can't bind after sendto, apparently.
715     s = net_test.IPv6PingSocket()
716     self.assertEqual(0, len(self.ReadProcNetSocket("icmp6")))
717     s.bind((self.lladdr, 0xd00d, 0, self.ifindex))
718     self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "::", 0, 7)
719
720     # Check receive bytes.
721     s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_MULTICAST_IF, self.ifindex)
722     s.connect(("ff02::1", 0xdead))
723     self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1)
724     s.send(net_test.IPV6_PING)
725     time.sleep(0.01)  # Give the other thread time to reply.
726     self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1,
727                            txmem=0, rxmem=0x300)
728     self.assertValidPingResponse(s, net_test.IPV6_PING)
729     self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1,
730                            txmem=0, rxmem=0)
731
732   def testProcNetUdp6(self):
733     s = net_test.Socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP)
734     s.bind(("::1", 0xace))
735     s.connect(("::1", 0xbeef))
736     self.CheckSockStatFile("udp6", "::1", 0xace, "::1", 0xbeef, 1)
737
738   def testProcNetRaw6(self):
739     s = net_test.Socket(AF_INET6, SOCK_RAW, IPPROTO_RAW)
740     s.bind(("::1", 0xace))
741     s.connect(("::1", 0xbeef))
742     self.CheckSockStatFile("raw6", "::1", 0xff, "::1", 0, 1)
743
744
745 if __name__ == "__main__":
746   unittest.main()