OSDN Git Service

edit the config (#320)
[bytom/vapor.git] / event / event_test.go
1 package event
2
3 import (
4         "math/rand"
5         "sync"
6         "testing"
7         "time"
8 )
9
10 type testEvent int
11
12 func TestSubCloseUnsub(t *testing.T) {
13         // the point of this test is **not** to panic
14         var mux Dispatcher
15         mux.Stop()
16         sub, _ := mux.Subscribe(int(0))
17         sub.Unsubscribe()
18 }
19
20 func TestSub(t *testing.T) {
21         mux := NewDispatcher()
22         defer mux.Stop()
23
24         sub, _ := mux.Subscribe(testEvent(0))
25         go func() {
26                 if err := mux.Post(testEvent(5)); err != nil {
27                         t.Errorf("Post returned unexpected error: %v", err)
28                 }
29         }()
30         ev := <-sub.Chan()
31
32         if ev.Data.(testEvent) != testEvent(5) {
33                 t.Errorf("Got %v (%T), expected event %v (%T)",
34                         ev, ev, testEvent(5), testEvent(5))
35         }
36 }
37
38 func TestMuxErrorAfterStop(t *testing.T) {
39         mux := NewDispatcher()
40         mux.Stop()
41
42         sub, _ := mux.Subscribe(testEvent(0))
43         if _, isopen := <-sub.Chan(); isopen {
44                 t.Errorf("subscription channel was not closed")
45         }
46         if err := mux.Post(testEvent(0)); err != ErrMuxClosed {
47                 t.Errorf("Post error mismatch, got: %s, expected: %s", err, ErrMuxClosed)
48         }
49 }
50
51 func TestUnsubscribeUnblockPost(t *testing.T) {
52         mux := NewDispatcher()
53         defer mux.Stop()
54
55         sub, _ := mux.Subscribe(testEvent(0))
56         unblocked := make(chan bool)
57         go func() {
58                 mux.Post(testEvent(5))
59                 unblocked <- true
60         }()
61
62         select {
63         case <-unblocked:
64                 t.Errorf("Post returned before Unsubscribe")
65         default:
66                 sub.Unsubscribe()
67                 <-unblocked
68         }
69 }
70
71 func TestSubscribeDuplicateType(t *testing.T) {
72         mux := NewDispatcher()
73         if _, err := mux.Subscribe(testEvent(1), testEvent(2)); err != ErrDuplicateSubscribe {
74                 t.Fatal("Subscribe didn't error for duplicate type")
75         }
76 }
77
78 func TestMuxConcurrent(t *testing.T) {
79         rand.Seed(time.Now().Unix())
80         mux := NewDispatcher()
81         defer mux.Stop()
82
83         recv := make(chan int)
84         poster := func() {
85                 for {
86                         err := mux.Post(testEvent(0))
87                         if err != nil {
88                                 return
89                         }
90                 }
91         }
92         sub := func(i int) {
93                 time.Sleep(time.Duration(rand.Intn(99)) * time.Millisecond)
94                 sub, _ := mux.Subscribe(testEvent(0))
95                 <-sub.Chan()
96                 sub.Unsubscribe()
97                 recv <- i
98         }
99
100         go poster()
101         go poster()
102         go poster()
103         nsubs := 1000
104         for i := 0; i < nsubs; i++ {
105                 go sub(i)
106         }
107
108         // wait until everyone has been served
109         counts := make(map[int]int, nsubs)
110         for i := 0; i < nsubs; i++ {
111                 counts[<-recv]++
112         }
113         for i, count := range counts {
114                 if count != 1 {
115                         t.Errorf("receiver %d called %d times, expected only 1 call", i, count)
116                 }
117         }
118 }
119
120 func emptySubscriber(mux *Dispatcher) {
121         s, _ := mux.Subscribe(testEvent(0))
122         go func() {
123                 for range s.Chan() {
124                 }
125         }()
126 }
127
128 func BenchmarkPost1000(b *testing.B) {
129         var (
130                 mux              = NewDispatcher()
131                 subscribed, done sync.WaitGroup
132                 nsubs            = 1000
133         )
134         subscribed.Add(nsubs)
135         done.Add(nsubs)
136         for i := 0; i < nsubs; i++ {
137                 go func() {
138                         s, _ := mux.Subscribe(testEvent(0))
139                         subscribed.Done()
140                         for range s.Chan() {
141                         }
142                         done.Done()
143                 }()
144         }
145         subscribed.Wait()
146
147         // The actual benchmark.
148         b.ResetTimer()
149         for i := 0; i < b.N; i++ {
150                 mux.Post(testEvent(0))
151         }
152
153         b.StopTimer()
154         mux.Stop()
155         done.Wait()
156 }
157
158 func BenchmarkPostConcurrent(b *testing.B) {
159         var mux = NewDispatcher()
160         defer mux.Stop()
161         emptySubscriber(mux)
162         emptySubscriber(mux)
163         emptySubscriber(mux)
164
165         var wg sync.WaitGroup
166         poster := func() {
167                 for i := 0; i < b.N; i++ {
168                         mux.Post(testEvent(0))
169                 }
170                 wg.Done()
171         }
172         wg.Add(5)
173         for i := 0; i < 5; i++ {
174                 go poster()
175         }
176         wg.Wait()
177 }
178
179 // for comparison
180 func BenchmarkChanSend(b *testing.B) {
181         c := make(chan interface{})
182         closed := make(chan struct{})
183         go func() {
184                 for range c {
185                 }
186         }()
187
188         for i := 0; i < b.N; i++ {
189                 select {
190                 case c <- i:
191                 case <-closed:
192                 }
193         }
194 }