OSDN Git Service

new repo
[bytom/vapor.git] / vendor / github.com / tendermint / tmlibs / pubsub / pubsub.go
1 // Package pubsub implements a pub-sub model with a single publisher (Server)
2 // and multiple subscribers (clients).
3 //
4 // Though you can have multiple publishers by sharing a pointer to a server or
5 // by giving the same channel to each publisher and publishing messages from
6 // that channel (fan-in).
7 //
8 // Clients subscribe for messages, which could be of any type, using a query.
9 // When some message is published, we match it with all queries. If there is a
10 // match, this message will be pushed to all clients, subscribed to that query.
11 // See query subpackage for our implementation.
12 package pubsub
13
14 import (
15         "context"
16
17         cmn "github.com/tendermint/tmlibs/common"
18 )
19
20 type operation int
21
22 const (
23         sub operation = iota
24         pub
25         unsub
26         shutdown
27 )
28
29 type cmd struct {
30         op       operation
31         query    Query
32         ch       chan<- interface{}
33         clientID string
34         msg      interface{}
35         tags     map[string]interface{}
36 }
37
38 // Query defines an interface for a query to be used for subscribing.
39 type Query interface {
40         Matches(tags map[string]interface{}) bool
41 }
42
43 // Server allows clients to subscribe/unsubscribe for messages, publishing
44 // messages with or without tags, and manages internal state.
45 type Server struct {
46         cmn.BaseService
47
48         cmds    chan cmd
49         cmdsCap int
50 }
51
52 // Option sets a parameter for the server.
53 type Option func(*Server)
54
55 // NewServer returns a new server. See the commentary on the Option functions
56 // for a detailed description of how to configure buffering. If no options are
57 // provided, the resulting server's queue is unbuffered.
58 func NewServer(options ...Option) *Server {
59         s := &Server{}
60         s.BaseService = *cmn.NewBaseService(nil, "PubSub", s)
61
62         for _, option := range options {
63                 option(s)
64         }
65
66         // if BufferCapacity option was not set, the channel is unbuffered
67         s.cmds = make(chan cmd, s.cmdsCap)
68
69         return s
70 }
71
72 // BufferCapacity allows you to specify capacity for the internal server's
73 // queue. Since the server, given Y subscribers, could only process X messages,
74 // this option could be used to survive spikes (e.g. high amount of
75 // transactions during peak hours).
76 func BufferCapacity(cap int) Option {
77         return func(s *Server) {
78                 if cap > 0 {
79                         s.cmdsCap = cap
80                 }
81         }
82 }
83
84 // BufferCapacity returns capacity of the internal server's queue.
85 func (s Server) BufferCapacity() int {
86         return s.cmdsCap
87 }
88
89 // Subscribe creates a subscription for the given client. It accepts a channel
90 // on which messages matching the given query can be received. If the
91 // subscription already exists, the old channel will be closed. An error will
92 // be returned to the caller if the context is canceled.
93 func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, out chan<- interface{}) error {
94         select {
95         case s.cmds <- cmd{op: sub, clientID: clientID, query: query, ch: out}:
96                 return nil
97         case <-ctx.Done():
98                 return ctx.Err()
99         }
100 }
101
102 // Unsubscribe removes the subscription on the given query. An error will be
103 // returned to the caller if the context is canceled.
104 func (s *Server) Unsubscribe(ctx context.Context, clientID string, query Query) error {
105         select {
106         case s.cmds <- cmd{op: unsub, clientID: clientID, query: query}:
107                 return nil
108         case <-ctx.Done():
109                 return ctx.Err()
110         }
111 }
112
113 // UnsubscribeAll removes all client subscriptions. An error will be returned
114 // to the caller if the context is canceled.
115 func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error {
116         select {
117         case s.cmds <- cmd{op: unsub, clientID: clientID}:
118                 return nil
119         case <-ctx.Done():
120                 return ctx.Err()
121         }
122 }
123
124 // Publish publishes the given message. An error will be returned to the caller
125 // if the context is canceled.
126 func (s *Server) Publish(ctx context.Context, msg interface{}) error {
127         return s.PublishWithTags(ctx, msg, make(map[string]interface{}))
128 }
129
130 // PublishWithTags publishes the given message with the set of tags. The set is
131 // matched with clients queries. If there is a match, the message is sent to
132 // the client.
133 func (s *Server) PublishWithTags(ctx context.Context, msg interface{}, tags map[string]interface{}) error {
134         select {
135         case s.cmds <- cmd{op: pub, msg: msg, tags: tags}:
136                 return nil
137         case <-ctx.Done():
138                 return ctx.Err()
139         }
140 }
141
142 // OnStop implements Service.OnStop by shutting down the server.
143 func (s *Server) OnStop() {
144         s.cmds <- cmd{op: shutdown}
145 }
146
147 // NOTE: not goroutine safe
148 type state struct {
149         // query -> client -> ch
150         queries map[Query]map[string]chan<- interface{}
151         // client -> query -> struct{}
152         clients map[string]map[Query]struct{}
153 }
154
155 // OnStart implements Service.OnStart by starting the server.
156 func (s *Server) OnStart() error {
157         go s.loop(state{
158                 queries: make(map[Query]map[string]chan<- interface{}),
159                 clients: make(map[string]map[Query]struct{}),
160         })
161         return nil
162 }
163
164 func (s *Server) loop(state state) {
165 loop:
166         for cmd := range s.cmds {
167                 switch cmd.op {
168                 case unsub:
169                         if cmd.query != nil {
170                                 state.remove(cmd.clientID, cmd.query)
171                         } else {
172                                 state.removeAll(cmd.clientID)
173                         }
174                 case shutdown:
175                         for clientID := range state.clients {
176                                 state.removeAll(clientID)
177                         }
178                         break loop
179                 case sub:
180                         state.add(cmd.clientID, cmd.query, cmd.ch)
181                 case pub:
182                         state.send(cmd.msg, cmd.tags)
183                 }
184         }
185 }
186
187 func (state *state) add(clientID string, q Query, ch chan<- interface{}) {
188         // add query if needed
189         if clientToChannelMap, ok := state.queries[q]; !ok {
190                 state.queries[q] = make(map[string]chan<- interface{})
191         } else {
192                 // check if already subscribed
193                 if oldCh, ok := clientToChannelMap[clientID]; ok {
194                         close(oldCh)
195                 }
196         }
197
198         // create subscription
199         state.queries[q][clientID] = ch
200
201         // add client if needed
202         if _, ok := state.clients[clientID]; !ok {
203                 state.clients[clientID] = make(map[Query]struct{})
204         }
205         state.clients[clientID][q] = struct{}{}
206 }
207
208 func (state *state) remove(clientID string, q Query) {
209         clientToChannelMap, ok := state.queries[q]
210         if !ok {
211                 return
212         }
213
214         ch, ok := clientToChannelMap[clientID]
215         if ok {
216                 close(ch)
217
218                 delete(state.clients[clientID], q)
219
220                 // if it not subscribed to anything else, remove the client
221                 if len(state.clients[clientID]) == 0 {
222                         delete(state.clients, clientID)
223                 }
224
225                 delete(state.queries[q], clientID)
226         }
227 }
228
229 func (state *state) removeAll(clientID string) {
230         queryMap, ok := state.clients[clientID]
231         if !ok {
232                 return
233         }
234
235         for q := range queryMap {
236                 ch := state.queries[q][clientID]
237                 close(ch)
238
239                 delete(state.queries[q], clientID)
240         }
241
242         delete(state.clients, clientID)
243 }
244
245 func (state *state) send(msg interface{}, tags map[string]interface{}) {
246         for q, clientToChannelMap := range state.queries {
247                 if q.Matches(tags) {
248                         for _, ch := range clientToChannelMap {
249                                 ch <- msg
250                         }
251                 }
252         }
253 }