def setup_class(self):
super().setup_class(dut_module='HCI_INTERFACES', cert_module='HCI')
- def enqueue_acl_data(self, handle, pb_flag, b_flag, acl):
- acl_msg = hci_facade.AclMsg(
- handle=int(handle),
- packet_boundary_flag=int(pb_flag),
- broadcast_flag=int(b_flag),
- data=acl)
- self.cert.hci.SendAclData(acl_msg)
-
def test_dut_connects(self):
- self.cert.hci.register_for_events(
- hci_packets.EventCode.CONNECTION_REQUEST,
- hci_packets.EventCode.CONNECTION_COMPLETE,
- hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED)
-
with PyHci(self.cert) as cert_hci, \
EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream:
address=bytes(cert_address,
'utf8')))) as connection_event_stream:
- # Cert Accepts
- connection_request = ConnectionRequestCapture()
- assertThat(
- cert_hci.get_event_stream()).emits(connection_request)
-
- cert_hci.send_command_with_status(
- hci_packets.AcceptConnectionRequestBuilder(
- connection_request.get().GetBdAddr(),
- hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE))
-
- # Cert gets ConnectionComplete with a handle and sends ACL data
- connection_complete = ConnectionCompleteCapture()
- assertThat(
- cert_hci.get_event_stream()).emits(connection_complete)
- cert_handle = connection_complete.get().GetConnectionHandle()
-
- self.enqueue_acl_data(
- cert_handle, hci_packets.PacketBoundaryFlag.
- FIRST_AUTOMATICALLY_FLUSHABLE,
- hci_packets.BroadcastFlag.POINT_TO_POINT,
- bytes(
- b'\x26\x00\x07\x00This is just SomeAclData from the Cert'
- ))
+ cert_acl = cert_hci.accept_connection()
+ cert_acl.send_first(
+ b'\x26\x00\x07\x00This is just SomeAclData from the Cert')
# DUT gets a connection complete event and sends and receives
connection_complete = ConnectionCompleteCapture()
lambda packet: b'SomeAclData' in packet.payload)
def test_cert_connects(self):
- self.cert.hci.register_for_events(
- hci_packets.EventCode.ROLE_CHANGE,
- hci_packets.EventCode.CONNECTION_COMPLETE,
- hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED)
-
with PyHci(self.cert) as cert_hci, \
EventStream(self.dut.hci_acl_manager.FetchIncomingConnection(empty_proto.Empty())) as incoming_connection_stream, \
EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream:
self.dut.neighbor.EnablePageScan(
neighbor_facade.EnableMsg(enabled=True))
- # Cert connects
- cert_hci.send_command_with_status(
- hci_packets.CreateConnectionBuilder(
- dut_address.decode('utf-8'),
- 0xcc18, # Packet Type
- hci_packets.PageScanRepetitionMode.R1,
- 0x0,
- hci_packets.ClockOffsetValid.INVALID,
- hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH))
+ cert_hci.initiate_connection(dut_address)
# DUT gets a connection request
connection_complete = ConnectionCompleteCapture()
b'\x29\x00\x07\x00This is just SomeMoreAclData from the DUT'
)))
- connection_complete = ConnectionCompleteCapture()
- assertThat(cert_hci.get_event_stream()).emits(connection_complete)
- cert_handle = connection_complete.get().GetConnectionHandle()
-
- self.enqueue_acl_data(
- cert_handle,
- hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE,
- hci_packets.BroadcastFlag.POINT_TO_POINT,
- bytes(
- b'\x26\x00\x07\x00This is just SomeAclData from the Cert'))
+ cert_acl = cert_hci.complete_connection()
+ cert_acl.send_first(
+ b'\x26\x00\x07\x00This is just SomeAclData from the Cert')
assertThat(cert_hci.get_acl_stream()).emits(
lambda packet: b'SomeMoreAclData' in packet.data)
lambda packet: b'SomeAclData' in packet.payload)
def test_recombination_l2cap_packet(self):
- self.cert.hci.register_for_events(
- hci_packets.EventCode.CONNECTION_REQUEST,
- hci_packets.EventCode.CONNECTION_COMPLETE,
- hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED)
-
with PyHci(self.cert) as cert_hci, \
EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream:
address=bytes(cert_address,
'utf8')))) as connection_event_stream:
- # Cert Accepts
- connection_request = ConnectionRequestCapture()
- assertThat(
- cert_hci.get_event_stream()).emits(connection_request)
- cert_hci.send_command_with_status(
- hci_packets.AcceptConnectionRequestBuilder(
- connection_request.get().GetBdAddr(),
- hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE))
-
- # Cert gets ConnectionComplete with a handle and sends ACL data
- connection_complete = ConnectionCompleteCapture()
- assertThat(
- cert_hci.get_event_stream()).emits(connection_complete)
- cert_handle = connection_complete.get().GetConnectionHandle()
-
- self.enqueue_acl_data(
- cert_handle, hci_packets.PacketBoundaryFlag.
- FIRST_AUTOMATICALLY_FLUSHABLE,
- hci_packets.BroadcastFlag.POINT_TO_POINT,
- bytes(b'\x06\x00\x07\x00Hello'))
- self.enqueue_acl_data(
- cert_handle,
- hci_packets.PacketBoundaryFlag.CONTINUING_FRAGMENT,
- hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'!'))
- self.enqueue_acl_data(
- cert_handle, hci_packets.PacketBoundaryFlag.
- FIRST_AUTOMATICALLY_FLUSHABLE,
- hci_packets.BroadcastFlag.POINT_TO_POINT,
- bytes(b'\xe8\x03\x07\x00' + b'Hello' * 200))
+ cert_acl = cert_hci.accept_connection()
+ cert_acl.send_first(b'\x06\x00\x07\x00Hello')
+ cert_acl.send_continuing(b'!')
+ cert_acl.send_first(b'\xe8\x03\x07\x00' + b'Hello' * 200)
# DUT gets a connection complete event and sends and receives
connection_complete = ConnectionCompleteCapture()
from google.protobuf import empty_pb2 as empty_proto
from cert.event_stream import EventStream
from captures import ReadBdAddrCompleteCapture
+from captures import ConnectionCompleteCapture
+from captures import ConnectionRequestCapture
from bluetooth_packets_python3 import hci_packets
from cert.truth import assertThat
+from hci.facade import facade_pb2 as hci_facade
+
+
+class PyHciAclConnection(object):
+
+ def __init__(self, handle, acl_stream, device):
+ self.handle = handle
+ self.acl_stream = acl_stream
+ self.device = device
+
+ def send(self, pb_flag, b_flag, data):
+ acl_msg = hci_facade.AclMsg(
+ handle=int(self.handle),
+ packet_boundary_flag=int(pb_flag),
+ broadcast_flag=int(b_flag),
+ data=data)
+ self.device.hci.SendAclData(acl_msg)
+
+ def send_first(self, data):
+ self.send(hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE,
+ hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(data))
+
+ def send_continuing(self, data):
+ self.send(hci_packets.PacketBoundaryFlag.CONTINUING_FRAGMENT,
+ hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(data))
class PyHci(object):
def __init__(self, device):
self.device = device
+
+ self.device.hci.register_for_events(
+ hci_packets.EventCode.ROLE_CHANGE,
+ hci_packets.EventCode.CONNECTION_REQUEST,
+ hci_packets.EventCode.CONNECTION_COMPLETE,
+ hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED)
+
self.event_stream = self.device.hci.new_event_stream()
self.acl_stream = EventStream(
self.device.hci.FetchAclPackets(empty_proto.Empty()))
read_bd_addr = ReadBdAddrCompleteCapture()
assertThat(self.event_stream).emits(read_bd_addr)
return read_bd_addr.get().GetBdAddr()
+
+ def initiate_connection(self, remote_addr):
+ self.send_command_with_status(
+ hci_packets.CreateConnectionBuilder(
+ remote_addr.decode('utf-8'),
+ 0xcc18, # Packet Type
+ hci_packets.PageScanRepetitionMode.R1,
+ 0x0,
+ hci_packets.ClockOffsetValid.INVALID,
+ hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH))
+
+ def accept_connection(self):
+ connection_request = ConnectionRequestCapture()
+ assertThat(self.event_stream).emits(connection_request)
+
+ self.send_command_with_status(
+ hci_packets.AcceptConnectionRequestBuilder(
+ connection_request.get().GetBdAddr(),
+ hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE))
+ return self.complete_connection()
+
+ def complete_connection(self):
+ connection_complete = ConnectionCompleteCapture()
+ assertThat(self.event_stream).emits(connection_complete)
+
+ handle = connection_complete.get().GetConnectionHandle()
+ return PyHciAclConnection(handle, self.acl_stream, self.device)