10 "github.com/stretchr/testify/assert"
11 "github.com/stretchr/testify/require"
13 "github.com/tendermint/tmlibs/log"
14 "github.com/tendermint/tmlibs/pubsub"
15 "github.com/tendermint/tmlibs/pubsub/query"
19 clientID = "test-client"
22 func TestSubscribe(t *testing.T) {
23 s := pubsub.NewServer()
24 s.SetLogger(log.TestingLogger())
28 ctx := context.Background()
29 ch := make(chan interface{}, 1)
30 err := s.Subscribe(ctx, clientID, query.Empty{}, ch)
31 require.NoError(t, err)
32 err = s.Publish(ctx, "Ka-Zar")
33 require.NoError(t, err)
34 assertReceive(t, "Ka-Zar", ch)
36 err = s.Publish(ctx, "Quicksilver")
37 require.NoError(t, err)
38 assertReceive(t, "Quicksilver", ch)
41 func TestDifferentClients(t *testing.T) {
42 s := pubsub.NewServer()
43 s.SetLogger(log.TestingLogger())
47 ctx := context.Background()
48 ch1 := make(chan interface{}, 1)
49 err := s.Subscribe(ctx, "client-1", query.MustParse("tm.events.type='NewBlock'"), ch1)
50 require.NoError(t, err)
51 err = s.PublishWithTags(ctx, "Iceman", map[string]interface{}{"tm.events.type": "NewBlock"})
52 require.NoError(t, err)
53 assertReceive(t, "Iceman", ch1)
55 ch2 := make(chan interface{}, 1)
56 err = s.Subscribe(ctx, "client-2", query.MustParse("tm.events.type='NewBlock' AND abci.account.name='Igor'"), ch2)
57 require.NoError(t, err)
58 err = s.PublishWithTags(ctx, "Ultimo", map[string]interface{}{"tm.events.type": "NewBlock", "abci.account.name": "Igor"})
59 require.NoError(t, err)
60 assertReceive(t, "Ultimo", ch1)
61 assertReceive(t, "Ultimo", ch2)
63 ch3 := make(chan interface{}, 1)
64 err = s.Subscribe(ctx, "client-3", query.MustParse("tm.events.type='NewRoundStep' AND abci.account.name='Igor' AND abci.invoice.number = 10"), ch3)
65 require.NoError(t, err)
66 err = s.PublishWithTags(ctx, "Valeria Richards", map[string]interface{}{"tm.events.type": "NewRoundStep"})
67 require.NoError(t, err)
68 assert.Zero(t, len(ch3))
71 func TestClientSubscribesTwice(t *testing.T) {
72 s := pubsub.NewServer()
73 s.SetLogger(log.TestingLogger())
77 ctx := context.Background()
78 q := query.MustParse("tm.events.type='NewBlock'")
80 ch1 := make(chan interface{}, 1)
81 err := s.Subscribe(ctx, clientID, q, ch1)
82 require.NoError(t, err)
83 err = s.PublishWithTags(ctx, "Goblin Queen", map[string]interface{}{"tm.events.type": "NewBlock"})
84 require.NoError(t, err)
85 assertReceive(t, "Goblin Queen", ch1)
87 ch2 := make(chan interface{}, 1)
88 err = s.Subscribe(ctx, clientID, q, ch2)
89 require.NoError(t, err)
94 err = s.PublishWithTags(ctx, "Spider-Man", map[string]interface{}{"tm.events.type": "NewBlock"})
95 require.NoError(t, err)
96 assertReceive(t, "Spider-Man", ch2)
99 func TestUnsubscribe(t *testing.T) {
100 s := pubsub.NewServer()
101 s.SetLogger(log.TestingLogger())
105 ctx := context.Background()
106 ch := make(chan interface{})
107 err := s.Subscribe(ctx, clientID, query.Empty{}, ch)
108 require.NoError(t, err)
109 err = s.Unsubscribe(ctx, clientID, query.Empty{})
110 require.NoError(t, err)
112 err = s.Publish(ctx, "Nick Fury")
113 require.NoError(t, err)
114 assert.Zero(t, len(ch), "Should not receive anything after Unsubscribe")
120 func TestUnsubscribeAll(t *testing.T) {
121 s := pubsub.NewServer()
122 s.SetLogger(log.TestingLogger())
126 ctx := context.Background()
127 ch1, ch2 := make(chan interface{}, 1), make(chan interface{}, 1)
128 err := s.Subscribe(ctx, clientID, query.Empty{}, ch1)
129 require.NoError(t, err)
130 err = s.Subscribe(ctx, clientID, query.Empty{}, ch2)
131 require.NoError(t, err)
133 err = s.UnsubscribeAll(ctx, clientID)
134 require.NoError(t, err)
136 err = s.Publish(ctx, "Nick Fury")
137 require.NoError(t, err)
138 assert.Zero(t, len(ch1), "Should not receive anything after UnsubscribeAll")
139 assert.Zero(t, len(ch2), "Should not receive anything after UnsubscribeAll")
147 func TestBufferCapacity(t *testing.T) {
148 s := pubsub.NewServer(pubsub.BufferCapacity(2))
149 s.SetLogger(log.TestingLogger())
151 assert.Equal(t, 2, s.BufferCapacity())
153 ctx := context.Background()
154 err := s.Publish(ctx, "Nighthawk")
155 require.NoError(t, err)
156 err = s.Publish(ctx, "Sage")
157 require.NoError(t, err)
159 ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
161 err = s.Publish(ctx, "Ironclad")
162 if assert.Error(t, err) {
163 assert.Equal(t, context.DeadlineExceeded, err)
167 func Benchmark10Clients(b *testing.B) { benchmarkNClients(10, b) }
168 func Benchmark100Clients(b *testing.B) { benchmarkNClients(100, b) }
169 func Benchmark1000Clients(b *testing.B) { benchmarkNClients(1000, b) }
171 func Benchmark10ClientsOneQuery(b *testing.B) { benchmarkNClientsOneQuery(10, b) }
172 func Benchmark100ClientsOneQuery(b *testing.B) { benchmarkNClientsOneQuery(100, b) }
173 func Benchmark1000ClientsOneQuery(b *testing.B) { benchmarkNClientsOneQuery(1000, b) }
175 func benchmarkNClients(n int, b *testing.B) {
176 s := pubsub.NewServer()
180 ctx := context.Background()
181 for i := 0; i < n; i++ {
182 ch := make(chan interface{})
187 s.Subscribe(ctx, clientID, query.MustParse(fmt.Sprintf("abci.Account.Owner = 'Ivan' AND abci.Invoices.Number = %d", i)), ch)
192 for i := 0; i < b.N; i++ {
193 s.PublishWithTags(ctx, "Gamora", map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": i})
197 func benchmarkNClientsOneQuery(n int, b *testing.B) {
198 s := pubsub.NewServer()
202 ctx := context.Background()
203 q := query.MustParse("abci.Account.Owner = 'Ivan' AND abci.Invoices.Number = 1")
204 for i := 0; i < n; i++ {
205 ch := make(chan interface{})
210 s.Subscribe(ctx, clientID, q, ch)
215 for i := 0; i < b.N; i++ {
216 s.PublishWithTags(ctx, "Gamora", map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": 1})
220 ///////////////////////////////////////////////////////////////////////////////
222 ///////////////////////////////////////////////////////////////////////////////
224 func assertReceive(t *testing.T, expected interface{}, ch <-chan interface{}, msgAndArgs ...interface{}) {
228 assert.Equal(t, expected, actual, msgAndArgs...)
230 case <-time.After(1 * time.Second):
231 t.Errorf("Expected to receive %v from the channel, got nothing after 1s", expected)