OSDN Git Service

CertSecurity: Add OOB related interfaces
authorMartin Brabham <optedoblivion@google.com>
Fri, 13 Nov 2020 23:06:55 +0000 (15:06 -0800)
committerMartin Brabham <optedoblivion@google.com>
Mon, 16 Nov 2020 18:22:15 +0000 (10:22 -0800)
 - set_remote_oob_data
 - get_oob_data_from_controller
 - enable_secure_connections

Bug: 162984360
Tag: #gd-refactor
Test: cert/run --host SecurityTest
Change-Id: I53b512b121a24d2c56b0cbeba9e16f38d19c09cd

gd/security/cert/cert_security.py
gd/security/cert/security_test.py

index 60e8df3..1912aa1 100644 (file)
@@ -17,6 +17,7 @@
 import logging
 
 from bluetooth_packets_python3 import hci_packets
+from cert.captures import HciCaptures
 from cert.closable import safeClose
 from cert.event_stream import EventStream
 from cert.matchers import HciMatchers
@@ -68,8 +69,9 @@ class CertSecurity(PySecurity):
 
     _hci_event_stream = None
     _io_caps = hci_packets.IoCapability.DISPLAY_ONLY
-    _oob_data = hci_packets.OobDataPresent.NOT_PRESENT
+    _remote_oob_data = None
     _auth_reqs = hci_packets.AuthenticationRequirements.DEDICATED_BONDING_MITM_PROTECTION
+    _secure_connections_enabled = False
 
     _hci = None
 
@@ -129,12 +131,76 @@ class CertSecurity(PySecurity):
             auth_reqs, "ERROR"))
         self._auth_reqs = self._auth_req_lookup.get(auth_reqs, hci_packets.AuthenticationRequirements.GENERAL_BONDING)
 
-    def set_oob_data(self, data):
+    def set_remote_oob_data(self, remote_data):
         """
             Set the Out-of-band data for SSP pairing
         """
