OSDN Git Service

new repo
[bytom/vapor.git] / vendor / github.com / tendermint / tmlibs / pubsub / pubsub_test.go
1 package pubsub_test
2
3 import (
4         "context"
5         "fmt"
6         "runtime/debug"
7         "testing"
8         "time"
9
10         "github.com/stretchr/testify/assert"
11         "github.com/stretchr/testify/require"
12
13         "github.com/tendermint/tmlibs/log"
14         "github.com/tendermint/tmlibs/pubsub"
15         "github.com/tendermint/tmlibs/pubsub/query"
16 )
17
18 const (
19         clientID = "test-client"
20 )
21
22 func TestSubscribe(t *testing.T) {
23         s := pubsub.NewServer()
24         s.SetLogger(log.TestingLogger())
25         s.Start()
26         defer s.Stop()
27
28         ctx := context.Background()
29         ch := make(chan interface{}, 1)
30         err := s.Subscribe(ctx, clientID, query.Empty{}, ch)
31         require.NoError(t, err)
32         err = s.Publish(ctx, "Ka-Zar")
33         require.NoError(t, err)
34         assertReceive(t, "Ka-Zar", ch)
35
36         err = s.Publish(ctx, "Quicksilver")
37         require.NoError(t, err)
38         assertReceive(t, "Quicksilver", ch)
39 }
40
41 func TestDifferentClients(t *testing.T) {
42         s := pubsub.NewServer()
43         s.SetLogger(log.TestingLogger())
44         s.Start()
45         defer s.Stop()
46
47         ctx := context.Background()
48         ch1 := make(chan interface{}, 1)
49         err := s.Subscribe(ctx, "client-1", query.MustParse("tm.events.type='NewBlock'"), ch1)
50         require.NoError(t, err)
51         err = s.PublishWithTags(ctx, "Iceman", map[string]interface{}{"tm.events.type": "NewBlock"})
52         require.NoError(t, err)
53         assertReceive(t, "Iceman", ch1)
54
55         ch2 := make(chan interface{}, 1)
56         err = s.Subscribe(ctx, "client-2", query.MustParse("tm.events.type='NewBlock' AND abci.account.name='Igor'"), ch2)
57         require.NoError(t, err)
58         err = s.PublishWithTags(ctx, "Ultimo", map[string]interface{}{"tm.events.type": "NewBlock", "abci.account.name": "Igor"})
59         require.NoError(t, err)
60         assertReceive(t, "Ultimo", ch1)
61         assertReceive(t, "Ultimo", ch2)
62
63         ch3 := make(chan interface{}, 1)
64         err = s.Subscribe(ctx, "client-3", query.MustParse("tm.events.type='NewRoundStep' AND abci.account.name='Igor' AND abci.invoice.number = 10"), ch3)
65         require.NoError(t, err)
66         err = s.PublishWithTags(ctx, "Valeria Richards", map[string]interface{}{"tm.events.type": "NewRoundStep"})
67         require.NoError(t, err)
68         assert.Zero(t, len(ch3))
69 }
70
71 func TestClientSubscribesTwice(t *testing.T) {
72         s := pubsub.NewServer()
73         s.SetLogger(log.TestingLogger())
74         s.Start()
75         defer s.Stop()
76
77         ctx := context.Background()
78         q := query.MustParse("tm.events.type='NewBlock'")
79
80         ch1 := make(chan interface{}, 1)
81         err := s.Subscribe(ctx, clientID, q, ch1)
82         require.NoError(t, err)
83         err = s.PublishWithTags(ctx, "Goblin Queen", map[string]interface{}{"tm.events.type": "NewBlock"})
84         require.NoError(t, err)
85         assertReceive(t, "Goblin Queen", ch1)
86
87         ch2 := make(chan interface{}, 1)
88         err = s.Subscribe(ctx, clientID, q, ch2)
89         require.NoError(t, err)
90
91         _, ok := <-ch1
92         assert.False(t, ok)
93
94         err = s.PublishWithTags(ctx, "Spider-Man", map[string]interface{}{"tm.events.type": "NewBlock"})
95         require.NoError(t, err)
96         assertReceive(t, "Spider-Man", ch2)
97 }
98
99 func TestUnsubscribe(t *testing.T) {
100         s := pubsub.NewServer()
101         s.SetLogger(log.TestingLogger())
102         s.Start()
103         defer s.Stop()
104
105         ctx := context.Background()
106         ch := make(chan interface{})
107         err := s.Subscribe(ctx, clientID, query.Empty{}, ch)
108         require.NoError(t, err)
109         err = s.Unsubscribe(ctx, clientID, query.Empty{})
110         require.NoError(t, err)
111
112         err = s.Publish(ctx, "Nick Fury")
113         require.NoError(t, err)
114         assert.Zero(t, len(ch), "Should not receive anything after Unsubscribe")
115
116         _, ok := <-ch
117         assert.False(t, ok)
118 }
119
120 func TestUnsubscribeAll(t *testing.T) {
121         s := pubsub.NewServer()
122         s.SetLogger(log.TestingLogger())
123         s.Start()
124         defer s.Stop()
125
126         ctx := context.Background()
127         ch1, ch2 := make(chan interface{}, 1), make(chan interface{}, 1)
128         err := s.Subscribe(ctx, clientID, query.Empty{}, ch1)
129         require.NoError(t, err)
130         err = s.Subscribe(ctx, clientID, query.Empty{}, ch2)
131         require.NoError(t, err)
132
133         err = s.UnsubscribeAll(ctx, clientID)
134         require.NoError(t, err)
135
136         err = s.Publish(ctx, "Nick Fury")
137         require.NoError(t, err)
138         assert.Zero(t, len(ch1), "Should not receive anything after UnsubscribeAll")
139         assert.Zero(t, len(ch2), "Should not receive anything after UnsubscribeAll")
140
141         _, ok := <-ch1
142         assert.False(t, ok)
143         _, ok = <-ch2
144         assert.False(t, ok)
145 }
146
147 func TestBufferCapacity(t *testing.T) {
148         s := pubsub.NewServer(pubsub.BufferCapacity(2))
149         s.SetLogger(log.TestingLogger())
150
151         assert.Equal(t, 2, s.BufferCapacity())
152
153         ctx := context.Background()
154         err := s.Publish(ctx, "Nighthawk")
155         require.NoError(t, err)
156         err = s.Publish(ctx, "Sage")
157         require.NoError(t, err)
158
159         ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
160         defer cancel()
161         err = s.Publish(ctx, "Ironclad")
162         if assert.Error(t, err) {
163                 assert.Equal(t, context.DeadlineExceeded, err)
164         }
165 }
166
167 func Benchmark10Clients(b *testing.B)   { benchmarkNClients(10, b) }
168 func Benchmark100Clients(b *testing.B)  { benchmarkNClients(100, b) }
169 func Benchmark1000Clients(b *testing.B) { benchmarkNClients(1000, b) }
170
171 func Benchmark10ClientsOneQuery(b *testing.B)   { benchmarkNClientsOneQuery(10, b) }
172 func Benchmark100ClientsOneQuery(b *testing.B)  { benchmarkNClientsOneQuery(100, b) }
173 func Benchmark1000ClientsOneQuery(b *testing.B) { benchmarkNClientsOneQuery(1000, b) }
174
175 func benchmarkNClients(n int, b *testing.B) {
176         s := pubsub.NewServer()
177         s.Start()
178         defer s.Stop()
179
180         ctx := context.Background()
181         for i := 0; i < n; i++ {
182                 ch := make(chan interface{})
183                 go func() {
184                         for range ch {
185                         }
186                 }()
187                 s.Subscribe(ctx, clientID, query.MustParse(fmt.Sprintf("abci.Account.Owner = 'Ivan' AND abci.Invoices.Number = %d", i)), ch)
188         }
189
190         b.ReportAllocs()
191         b.ResetTimer()
192         for i := 0; i < b.N; i++ {
193                 s.PublishWithTags(ctx, "Gamora", map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": i})
194         }
195 }
196
197 func benchmarkNClientsOneQuery(n int, b *testing.B) {
198         s := pubsub.NewServer()
199         s.Start()
200         defer s.Stop()
201
202         ctx := context.Background()
203         q := query.MustParse("abci.Account.Owner = 'Ivan' AND abci.Invoices.Number = 1")
204         for i := 0; i < n; i++ {
205                 ch := make(chan interface{})
206                 go func() {
207                         for range ch {
208                         }
209                 }()
210                 s.Subscribe(ctx, clientID, q, ch)
211         }
212
213         b.ReportAllocs()
214         b.ResetTimer()
215         for i := 0; i < b.N; i++ {
216                 s.PublishWithTags(ctx, "Gamora", map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": 1})
217         }
218 }
219
220 ///////////////////////////////////////////////////////////////////////////////
221 /// HELPERS
222 ///////////////////////////////////////////////////////////////////////////////
223
224 func assertReceive(t *testing.T, expected interface{}, ch <-chan interface{}, msgAndArgs ...interface{}) {
225         select {
226         case actual := <-ch:
227                 if actual != nil {
228                         assert.Equal(t, expected, actual, msgAndArgs...)
229                 }
230         case <-time.After(1 * time.Second):
231                 t.Errorf("Expected to receive %v from the channel, got nothing after 1s", expected)
232                 debug.PrintStack()
233         }
234 }