OSDN Git Service

Add tests for {sticky,non-sticky} pktinfo routing
authorLorenzo Colitti <lorenzo@google.com>
Fri, 2 May 2014 10:29:49 +0000 (19:29 +0900)
committerLorenzo Colitti <lorenzo@google.com>
Mon, 2 Feb 2015 08:47:27 +0000 (17:47 +0900)
Change-Id: I7ba69ef9e26999e920118b2f3c75e34fac032d8f

tests/net_test/mark_test.py

index a1b0ae4..23ae086 100755 (executable)
@@ -13,8 +13,10 @@ import unittest
 
 from scapy import all as scapy
 
+import cstruct
 import iproute
 import net_test
+import sendmsg
 
 IFF_TUN = 1
 IFF_TAP = 2
@@ -28,9 +30,23 @@ PING_TOS = 0x83
 
 SO_BINDTODEVICE = 25
 
+# Setsockopt values.
 IP_UNICAST_IF = 50
 IPV6_UNICAST_IF = 76
 
+# Cmsg values.
+IP_TTL = 2
+IP_PKTINFO = 8
+IPV6_2292PKTOPTIONS = 6
+IPV6_FLOWINFO = 11
+IPV6_PKTINFO = 50
+IPV6_HOPLIMIT = 52  # Different from IPV6_UNICAST_HOPS, this is cmsg only.
+
+# Data structures
+InPktinfo = cstruct.Struct("in_pktinfo", "@i4s4s", "ifindex spec_dst addr")
+In6Pktinfo = cstruct.Struct("in6_pktinfo", "@16si", "addr ifindex")
+
+
 UDP_PAYLOAD = str(scapy.DNS(rd=1,
                             id=random.randint(0, 65535),
                             qd=scapy.DNSQR(qname="wWW.GoOGle.CoM",
@@ -769,7 +785,7 @@ class InboundMarkingTest(MultiNetworkTest):
       pass
 
 
-class OutgoingRoutingTest(MultiNetworkTest):
+class OutgoingTest(MultiNetworkTest):
 
   # How many times to run outgoing packet tests.
   ITERATIONS = 5
@@ -940,6 +956,109 @@ class OutgoingRoutingTest(MultiNetworkTest):
     self.CheckRemarking(6, False)
     self.CheckRemarking(6, True)
 
+  def _MakePktInfo(self, version, addr, ifindex):
+    version = 6 if ":" in addr else 4
+    family = {4: AF_INET, 6: AF_INET6}[version]
+    addr = inet_pton(family, addr)
+    if version == 6:
+      return In6Pktinfo((addr, ifindex)).Pack()
+    else:
+      return InPktinfo((ifindex, addr, "\x00" * 4)).Pack()
+
+  def testIPv6StickyPktinfo(self):
+    for _ in xrange(self.ITERATIONS):
+      for netid in self.tuns:
+        s = net_test.UDPSocket(AF_INET6)
+
+        # Set a flowlabel.
+        net_test.SetFlowLabel(s, net_test.IPV6_ADDR, 0xdead)
+        s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_FLOWINFO_SEND, 1)
+
+        # Set some destination options.
+        nonce = "\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c"
+        dstopts = "".join([
+            "\x11\x02",              # Next header=UDP, 24 bytes of options.
+            "\x01\x06", "\x00" * 6,  # PadN, 6 bytes of padding.
+            "\x8b\x0c",              # ILNP nonce, 12 bytes.
+            nonce
+        ])
+        s.setsockopt(net_test.SOL_IPV6, IPV6_DSTOPTS, dstopts)
+        s.setsockopt(net_test.SOL_IPV6, IPV6_UNICAST_HOPS, 255)
+
+        pktinfo = self._MakePktInfo(6, "::", self.ifindices[netid])
+
+        # Set the sticky pktinfo option.
+        s.setsockopt(net_test.SOL_IPV6, IPV6_PKTINFO, pktinfo)
+
+        # Specify the flowlabel in the destination address.
+        s.sendto(UDP_PAYLOAD, (net_test.IPV6_ADDR, 53, 0xdead, 0))
+
+        sport = s.getsockname()[1]
+        srcaddr = self.MyAddress(6, netid)
+        expected = (scapy.IPv6(src=srcaddr, dst=net_test.IPV6_ADDR,
+                               fl=0xdead, hlim=255) /
+                    scapy.IPv6ExtHdrDestOpt(
+                        options=[scapy.PadN(optdata="\x00\x00\x00\x00\x00\x00"),
+                                 scapy.HBHOptUnknown(otype=0x8b,
+                                                     optdata=nonce)]) /
+                    scapy.UDP(sport=sport, dport=53) /
+                    UDP_PAYLOAD)
+        msg = "IPv6 UDP using sticky pktinfo: expected UDP packet on %s" % (
+            self.GetInterfaceName(netid))
+        self.ExpectPacketOn(netid, msg, expected)
+
+  def CheckPktinfoRouting(self, version):
+    for _ in xrange(self.ITERATIONS):
+      for netid in self.tuns:
+        family = self.GetProtocolFamily(version)
+        s = net_test.UDPSocket(family)
+
+        srcaddr = self.MyAddress(version, netid)
+        dstaddr = self.GetRemoteAddress(version)
+
+        s.bind(("", 0))
+        sport = s.getsockname()[1]
+
+        if version == 6:
+          # Create a flowlabel so we can use it.
+          net_test.SetFlowLabel(s, net_test.IPV6_ADDR, 0xbeef)
+
+          # Specify an interface and some arbitrary options.
+          pktinfo = self._MakePktInfo(6, "::", self.ifindices[netid])
+          cmsg_opts = [
+              (net_test.IPPROTO_IPV6, IPV6_HOPLIMIT, 39),
+              (net_test.IPPROTO_IPV6, IPV6_TCLASS, 0x83),
+              (net_test.IPPROTO_IPV6, IPV6_FLOWINFO, int(htonl(0xbeef))),
+              (net_test.IPPROTO_IPV6, IPV6_PKTINFO, pktinfo)
+          ]
+          expected = (scapy.IPv6(src=srcaddr, dst=dstaddr,
+                                 fl=0xbeef, hlim=39, tc=0x83) /
+                      scapy.UDP(sport=sport, dport=53) /
+                      UDP_PAYLOAD)
+        else:
+          pktinfo = self._MakePktInfo(4, "0.0.0.0", self.ifindices[netid])
+          cmsg_opts = [
+              (net_test.IPPROTO_IP, IP_TTL, 39),
+              (net_test.IPPROTO_IP, IP_TOS, 0x83),
+              (net_test.IPPROTO_IP, IP_PKTINFO, pktinfo),
+          ]
+          expected = (scapy.IP(src=srcaddr, dst=dstaddr, ttl=39, tos=0x83) /
+                      scapy.UDP(sport=sport, dport=53) /
+                      UDP_PAYLOAD)
+
+        sendmsg.Sendmsg(s, (dstaddr, 53), UDP_PAYLOAD, cmsg_opts,
+                        sendmsg.MSG_CONFIRM)
+
+        msg = "IPv%d UDP using pktinfo routing: expected UDP packet on %s" % (
+            version, self.GetInterfaceName(netid))
+        self.ExpectPacketOn(netid, msg, expected)
+
+  def testIPv4PktinfoRouting(self):
+    self.CheckPktinfoRouting(4)
+
+  def testIPv6PktinfoRouting(self):
+    self.CheckPktinfoRouting(6)
+
 
 class MarkTest(InboundMarkingTest):