-        logging.info("Cert: setting OOB data present to '%s'" % data)
-        self._oob_data = self._oob_present_lookup.get(data, hci_packets.OobDataPresent.NOT_PRESENT)
+        logging.info("Cert: setting OOB data present to '%s'" % remote_data)
+        self._remote_oob_data = remote_data
+
+    def get_oob_data_from_controller(self, pb_oob_data_type):
+        """
+            Get the Out-of-band data for SSP pairing
+
+            :param pb_oob_data_type: Type of data needed
+            :return: a tuple of bytes (192c,192r,256c,256r) with increasing security; bytes may be all 0s depending on pb_oob_data_type value
+
+        """
+
+        oob_data_type = self._oob_present_lookup[pb_oob_data_type]
+
+        if (oob_data_type == hci_packets.OobDataPresent.NOT_PRESENT):
+            logging.warn("No data present, no need to call get_oob_data")
+            return ([0 for i in range(0, 16)], [0 for i in range(0, 16)], [0 for i in range(0, 16)],
+                    [0 for i in range(0, 16)])
+
+        logging.info("Cert: Requesting OOB data")
+        if oob_data_type == hci_packets.OobDataPresent.P_192_PRESENT:
+            # If host and controller supports secure connections we always used ReadLocalOobExtendedDataRequest
+            if self._secure_connections_enabled:
+                logging.info("Cert: Requesting P192 Data; secure connections")
+                complete_capture = HciCaptures.ReadLocalOobExtendedDataCompleteCapture()
+                self._enqueue_hci_command(hci_packets.ReadLocalOobExtendedDataBuilder(), True)
+                logging.info("Cert: Waiting for OOB response from controller")
+                assertThat(self._hci_event_stream).emits(complete_capture)
+                command_complete = complete_capture.get()
+                complete = hci_packets.ReadLocalOobExtendedDataCompleteView(command_complete)
+                return (list(complete.GetC192()), list(complete.GetR192()), [0 for i in range(0, 16)],
+                        [0 for i in range(0, 16)])
+            # else we use ReadLocalDataRequest
+            else:
+                logging.info("Cert: Requesting P192 Data; no secure connections")
+                complete_capture = HciCaptures.ReadLocalOobDataCompleteCapture()
+                self._enqueue_hci_command(hci_packets.ReadLocalOobDataBuilder(), True)
+                logging.info("Cert: Waiting for OOB response from controller")
+                assertThat(self._hci_event_stream).emits(complete_capture)
+                command_complete = complete_capture.get()
+                complete = hci_packets.ReadLocalOobDataCompleteView(command_complete)
+                return (list(complete.GetC()), list(complete.GetR()), [0 for i in range(0, 16)],
+                        [0 for i in range(0, 16)])
+
+        # Must be secure connection compatible to use these
+        elif oob_data_type == hci_packets.OobDataPresent.P_256_PRESENT:
+            logging.info("Cert: Requesting P256 Extended Data; secure connections")
+            complete_capture = HciCaptures.ReadLocalOobExtendedDataCompleteCapture()
+            self._enqueue_hci_command(hci_packets.ReadLocalOobExtendedDataBuilder(), True)
+            logging.info("Cert: Waiting for OOB response from controller")
+            assertThat(self._hci_event_stream).emits(complete_capture)
+            command_complete = complete_capture.get()
+            complete = hci_packets.ReadLocalOobExtendedDataCompleteView(command_complete)
+            return ([0 for i in range(0, 16)], [0 for i in range(0, 16)], list(complete.GetC256()),
+                    list(complete.GetR256()))
+
+        else:  # Both
+            logging.info("Cert: Requesting P192 AND P256 Extended Data; secure connections")
+            complete_capture = HciCaptures.ReadLocalOobExtendedDataCompleteCapture()
+            self._enqueue_hci_command(hci_packets.ReadLocalOobExtendedDataBuilder(), True)
+            logging.info("Cert: Waiting for OOB response from controller")
+            assertThat(self._hci_event_stream).emits(complete_capture)
+            command_complete = complete_capture.get()
+            complete = hci_packets.ReadLocalOobExtendedDataCompleteView(command_complete)
+            return (list(complete.GetC192()), list(complete.GetR192()), list(complete.GetC256()),
+                    list(complete.GetR256()))
 
     def send_ui_callback(self, address, callback_type, b, uid):
         """
@@ -150,7 +216,20 @@ class CertSecurity(PySecurity):
         logging.info("Cert: Sending WRITE_SIMPLE_PAIRING_MODE [True]")
         self._enqueue_hci_command(hci_packets.WriteSimplePairingModeBuilder(hci_packets.Enable.ENABLED), True)
         logging.info("Cert: Waiting for controller response")
-        assertThat(self._hci_event_stream).emits(lambda msg: b'\x0e\x04\x01\x56\x0c' in msg.event)
+        assertThat(self._hci_event_stream).emits(
+            HciMatchers.CommandComplete(hci_packets.OpCode.WRITE_SIMPLE_PAIRING_MODE))
+
+    def enable_secure_connections(self):
+        """
+            This is called when you want to enable secure connections support
+        """
+        logging.info("Cert: Sending WRITE_SECURE_CONNECTIONS_HOST_SUPPORT [True]")
+        self._enqueue_hci_command(
+            hci_packets.WriteSecureConnectionsHostSupportBuilder(hci_packets.Enable.ENABLED), True)
+        logging.info("Cert: Waiting for controller response")
+        assertThat(self._hci_event_stream).emits(
+            HciMatchers.CommandComplete(hci_packets.OpCode.WRITE_SECURE_CONNECTIONS_HOST_SUPPORT))
+        self._secure_connections_enabled = True
 
     def accept_pairing(self, dut_address, reply_boolean):
         """
@@ -165,7 +244,8 @@ class CertSecurity(PySecurity):
         logging.info("Cert: Sending IO_CAPABILITY_REQUEST_REPLY")
         self._enqueue_hci_command(
             hci_packets.IoCapabilityRequestReplyBuilder(
-                dut_address.decode('utf8'), self._io_caps, self._oob_data, self._auth_reqs), True)
+                dut_address.decode('utf8'), self._io_caps, hci_packets.OobDataPresent.NOT_PRESENT, self._auth_reqs),
+            True)
         logging.info("Cert: Waiting for USER_CONFIRMATION_REQUEST")
         assertThat(self._hci_event_stream).emits(HciMatchers.UserConfirmationRequest())
         logging.info("Cert: Sending Simulated User Response '%s'" % reply_boolean)
index cd8efd9..f4074db 100644 (file)
@@ -20,6 +20,7 @@ from bluetooth_packets_python3 import hci_packets
 from cert.event_stream import EventStream
 from cert.gd_base_test import GdBaseTestClass
 from cert.py_security import PySecurity
