logging.debug(e)
return True # Failed as expected
return False
+
+ def test_assertThat_eventStream_emitsInOrder_passes(self):
+ with EventStream(FetchEvents(events=[1, 2, 3],
+ delay_ms=50)) as event_stream:
+ assertThat(event_stream).emits(
+ lambda data: data.value_ == 1,
+ lambda data: data.value_ == 2).inOrder()
+
+ def test_assertThat_eventStream_emitsInAnyOrder_passes(self):
+ with EventStream(FetchEvents(events=[1, 2, 3],
+ delay_ms=50)) as event_stream:
+ assertThat(event_stream).emits(
+ lambda data: data.value_ == 2,
+ lambda data: data.value_ == 1).inAnyOrder().then(
+ lambda data: data.value_ == 3)
+
+ def test_assertThat_eventStream_emitsInOrder_fails(self):
+ try:
+ with EventStream(FetchEvents(events=[1, 2, 3],
+ delay_ms=50)) as event_stream:
+ assertThat(event_stream).emits(
+ lambda data: data.value_ == 2,
+ lambda data: data.value_ == 1).inOrder()
+ except Exception as e:
+ logging.debug(e)
+ return True # Failed as expected
+ return False
+
+ def test_assertThat_eventStream_emitsInAnyOrder_fails(self):
+ try:
+ with EventStream(FetchEvents(events=[1, 2, 3],
+ delay_ms=50)) as event_stream:
+ assertThat(event_stream).emits(
+ lambda data: data.value_ == 4,
+ lambda data: data.value_ == 1).inAnyOrder()
+ except Exception as e:
+ logging.debug(e)
+ return True # Failed as expected
+ return False
len(event_list) <= at_most_times,
msg=("Expected at most %d events, but got %d" % (at_most_times,
len(event_list))))
+
+ def assert_all_events_occur(
+ self,
+ match_fns,
+ order_matters,
+ timeout=timedelta(seconds=DEFAULT_TIMEOUT_SECONDS)):
+ logging.debug("assert_all_events_occur %fs" % timeout.total_seconds())
+ pending_matches = list(match_fns)
+ matched_order = []
+ end_time = datetime.now() + timeout
+ while len(pending_matches) > 0 and datetime.now() < end_time:
+ remaining = self.remaining_time_delta(end_time)
+ logging.debug("Waiting for event (%fs remaining)" %
+ (remaining.total_seconds()))
+ try:
+ current_event = self.event_queue.get(
+ timeout=remaining.total_seconds())
+ for match_fn in pending_matches:
+ if match_fn(current_event):
+ pending_matches.remove(match_fn)
+ matched_order.append(match_fn)
+ except Empty:
+ continue
+ logging.debug("Done waiting for event")
+ asserts.assert_true(
+ len(matched_order) == len(match_fns),
+ msg=("Expected at least %d events, but got %d" %
+ (len(match_fns), len(matched_order))))
+ if order_matters:
+ correct_order = True
+ i = 0
+ while i < len(match_fns):
+ if match_fns[i] is not matched_order[i]:
+ correct_order = False
+ break
+ i += 1
+ asserts.assert_true(
+ correct_order, "Events not received in correct order %s %s" %
+ (match_fns, matched_order))
def __init__(self, value):
super().__init__(value)
- def emits(self, match_fn):
- self._value.assert_event_occurs(match_fn)
- return EventStreamContinuationSubject(self._value)
+ def emits(self, *match_fns):
+ if len(match_fns) == 0:
+ raise signals.TestFailure("Must specify a match function")
+ elif len(match_fns) == 1:
+ self._value.assert_event_occurs(match_fns[0])
+ return EventStreamContinuationSubject(self._value)
+ else:
+ return MultiMatchStreamSubject(self._value, match_fns)
+
+
+class MultiMatchStreamSubject(object):
+
+ def __init__(self, stream, match_fns):
+ self._stream = stream
+ self._match_fns = match_fns
+
+ def inAnyOrder(self):
+ self._stream.assert_all_events_occur(
+ self._match_fns, order_matters=False)
+ return EventStreamContinuationSubject(self._stream)
+
+ def inOrder(self):
+ self._stream.assert_all_events_occur(
+ self._match_fns, order_matters=True)
+ return EventStreamContinuationSubject(self._stream)
class EventStreamContinuationSubject(ObjectSubject):
def __init__(self, value):
super().__init__(value)
- def then(self, match_fn):
- self._value.assert_event_occurs(match_fn)
- return EventStreamContinuationSubject(self._value)
+ def then(self, *match_fns):
+ if len(match_fns) == 0:
+ raise signals.TestFailure("Must specify a match function")
+ elif len(match_fns) == 1:
+ self._value.assert_event_occurs(match_fns[0])
+ return EventStreamContinuationSubject(self._value)
+ else:
+ return MultiMatchStreamSubject(self._value, match_fns)
class BooleanSubject(ObjectSubject):