OSDN Git Service

Security: Add a keyboard bonding test
authorMyles Watson <mylesgw@google.com>
Thu, 19 Nov 2020 19:00:49 +0000 (11:00 -0800)
committerMyles Watson <mylesgw@google.com>
Fri, 20 Nov 2020 17:38:59 +0000 (17:38 +0000)
Bug: 162984360
Tag: #gd-refactor
Test: cert/run --host SecurityTest
Change-Id: I98b7e16fb8790b001b79e9c802e45b9f180bb203

gd/cert/py_security.py
gd/security/cert/cert_security.py
gd/security/cert/security_test.py
gd/security/pairing/classic_pairing_handler.cc

index d03c055..f70d736 100644 (file)
@@ -34,6 +34,7 @@ from security.facade_pb2 import IoCapabilityMessage
 from security.facade_pb2 import OobDataBondMessage
 from security.facade_pb2 import OobDataMessage
 from security.facade_pb2 import OobDataPresentMessage
+from security.facade_pb2 import UiMsgType
 from security.facade_pb2 import UiCallbackMsg
 from security.facade_pb2 import UiCallbackType
 
@@ -46,7 +47,7 @@ class PySecurity(Closable):
     _io_capabilities_name_lookup = {
         IoCapabilities.DISPLAY_ONLY: "DISPLAY_ONLY",
         IoCapabilities.DISPLAY_YES_NO_IO_CAP: "DISPLAY_YES_NO_IO_CAP",
-        #IoCapabilities.KEYBOARD_ONLY:"KEYBOARD_ONLY",
+        IoCapabilities.KEYBOARD_ONLY: "KEYBOARD_ONLY",
         IoCapabilities.NO_INPUT_NO_OUTPUT: "NO_INPUT_NO_OUTPUT",
     }
 
@@ -157,7 +158,7 @@ class PySecurity(Closable):
         """
         pass
 
-    def accept_oob_pairing(self, cert_address, reply_boolean, p192_data, p256_data):
+    def accept_oob_pairing(self, cert_address, reply_boolean):
         """
             Here we pass, but in cert we perform pairing flow tasks.
             This was added here in order to be more dynamic, but the stack
@@ -165,6 +166,23 @@ class PySecurity(Closable):
         """
         pass
 
+    def wait_for_passkey(self, cert_address):
+        """
+            Respond to the UI event
+        """
+        passkey = -1
+
+        def get_unique_id(event):
+            if event.message_type == UiMsgType.DISPLAY_PASSKEY:
+                nonlocal passkey
+                passkey = event.numeric_value
+                return True
+            return False
+
+        logging.debug("DUT: Waiting for expected UI event")
+        assertThat(self._ui_event_stream).emits(get_unique_id)
+        return passkey
+
     def on_user_input(self, cert_address, reply_boolean, expected_ui_event):
         """
             Respond to the UI event
index a91c7bb..e48d1bc 100644 (file)
@@ -193,6 +193,43 @@ class CertSecurity(PySecurity):
             return (list(complete.GetC192()), list(complete.GetR192()), list(complete.GetC256()),
                     list(complete.GetR256()))
 
+    def input_passkey(self, address, passkey):
+        """
+            Pretend to answer the pairing dialog as a user
+        """
+        logging.info("Cert: Waiting for PASSKEY request")
+        assertThat(self._hci_event_stream).emits(HciMatchers.EventWithCode(hci_packets.EventCode.USER_PASSKEY_REQUEST))
+        logging.info("Cert: Send user input passkey %d for %s" % (passkey, address))
+        peer = address.decode('utf-8')
+        self._enqueue_hci_command(
+            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.ENTRY_STARTED), True)
+        self._enqueue_hci_command(
+            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True)
+        self._enqueue_hci_command(
+            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True)
+        self._enqueue_hci_command(
+            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.CLEARED), True)
+        self._enqueue_hci_command(
+            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True)
+        self._enqueue_hci_command(
+            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True)
+        self._enqueue_hci_command(
+            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ERASED), True)
+        self._enqueue_hci_command(
+            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True)
+        self._enqueue_hci_command(
+            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True)
+        self._enqueue_hci_command(
+            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True)
+        self._enqueue_hci_command(
+            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True)
+        self._enqueue_hci_command(
+            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True)
+        self._enqueue_hci_command(
+            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.ENTRY_COMPLETED),
+            True)
+        self._enqueue_hci_command(hci_packets.UserPasskeyRequestReplyBuilder(peer, passkey), True)
+
     def send_ui_callback(self, address, callback_type, b, uid):
         """
             Pretend to answer the pairing dailog as a user
@@ -223,6 +260,15 @@ class CertSecurity(PySecurity):
         # TODO(optedoblivion): Figure this out and remove (see classic_pairing_handler.cc)
         #self._secure_connections_enabled = True
 
+    def send_io_caps(self, address):
+        logging.info("Cert: Waiting for IO_CAPABILITY_REQUEST")
+        assertThat(self._hci_event_stream).emits(HciMatchers.IoCapabilityRequest())
+        logging.info("Cert: Sending IO_CAPABILITY_REQUEST_REPLY")
+        oob_data_present = hci_packets.OobDataPresent.NOT_PRESENT
+        self._enqueue_hci_command(
+            hci_packets.IoCapabilityRequestReplyBuilder(
+                address.decode('utf8'), self._io_caps, oob_data_present, self._auth_reqs), True)
+
     def accept_pairing(self, dut_address, reply_boolean):
         """
             Here we handle the pairing events at the HCI level
