12 func TestSubCloseUnsub(t *testing.T) {
13 // the point of this test is **not** to panic
16 sub, _ := mux.Subscribe(int(0))
20 func TestSub(t *testing.T) {
21 mux := NewDispatcher()
24 sub, _ := mux.Subscribe(testEvent(0))
26 if err := mux.Post(testEvent(5)); err != nil {
27 t.Errorf("Post returned unexpected error: %v", err)
32 if ev.Data.(testEvent) != testEvent(5) {
33 t.Errorf("Got %v (%T), expected event %v (%T)",
34 ev, ev, testEvent(5), testEvent(5))
38 func TestMuxErrorAfterStop(t *testing.T) {
39 mux := NewDispatcher()
42 sub, _ := mux.Subscribe(testEvent(0))
43 if _, isopen := <-sub.Chan(); isopen {
44 t.Errorf("subscription channel was not closed")
46 if err := mux.Post(testEvent(0)); err != ErrMuxClosed {
47 t.Errorf("Post error mismatch, got: %s, expected: %s", err, ErrMuxClosed)
51 func TestUnsubscribeUnblockPost(t *testing.T) {
52 mux := NewDispatcher()
55 sub, _ := mux.Subscribe(testEvent(0))
56 unblocked := make(chan bool)
58 mux.Post(testEvent(5))
64 t.Errorf("Post returned before Unsubscribe")
71 func TestSubscribeDuplicateType(t *testing.T) {
72 mux := NewDispatcher()
73 if _, err := mux.Subscribe(testEvent(1), testEvent(2)); err != ErrDuplicateSubscribe {
74 t.Fatal("Subscribe didn't error for duplicate type")
78 func TestMuxConcurrent(t *testing.T) {
79 rand.Seed(time.Now().Unix())
80 mux := NewDispatcher()
83 recv := make(chan int)
86 err := mux.Post(testEvent(0))
93 time.Sleep(time.Duration(rand.Intn(99)) * time.Millisecond)
94 sub, _ := mux.Subscribe(testEvent(0))
104 for i := 0; i < nsubs; i++ {
108 // wait until everyone has been served
109 counts := make(map[int]int, nsubs)
110 for i := 0; i < nsubs; i++ {
113 for i, count := range counts {
115 t.Errorf("receiver %d called %d times, expected only 1 call", i, count)
120 func emptySubscriber(mux *Dispatcher) {
121 s, _ := mux.Subscribe(testEvent(0))
128 func BenchmarkPost1000(b *testing.B) {
130 mux = NewDispatcher()
131 subscribed, done sync.WaitGroup
134 subscribed.Add(nsubs)
136 for i := 0; i < nsubs; i++ {
138 s, _ := mux.Subscribe(testEvent(0))
147 // The actual benchmark.
149 for i := 0; i < b.N; i++ {
150 mux.Post(testEvent(0))
158 func BenchmarkPostConcurrent(b *testing.B) {
159 var mux = NewDispatcher()
165 var wg sync.WaitGroup
167 for i := 0; i < b.N; i++ {
168 mux.Post(testEvent(0))
173 for i := 0; i < 5; i++ {
180 func BenchmarkChanSend(b *testing.B) {
181 c := make(chan interface{})
182 closed := make(chan struct{})
188 for i := 0; i < b.N; i++ {