from grpc import StatusCode
class EventStream(object):
+
+ event_buffer = []
+
def __init__(self, stream_stub_fn):
self.stream_stub_fn = stream_stub_fn
+ def clear_event_buffer(self):
+ self.event_buffer.clear()
+
def subscribe(self):
return self.stream_stub_fn(
common_pb2.EventStreamRequest(
)
)
+ def assert_none(self):
+ response = self.stream_stub_fn(
+ common_pb2.EventStreamRequest(
+ subscription_mode=common_pb2.NONE,
+ fetch_mode=common_pb2.ALL_CURRENT
+ )
+ )
+
+ try:
+ for event in response:
+ self.event_buffer.append(event)
+ except RpcError:
+ pass
+
+ if len(self.event_buffer) != 0:
+ asserts.fail("event_buffer is not empty \n%s" % self.event_buffer)
+
+ def assert_none_matching(self, match_fn):
+ response = self.stream_stub_fn(
+ common_pb2.EventStreamRequest(
+ subscription_mode=common_pb2.NONE,
+ fetch_mode=common_pb2.ALL_CURRENT
+ )
+ )
+
+ try:
+ for event in response:
+ self.event_buffer.append(event)
+ except RpcError:
+ pass
+
+ for event in self.event_buffer:
+ if match_fn(event):
+ asserts.fail("event %s occurs" % event)
+
def assert_event_occurs(self, match_fn, timeout=timedelta(seconds=3)):
expiration_time = datetime.now() + timeout
+
+ while len(self.event_buffer):
+ element = self.event_buffer.pop(0)
+ if match_fn(element):
+ return
+
while (True):
if datetime.now() > expiration_time:
asserts.fail("timeout of %s exceeded" % str(timeout))
try:
for event in response:
if (match_fn(event)):
+ for remain_event in response:
+ self.event_buffer.append(remain_event)
return
except RpcError:
if response.code() == StatusCode.DEADLINE_EXCEEDED:
class SimpleHalTest(GdBaseTestClass):
+ def test_none_event(self):
+ self.gd_devices[0].hal.hci_event_stream.clear_event_buffer()
+
+ self.gd_devices[0].hal.hci_event_stream.subscribe()
+
+ self.gd_devices[0].hal.hci_event_stream.assert_none()
+
+ self.gd_devices[0].hal.hci_event_stream.unsubscribe()
+
def test_fetch_hci_event(self):
self.gd_devices[0].hal.SetLoopbackMode(
hal_facade_pb2.LoopbackModeSettings(enable=True)