@@ -231,13 +277,7 @@ class CertSecurity(PySecurity):
         assertThat(self._hci_event_stream).emits(HciMatchers.LinkKeyRequest())
         logging.info("Cert: Sending LINK_KEY_REQUEST_NEGATIVE_REPLY")
         self._enqueue_hci_command(hci_packets.LinkKeyRequestNegativeReplyBuilder(dut_address.decode('utf8')), True)
-        logging.info("Cert: Waiting for IO_CAPABILITY_REQUEST")
-        assertThat(self._hci_event_stream).emits(HciMatchers.IoCapabilityRequest())
-        logging.info("Cert: Sending IO_CAPABILITY_REQUEST_REPLY")
-        self._enqueue_hci_command(
-            hci_packets.IoCapabilityRequestReplyBuilder(
-                dut_address.decode('utf8'), self._io_caps, hci_packets.OobDataPresent.NOT_PRESENT, self._auth_reqs),
-            True)
+        self.send_io_caps(dut_address)
         logging.info("Cert: Waiting for USER_CONFIRMATION_REQUEST")
         assertThat(self._hci_event_stream).emits(HciMatchers.UserConfirmationRequest())
         logging.info("Cert: Sending Simulated User Response '%s'" % reply_boolean)
@@ -258,15 +298,7 @@ class CertSecurity(PySecurity):
     def accept_oob_pairing(self, dut_address):
         logging.info("Cert: Waiting for IO_CAPABILITY_RESPONSE")
         assertThat(self._hci_event_stream).emits(HciMatchers.IoCapabilityResponse())
-        logging.info("Cert: Waiting for IO_CAPABILITY_REQUEST")
-        assertThat(self._hci_event_stream).emits(HciMatchers.IoCapabilityRequest())
-        logging.info("Cert: Sending IO_CAPABILITY_REQUEST_REPLY")
-        oob_data_present = hci_packets.OobDataPresent.NOT_PRESENT
-        self._enqueue_hci_command(
-            hci_packets.IoCapabilityRequestReplyBuilder(
-                dut_address.decode('utf8'), self._io_caps, oob_data_present, self._auth_reqs), True)
-        assertThat(self._hci_event_stream).emits(
-            HciMatchers.CommandComplete(hci_packets.OpCode.IO_CAPABILITY_REQUEST_REPLY))
+        self.send_io_caps(dut_address)
         logging.info("Cert: Waiting for SIMPLE_PAIRING_COMPLETE")
         ssp_complete_capture = HciCaptures.SimplePairingCompleteCapture()
         assertThat(self._hci_event_stream).emits(ssp_complete_capture)
index b994526..fe1473e 100644 (file)
@@ -158,6 +158,19 @@ class SecurityTest(GdBaseTestClass):
         initiator.wait_for_bond_event(expected_init_bond_event)
         responder.wait_for_bond_event(expected_resp_bond_event)
 
