9 "github.com/stretchr/testify/assert"
12 // TestAddListenerForEventFireOnce sets up an EventSwitch, subscribes a single
13 // listener to an event, and sends a string "data".
14 func TestAddListenerForEventFireOnce(t *testing.T) {
15 evsw := NewEventSwitch()
16 started, err := evsw.Start()
17 if !started || err != nil {
18 t.Errorf("Failed to start EventSwitch, error: %v", err)
20 messages := make(chan EventData)
21 evsw.AddListenerForEvent("listener", "event",
22 func(data EventData) {
25 go evsw.FireEvent("event", "data")
26 received := <-messages
27 if received != "data" {
28 t.Errorf("Message received does not match: %v", received)
32 // TestAddListenerForEventFireMany sets up an EventSwitch, subscribes a single
33 // listener to an event, and sends a thousand integers.
34 func TestAddListenerForEventFireMany(t *testing.T) {
35 evsw := NewEventSwitch()
36 started, err := evsw.Start()
37 if !started || err != nil {
38 t.Errorf("Failed to start EventSwitch, error: %v", err)
40 doneSum := make(chan uint64)
41 doneSending := make(chan uint64)
42 numbers := make(chan uint64, 4)
43 // subscribe one listener for one event
44 evsw.AddListenerForEvent("listener", "event",
45 func(data EventData) {
46 numbers <- data.(uint64)
48 // collect received events
49 go sumReceivedNumbers(numbers, doneSum)
51 go fireEvents(evsw, "event", doneSending, uint64(1))
52 checkSum := <-doneSending
55 if checkSum != eventSum {
56 t.Errorf("Not all messages sent were received.\n")
60 // TestAddListenerForDifferentEvents sets up an EventSwitch, subscribes a single
61 // listener to three different events and sends a thousand integers for each
62 // of the three events.
63 func TestAddListenerForDifferentEvents(t *testing.T) {
64 evsw := NewEventSwitch()
65 started, err := evsw.Start()
66 if !started || err != nil {
67 t.Errorf("Failed to start EventSwitch, error: %v", err)
69 doneSum := make(chan uint64)
70 doneSending1 := make(chan uint64)
71 doneSending2 := make(chan uint64)
72 doneSending3 := make(chan uint64)
73 numbers := make(chan uint64, 4)
74 // subscribe one listener to three events
75 evsw.AddListenerForEvent("listener", "event1",
76 func(data EventData) {
77 numbers <- data.(uint64)
79 evsw.AddListenerForEvent("listener", "event2",
80 func(data EventData) {
81 numbers <- data.(uint64)
83 evsw.AddListenerForEvent("listener", "event3",
84 func(data EventData) {
85 numbers <- data.(uint64)
87 // collect received events
88 go sumReceivedNumbers(numbers, doneSum)
90 go fireEvents(evsw, "event1", doneSending1, uint64(1))
91 go fireEvents(evsw, "event2", doneSending2, uint64(1))
92 go fireEvents(evsw, "event3", doneSending3, uint64(1))
93 var checkSum uint64 = 0
94 checkSum += <-doneSending1
95 checkSum += <-doneSending2
96 checkSum += <-doneSending3
99 if checkSum != eventSum {
100 t.Errorf("Not all messages sent were received.\n")
104 // TestAddDifferentListenerForDifferentEvents sets up an EventSwitch,
105 // subscribes a first listener to three events, and subscribes a second
106 // listener to two of those three events, and then sends a thousand integers
107 // for each of the three events.
108 func TestAddDifferentListenerForDifferentEvents(t *testing.T) {
109 evsw := NewEventSwitch()
110 started, err := evsw.Start()
111 if !started || err != nil {
112 t.Errorf("Failed to start EventSwitch, error: %v", err)
114 doneSum1 := make(chan uint64)
115 doneSum2 := make(chan uint64)
116 doneSending1 := make(chan uint64)
117 doneSending2 := make(chan uint64)
118 doneSending3 := make(chan uint64)
119 numbers1 := make(chan uint64, 4)
120 numbers2 := make(chan uint64, 4)
121 // subscribe two listener to three events
122 evsw.AddListenerForEvent("listener1", "event1",
123 func(data EventData) {
124 numbers1 <- data.(uint64)
126 evsw.AddListenerForEvent("listener1", "event2",
127 func(data EventData) {
128 numbers1 <- data.(uint64)
130 evsw.AddListenerForEvent("listener1", "event3",
131 func(data EventData) {
132 numbers1 <- data.(uint64)
134 evsw.AddListenerForEvent("listener2", "event2",
135 func(data EventData) {
136 numbers2 <- data.(uint64)
138 evsw.AddListenerForEvent("listener2", "event3",
139 func(data EventData) {
140 numbers2 <- data.(uint64)
142 // collect received events for listener1
143 go sumReceivedNumbers(numbers1, doneSum1)
144 // collect received events for listener2
145 go sumReceivedNumbers(numbers2, doneSum2)
147 go fireEvents(evsw, "event1", doneSending1, uint64(1))
148 go fireEvents(evsw, "event2", doneSending2, uint64(1001))
149 go fireEvents(evsw, "event3", doneSending3, uint64(2001))
150 checkSumEvent1 := <-doneSending1
151 checkSumEvent2 := <-doneSending2
152 checkSumEvent3 := <-doneSending3
153 checkSum1 := checkSumEvent1 + checkSumEvent2 + checkSumEvent3
154 checkSum2 := checkSumEvent2 + checkSumEvent3
157 eventSum1 := <-doneSum1
158 eventSum2 := <-doneSum2
159 if checkSum1 != eventSum1 ||
160 checkSum2 != eventSum2 {
161 t.Errorf("Not all messages sent were received for different listeners to different events.\n")
165 // TestAddAndRemoveListener sets up an EventSwitch, subscribes a listener to
166 // two events, fires a thousand integers for the first event, then unsubscribes
167 // the listener and fires a thousand integers for the second event.
168 func TestAddAndRemoveListener(t *testing.T) {
169 evsw := NewEventSwitch()
170 started, err := evsw.Start()
171 if !started || err != nil {
172 t.Errorf("Failed to start EventSwitch, error: %v", err)
174 doneSum1 := make(chan uint64)
175 doneSum2 := make(chan uint64)
176 doneSending1 := make(chan uint64)
177 doneSending2 := make(chan uint64)
178 numbers1 := make(chan uint64, 4)
179 numbers2 := make(chan uint64, 4)
180 // subscribe two listener to three events
181 evsw.AddListenerForEvent("listener", "event1",
182 func(data EventData) {
183 numbers1 <- data.(uint64)
185 evsw.AddListenerForEvent("listener", "event2",
186 func(data EventData) {
187 numbers2 <- data.(uint64)
189 // collect received events for event1
190 go sumReceivedNumbers(numbers1, doneSum1)
191 // collect received events for event2
192 go sumReceivedNumbers(numbers2, doneSum2)
194 go fireEvents(evsw, "event1", doneSending1, uint64(1))
195 checkSumEvent1 := <-doneSending1
196 // after sending all event1, unsubscribe for all events
197 evsw.RemoveListener("listener")
198 go fireEvents(evsw, "event2", doneSending2, uint64(1001))
199 checkSumEvent2 := <-doneSending2
202 eventSum1 := <-doneSum1
203 eventSum2 := <-doneSum2
204 if checkSumEvent1 != eventSum1 ||
205 // correct value asserted by preceding tests, suffices to be non-zero
206 checkSumEvent2 == uint64(0) ||
207 eventSum2 != uint64(0) {
208 t.Errorf("Not all messages sent were received or unsubscription did not register.\n")
212 // TestRemoveListener does basic tests on adding and removing
213 func TestRemoveListener(t *testing.T) {
214 evsw := NewEventSwitch()
215 started, err := evsw.Start()
216 if !started || err != nil {
217 t.Errorf("Failed to start EventSwitch, error: %v", err)
221 // add some listeners and make sure they work
222 evsw.AddListenerForEvent("listener", "event1",
223 func(data EventData) {
226 evsw.AddListenerForEvent("listener", "event2",
227 func(data EventData) {
230 for i := 0; i < count; i++ {
231 evsw.FireEvent("event1", true)
232 evsw.FireEvent("event2", true)
234 assert.Equal(t, count, sum1)
235 assert.Equal(t, count, sum2)
237 // remove one by event and make sure it is gone
238 evsw.RemoveListenerForEvent("event2", "listener")
239 for i := 0; i < count; i++ {
240 evsw.FireEvent("event1", true)
241 evsw.FireEvent("event2", true)
243 assert.Equal(t, count*2, sum1)
244 assert.Equal(t, count, sum2)
246 // remove the listener entirely and make sure both gone
247 evsw.RemoveListener("listener")
248 for i := 0; i < count; i++ {
249 evsw.FireEvent("event1", true)
250 evsw.FireEvent("event2", true)
252 assert.Equal(t, count*2, sum1)
253 assert.Equal(t, count, sum2)
256 // TestAddAndRemoveListenersAsync sets up an EventSwitch, subscribes two
257 // listeners to three events, and fires a thousand integers for each event.
258 // These two listeners serve as the baseline validation while other listeners
259 // are randomly subscribed and unsubscribed.
260 // More precisely it randomly subscribes new listeners (different from the first
261 // two listeners) to one of these three events. At the same time it starts
262 // randomly unsubscribing these additional listeners from all events they are
263 // at that point subscribed to.
264 // NOTE: it is important to run this test with race conditions tracking on,
265 // `go test -race`, to examine for possible race conditions.
266 func TestRemoveListenersAsync(t *testing.T) {
267 evsw := NewEventSwitch()
268 started, err := evsw.Start()
269 if !started || err != nil {
270 t.Errorf("Failed to start EventSwitch, error: %v", err)
272 doneSum1 := make(chan uint64)
273 doneSum2 := make(chan uint64)
274 doneSending1 := make(chan uint64)
275 doneSending2 := make(chan uint64)
276 doneSending3 := make(chan uint64)
277 numbers1 := make(chan uint64, 4)
278 numbers2 := make(chan uint64, 4)
279 // subscribe two listener to three events
280 evsw.AddListenerForEvent("listener1", "event1",
281 func(data EventData) {
282 numbers1 <- data.(uint64)
284 evsw.AddListenerForEvent("listener1", "event2",
285 func(data EventData) {
286 numbers1 <- data.(uint64)
288 evsw.AddListenerForEvent("listener1", "event3",
289 func(data EventData) {
290 numbers1 <- data.(uint64)
292 evsw.AddListenerForEvent("listener2", "event1",
293 func(data EventData) {
294 numbers2 <- data.(uint64)
296 evsw.AddListenerForEvent("listener2", "event2",
297 func(data EventData) {
298 numbers2 <- data.(uint64)
300 evsw.AddListenerForEvent("listener2", "event3",
301 func(data EventData) {
302 numbers2 <- data.(uint64)
304 // collect received events for event1
305 go sumReceivedNumbers(numbers1, doneSum1)
306 // collect received events for event2
307 go sumReceivedNumbers(numbers2, doneSum2)
308 addListenersStress := func() {
309 s1 := rand.NewSource(time.Now().UnixNano())
311 for k := uint16(0); k < 400; k++ {
312 listenerNumber := r1.Intn(100) + 3
313 eventNumber := r1.Intn(3) + 1
314 go evsw.AddListenerForEvent(fmt.Sprintf("listener%v", listenerNumber),
315 fmt.Sprintf("event%v", eventNumber),
316 func(_ EventData) {})
319 removeListenersStress := func() {
320 s2 := rand.NewSource(time.Now().UnixNano())
322 for k := uint16(0); k < 80; k++ {
323 listenerNumber := r2.Intn(100) + 3
324 go evsw.RemoveListener(fmt.Sprintf("listener%v", listenerNumber))
329 go fireEvents(evsw, "event1", doneSending1, uint64(1))
330 removeListenersStress()
331 go fireEvents(evsw, "event2", doneSending2, uint64(1001))
332 go fireEvents(evsw, "event3", doneSending3, uint64(2001))
333 checkSumEvent1 := <-doneSending1
334 checkSumEvent2 := <-doneSending2
335 checkSumEvent3 := <-doneSending3
336 checkSum := checkSumEvent1 + checkSumEvent2 + checkSumEvent3
339 eventSum1 := <-doneSum1
340 eventSum2 := <-doneSum2
341 if checkSum != eventSum1 ||
342 checkSum != eventSum2 {
343 t.Errorf("Not all messages sent were received.\n")
347 //------------------------------------------------------------------------------
350 // sumReceivedNumbers takes two channels and adds all numbers received
351 // until the receiving channel `numbers` is closed; it then sends the sum
352 // on `doneSum` and closes that channel. Expected to be run in a go-routine.
353 func sumReceivedNumbers(numbers, doneSum chan uint64) {
366 // fireEvents takes an EventSwitch and fires a thousand integers under
367 // a given `event` with the integers mootonically increasing from `offset`
368 // to `offset` + 999. It additionally returns the addition of all integers
369 // sent on `doneChan` for assertion that all events have been sent, and enabling
370 // the test to assert all events have also been received.
371 func fireEvents(evsw EventSwitch, event string, doneChan chan uint64,
373 var sentSum uint64 = 0
374 for i := offset; i <= offset+uint64(999); i++ {
376 evsw.FireEvent(event, i)