OSDN Git Service

Add a function to build UDP packets with options.
authorLorenzo Colitti <lorenzo@google.com>
Fri, 9 May 2014 01:22:36 +0000 (10:22 +0900)
committerLorenzo Colitti <lorenzo@google.com>
Mon, 2 Feb 2015 08:47:27 +0000 (17:47 +0900)
Change-Id: I416bb293cca532b05c93bcaead9b024dbfd8c2d0

tests/net_test/mark_test.py

index 23ae086..b2cb96c 100755 (executable)
@@ -130,6 +130,19 @@ class Packets(object):
             scapy.UDP(sport=sport, dport=53) / UDP_PAYLOAD)
 
   @classmethod
+  def UDPWithOptions(cls, version, srcaddr, dstaddr, sport=0):
+    if version == 4:
+      packet = (scapy.IP(src=srcaddr, dst=dstaddr, ttl=39, tos=0x83) /
+                scapy.UDP(sport=sport, dport=53) /
+                UDP_PAYLOAD)
+    else:
+      packet = (scapy.IPv6(src=srcaddr, dst=dstaddr,
+                           fl=0xbeef, hlim=39, tc=0x83) /
+                scapy.UDP(sport=sport, dport=53) /
+                UDP_PAYLOAD)
+    return ("UDPv%d packet with options" % version, packet)
+
+  @classmethod
   def SYN(cls, dport, version, srcaddr, dstaddr, sport=0, seq=TCP_SEQ):
     ip = cls._GetIpLayer(version)
     if sport == 0:
@@ -1016,9 +1029,6 @@ class OutgoingTest(MultiNetworkTest):
         srcaddr = self.MyAddress(version, netid)
         dstaddr = self.GetRemoteAddress(version)
 
-        s.bind(("", 0))
-        sport = s.getsockname()[1]
-
         if version == 6:
           # Create a flowlabel so we can use it.
           net_test.SetFlowLabel(s, net_test.IPV6_ADDR, 0xbeef)
@@ -1031,10 +1041,6 @@ class OutgoingTest(MultiNetworkTest):
               (net_test.IPPROTO_IPV6, IPV6_FLOWINFO, int(htonl(0xbeef))),
               (net_test.IPPROTO_IPV6, IPV6_PKTINFO, pktinfo)
           ]
-          expected = (scapy.IPv6(src=srcaddr, dst=dstaddr,
-                                 fl=0xbeef, hlim=39, tc=0x83) /
-                      scapy.UDP(sport=sport, dport=53) /
-                      UDP_PAYLOAD)
         else:
           pktinfo = self._MakePktInfo(4, "0.0.0.0", self.ifindices[netid])
           cmsg_opts = [
@@ -1042,15 +1048,16 @@ class OutgoingTest(MultiNetworkTest):
               (net_test.IPPROTO_IP, IP_TOS, 0x83),
               (net_test.IPPROTO_IP, IP_PKTINFO, pktinfo),
           ]
-          expected = (scapy.IP(src=srcaddr, dst=dstaddr, ttl=39, tos=0x83) /
-                      scapy.UDP(sport=sport, dport=53) /
-                      UDP_PAYLOAD)
 
         sendmsg.Sendmsg(s, (dstaddr, 53), UDP_PAYLOAD, cmsg_opts,
                         sendmsg.MSG_CONFIRM)
 
-        msg = "IPv%d UDP using pktinfo routing: expected UDP packet on %s" % (
-            version, self.GetInterfaceName(netid))
+        sport = s.getsockname()[1]
+        desc, expected = Packets.UDPWithOptions(version, srcaddr, dstaddr,
+                                                sport=sport)
+
+        msg = "IPv%d UDP using pktinfo routing: expected %s on %s" % (
+            version, desc, self.GetInterfaceName(netid))
         self.ExpectPacketOn(netid, msg, expected)
 
   def testIPv4PktinfoRouting(self):