def setUp(self):
super(NeighbourTest, self).setUp()
- self.sock = socket(AF_NETLINK, SOCK_RAW, NETLINK_ROUTE)
- self.sock.bind((0, RTMGRP_NEIGH))
- net_test.SetNonBlocking(self.sock)
-
for netid in self.tuns:
+ # Clear the ND cache entries for all routers, so each test starts with
+ # the IPv6 default router in state STALE.
+ addr = self._RouterAddress(netid, 6)
+ ifindex = self.ifindices[netid]
+ self.iproute.UpdateNeighbour(6, addr, None, ifindex, NUD_FAILED)
+
+ # Configure IPv6 by sending an RA.
self.SendRA(netid,
retranstimer=self.RETRANS_TIME_MS,
reachabletime=self.REACHABLE_TIME_MS)
+ self.sock = socket(AF_NETLINK, SOCK_RAW, NETLINK_ROUTE)
+ self.sock.bind((0, RTMGRP_NEIGH))
+ net_test.SetNonBlocking(self.sock)
+
self.netid = random.choice(self.tuns.keys())
self.ifindex = self.ifindices[self.netid]
for name in attrs:
self.assertEquals(attrs[name], actual_attrs[name])
- def ExpectUnicastProbe(self, addr):
+ def ExpectProbe(self, is_unicast, addr):
version = 6 if ":" in addr else 4
if version == 6:
+ llsrc = self.MyMacAddress(self.netid)
+ if is_unicast:
+ src = self.MyLinkLocalAddress(self.netid)
+ dst = addr
+ else:
+ solicited = inet_pton(AF_INET6, addr)
+ last3bytes = tuple([ord(b) for b in solicited[-3:]])
+ dst = "ff02::1:ff%02x:%02x%02x" % last3bytes
+ src = self.MyAddress(6, self.netid)
expected = (
- scapy.IPv6(src=self.MyLinkLocalAddress(self.netid), dst=addr) /
+ scapy.IPv6(src=src, dst=dst) /
scapy.ICMPv6ND_NS(tgt=addr) /
- scapy.ICMPv6NDOptSrcLLAddr(lladdr=self.MyMacAddress(self.netid))
+ scapy.ICMPv6NDOptSrcLLAddr(lladdr=llsrc)
)
- self.ExpectPacketOn(self.netid, "Unicast probe", expected)
+ msg = "%s probe" % ("Unicast" if is_unicast else "Multicast")
+ self.ExpectPacketOn(self.netid, msg, expected)
else:
raise NotImplementedError
- def ReceiveUnicastAdvertisement(self, addr, mac):
+ def ExpectUnicastProbe(self, addr):
+ self.ExpectProbe(True, addr)
+
+ def ExpectMulticastNS(self, addr):
+ self.ExpectProbe(False, addr)
+
+ def ReceiveUnicastAdvertisement(self, addr, mac, srcaddr=None, dstaddr=None,
+ S=1, O=0, R=1):
version = 6 if ":" in addr else 4
+ if srcaddr is None:
+ srcaddr = addr
+ if dstaddr is None:
+ dstaddr = self.MyLinkLocalAddress(self.netid)
if version == 6:
packet = (
scapy.Ether(src=mac, dst=self.MyMacAddress(self.netid)) /
- scapy.IPv6(src=addr, dst=self.MyLinkLocalAddress(self.netid)) /
- scapy.ICMPv6ND_NA(tgt=addr, S=1, O=0) /
+ scapy.IPv6(src=srcaddr, dst=dstaddr) /
+ scapy.ICMPv6ND_NA(tgt=addr, S=S, O=O, R=R) /
scapy.ICMPv6NDOptDstLLAddr(lladdr=mac)
)
self.ReceiveEtherPacketOn(self.netid, packet)
for _ in xrange(5):
ForceProbe(router6, routermac)
+ def testIsRouterFlag(self):
+ router6 = self._RouterAddress(self.netid, 6)
+ self.assertNeighbourState(NUD_STALE, router6)
+
+ # Get into FAILED.
+ ifindex = self.ifindices[self.netid]
+ self.iproute.UpdateNeighbour(6, router6, None, ifindex, NUD_FAILED)
+ self.ExpectNeighbourNotification(router6, NUD_FAILED)
+ self.assertNeighbourState(NUD_FAILED, router6)
+
+ time.sleep(1)
+
+ # Send another packet and expect a multicast NS.
+ routing_mode = random.choice(["mark", "oif", "uid"])
+ s = self.BuildSocket(6, net_test.UDPSocket, self.netid, routing_mode)
+ s.connect((net_test.IPV6_ADDR, 53))
+ s.send(net_test.UDP_PAYLOAD)
+ self.ExpectMulticastNS(router6)
+
+ # Receive a unicast NA with the R flag set to 0.
+ self.ReceiveUnicastAdvertisement(router6, self.RouterMacAddress(self.netid),
+ srcaddr=self._RouterAddress(self.netid, 6),
+ dstaddr=self.MyAddress(6, self.netid),
+ S=1, O=0, R=0)
+
+ # Expect that this takes us to REACHABLE.
+ self.ExpectNeighbourNotification(router6, NUD_REACHABLE)
+ self.assertNeighbourState(NUD_REACHABLE, router6)
+
if __name__ == "__main__":
unittest.main()