OSDN Git Service

ANRdaemon: move trace result from /sdcard to /data
[android-x86/system-extras.git] / tests / net_test / srcaddr_selection_test.py
1 #!/usr/bin/python
2 #
3 # Copyright 2014 The Android Open Source Project
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17 import errno
18 import os
19 import random
20 from socket import *  # pylint: disable=wildcard-import
21 import time
22 import unittest
23
24 from scapy import all as scapy
25
26 import csocket
27 import iproute
28 import multinetwork_base
29 import multinetwork_test
30 import net_test
31
32 # Setsockopt values.
33 IPV6_ADDR_PREFERENCES = 72
34 IPV6_PREFER_SRC_PUBLIC = 0x0002
35
36
37 USE_OPTIMISTIC_SYSCTL = "/proc/sys/net/ipv6/conf/default/use_optimistic"
38
39 HAVE_USE_OPTIMISTIC = os.path.isfile(USE_OPTIMISTIC_SYSCTL)
40
41
42 class IPv6SourceAddressSelectionTest(multinetwork_base.MultiNetworkBaseTest):
43
44   def SetDAD(self, ifname, value):
45     self.SetSysctl("/proc/sys/net/ipv6/conf/%s/accept_dad" % ifname, value)
46     self.SetSysctl("/proc/sys/net/ipv6/conf/%s/dad_transmits" % ifname, value)
47
48   def SetOptimisticDAD(self, ifname, value):
49     self.SetSysctl("/proc/sys/net/ipv6/conf/%s/optimistic_dad" % ifname, value)
50
51   def SetUseTempaddrs(self, ifname, value):
52     self.SetSysctl("/proc/sys/net/ipv6/conf/%s/use_tempaddr" % ifname, value)
53
54   def SetUseOptimistic(self, ifname, value):
55     self.SetSysctl("/proc/sys/net/ipv6/conf/%s/use_optimistic" % ifname, value)
56
57   def GetSourceIP(self, netid, mode="mark"):
58     s = self.BuildSocket(6, net_test.UDPSocket, netid, mode)
59     # Because why not...testing for temporary addresses is a separate thing.
60     s.setsockopt(IPPROTO_IPV6, IPV6_ADDR_PREFERENCES, IPV6_PREFER_SRC_PUBLIC)
61
62     s.connect((net_test.IPV6_ADDR, 123))
63     src_addr = s.getsockname()[0]
64     self.assertTrue(src_addr)
65     return src_addr
66
67   def assertAddressNotPresent(self, address):
68     self.assertRaises(IOError, self.iproute.GetAddress, address)
69
70   def assertAddressHasExpectedAttributes(
71       self, address, expected_ifindex, expected_flags):
72     ifa_msg = self.iproute.GetAddress(address)[0]
73     self.assertEquals(AF_INET6 if ":" in address else AF_INET, ifa_msg.family)
74     self.assertEquals(64, ifa_msg.prefixlen)
75     self.assertEquals(iproute.RT_SCOPE_UNIVERSE, ifa_msg.scope)
76     self.assertEquals(expected_ifindex, ifa_msg.index)
77     self.assertEquals(expected_flags, ifa_msg.flags & expected_flags)
78
79   def AddressIsTentative(self, address):
80     ifa_msg = self.iproute.GetAddress(address)[0]
81     return ifa_msg.flags & iproute.IFA_F_TENTATIVE
82
83   def BindToAddress(self, address):
84     s = net_test.UDPSocket(AF_INET6)
85     s.bind((address, 0, 0, 0))
86
87   def SendWithSourceAddress(self, address, netid, dest=net_test.IPV6_ADDR):
88     pktinfo = multinetwork_base.MakePktInfo(6, address, 0)
89     cmsgs = [(net_test.SOL_IPV6, IPV6_PKTINFO, pktinfo)]
90     s = self.BuildSocket(6, net_test.UDPSocket, netid, "mark")
91     return csocket.Sendmsg(s, (dest, 53), "Hello", cmsgs, 0)
92
93   def assertAddressUsable(self, address, netid):
94     self.BindToAddress(address)
95     self.SendWithSourceAddress(address, netid)
96     # No exceptions? Good.
97
98   def assertAddressNotUsable(self, address, netid):
99     self.assertRaisesErrno(errno.EADDRNOTAVAIL, self.BindToAddress, address)
100     self.assertRaisesErrno(errno.EINVAL,
101                            self.SendWithSourceAddress, address, netid)
102
103   def assertAddressSelected(self, address, netid):
104     self.assertEquals(address, self.GetSourceIP(netid))
105
106   def assertAddressNotSelected(self, address, netid):
107     self.assertNotEquals(address, self.GetSourceIP(netid))
108
109   def WaitForDad(self, address):
110     for _ in xrange(20):
111       if not self.AddressIsTentative(address):
112         return
113       time.sleep(0.1)
114     raise AssertionError("%s did not complete DAD after 2 seconds")
115
116
117 class MultiInterfaceSourceAddressSelectionTest(IPv6SourceAddressSelectionTest):
118
119   def setUp(self):
120     # [0]  Make sure DAD, optimistic DAD, and the use_optimistic option
121     # are all consistently disabled at the outset.
122     for netid in self.tuns:
123       self.SetDAD(self.GetInterfaceName(netid), 0)
124       self.SetOptimisticDAD(self.GetInterfaceName(netid), 0)
125       self.SetUseTempaddrs(self.GetInterfaceName(netid), 0)
126       if HAVE_USE_OPTIMISTIC:
127         self.SetUseOptimistic(self.GetInterfaceName(netid), 0)
128
129     # [1]  Pick an interface on which to test.
130     self.test_netid = random.choice(self.tuns.keys())
131     self.test_ip = self.MyAddress(6, self.test_netid)
132     self.test_ifindex = self.ifindices[self.test_netid]
133     self.test_ifname = self.GetInterfaceName(self.test_netid)
134
135     # [2]  Delete the test interface's IPv6 address.
136     self.iproute.DelAddress(self.test_ip, 64, self.test_ifindex)
137     self.assertAddressNotPresent(self.test_ip)
138
139     self.assertAddressNotUsable(self.test_ip, self.test_netid)
140
141
142 class TentativeAddressTest(MultiInterfaceSourceAddressSelectionTest):
143
144   def testRfc6724Behaviour(self):
145     # [3]  Get an IPv6 address back, in DAD start-up.
146     self.SetDAD(self.test_ifname, 1)  # Enable DAD
147     # Send a RA to start SLAAC and subsequent DAD.
148     self.SendRA(self.test_netid, 0)
149     # Get flags and prove tentative-ness.
150     self.assertAddressHasExpectedAttributes(
151         self.test_ip, self.test_ifindex, iproute.IFA_F_TENTATIVE)
152
153     # Even though the interface has an IPv6 address, its tentative nature
154     # prevents it from being selected.
155     self.assertAddressNotUsable(self.test_ip, self.test_netid)
156     self.assertAddressNotSelected(self.test_ip, self.test_netid)
157
158     # Busy wait for DAD to complete (should be less than 1 second).
159     self.WaitForDad(self.test_ip)
160
161     # The test_ip should have completed DAD by now, and should be the
162     # chosen source address, eligible to bind to, etc.
163     self.assertAddressUsable(self.test_ip, self.test_netid)
164     self.assertAddressSelected(self.test_ip, self.test_netid)
165
166
167 class OptimisticAddressTest(MultiInterfaceSourceAddressSelectionTest):
168
169   def testRfc6724Behaviour(self):
170     # [3]  Get an IPv6 address back, in optimistic DAD start-up.
171     self.SetDAD(self.test_ifname, 1)  # Enable DAD
172     self.SetOptimisticDAD(self.test_ifname, 1)
173     # Send a RA to start SLAAC and subsequent DAD.
174     self.SendRA(self.test_netid, 0)
175     # Get flags and prove optimism.
176     self.assertAddressHasExpectedAttributes(
177         self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)
178
179     # Optimistic addresses are usable but are not selected.
180     if net_test.LinuxVersion() >= (3, 18, 0):
181       # The version checked in to android kernels <= 3.10 requires the
182       # use_optimistic sysctl to be turned on.
183       self.assertAddressUsable(self.test_ip, self.test_netid)
184     self.assertAddressNotSelected(self.test_ip, self.test_netid)
185
186     # Busy wait for DAD to complete (should be less than 1 second).
187     self.WaitForDad(self.test_ip)
188
189     # The test_ip should have completed DAD by now, and should be the
190     # chosen source address.
191     self.assertAddressUsable(self.test_ip, self.test_netid)
192     self.assertAddressSelected(self.test_ip, self.test_netid)
193
194
195 class OptimisticAddressOkayTest(MultiInterfaceSourceAddressSelectionTest):
196
197   @unittest.skipUnless(HAVE_USE_OPTIMISTIC, "use_optimistic not supported")
198   def testModifiedRfc6724Behaviour(self):
199     # [3]  Get an IPv6 address back, in optimistic DAD start-up.
200     self.SetDAD(self.test_ifname, 1)  # Enable DAD
201     self.SetOptimisticDAD(self.test_ifname, 1)
202     self.SetUseOptimistic(self.test_ifname, 1)
203     # Send a RA to start SLAAC and subsequent DAD.
204     self.SendRA(self.test_netid, 0)
205     # Get flags and prove optimistism.
206     self.assertAddressHasExpectedAttributes(
207         self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)
208
209     # The interface has an IPv6 address and, despite its optimistic nature,
210     # the use_optimistic option allows it to be selected.
211     self.assertAddressUsable(self.test_ip, self.test_netid)
212     self.assertAddressSelected(self.test_ip, self.test_netid)
213
214
215 class ValidBeforeOptimisticTest(MultiInterfaceSourceAddressSelectionTest):
216
217   @unittest.skipUnless(HAVE_USE_OPTIMISTIC, "use_optimistic not supported")
218   def testModifiedRfc6724Behaviour(self):
219     # [3]  Add a valid IPv6 address to this interface and verify it is
220     # selected as the source address.
221     preferred_ip = self.IPv6Prefix(self.test_netid) + "cafe"
222     self.iproute.AddAddress(preferred_ip, 64, self.test_ifindex)
223     self.assertAddressHasExpectedAttributes(
224         preferred_ip, self.test_ifindex, iproute.IFA_F_PERMANENT)
225     self.assertEquals(preferred_ip, self.GetSourceIP(self.test_netid))
226
227     # [4]  Get another IPv6 address, in optimistic DAD start-up.
228     self.SetDAD(self.test_ifname, 1)  # Enable DAD
229     self.SetOptimisticDAD(self.test_ifname, 1)
230     self.SetUseOptimistic(self.test_ifname, 1)
231     # Send a RA to start SLAAC and subsequent DAD.
232     self.SendRA(self.test_netid, 0)
233     # Get flags and prove optimism.
234     self.assertAddressHasExpectedAttributes(
235         self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)
236
237     # Since the interface has another IPv6 address, the optimistic address
238     # is not selected--the other, valid address is chosen.
239     self.assertAddressUsable(self.test_ip, self.test_netid)
240     self.assertAddressNotSelected(self.test_ip, self.test_netid)
241     self.assertAddressSelected(preferred_ip, self.test_netid)
242
243
244 class DadFailureTest(MultiInterfaceSourceAddressSelectionTest):
245
246   @unittest.skipUnless(HAVE_USE_OPTIMISTIC, "use_optimistic not supported")
247   def testDadFailure(self):
248     # [3]  Get an IPv6 address back, in optimistic DAD start-up.
249     self.SetDAD(self.test_ifname, 1)  # Enable DAD
250     self.SetOptimisticDAD(self.test_ifname, 1)
251     self.SetUseOptimistic(self.test_ifname, 1)
252     # Send a RA to start SLAAC and subsequent DAD.
253     self.SendRA(self.test_netid, 0)
254     # Prove optimism and usability.
255     self.assertAddressHasExpectedAttributes(
256         self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)
257     self.assertAddressUsable(self.test_ip, self.test_netid)
258     self.assertAddressSelected(self.test_ip, self.test_netid)
259
260     # Send a NA for the optimistic address, indicating address conflict
261     # ("DAD defense").
262     conflict_macaddr = "02:00:0b:ad:d0:0d"
263     dad_defense = (scapy.Ether(src=conflict_macaddr, dst="33:33:33:00:00:01") /
264                    scapy.IPv6(src=self.test_ip, dst="ff02::1") /
265                    scapy.ICMPv6ND_NA(tgt=self.test_ip, R=0, S=0, O=1) /
266                    scapy.ICMPv6NDOptDstLLAddr(lladdr=conflict_macaddr))
267     self.ReceiveEtherPacketOn(self.test_netid, dad_defense)
268
269     # The address should have failed DAD, and therefore no longer be usable.
270     self.assertAddressNotUsable(self.test_ip, self.test_netid)
271     self.assertAddressNotSelected(self.test_ip, self.test_netid)
272
273     # TODO(ek): verify that an RTM_DELADDR issued for the DAD-failed address.
274
275
276 class NoNsFromOptimisticTest(MultiInterfaceSourceAddressSelectionTest):
277
278   @unittest.skipUnless(HAVE_USE_OPTIMISTIC, "use_optimistic not supported")
279   @unittest.skipUnless(net_test.LinuxVersion() >= (3, 18, 0),
280                        "correct optimistic bind() not supported")
281   def testSendToOnlinkDestination(self):
282     # [3]  Get an IPv6 address back, in optimistic DAD start-up.
283     self.SetDAD(self.test_ifname, 1)  # Enable DAD
284     self.SetOptimisticDAD(self.test_ifname, 1)
285     self.SetUseOptimistic(self.test_ifname, 1)
286     # Send a RA to start SLAAC and subsequent DAD.
287     self.SendRA(self.test_netid, 0)
288     # Prove optimism and usability.
289     self.assertAddressHasExpectedAttributes(
290         self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)
291     self.assertAddressUsable(self.test_ip, self.test_netid)
292     self.assertAddressSelected(self.test_ip, self.test_netid)
293
294     # [4]  Send to an on-link destination and observe a Neighbor Solicitation
295     # packet with a source address that is NOT the optimistic address.
296     # In this setup, the only usable address is the link-local address.
297     onlink_dest = self.GetRandomDestination(self.IPv6Prefix(self.test_netid))
298     self.SendWithSourceAddress(self.test_ip, self.test_netid, onlink_dest)
299
300     expected_ns = multinetwork_test.Packets.NS(
301         net_test.GetLinkAddress(self.test_ifname, True),
302         onlink_dest,
303         self.MyMacAddress(self.test_netid))[1]
304     self.ExpectPacketOn(self.test_netid, "link-local NS", expected_ns)
305
306
307 # TODO(ek): add tests listening for netlink events.
308
309
310 if __name__ == "__main__":
311   unittest.main()