OSDN Git Service

Merge changes Ibab1eb74,I08e82f70,I9f8bb375 am: 414cff342d
authorLorenzo Colitti <lorenzo@google.com>
Mon, 14 Dec 2015 08:13:40 +0000 (00:13 -0800)
committerandroid-build-merger <android-build-merger@google.com>
Mon, 14 Dec 2015 08:13:40 +0000 (00:13 -0800)
am: d8054a1d69

* commit 'd8054a1d69ddc86c5181f4376bf7aef026d7aa30':
  Support properly resetting non-SYN, non-FIN packets.
  Add a tests that checks NAs with the R flag set to 0.
  Add utility functions to neighbour_test.

tests/net_test/neighbour_test.py
tests/net_test/packets.py

index ca39d4a..64518a9 100755 (executable)
@@ -66,15 +66,22 @@ class NeighbourTest(multinetwork_base.MultiNetworkBaseTest):
   def setUp(self):
     super(NeighbourTest, self).setUp()
 
-    self.sock = socket(AF_NETLINK, SOCK_RAW, NETLINK_ROUTE)
-    self.sock.bind((0, RTMGRP_NEIGH))
-    net_test.SetNonBlocking(self.sock)
-
     for netid in self.tuns:
+      # Clear the ND cache entries for all routers, so each test starts with
+      # the IPv6 default router in state STALE.
+      addr = self._RouterAddress(netid, 6)
+      ifindex = self.ifindices[netid]
+      self.iproute.UpdateNeighbour(6, addr, None, ifindex, NUD_FAILED)
+
+      # Configure IPv6 by sending an RA.
       self.SendRA(netid,
                   retranstimer=self.RETRANS_TIME_MS,
                   reachabletime=self.REACHABLE_TIME_MS)
 
+    self.sock = socket(AF_NETLINK, SOCK_RAW, NETLINK_ROUTE)
+    self.sock.bind((0, RTMGRP_NEIGH))
+    net_test.SetNonBlocking(self.sock)
+
     self.netid = random.choice(self.tuns.keys())
     self.ifindex = self.ifindices[self.netid]
 
@@ -105,25 +112,46 @@ class NeighbourTest(multinetwork_base.MultiNetworkBaseTest):
       for name in attrs:
         self.assertEquals(attrs[name], actual_attrs[name])
 
-  def ExpectUnicastProbe(self, addr):
+  def ExpectProbe(self, is_unicast, addr):
     version = 6 if ":" in addr else 4
     if version == 6:
+      llsrc = self.MyMacAddress(self.netid)
+      if is_unicast:
+        src = self.MyLinkLocalAddress(self.netid)
+        dst = addr
+      else:
+        solicited = inet_pton(AF_INET6, addr)
+        last3bytes = tuple([ord(b) for b in solicited[-3:]])
+        dst = "ff02::1:ff%02x:%02x%02x" % last3bytes
+        src = self.MyAddress(6, self.netid)
       expected = (
-          scapy.IPv6(src=self.MyLinkLocalAddress(self.netid), dst=addr) /
+          scapy.IPv6(src=src, dst=dst) /
           scapy.ICMPv6ND_NS(tgt=addr) /
-          scapy.ICMPv6NDOptSrcLLAddr(lladdr=self.MyMacAddress(self.netid))
+          scapy.ICMPv6NDOptSrcLLAddr(lladdr=llsrc)
       )
-      self.ExpectPacketOn(self.netid, "Unicast probe", expected)
+      msg = "%s probe" % ("Unicast" if is_unicast else "Multicast")
+      self.ExpectPacketOn(self.netid, msg, expected)
     else:
       raise NotImplementedError
 
-  def ReceiveUnicastAdvertisement(self, addr, mac):
+  def ExpectUnicastProbe(self, addr):
+    self.ExpectProbe(True, addr)
+
+  def ExpectMulticastNS(self, addr):
+    self.ExpectProbe(False, addr)
+
+  def ReceiveUnicastAdvertisement(self, addr, mac, srcaddr=None, dstaddr=None,
+                                  S=1, O=0, R=1):
     version = 6 if ":" in addr else 4
+    if srcaddr is None:
+      srcaddr = addr
+    if dstaddr is None:
+      dstaddr = self.MyLinkLocalAddress(self.netid)
     if version == 6:
       packet = (
           scapy.Ether(src=mac, dst=self.MyMacAddress(self.netid)) /
-          scapy.IPv6(src=addr, dst=self.MyLinkLocalAddress(self.netid)) /
-          scapy.ICMPv6ND_NA(tgt=addr, S=1, O=0) /
+          scapy.IPv6(src=srcaddr, dst=dstaddr) /
+          scapy.ICMPv6ND_NA(tgt=addr, S=S, O=O, R=R) /
           scapy.ICMPv6NDOptDstLLAddr(lladdr=mac)
       )
       self.ReceiveEtherPacketOn(self.netid, packet)
@@ -232,6 +260,35 @@ class NeighbourTest(multinetwork_base.MultiNetworkBaseTest):
     for _ in xrange(5):
       ForceProbe(router6, routermac)
 
+  def testIsRouterFlag(self):
+    router6 = self._RouterAddress(self.netid, 6)
+    self.assertNeighbourState(NUD_STALE, router6)
+
+    # Get into FAILED.
+    ifindex = self.ifindices[self.netid]
+    self.iproute.UpdateNeighbour(6, router6, None, ifindex, NUD_FAILED)
+    self.ExpectNeighbourNotification(router6, NUD_FAILED)
+    self.assertNeighbourState(NUD_FAILED, router6)
+
+    time.sleep(1)
+
+    # Send another packet and expect a multicast NS.
+    routing_mode = random.choice(["mark", "oif", "uid"])
+    s = self.BuildSocket(6, net_test.UDPSocket, self.netid, routing_mode)
+    s.connect((net_test.IPV6_ADDR, 53))
+    s.send(net_test.UDP_PAYLOAD)
+    self.ExpectMulticastNS(router6)
+
+    # Receive a unicast NA with the R flag set to 0.
+    self.ReceiveUnicastAdvertisement(router6, self.RouterMacAddress(self.netid),
+                                     srcaddr=self._RouterAddress(self.netid, 6),
+                                     dstaddr=self.MyAddress(6, self.netid),
+                                     S=1, O=0, R=0)
+
+    # Expect that this takes us to REACHABLE.
+    self.ExpectNeighbourNotification(router6, NUD_REACHABLE)
+    self.assertNeighbourState(NUD_REACHABLE, router6)
+
 
 if __name__ == "__main__":
   unittest.main()
index a0b75e8..d92a97e 100644 (file)
@@ -87,10 +87,11 @@ def SYN(dport, version, srcaddr, dstaddr, sport=0, seq=TCP_SEQ):
 def RST(version, srcaddr, dstaddr, packet):
   ip = _GetIpLayer(version)
   original = packet.getlayer("TCP")
+  was_syn_or_fin = (original.flags & (TCP_SYN | TCP_FIN)) != 0
   return ("TCP RST",
           ip(src=srcaddr, dst=dstaddr) /
           scapy.TCP(sport=original.dport, dport=original.sport,
-                    ack=original.seq + 1, seq=None,
+                    ack=original.seq + was_syn_or_fin, seq=None,
                     flags=TCP_RST | TCP_ACK, window=TCP_WINDOW))
 
 def SYNACK(version, srcaddr, dstaddr, packet):