#!/usr/bin/python
+#
+# Copyright 2014 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
# pylint: disable=g-bad-todo
import errno
import os
import posix
-import re
+import random
from socket import * # pylint: disable=wildcard-import
+import threading
+import time
import unittest
+from scapy import all as scapy
+
+import csocket
+import multinetwork_base
import net_test
HAVE_PROC_NET_ICMP6 = os.path.isfile("/proc/net/icmp6")
+ICMP_ECHO = 8
+ICMP_ECHOREPLY = 0
+ICMPV6_ECHO_REQUEST = 128
+ICMPV6_ECHO_REPLY = 129
+
+
+class PingReplyThread(threading.Thread):
+
+ MIN_TTL = 10
+ INTERMEDIATE_IPV4 = "192.0.2.2"
+ INTERMEDIATE_IPV6 = "2001:db8:1:2::ace:d00d"
+ NEIGHBOURS = ["fe80::1"]
+
+ def __init__(self, tun, mymac, routermac):
+ super(PingReplyThread, self).__init__()
+ self._tun = tun
+ self._stopped = False
+ self._mymac = mymac
+ self._routermac = routermac
+
+ def Stop(self):
+ self._stopped = True
+
+ def ChecksumValid(self, packet):
+ # Get and clear the checksums.
+ def GetAndClearChecksum(layer):
+ if not layer:
+ return
+ try:
+ checksum = layer.chksum
+ del layer.chksum
+ except AttributeError:
+ checksum = layer.cksum
+ del layer.cksum
+ return checksum
+
+ def GetChecksum(layer):
+ try:
+ return layer.chksum
+ except AttributeError:
+ return layer.cksum
+
+ layers = ["IP", "ICMP", scapy.ICMPv6EchoRequest]
+ sums = {}
+ for name in layers:
+ sums[name] = GetAndClearChecksum(packet.getlayer(name))
+
+ # Serialize the packet, so scapy recalculates the checksums, and compare
+ # them with the ones in the packet.
+ packet = packet.__class__(str(packet))
+ for name in layers:
+ layer = packet.getlayer(name)
+ if layer and GetChecksum(layer) != sums[name]:
+ return False
+
+ return True
+
+ def SendTimeExceeded(self, version, packet):
+ if version == 4:
+ src = packet.getlayer(scapy.IP).src
+ self.SendPacket(
+ scapy.IP(src=self.INTERMEDIATE_IPV4, dst=src) /
+ scapy.ICMP(type=11, code=0) /
+ packet)
+ elif version == 6:
+ src = packet.getlayer(scapy.IPv6).src
+ self.SendPacket(
+ scapy.IPv6(src=self.INTERMEDIATE_IPV6, dst=src) /
+ scapy.ICMPv6TimeExceeded(code=0) /
+ packet)
+
+ def IPv4Packet(self, ip):
+ icmp = ip.getlayer(scapy.ICMP)
+
+ # We only support ping for now.
+ if (ip.proto != IPPROTO_ICMP or
+ icmp.type != ICMP_ECHO or
+ icmp.code != 0):
+ return
+
+ # Check the checksums.
+ if not self.ChecksumValid(ip):
+ return
+
+ if ip.ttl < self.MIN_TTL:
+ self.SendTimeExceeded(4, ip)
+ return
+
+ icmp.type = ICMP_ECHOREPLY
+ self.SwapAddresses(ip)
+ self.SendPacket(ip)
+
+ def IPv6Packet(self, ipv6):
+ icmpv6 = ipv6.getlayer(scapy.ICMPv6EchoRequest)
+
+ # We only support ping for now.
+ if (ipv6.nh != IPPROTO_ICMPV6 or
+ not icmpv6 or
+ icmpv6.type != ICMPV6_ECHO_REQUEST or
+ icmpv6.code != 0):
+ return
+
+ # Check the checksums.
+ if not self.ChecksumValid(ipv6):
+ return
+
+ if ipv6.dst.startswith("ff02::"):
+ ipv6.dst = ipv6.src
+ for src in self.NEIGHBOURS:
+ ipv6.src = src
+ icmpv6.type = ICMPV6_ECHO_REPLY
+ self.SendPacket(ipv6)
+ elif ipv6.hlim < self.MIN_TTL:
+ self.SendTimeExceeded(6, ipv6)
+ else:
+ icmpv6.type = ICMPV6_ECHO_REPLY
+ self.SwapAddresses(ipv6)
+ self.SendPacket(ipv6)
-class Ping6Test(net_test.NetworkTest):
+ def SwapAddresses(self, packet):
+ src = packet.src
+ packet.src = packet.dst
+ packet.dst = src
+
+ def SendPacket(self, packet):
+ packet = scapy.Ether(src=self._routermac, dst=self._mymac) / packet
+ try:
+ posix.write(self._tun.fileno(), str(packet))
+ except ValueError:
+ pass
+
+ def run(self):
+ while not self._stopped:
+
+ try:
+ packet = posix.read(self._tun.fileno(), 4096)
+ except OSError, e:
+ if e.errno == errno.EAGAIN:
+ continue
+ else:
+ break
+
+ ether = scapy.Ether(packet)
+ if ether.type == net_test.ETH_P_IPV6:
+ self.IPv6Packet(ether.payload)
+ elif ether.type == net_test.ETH_P_IP:
+ self.IPv4Packet(ether.payload)
+
+
+class Ping6Test(multinetwork_base.MultiNetworkBaseTest):
+
+ @classmethod
+ def setUpClass(cls):
+ super(Ping6Test, cls).setUpClass()
+ cls.netid = random.choice(cls.NETIDS)
+ cls.reply_thread = PingReplyThread(
+ cls.tuns[cls.netid],
+ cls.MyMacAddress(cls.netid),
+ cls.RouterMacAddress(cls.netid))
+ cls.SetDefaultNetwork(cls.netid)
+ cls.reply_thread.start()
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.reply_thread.Stop()
+ cls.ClearDefaultNetwork()
+ super(Ping6Test, cls).tearDownClass()
def setUp(self):
- if net_test.HAVE_IPV6:
- self.ifname = net_test.GetDefaultRouteInterface()
- self.ifindex = net_test.GetInterfaceIndex(self.ifname)
- self.lladdr = net_test.GetLinkAddress(self.ifname, True)
- self.globaladdr = net_test.GetLinkAddress(self.ifname, False)
+ self.ifname = self.GetInterfaceName(self.netid)
+ self.ifindex = self.ifindices[self.netid]
+ self.lladdr = net_test.GetLinkAddress(self.ifname, True)
+ self.globaladdr = net_test.GetLinkAddress(self.ifname, False)
def assertValidPingResponse(self, s, data):
family = s.family
+ # Receive the reply.
+ rcvd, src = s.recvfrom(32768)
+ self.assertNotEqual(0, len(rcvd), "No data received")
+
+ # If this is a dual-stack socket sending to a mapped IPv4 address, treat it
+ # as IPv4.
+ if src[0].startswith("::ffff:"):
+ family = AF_INET
+ src = (src[0].replace("::ffff:", ""), src[1:])
+
# Check the data being sent is valid.
self.assertGreater(len(data), 7, "Not enough data for ping packet")
if family == AF_INET:
self.assertTrue(data.startswith("\x08\x00"), "Not an IPv4 echo request")
elif family == AF_INET6:
- self.assertTrue(data.startswith("\x80\x00"), "Not an IPv4 echo request")
+ self.assertTrue(data.startswith("\x80\x00"), "Not an IPv6 echo request")
else:
self.fail("Unknown socket address family %d" * s.family)
- # Receive the reply.
- rcvd, src = s.recvfrom(32768)
- self.assertNotEqual(0, len(rcvd), "No data received")
-
# Check address, ICMP type, and ICMP code.
if family == AF_INET:
addr, unused_port = src
self.assertGreaterEqual(len(addr), len("1.1.1.1"))
self.assertTrue(rcvd.startswith("\x00\x00"), "Not an IPv4 echo reply")
else:
- addr, unused_port, flowlabel, scope_id = src
+ addr, unused_port, flowlabel, scope_id = src # pylint: disable=unbalanced-tuple-unpacking
self.assertGreaterEqual(len(addr), len("::"))
self.assertTrue(rcvd.startswith("\x81\x00"), "Not an IPv6 echo reply")
# Check that the flow label is zero and that the scope ID is sane.
self.assertEqual(flowlabel, 0)
- self.assertLess(scope_id, 100)
+ if addr.startswith("fe80::"):
+ self.assertTrue(scope_id in self.ifindices.values())
+ else:
+ self.assertEquals(0, scope_id)
# TODO: check the checksum. We can't do this easily now for ICMPv6 because
# we don't have the IP addresses so we can't construct the pseudoheader.
self.assertEqual(len(data), len(rcvd))
self.assertEqual(data[6:].encode("hex"), rcvd[6:].encode("hex"))
- def ReadProcNetSocket(self, protocol):
- # Read file.
- lines = open("/proc/net/%s" % protocol).readlines()
-
- # Possibly check, and strip, header.
- if protocol in ["icmp6", "raw6", "udp6"]:
- self.assertEqual(net_test.IPV6_SEQ_DGRAM_HEADER, lines[0])
- lines = lines[1:]
-
- # Check contents.
- if protocol.endswith("6"):
- addrlen = 32
- else:
- addrlen = 8
- regexp = re.compile(r" *(\d+): " # bucket
- "([0-9A-F]{%d}:[0-9A-F]{4}) " # srcaddr, port
- "([0-9A-F]{%d}:[0-9A-F]{4}) " # dstaddr, port
- "([0-9A-F][0-9A-F]) " # state
- "([0-9A-F]{8}:[0-9A-F]{8}) " # mem
- "([0-9A-F]{2}:[0-9A-F]{8}) " # ?
- "([0-9A-F]{8}) +" # ?
- "([0-9]+) +" # uid
- "([0-9]+) +" # ?
- "([0-9]+) +" # inode
- "([0-9]+) +" # refcnt
- "([0-9a-f]+) +" # sp
- "([0-9]+) *$" # drops, icmp has spaces
- % (addrlen, addrlen))
- # Return a list of lists with only source / dest addresses for now.
- out = []
- for line in lines:
- (_, src, dst, state, mem,
- _, _, uid, _, _, refcnt, _, drops) = regexp.match(line).groups()
- out.append([src, dst, state, mem, uid, refcnt, drops])
- return out
-
def CheckSockStatFile(self, name, srcaddr, srcport, dstaddr, dstport, state,
txmem=0, rxmem=0):
expected = ["%s:%04X" % (net_test.FormatSockStatAddress(srcaddr), srcport),
s = net_test.IPv4PingSocket()
self.assertRaisesErrno(errno.EDESTADDRREQ, s.send, net_test.IPV4_PING)
- @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
def testIPv6SendWithNoConnection(self):
s = net_test.IPv6PingSocket()
self.assertRaisesErrno(errno.EDESTADDRREQ, s.send, net_test.IPV6_PING)
s.send(data)
self.assertValidPingResponse(s, data)
- @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
def testIPv6LoopbackPingWithConnect(self):
s = net_test.IPv6PingSocket()
s.connect(("::1", 55))
self.assertEquals(len(net_test.IPV4_PING), written)
self.assertValidPingResponse(s, net_test.IPV4_PING)
- @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
def testIPv6PingUsingSendto(self):
s = net_test.IPv6PingSocket()
written = s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
reply = posix.read(fd, 4096)
self.assertEquals(written, len(reply))
+ def testCrossProtocolCrash(self):
+ # Checks that an ICMP error containing a ping packet that matches the ID
+ # of a socket of the wrong protocol (which can happen when using 464xlat)
+ # doesn't crash the kernel.
+
+ # We can only test this using IPv6 unreachables and IPv4 ping sockets,
+ # because IPv4 packets sent by scapy.send() on loopback are not received by
+ # the kernel. So we don't actually use this function yet.
+ def GetIPv4Unreachable(port): # pylint: disable=unused-variable
+ return (scapy.IP(src="192.0.2.1", dst="127.0.0.1") /
+ scapy.ICMP(type=3, code=0) /
+ scapy.IP(src="127.0.0.1", dst="127.0.0.1") /
+ scapy.ICMP(type=8, id=port, seq=1))
+
+ def GetIPv6Unreachable(port):
+ return (scapy.IPv6(src="::1", dst="::1") /
+ scapy.ICMPv6DestUnreach() /
+ scapy.IPv6(src="::1", dst="::1") /
+ scapy.ICMPv6EchoRequest(id=port, seq=1, data="foobarbaz"))
+
+ # An unreachable matching the ID of a socket of the wrong protocol
+ # shouldn't crash.
+ s = net_test.IPv4PingSocket()
+ s.connect(("127.0.0.1", 12345))
+ _, port = s.getsockname()
+ scapy.send(GetIPv6Unreachable(port))
+ # No crash? Good.
+
+ def testCrossProtocolCalls(self):
+ """Tests that passing in the wrong family returns EAFNOSUPPORT.
+
+ Relevant kernel commits:
+ upstream net:
+ 91a0b60 net/ping: handle protocol mismatching scenario
+ 9145736d net: ping: Return EAFNOSUPPORT when appropriate.
+
+ android-3.10:
+ 78a6809 net/ping: handle protocol mismatching scenario
+ 428e6d6 net: ping: Return EAFNOSUPPORT when appropriate.
+ """
+
+ def CheckEAFNoSupport(function, *args):
+ self.assertRaisesErrno(errno.EAFNOSUPPORT, function, *args)
+
+ ipv6sockaddr = csocket.Sockaddr((net_test.IPV6_ADDR, 53))
+
+ # In order to check that IPv6 socket calls return EAFNOSUPPORT when passed
+ # IPv4 socket address structures, we need to pass down a socket address
+ # length argument that's at least sizeof(sockaddr_in6). Otherwise, the calls
+ # will fail immediately with EINVAL because the passed-in socket length is
+ # too short. So create a sockaddr_in that's as long as a sockaddr_in6.
+ ipv4sockaddr = csocket.Sockaddr((net_test.IPV4_ADDR, 53))
+ ipv4sockaddr = csocket.SockaddrIn6(
+ ipv4sockaddr.Pack() +
+ "\x00" * (len(csocket.SockaddrIn6) - len(csocket.SockaddrIn)))
+
+ s4 = net_test.IPv4PingSocket()
+ s6 = net_test.IPv6PingSocket()
+
+ # We can't just call s.connect(), s.bind() etc. with a tuple of the wrong
+ # address family, because the Python implementation will just pass garbage
+ # down to the kernel. So call the C functions directly.
+ CheckEAFNoSupport(csocket.Bind, s4, ipv6sockaddr)
+ CheckEAFNoSupport(csocket.Bind, s6, ipv4sockaddr)
+ CheckEAFNoSupport(csocket.Connect, s4, ipv6sockaddr)
+ CheckEAFNoSupport(csocket.Connect, s6, ipv4sockaddr)
+ CheckEAFNoSupport(csocket.Sendmsg,
+ s4, ipv6sockaddr, net_test.IPV4_PING, None, 0)
+ CheckEAFNoSupport(csocket.Sendmsg,
+ s6, ipv4sockaddr, net_test.IPV6_PING, None, 0)
+
def testIPv4Bind(self):
# Bind to unspecified address.
s = net_test.IPv4PingSocket()
s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 9132))
self.assertRaisesErrno(errno.EINVAL, s.bind, ("0.0.0.0", 5429))
- @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
def testIPv6Bind(self):
# Bind to unspecified address.
s = net_test.IPv6PingSocket()
if e.errno == errno.EACCES:
pass # We're not root. let it go for now.
- @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
def testIPv6InvalidBind(self):
s = net_test.IPv6PingSocket()
self.assertRaisesErrno(errno.EINVAL,
if e.errno == errno.EACCES:
pass # We're not root. let it go for now.
- @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
+ def testAfUnspecBind(self):
+ # Binding to AF_UNSPEC is treated as IPv4 if the address is 0.0.0.0.
+ s4 = net_test.IPv4PingSocket()
+ sockaddr = csocket.Sockaddr(("0.0.0.0", 12996))
+ sockaddr.family = AF_UNSPEC
+ csocket.Bind(s4, sockaddr)
+ self.assertEquals(("0.0.0.0", 12996), s4.getsockname())
+
+ # But not if the address is anything else.
+ sockaddr = csocket.Sockaddr(("127.0.0.1", 58234))
+ sockaddr.family = AF_UNSPEC
+ self.assertRaisesErrno(errno.EAFNOSUPPORT, csocket.Bind, s4, sockaddr)
+
+ # This doesn't work for IPv6.
+ s6 = net_test.IPv6PingSocket()
+ sockaddr = csocket.Sockaddr(("::1", 58997))
+ sockaddr.family = AF_UNSPEC
+ self.assertRaisesErrno(errno.EAFNOSUPPORT, csocket.Bind, s6, sockaddr)
+
def testIPv6ScopedBind(self):
# Can't bind to a link-local address without a scope ID.
s = net_test.IPv6PingSocket()
s.bind(("::1", 1234, 0, 1))
self.assertEquals(("::1", 1234, 0, 0), s.getsockname())
- @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
def testBindAffectsIdentifier(self):
s = net_test.IPv6PingSocket()
s.bind((self.globaladdr, 0xf976))
s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
self.assertEquals("\x0a\xce", s.recv(32768)[4:6])
- @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
def testLinkLocalAddress(self):
s = net_test.IPv6PingSocket()
# Sending to a link-local address with no scope fails with EINVAL.
s.sendto(net_test.IPV6_PING, ("fe80::1", 55, 0, self.ifindex))
# No exceptions? Good.
- @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
def testMappedAddressFails(self):
s = net_test.IPv6PingSocket()
s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
self.assertRaisesErrno(errno.EINVAL, s.sendto, net_test.IPV6_PING,
("::ffff:192.0.2.1", 55))
- @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
+ @unittest.skipUnless(False, "skipping: does not work yet")
def testFlowLabel(self):
s = net_test.IPv6PingSocket()
- net_test.SetFlowLabel(s, net_test.IPV6_ADDR, 0xdead)
+
+ # Specifying a flowlabel without having set IPV6_FLOWINFO_SEND succeeds but
+ # the flow label in the packet is not set.
s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 93, 0xdead, 0))
self.assertValidPingResponse(s, net_test.IPV6_PING) # Checks flow label==0.
+ # If IPV6_FLOWINFO_SEND is set on the socket, attempting to set a flow label
+ # that is not registered with the flow manager should return EINVAL...
s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_FLOWINFO_SEND, 1)
+ # ... but this doesn't work yet.
+ if False:
+ self.assertRaisesErrno(errno.EINVAL, s.sendto, net_test.IPV6_PING,
+ (net_test.IPV6_ADDR, 93, 0xdead, 0))
+
+ # After registering the flow label, it gets sent properly, appears in the
+ # output packet, and is returned in the response.
+ net_test.SetFlowLabel(s, net_test.IPV6_ADDR, 0xdead)
self.assertEqual(1, s.getsockopt(net_test.SOL_IPV6,
net_test.IPV6_FLOWINFO_SEND))
s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 93, 0xdead, 0))
# recvmsg, but we can at least check that the socket returns an error.
self.assertRaisesErrno(errno.EHOSTUNREACH, s.recv, 32768) # No response.
- @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
def testIPv6Error(self):
s = net_test.IPv6PingSocket()
s.setsockopt(net_test.SOL_IPV6, IPV6_UNICAST_HOPS, 2)
# recvmsg, but we can at least check that the socket returns an error.
self.assertRaisesErrno(errno.EHOSTUNREACH, s.recv, 32768) # No response.
- @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
def testIPv6MulticastPing(self):
s = net_test.IPv6PingSocket()
# Send a multicast ping and check we get at least one duplicate.
+ # The setsockopt should not be necessary, but ping_v6_sendmsg has a bug.
+ s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_MULTICAST_IF, self.ifindex)
s.sendto(net_test.IPV6_PING, ("ff02::1", 55, 0, self.ifindex))
self.assertValidPingResponse(s, net_test.IPV6_PING)
self.assertValidPingResponse(s, net_test.IPV6_PING)
s.sendto(data, ("127.0.0.1", 987))
self.assertValidPingResponse(s, data)
- @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
def testIPv6LargePacket(self):
s = net_test.IPv6PingSocket()
s.bind(("::", 0xace))
data = net_test.IPV6_PING + "\x01" + 19994 * "\x00" + "aaaaa"
s.sendto(data, ("::1", 953))
- @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
@unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6")
def testIcmpSocketsNotInIcmp6(self):
numrows = len(self.ReadProcNetSocket("icmp"))
self.assertEquals(numrows + 1, len(self.ReadProcNetSocket("icmp")))
self.assertEquals(numrows6, len(self.ReadProcNetSocket("icmp6")))
- @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
@unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6")
def testIcmp6SocketsNotInIcmp(self):
numrows = len(self.ReadProcNetSocket("icmp"))
s.connect(("127.0.0.1", 0xbeef))
self.CheckSockStatFile("icmp", "127.0.0.1", 0xace, "127.0.0.1", 0xbeef, 1)
- @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
@unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6")
def testProcNetIcmp6(self):
numrows6 = len(self.ReadProcNetSocket("icmp6"))
self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "::", 0, 7)
# Check receive bytes.
+ s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_MULTICAST_IF, self.ifindex)
s.connect(("ff02::1", 0xdead))
self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1)
s.send(net_test.IPV6_PING)
+ time.sleep(0.01) # Give the other thread time to reply.
self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1,
- txmem=0, rxmem=0x880)
+ txmem=0, rxmem=0x300)
self.assertValidPingResponse(s, net_test.IPV6_PING)
self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1,
txmem=0, rxmem=0)
- @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
def testProcNetUdp6(self):
s = net_test.Socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP)
s.bind(("::1", 0xace))
s.connect(("::1", 0xbeef))
self.CheckSockStatFile("udp6", "::1", 0xace, "::1", 0xbeef, 1)
- @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
def testProcNetRaw6(self):
s = net_test.Socket(AF_INET6, SOCK_RAW, IPPROTO_RAW)
s.bind(("::1", 0xace))
if __name__ == "__main__":
unittest.main()
-