+from cert.truth import assertThat
 from facade import common_pb2 as common
 from google.protobuf import empty_pb2 as empty_proto
 from hci.facade import controller_facade_pb2 as controller_facade
@@ -147,10 +148,8 @@ class SecurityTest(GdBaseTestClass):
         # Arrange
         self.dut_security.set_io_capabilities(IoCapabilities.NO_INPUT_NO_OUTPUT)
         self.dut_security.set_authentication_requirements(AuthenticationRequirements.DEDICATED_BONDING)
-        self.dut_security.set_oob_data(OobDataPresent.NOT_PRESENT)
         self.cert_security.set_io_capabilities(IoCapabilities.NO_INPUT_NO_OUTPUT)
         self.cert_security.set_authentication_requirements(AuthenticationRequirements.DEDICATED_BONDING)
-        self.cert_security.set_oob_data(OobDataPresent.NOT_PRESENT)
 
         # Act and Assert
         self._run_ssp_numeric_comparison(
@@ -174,10 +173,8 @@ class SecurityTest(GdBaseTestClass):
         # Arrange
         self.dut_security.set_io_capabilities(IoCapabilities.NO_INPUT_NO_OUTPUT)
         self.dut_security.set_authentication_requirements(AuthenticationRequirements.DEDICATED_BONDING)
-        self.dut_security.set_oob_data(OobDataPresent.NOT_PRESENT)
         self.cert_security.set_io_capabilities(IoCapabilities.NO_INPUT_NO_OUTPUT)
         self.cert_security.set_authentication_requirements(AuthenticationRequirements.DEDICATED_BONDING)
-        self.cert_security.set_oob_data(OobDataPresent.NOT_PRESENT)
 
         # Act and Assert
         self._run_ssp_numeric_comparison(
@@ -191,6 +188,9 @@ class SecurityTest(GdBaseTestClass):
             expected_resp_bond_event=None)
 
         self.dut_security.remove_bond(self.cert.address, common.BluetoothAddressTypeEnum.PUBLIC_DEVICE_ADDRESS)
+        self.cert_security.remove_bond(self.cert.address, common.BluetoothAddressTypeEnum.PUBLIC_DEVICE_ADDRESS)
+        self.dut_security.wait_for_bond_event(BondMsgType.DEVICE_UNBONDED)
+        self.cert_security.wait_for_bond_event(BondMsgType.DEVICE_UNBONDED)
 
         self.dut_security.wait_for_disconnect_event()
         self.cert_security.wait_for_disconnect_event()
@@ -206,6 +206,14 @@ class SecurityTest(GdBaseTestClass):
             expected_init_bond_event=BondMsgType.DEVICE_BONDED,
             expected_resp_bond_event=None)
 
+        self.dut_security.remove_bond(self.cert.address, common.BluetoothAddressTypeEnum.PUBLIC_DEVICE_ADDRESS)
+        self.cert_security.remove_bond(self.cert.address, common.BluetoothAddressTypeEnum.PUBLIC_DEVICE_ADDRESS)
+        self.dut_security.wait_for_bond_event(BondMsgType.DEVICE_UNBONDED)
+        self.cert_security.wait_for_bond_event(BondMsgType.DEVICE_UNBONDED)
+
+        self.dut_security.wait_for_disconnect_event()
+        self.cert_security.wait_for_disconnect_event()
+
     def test_successful_dut_initiated_ssp_numeric_comparison(self):
         test_count = len(self.io_capabilities) * len(self.auth_reqs) * len(self.oob_present) * len(
             self.io_capabilities) * len(self.auth_reqs) * len(self.oob_present)
@@ -232,10 +240,8 @@ class SecurityTest(GdBaseTestClass):
                                 logging.info("")
                                 self.dut_security.set_io_capabilities(dut_io_capability)
                                 self.dut_security.set_authentication_requirements(dut_auth_reqs)
-                                self.dut_security.set_oob_data(dut_oob_present)
                                 self.cert_security.set_io_capabilities(cert_io_capability)
                                 self.cert_security.set_authentication_requirements(cert_auth_reqs)
-                                self.cert_security.set_oob_data(cert_oob_present)
                                 init_ui_response = True
                                 resp_ui_response = True
                                 expected_init_ui_event = None  # None is auto accept
