OSDN Git Service

Set accept_ra to 2 on all interfaces.
[android-x86/system-extras.git] / tests / net_test / neighbour_test.py
1 #!/usr/bin/python
2 #
3 # Copyright 2015 The Android Open Source Project
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17 import errno
18 import random
19 from socket import *  # pylint: disable=wildcard-import
20 import time
21 import unittest
22
23 from scapy import all as scapy
24
25 import multinetwork_base
26 import net_test
27
28
29 RTMGRP_NEIGH = 4
30
31 NUD_INCOMPLETE = 0x01
32 NUD_REACHABLE = 0x02
33 NUD_STALE = 0x04
34 NUD_DELAY = 0x08
35 NUD_PROBE = 0x10
36 NUD_FAILED = 0x20
37 NUD_PERMANENT = 0x80
38
39
40 # TODO: Support IPv4.
41 class NeighbourTest(multinetwork_base.MultiNetworkBaseTest):
42
43   # Set a 100-ms retrans timer so we can test for ND retransmits without
44   # waiting too long. Apparently this cannot go below 500ms.
45   RETRANS_TIME_MS = 500
46
47   # This can only be in seconds, so 1000 is the minimum.
48   DELAY_TIME_MS = 1000
49
50   # Unfortunately, this must be above the delay timer or the kernel ND code will
51   # not behave correctly (e.g., go straight from REACHABLE into DELAY). This is
52   # is fuzzed by the kernel from 0.5x to 1.5x of its value, so we need a value
53   # that's 2x the delay timer.
54   REACHABLE_TIME_MS = 2 * DELAY_TIME_MS
55
56   @classmethod
57   def setUpClass(cls):
58     super(NeighbourTest, cls).setUpClass()
59     for netid in cls.tuns:
60       iface = cls.GetInterfaceName(netid)
61       # This can't be set in an RA.
62       cls.SetSysctl(
63           "/proc/sys/net/ipv6/neigh/%s/delay_first_probe_time" % iface,
64           cls.DELAY_TIME_MS / 1000)
65
66   def setUp(self):
67     super(NeighbourTest, self).setUp()
68
69     self.sock = socket(AF_NETLINK, SOCK_RAW, NETLINK_ROUTE)
70     self.sock.bind((0, RTMGRP_NEIGH))
71     net_test.SetNonBlocking(self.sock)
72
73     for netid in self.tuns:
74       self.SendRA(netid,
75                   retranstimer=self.RETRANS_TIME_MS,
76                   reachabletime=self.REACHABLE_TIME_MS)
77
78     self.netid = random.choice(self.tuns.keys())
79     self.ifindex = self.ifindices[self.netid]
80
81   def GetNeighbour(self, addr):
82     version = 6 if ":" in addr else 4
83     for msg, args in self.iproute.DumpNeighbours(version):
84       if args["NDA_DST"] == addr:
85         return msg, args
86
87   def GetNdEntry(self, addr):
88     return self.GetNeighbour(addr)
89
90   def CheckNoNdEvents(self):
91     self.assertRaisesErrno(errno.EAGAIN, self.sock.recvfrom, 4096, MSG_PEEK)
92
93   def assertNeighbourState(self, state, addr):
94     self.assertEquals(state, self.GetNdEntry(addr)[0].state)
95
96   def assertNeighbourAttr(self, addr, name, value):
97     self.assertEquals(value, self.GetNdEntry(addr)[1][name])
98
99   def ExpectNeighbourNotification(self, addr, state, attrs=None):
100     msg = self.sock.recv(4096)
101     msg, actual_attrs = self.iproute.ParseNeighbourMessage(msg)
102     self.assertEquals(addr, actual_attrs["NDA_DST"])
103     self.assertEquals(state, msg.state)
104     if attrs:
105       for name in attrs:
106         self.assertEquals(attrs[name], actual_attrs[name])
107
108   def ExpectUnicastProbe(self, addr):
109     version = 6 if ":" in addr else 4
110     if version == 6:
111       expected = (
112           scapy.IPv6(src=self.MyLinkLocalAddress(self.netid), dst=addr) /
113           scapy.ICMPv6ND_NS(tgt=addr) /
114           scapy.ICMPv6NDOptSrcLLAddr(lladdr=self.MyMacAddress(self.netid))
115       )
116       self.ExpectPacketOn(self.netid, "Unicast probe", expected)
117     else:
118       raise NotImplementedError
119
120   def ReceiveUnicastAdvertisement(self, addr, mac):
121     version = 6 if ":" in addr else 4
122     if version == 6:
123       packet = (
124           scapy.Ether(src=mac, dst=self.MyMacAddress(self.netid)) /
125           scapy.IPv6(src=addr, dst=self.MyLinkLocalAddress(self.netid)) /
126           scapy.ICMPv6ND_NA(tgt=addr, S=1, O=0) /
127           scapy.ICMPv6NDOptDstLLAddr(lladdr=mac)
128       )
129       self.ReceiveEtherPacketOn(self.netid, packet)
130     else:
131       raise NotImplementedError
132
133   def MonitorSleepMs(self, interval, addr):
134     slept = 0
135     while slept < interval:
136       sleep_ms = min(100, interval - slept)
137       time.sleep(sleep_ms / 1000.0)
138       slept += sleep_ms
139       print self.GetNdEntry(addr)
140
141   def MonitorSleep(self, intervalseconds, addr):
142     self.MonitorSleepMs(intervalseconds * 1000, addr)
143
144   def SleepMs(self, ms):
145     time.sleep(ms / 1000.0)
146
147   def testNotifications(self):
148     """Tests neighbour notifications.
149
150     Relevant kernel commits:
151       upstream net-next:
152         765c9c6 neigh: Better handling of transition to NUD_PROBE state
153         53385d2 neigh: Netlink notification for administrative NUD state change
154           (only checked on kernel v3.13+, not on v3.10)
155
156       android-3.10:
157         e4a6d6b neigh: Better handling of transition to NUD_PROBE state
158     """
159
160     router4 = self._RouterAddress(self.netid, 4)
161     router6 = self._RouterAddress(self.netid, 6)
162     self.assertNeighbourState(NUD_PERMANENT, router4)
163     self.assertNeighbourState(NUD_STALE, router6)
164
165     # Send a packet and check that we go into DELAY.
166     routing_mode = random.choice(["mark", "oif", "uid"])
167     s = self.BuildSocket(6, net_test.UDPSocket, self.netid, routing_mode)
168     s.connect((net_test.IPV6_ADDR, 53))
169     s.send(net_test.UDP_PAYLOAD)
170     self.assertNeighbourState(NUD_DELAY, router6)
171
172     # Wait for the probe interval, then check that we're in PROBE, and that the
173     # kernel has notified us.
174     self.SleepMs(self.DELAY_TIME_MS)
175     self.ExpectNeighbourNotification(router6, NUD_PROBE)
176     self.assertNeighbourState(NUD_PROBE, router6)
177     self.ExpectUnicastProbe(router6)
178
179     # Respond to the NS and verify we're in REACHABLE again.
180     self.ReceiveUnicastAdvertisement(router6, self.RouterMacAddress(self.netid))
181     self.assertNeighbourState(NUD_REACHABLE, router6)
182     if net_test.LINUX_VERSION >= (3, 13, 0):
183       # commit 53385d2 (v3.13) "neigh: Netlink notification for administrative
184       # NUD state change" produces notifications for NUD_REACHABLE, but these
185       # are not generated on earlier kernels.
186       self.ExpectNeighbourNotification(router6, NUD_REACHABLE)
187
188     # Wait until the reachable time has passed, and verify we're in STALE.
189     self.SleepMs(self.REACHABLE_TIME_MS * 1.5)
190     self.assertNeighbourState(NUD_STALE, router6)
191     self.ExpectNeighbourNotification(router6, NUD_STALE)
192
193     # Send a packet, and verify we go into DELAY and then to PROBE.
194     s.send(net_test.UDP_PAYLOAD)
195     self.assertNeighbourState(NUD_DELAY, router6)
196     self.SleepMs(self.DELAY_TIME_MS)
197     self.assertNeighbourState(NUD_PROBE, router6)
198     self.ExpectNeighbourNotification(router6, NUD_PROBE)
199
200     # Wait for the probes to time out, and expect a FAILED notification.
201     self.assertNeighbourAttr(router6, "NDA_PROBES", 1)
202     self.ExpectUnicastProbe(router6)
203
204     self.SleepMs(self.RETRANS_TIME_MS)
205     self.ExpectUnicastProbe(router6)
206     self.assertNeighbourAttr(router6, "NDA_PROBES", 2)
207
208     self.SleepMs(self.RETRANS_TIME_MS)
209     self.ExpectUnicastProbe(router6)
210     self.assertNeighbourAttr(router6, "NDA_PROBES", 3)
211
212     self.SleepMs(self.RETRANS_TIME_MS)
213     self.assertNeighbourState(NUD_FAILED, router6)
214     self.ExpectNeighbourNotification(router6, NUD_FAILED, {"NDA_PROBES": 3})
215
216   def testRepeatedProbes(self):
217     router4 = self._RouterAddress(self.netid, 4)
218     router6 = self._RouterAddress(self.netid, 6)
219     routermac = self.RouterMacAddress(self.netid)
220     self.assertNeighbourState(NUD_PERMANENT, router4)
221     self.assertNeighbourState(NUD_STALE, router6)
222
223     def ForceProbe(addr, mac):
224       self.iproute.UpdateNeighbour(6, addr, None, self.ifindex, NUD_PROBE)
225       self.assertNeighbourState(NUD_PROBE, addr)
226       self.SleepMs(1)  # TODO: Why is this necessary?
227       self.assertNeighbourState(NUD_PROBE, addr)
228       self.ExpectUnicastProbe(addr)
229       self.ReceiveUnicastAdvertisement(addr, mac)
230       self.assertNeighbourState(NUD_REACHABLE, addr)
231
232     for _ in xrange(5):
233       ForceProbe(router6, routermac)
234
235
236 if __name__ == "__main__":
237   unittest.main()