3 # Copyright 2015 The Android Open Source Project
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
19 from socket import * # pylint: disable=wildcard-import
23 from scapy import all as scapy
25 import multinetwork_base
41 class NeighbourTest(multinetwork_base.MultiNetworkBaseTest):
43 # Set a 100-ms retrans timer so we can test for ND retransmits without
44 # waiting too long. Apparently this cannot go below 500ms.
47 # This can only be in seconds, so 1000 is the minimum.
50 # Unfortunately, this must be above the delay timer or the kernel ND code will
51 # not behave correctly (e.g., go straight from REACHABLE into DELAY). This is
52 # is fuzzed by the kernel from 0.5x to 1.5x of its value, so we need a value
53 # that's 2x the delay timer.
54 REACHABLE_TIME_MS = 2 * DELAY_TIME_MS
58 super(NeighbourTest, cls).setUpClass()
59 for netid in cls.tuns:
60 iface = cls.GetInterfaceName(netid)
61 # This can't be set in an RA.
63 "/proc/sys/net/ipv6/neigh/%s/delay_first_probe_time" % iface,
64 cls.DELAY_TIME_MS / 1000)
67 super(NeighbourTest, self).setUp()
69 self.sock = socket(AF_NETLINK, SOCK_RAW, NETLINK_ROUTE)
70 self.sock.bind((0, RTMGRP_NEIGH))
71 net_test.SetNonBlocking(self.sock)
73 for netid in self.tuns:
75 retranstimer=self.RETRANS_TIME_MS,
76 reachabletime=self.REACHABLE_TIME_MS)
78 self.netid = random.choice(self.tuns.keys())
79 self.ifindex = self.ifindices[self.netid]
81 def GetNeighbour(self, addr):
82 version = 6 if ":" in addr else 4
83 for msg, args in self.iproute.DumpNeighbours(version):
84 if args["NDA_DST"] == addr:
87 def GetNdEntry(self, addr):
88 return self.GetNeighbour(addr)
90 def CheckNoNdEvents(self):
91 self.assertRaisesErrno(errno.EAGAIN, self.sock.recvfrom, 4096, MSG_PEEK)
93 def assertNeighbourState(self, state, addr):
94 self.assertEquals(state, self.GetNdEntry(addr)[0].state)
96 def assertNeighbourAttr(self, addr, name, value):
97 self.assertEquals(value, self.GetNdEntry(addr)[1][name])
99 def ExpectNeighbourNotification(self, addr, state, attrs=None):
100 msg = self.sock.recv(4096)
101 msg, actual_attrs = self.iproute.ParseNeighbourMessage(msg)
102 self.assertEquals(addr, actual_attrs["NDA_DST"])
103 self.assertEquals(state, msg.state)
106 self.assertEquals(attrs[name], actual_attrs[name])
108 def ExpectUnicastProbe(self, addr):
109 version = 6 if ":" in addr else 4
112 scapy.IPv6(src=self.MyLinkLocalAddress(self.netid), dst=addr) /
113 scapy.ICMPv6ND_NS(tgt=addr) /
114 scapy.ICMPv6NDOptSrcLLAddr(lladdr=self.MyMacAddress(self.netid))
116 self.ExpectPacketOn(self.netid, "Unicast probe", expected)
118 raise NotImplementedError
120 def ReceiveUnicastAdvertisement(self, addr, mac):
121 version = 6 if ":" in addr else 4
124 scapy.Ether(src=mac, dst=self.MyMacAddress(self.netid)) /
125 scapy.IPv6(src=addr, dst=self.MyLinkLocalAddress(self.netid)) /
126 scapy.ICMPv6ND_NA(tgt=addr, S=1, O=0) /
127 scapy.ICMPv6NDOptDstLLAddr(lladdr=mac)
129 self.ReceiveEtherPacketOn(self.netid, packet)
131 raise NotImplementedError
133 def MonitorSleepMs(self, interval, addr):
135 while slept < interval:
136 sleep_ms = min(100, interval - slept)
137 time.sleep(sleep_ms / 1000.0)
139 print self.GetNdEntry(addr)
141 def MonitorSleep(self, intervalseconds, addr):
142 self.MonitorSleepMs(intervalseconds * 1000, addr)
144 def SleepMs(self, ms):
145 time.sleep(ms / 1000.0)
147 def testNotifications(self):
148 """Tests neighbour notifications.
150 Relevant kernel commits:
152 765c9c6 neigh: Better handling of transition to NUD_PROBE state
153 53385d2 neigh: Netlink notification for administrative NUD state change
154 (only checked on kernel v3.13+, not on v3.10)
157 e4a6d6b neigh: Better handling of transition to NUD_PROBE state
160 router4 = self._RouterAddress(self.netid, 4)
161 router6 = self._RouterAddress(self.netid, 6)
162 self.assertNeighbourState(NUD_PERMANENT, router4)
163 self.assertNeighbourState(NUD_STALE, router6)
165 # Send a packet and check that we go into DELAY.
166 routing_mode = random.choice(["mark", "oif", "uid"])
167 s = self.BuildSocket(6, net_test.UDPSocket, self.netid, routing_mode)
168 s.connect((net_test.IPV6_ADDR, 53))
169 s.send(net_test.UDP_PAYLOAD)
170 self.assertNeighbourState(NUD_DELAY, router6)
172 # Wait for the probe interval, then check that we're in PROBE, and that the
173 # kernel has notified us.
174 self.SleepMs(self.DELAY_TIME_MS)
175 self.ExpectNeighbourNotification(router6, NUD_PROBE)
176 self.assertNeighbourState(NUD_PROBE, router6)
177 self.ExpectUnicastProbe(router6)
179 # Respond to the NS and verify we're in REACHABLE again.
180 self.ReceiveUnicastAdvertisement(router6, self.RouterMacAddress(self.netid))
181 self.assertNeighbourState(NUD_REACHABLE, router6)
182 if net_test.LINUX_VERSION >= (3, 13, 0):
183 # commit 53385d2 (v3.13) "neigh: Netlink notification for administrative
184 # NUD state change" produces notifications for NUD_REACHABLE, but these
185 # are not generated on earlier kernels.
186 self.ExpectNeighbourNotification(router6, NUD_REACHABLE)
188 # Wait until the reachable time has passed, and verify we're in STALE.
189 self.SleepMs(self.REACHABLE_TIME_MS * 1.5)
190 self.assertNeighbourState(NUD_STALE, router6)
191 self.ExpectNeighbourNotification(router6, NUD_STALE)
193 # Send a packet, and verify we go into DELAY and then to PROBE.
194 s.send(net_test.UDP_PAYLOAD)
195 self.assertNeighbourState(NUD_DELAY, router6)
196 self.SleepMs(self.DELAY_TIME_MS)
197 self.assertNeighbourState(NUD_PROBE, router6)
198 self.ExpectNeighbourNotification(router6, NUD_PROBE)
200 # Wait for the probes to time out, and expect a FAILED notification.
201 self.assertNeighbourAttr(router6, "NDA_PROBES", 1)
202 self.ExpectUnicastProbe(router6)
204 self.SleepMs(self.RETRANS_TIME_MS)
205 self.ExpectUnicastProbe(router6)
206 self.assertNeighbourAttr(router6, "NDA_PROBES", 2)
208 self.SleepMs(self.RETRANS_TIME_MS)
209 self.ExpectUnicastProbe(router6)
210 self.assertNeighbourAttr(router6, "NDA_PROBES", 3)
212 self.SleepMs(self.RETRANS_TIME_MS)
213 self.assertNeighbourState(NUD_FAILED, router6)
214 self.ExpectNeighbourNotification(router6, NUD_FAILED, {"NDA_PROBES": 3})
216 def testRepeatedProbes(self):
217 router4 = self._RouterAddress(self.netid, 4)
218 router6 = self._RouterAddress(self.netid, 6)
219 routermac = self.RouterMacAddress(self.netid)
220 self.assertNeighbourState(NUD_PERMANENT, router4)
221 self.assertNeighbourState(NUD_STALE, router6)
223 def ForceProbe(addr, mac):
224 self.iproute.UpdateNeighbour(6, addr, None, self.ifindex, NUD_PROBE)
225 self.assertNeighbourState(NUD_PROBE, addr)
226 self.SleepMs(1) # TODO: Why is this necessary?
227 self.assertNeighbourState(NUD_PROBE, addr)
228 self.ExpectUnicastProbe(addr)
229 self.ReceiveUnicastAdvertisement(addr, mac)
230 self.assertNeighbourState(NUD_REACHABLE, addr)
233 ForceProbe(router6, routermac)
236 if __name__ == "__main__":