+    def _run_ssp_passkey(self, initiator, responder, expected_init_bond_event, expected_resp_bond_event):
+        initiator.enable_secure_simple_pairing()
+        responder.enable_secure_simple_pairing()
+        initiator.create_bond(responder.get_address(), common.BluetoothAddressTypeEnum.PUBLIC_DEVICE_ADDRESS)
+        self._verify_ssp_passkey(initiator, responder, expected_init_bond_event, expected_resp_bond_event)
+
+    def _verify_ssp_passkey(self, initiator, responder, expected_init_bond_event, expected_resp_bond_event):
+        responder.send_io_caps(initiator.get_address())
+        passkey = initiator.wait_for_passkey(responder.get_address())
+        responder.input_passkey(initiator.get_address(), passkey)
+        initiator.wait_for_bond_event(expected_init_bond_event)
+        responder.wait_for_bond_event(expected_resp_bond_event)
+
     def test_setup_teardown(self):
         """
             Make sure our setup and teardown is sane
@@ -453,3 +466,32 @@ class SecurityTest(GdBaseTestClass):
         self.cert_security.wait_for_bond_event(BondMsgType.DEVICE_UNBONDED)
         self.dut_security.wait_for_disconnect_event()
         self.cert_security.wait_for_disconnect_event()
+
+    def test_successful_dut_initiated_ssp_keyboard(self):
+        dut_io_capability = IoCapabilities.DISPLAY_YES_NO_IO_CAP
+        dut_auth_reqs = AuthenticationRequirements.DEDICATED_BONDING_MITM_PROTECTION
+        dut_oob_present = OobDataPresent.NOT_PRESENT
+        cert_io_capability = IoCapabilities.KEYBOARD_ONLY
+        cert_auth_reqs = AuthenticationRequirements.DEDICATED_BONDING_MITM_PROTECTION
+        cert_oob_present = OobDataPresent.NOT_PRESENT
+        self.dut_security.set_io_capabilities(dut_io_capability)
+        self.dut_security.set_authentication_requirements(dut_auth_reqs)
+        self.cert_security.set_io_capabilities(cert_io_capability)
+        self.cert_security.set_authentication_requirements(cert_auth_reqs)
+
+        self._run_ssp_passkey(
+            initiator=self.dut_security,
+            responder=self.cert_security,
+            expected_init_bond_event=BondMsgType.DEVICE_BONDED,
+            expected_resp_bond_event=BondMsgType.DEVICE_BONDED)
+
+        self.dut_security.remove_bond(self.cert_security.get_address(),
+                                      common.BluetoothAddressTypeEnum.PUBLIC_DEVICE_ADDRESS)
+        self.cert_security.remove_bond(self.dut_security.get_address(),
+                                       common.BluetoothAddressTypeEnum.PUBLIC_DEVICE_ADDRESS)
+
+        self.dut_security.wait_for_bond_event(BondMsgType.DEVICE_UNBONDED)
+        self.cert_security.wait_for_bond_event(BondMsgType.DEVICE_UNBONDED)
+
+        self.dut_security.wait_for_disconnect_event()
+        self.cert_security.wait_for_disconnect_event()
index a01b308..33cba9f 100644 (file)
@@ -153,6 +153,7 @@ void ClassicPairingHandler::OnReceive(hci::PinCodeRequestView packet) {
   ASSERT(packet.IsValid());
   LOG_INFO("Received: %s", hci::EventCodeText(packet.GetEventCode()).c_str());
   ASSERT_LOG(GetRecord()->GetPseudoAddress()->GetAddress() == packet.GetBdAddr(), "Address mismatch");
+  NotifyUiDisplayPasskeyInput();
 }
 
 void ClassicPairingHandler::OnReceive(hci::LinkKeyRequestView packet) {
@@ -349,6 +350,7 @@ void ClassicPairingHandler::OnReceive(hci::UserPasskeyNotificationView packet) {
   ASSERT(packet.IsValid());
   LOG_INFO("Received: %s", hci::EventCodeText(packet.GetEventCode()).c_str());
   ASSERT_LOG(GetRecord()->GetPseudoAddress()->GetAddress() == packet.GetBdAddr(), "Address mismatch");
+  NotifyUiDisplayPasskey(packet.GetPasskey());
 }
 
 void ClassicPairingHandler::OnReceive(hci::KeypressNotificationView packet) {
@@ -357,19 +359,19 @@ void ClassicPairingHandler::OnReceive(hci::KeypressNotificationView packet) {
   LOG_INFO("Notification Type: %s", hci::KeypressNotificationTypeText(packet.GetNotificationType()).c_str());
   switch (packet.GetNotificationType()) {
     case hci::KeypressNotificationType::ENTRY_STARTED:
-      // Get ready to keep track of key input
+      // Tell the UI to highlight the first digit
       break;
     case hci::KeypressNotificationType::DIGIT_ENTERED:
-      // Append digit to key
+      // Tell the UI to move one digit to the right
       break;
     case hci::KeypressNotificationType::DIGIT_ERASED:
-      // erase last digit from key
+      // Tell the UI to move back one digit
       break;
     case hci::KeypressNotificationType::CLEARED:
-      // erase all digits from key
+      // Tell the UI to highlight the first digit again
       break;
     case hci::KeypressNotificationType::ENTRY_COMPLETED:
-      // set full key to security record
+      // Tell the UI to hide the dialog
       break;
   }
 }