OSDN Git Service

Cert: implement assert function with event buffer
authorChienyuan <chienyuanhuang@google.com>
Thu, 2 May 2019 20:29:20 +0000 (13:29 -0700)
committerHansong Zhang <hsz@google.com>
Thu, 2 May 2019 23:45:58 +0000 (16:45 -0700)
Test: run gd/cert/run_cert.sh
Change-Id: I237ecdee7e70b860f3e73d4825dd79b0235ae22b

gd/cert/event_stream.py
gd/hal/cert/simple_hal_test.py

index cc24c2b..0342131 100644 (file)
@@ -23,9 +23,15 @@ from grpc import RpcError
 from grpc import StatusCode
 
 class EventStream(object):
+
+  event_buffer = []
+
   def __init__(self, stream_stub_fn):
     self.stream_stub_fn = stream_stub_fn
 
+  def clear_event_buffer(self):
+    self.event_buffer.clear()
+
   def subscribe(self):
     return self.stream_stub_fn(
         common_pb2.EventStreamRequest(
@@ -42,8 +48,49 @@ class EventStream(object):
         )
     )
 
+  def assert_none(self):
+    response = self.stream_stub_fn(
+        common_pb2.EventStreamRequest(
+            subscription_mode=common_pb2.NONE,
+            fetch_mode=common_pb2.ALL_CURRENT
+        )
+    )
+
+    try:
+      for event in response:
+        self.event_buffer.append(event)
+    except RpcError:
+        pass
+
+    if len(self.event_buffer) != 0:
+      asserts.fail("event_buffer is not empty \n%s" % self.event_buffer)
+
+  def assert_none_matching(self, match_fn):
+    response = self.stream_stub_fn(
+        common_pb2.EventStreamRequest(
+            subscription_mode=common_pb2.NONE,
+            fetch_mode=common_pb2.ALL_CURRENT
+        )
+    )
+
+    try:
+      for event in response:
+        self.event_buffer.append(event)
+    except RpcError:
+      pass
+
+    for event in self.event_buffer:
+      if match_fn(event):
+        asserts.fail("event %s occurs" % event)
+
   def assert_event_occurs(self, match_fn, timeout=timedelta(seconds=3)):
     expiration_time = datetime.now() + timeout
+
+    while len(self.event_buffer):
+      element = self.event_buffer.pop(0)
+      if match_fn(element):
+        return
+
     while (True):
       if datetime.now() > expiration_time:
         asserts.fail("timeout of %s exceeded" % str(timeout))
@@ -59,6 +106,8 @@ class EventStream(object):
       try:
         for event in response:
           if (match_fn(event)):
+            for remain_event in response:
+              self.event_buffer.append(remain_event)
             return
       except RpcError:
         if response.code() == StatusCode.DEADLINE_EXCEEDED:
index 14ae5e5..934b7fb 100644 (file)
@@ -25,6 +25,15 @@ from hal import facade_pb2 as hal_facade_pb2
 
 class SimpleHalTest(GdBaseTestClass):
 
+    def test_none_event(self):
+        self.gd_devices[0].hal.hci_event_stream.clear_event_buffer()
+
+        self.gd_devices[0].hal.hci_event_stream.subscribe()
+
+        self.gd_devices[0].hal.hci_event_stream.assert_none()
+
+        self.gd_devices[0].hal.hci_event_stream.unsubscribe()
+
     def test_fetch_hci_event(self):
         self.gd_devices[0].hal.SetLoopbackMode(
             hal_facade_pb2.LoopbackModeSettings(enable=True)