return information_payload[2:]
+def get_enhanced_control_field(payload):
+ return payload[:2]
+
+
class SimpleL2capTest(GdBaseTestClass):
def setup_test(self):
lambda log: log.HasField("link_up") and log.link_up.remote == self.dut_address
)
- def _open_channel(self, event_asserts, scid=0x0101, psm=0x33):
+ def _open_channel(
+ self,
+ event_asserts,
+ scid=0x0101,
+ psm=0x33,
+ mode=l2cap_cert_pb2.ChannelRetransmissionFlowControlMode.BASIC):
self.device_under_test.l2cap.SetDynamicChannel(
- l2cap_facade_pb2.SetEnableDynamicChannelRequest(psm=psm))
+ l2cap_facade_pb2.SetEnableDynamicChannelRequest(
+ psm=psm, retransmission_mode=mode))
self.cert_device.l2cap.SendConnectionRequest(
l2cap_cert_pb2.ConnectionRequest(scid=scid, psm=psm))
event_asserts.assert_event_occurs(
l2cap_event_asserts_alt = EventAsserts(l2cap_log_stream)
self._register_callbacks(l2cap_log_stream)
self.retransmission_mode = l2cap_cert_pb2.ChannelRetransmissionFlowControlMode.ERTM
- self.device_under_test.l2cap.RegisterChannel(
- l2cap_facade_pb2.RegisterChannelRequest(channel=2))
- self.device_under_test.l2cap.SetDynamicChannel(
- l2cap_facade_pb2.SetEnableDynamicChannelRequest(
- psm=0x33,
- retransmission_mode=l2cap_facade_pb2.
- RetransmissionFlowControlMode.ERTM))
- self._setup_link(l2cap_event_asserts)
scid = 0x0101
- self._open_channel(l2cap_event_asserts, scid=scid)
+ psm = 0x33
+ self._setup_link(l2cap_event_asserts)
+ self._open_channel(
+ l2cap_event_asserts, mode=self.retransmission_mode)
self.device_under_test.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc'))
+ l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
# Retransmission timer = 1, 1 * monitor timer = 2, so total timeout is 3
time.sleep(4)
- l2cap_event_asserts.assert_event_occurs(lambda log : is_disconnection_request(log) and \
- log.disconnection_request.dcid == scid and \
- log.disconnection_request.scid == self.scid_dcid_map[scid])
+ l2cap_event_asserts.assert_event_occurs(
+ lambda log : is_disconnection_request(log) and \
+ log.disconnection_request.dcid == scid and \
+ log.disconnection_request.scid == self.scid_dcid_map[scid])
l2cap_event_asserts_alt.assert_event_occurs_at_most(
lambda log: is_disconnection_request(log), 1)
- def test_sent_rej_lost(self):
+ def test_i_frame_transmissions_exceed_max_transmit(self):
"""
- L2CAP/ERM/BI-01-C [S-Frame [REJ] Lost or Corrupted]
+ L2CAP/ERM/BV-12-C [I-Frame Transmissions Exceed MaxTransmit]
"""
with EventCallbackStream(
self.cert_device.l2cap.FetchL2capLog(
empty_pb2.Empty())) as l2cap_log_stream:
l2cap_event_asserts = EventAsserts(l2cap_log_stream)
- self._setup_link(l2cap_event_asserts)
-
- signal_id = 3
+ l2cap_event_asserts_alt = EventAsserts(l2cap_log_stream)
+ self._register_callbacks(l2cap_log_stream)
+ self.retransmission_mode = l2cap_cert_pb2.ChannelRetransmissionFlowControlMode.ERTM
scid = 0x0101
- psm = 1
- mode = l2cap_cert_pb2.ChannelRetransmissionFlowControlMode.ERTM
- self.tx_window = 1
- self.device_under_test.l2cap.SetDynamicChannel(
- l2cap_facade_pb2.SetEnableDynamicChannelRequest(
- psm=psm, retransmission_mode=mode))
-
- def handle_connection_response(log):
- log = log.connection_response
- self.scid_dcid_map[log.scid] = log.dcid
- self.cert_device.l2cap.SendConfigurationRequest(
- l2cap_cert_pb2.ConfigurationRequest(
- dcid=self.scid_dcid_map[scid],
- signal_id=signal_id + 1,
- retransmission_config=l2cap_cert_pb2.
- ChannelRetransmissionFlowControlConfig(mode=mode)))
- l2cap_log_stream.unregister_callback(
- handle_connection_response,
- matcher_fn=is_connection_response)
-
- l2cap_log_stream.register_callback(
- handle_connection_response, matcher_fn=is_connection_response)
-
- def handle_configuration_request(log):
- log = log.configuration_request
- if log.dcid not in self.scid_dcid_map:
- return
- dcid = self.scid_dcid_map[log.dcid]
- if log.HasField("retransmission_config"):
- self.tx_window = log.retransmission_config.tx_window
- self.cert_device.l2cap.SendConfigurationResponse(
- l2cap_cert_pb2.ConfigurationResponse(
- scid=dcid,
- signal_id=log.signal_id,
- ))
- l2cap_log_stream.unregister_callback(
- handle_configuration_request,
- matcher_fn=is_configuration_request)
+ psm = 0x33
+ self._setup_link(l2cap_event_asserts)
+ self._open_channel(
+ l2cap_event_asserts, mode=self.retransmission_mode)
+ self.device_under_test.l2cap.SendDynamicChannelPacket(
+ l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
- l2cap_log_stream.register_callback(
- handle_configuration_request,
- matcher_fn=is_configuration_request)
+ l2cap_event_asserts.assert_event_occurs(
+ lambda log: log.HasField("data_packet") and \
+ log.data_packet.channel == scid and \
+ get_enhanced_control_field(log.data_packet.payload) == b'\x11\x00'
+ )
- def handle_information_request(log):
- log = log.information_request
- self.cert_device.l2cap.SendInformationResponse(
- l2cap_cert_pb2.InformationResponse(
- type=log.type, signal_id=log.signal_id))
+ self.cert_device.l2cap.SendSFrame(
+ l2cap_cert_pb2.SFrame(
+ channel=self.scid_dcid_map[scid], req_seq=0, p=0, s=0, f=1))
+ l2cap_event_asserts.assert_none_matching(
+ lambda log: log.HasField("data_packet"))
+ l2cap_event_asserts_alt.assert_event_occurs(
+ lambda log : is_disconnection_request(log) and \
+ log.disconnection_request.dcid == scid and \
+ log.disconnection_request.scid == self.scid_dcid_map[scid])
- l2cap_log_stream.register_callback(
- handle_information_request, matcher_fn=is_information_request)
+ def test_sent_rej_lost(self):
+ """
+ L2CAP/ERM/BI-01-C [S-Frame [REJ] Lost or Corrupted]
+ """
+ with EventCallbackStream(
+ self.cert_device.l2cap.FetchL2capLog(
+ empty_pb2.Empty())) as l2cap_log_stream:
+ l2cap_event_asserts = EventAsserts(l2cap_log_stream)
- self.cert_device.l2cap.SendConnectionRequest(
- l2cap_cert_pb2.ConnectionRequest(scid=scid, psm=psm))
+ self._register_callbacks(l2cap_log_stream)
+ signal_id = 3
+ scid = 0x0101
+ tx_window = 5
+ self.retransmission_mode = l2cap_cert_pb2.ChannelRetransmissionFlowControlMode.ERTM
- l2cap_event_asserts.assert_event_occurs(
- lambda log: is_configuration_response(log) and scid == log.configuration_response.scid
- )
- time.sleep(0.5)
+ self._setup_link(l2cap_event_asserts)
+ self._open_channel(
+ l2cap_event_asserts, mode=self.retransmission_mode)
self.cert_device.l2cap.SendIFrame(
l2cap_cert_pb2.IFrame(
req_seq=0,
tx_seq=0,
sar=0,
- sdu_size=3,
- information=b"abc"))
+ information=b'abc'))
+ l2cap_event_asserts.assert_event_occurs(
+ lambda log :log.HasField("data_packet") and \
+ log.data_packet.channel == scid and \
+ get_enhanced_control_field(log.data_packet.payload) == b'\x01\x01'
+ )
self.cert_device.l2cap.SendIFrame(
l2cap_cert_pb2.IFrame(
channel=self.scid_dcid_map[scid],
req_seq=0,
- tx_seq=(self.tx_window - 1),
+ tx_seq=(tx_window - 1),
sar=0,
- sdu_size=3,
- information=b"def"))
+ information=b'def'))
l2cap_event_asserts.assert_event_occurs(
- lambda log: log.HasField("data_packet") and log.data_packet.channel == scid and log.data_packet.payload == b'\x05\x01'
- )
+ lambda log :log.HasField("data_packet") and \
+ log.data_packet.channel == scid and \
+ get_enhanced_control_field(log.data_packet.payload) == b'\x05\x01'
+ )
self.cert_device.l2cap.SendSFrame(
l2cap_cert_pb2.SFrame(
channel=self.scid_dcid_map[scid], req_seq=0, p=1, s=0))
l2cap_event_asserts.assert_event_occurs(
- lambda log: log.HasField("data_packet") and log.data_packet.channel == scid and log.data_packet.payload == b'\x81\x01'
- )
-
- for i in range(1, self.tx_window):
+ lambda log :log.HasField("data_packet") and \
+ log.data_packet.channel == scid and \
+ get_enhanced_control_field(log.data_packet.payload) == b'\x81\x01'
+ )
+ for i in range(1, tx_window):
self.cert_device.l2cap.SendIFrame(
l2cap_cert_pb2.IFrame(
channel=self.scid_dcid_map[scid],
req_seq=0,
tx_seq=(i),
sar=0))
- time.sleep(0.1)
- l2cap_event_asserts.assert_event_occurs(
- lambda log: log.HasField("data_packet") and log.data_packet.channel == scid and log.data_packet.payload == b'\x01\x0a'
- )
+ l2cap_event_asserts.assert_event_occurs(
+ lambda log :log.HasField("data_packet") and \
+ log.data_packet.channel == scid and \
+ bytes([get_enhanced_control_field(log.data_packet.payload)[1]]) == bytes([i + 1])
+ )
def test_respond_configuration_request_ertm(self):
"""
void recv_rr(uint8_t req_seq, Poll p = Poll::NOT_SET, Final f = Final::NOT_SET) {
if (rx_state_ == RxState::RECV) {
- if (p == Poll::NOT_SET && f == Final::NOT_SET && with_valid_req_seq(req_seq) && with_valid_f_bit(f)) {
+ if (p == Poll::NOT_SET && f == Final::NOT_SET && with_valid_req_seq_rr(req_seq) && with_valid_f_bit(f)) {
pass_to_tx(req_seq, f);
if (remote_busy() && unacked_frames_ > 0) {
start_retrans_timer();
} else if (p == Poll::POLL && with_valid_req_seq(req_seq) && with_valid_f_bit(f)) {
pass_to_tx(req_seq, f);
send_i_or_rr_or_rnr(Final::POLL_RESPONSE);
- } else if (with_invalid_req_seq(req_seq)) {
+ } else if (with_invalid_req_seq_rr(req_seq)) {
CloseChannel();
}
} else if (rx_state_ == RxState::REJ_SENT) {
rej_actioned_ = false;
}
send_pending_i_frames();
- } else if (p == Poll::NOT_SET && f == Final::NOT_SET && with_valid_req_seq(req_seq) && with_valid_f_bit(f)) {
+ } else if (p == Poll::NOT_SET && f == Final::NOT_SET && with_valid_req_seq_rr(req_seq) && with_valid_f_bit(f)) {
pass_to_tx(req_seq, f);
if (remote_busy() and unacked_frames_ > 0) {
start_retrans_timer();
}
remote_busy_ = false;
send_rr(Final::POLL_RESPONSE);
- } else if (with_invalid_req_seq(req_seq)) {
+ } else if (with_invalid_req_seq_rr(req_seq)) {
CloseChannel();
}
} else if (rx_state_ == RxState::SREJ_SENT) {
}
bool with_invalid_req_seq(uint8_t req_seq) {
- return req_seq < expected_ack_seq_ || req_seq >= next_tx_seq_;
+ return req_seq < expected_ack_seq_ || req_seq > next_tx_seq_;
}
bool with_invalid_req_seq_retrans(uint8_t req_seq) {
return !with_invalid_tx_seq(tx_seq) && !with_expected_tx_seq(tx_seq);
}
+ bool with_valid_req_seq_rr(uint8_t req_seq) {
+ return expected_ack_seq_ < req_seq && req_seq <= next_tx_seq_;
+ }
+
+ bool with_invalid_req_seq_rr(uint8_t req_seq) {
+ return req_seq <= expected_ack_seq_ || req_seq > next_tx_seq_;
+ }
+
bool with_expected_tx_seq_srej() {
// We don't support sending SREJ
return false;
uint8_t i = req_seq;
Final f = (p == Poll::NOT_SET ? Final::NOT_SET : Final::POLL_RESPONSE);
while (unacked_list_.find(i) != unacked_list_.end()) {
+ if (retry_i_frames_[i] == controller_->local_max_transmit_) {
+ CloseChannel();
+ return;
+ }
std::unique_ptr<CopyablePacketBuilder> copyable_packet_builder =
std::make_unique<CopyablePacketBuilder>(std::get<2>(unacked_list_.find(i)->second));
_send_i_frame(std::get<0>(unacked_list_.find(i)->second), std::move(copyable_packet_builder), buffer_seq_, i,
std::get<1>(unacked_list_.find(i)->second), f);
retry_i_frames_[i]++;
- if (retry_i_frames_[i] == controller_->local_max_transmit_) {
- CloseChannel();
- }
frames_sent_++;
f = Final::NOT_SET;
i++;