@@ -302,3 +308,79 @@ class SecurityTest(GdBaseTestClass):
 
                                 self.dut_security.wait_for_disconnect_event()
                                 self.cert_security.wait_for_disconnect_event()
+
+    def test_enable_secure_simple_pairing(self):
+        self.dut_security.enable_secure_simple_pairing()
+        self.cert_security.enable_secure_simple_pairing()
+
+    def test_enable_secure_connections(self):
+        self.dut_security.enable_secure_simple_pairing()
+        self.cert_security.enable_secure_simple_pairing()
+        self.dut_security.enable_secure_connections()
+        self.cert_security.enable_secure_connections()
+
+    def test_get_oob_data_from_controller_not_present(self):
+        oob_data = self.cert_security.get_oob_data_from_controller(OobDataPresent.NOT_PRESENT)
+        assertThat(len(oob_data)).isEqualTo(4)
+        has192C = not all([i == 0 for i in oob_data[0]])
+        has192R = not all([i == 0 for i in oob_data[1]])
+        has256C = not all([i == 0 for i in oob_data[2]])
+        has256R = not all([i == 0 for i in oob_data[3]])
+        assertThat(has192C).isFalse()
+        assertThat(has192R).isFalse()
+        assertThat(has256C).isFalse()
+        assertThat(has256R).isFalse()
+
+    def test_get_oob_data_from_controller_p192_present_no_secure_connections(self):
+        oob_data = self.cert_security.get_oob_data_from_controller(OobDataPresent.P192_PRESENT)
+        assertThat(len(oob_data)).isEqualTo(4)
+        has192C = not all([i == 0 for i in oob_data[0]])
+        has192R = not all([i == 0 for i in oob_data[1]])
+        has256C = not all([i == 0 for i in oob_data[2]])
+        has256R = not all([i == 0 for i in oob_data[3]])
+        assertThat(has192C).isTrue()
+        assertThat(has192R).isTrue()
+        assertThat(has256C).isFalse()
+        assertThat(has256R).isFalse()
+
+    def test_get_oob_data_from_controller_p192_present(self):
+        self.cert_security.enable_secure_simple_pairing()
+        self.cert_security.enable_secure_connections()
+        oob_data = self.cert_security.get_oob_data_from_controller(OobDataPresent.P192_PRESENT)
+        assertThat(len(oob_data)).isEqualTo(4)
+        has192C = not all([i == 0 for i in oob_data[0]])
+        has192R = not all([i == 0 for i in oob_data[1]])
+        has256C = not all([i == 0 for i in oob_data[2]])
+        has256R = not all([i == 0 for i in oob_data[3]])
+        assertThat(has192C).isTrue()
+        assertThat(has192R).isTrue()
+        assertThat(has256C).isFalse()
+        assertThat(has256R).isFalse()
+
+    def test_get_oob_data_from_controller_p256_present(self):
+        self.cert_security.enable_secure_simple_pairing()
+        self.cert_security.enable_secure_connections()
+        oob_data = self.cert_security.get_oob_data_from_controller(OobDataPresent.P256_PRESENT)
+        assertThat(len(oob_data)).isEqualTo(4)
+        has192C = not all([i == 0 for i in oob_data[0]])
+        has192R = not all([i == 0 for i in oob_data[1]])
+        has256C = not all([i == 0 for i in oob_data[2]])
+        has256R = not all([i == 0 for i in oob_data[3]])
+        assertThat(has192C).isFalse()
+        assertThat(has192R).isFalse()
+        assertThat(has256C).isTrue()
+        assertThat(has256R).isTrue()
+
+    def test_get_oob_data_from_controller_p192_and_p256_present(self):
+        self.cert_security.enable_secure_simple_pairing()
+        self.cert_security.enable_secure_connections()
+        oob_data = self.cert_security.get_oob_data_from_controller(OobDataPresent.P192_AND_256_PRESENT)
+        assertThat(len(oob_data)).isEqualTo(4)
+        has192C = not all([i == 0 for i in oob_data[0]])
+        has192R = not all([i == 0 for i in oob_data[1]])
+        has256C = not all([i == 0 for i in oob_data[2]])
+        has256R = not all([i == 0 for i in oob_data[3]])
+        assertThat(has192C).isTrue()
+        assertThat(has192R).isTrue()
+        assertThat(has256C).isTrue()
+        assertThat(has256R).isTrue()