1 // Package pubsub implements a pub-sub model with a single publisher (Server)
2 // and multiple subscribers (clients).
4 // Though you can have multiple publishers by sharing a pointer to a server or
5 // by giving the same channel to each publisher and publishing messages from
6 // that channel (fan-in).
8 // Clients subscribe for messages, which could be of any type, using a query.
9 // When some message is published, we match it with all queries. If there is a
10 // match, this message will be pushed to all clients, subscribed to that query.
11 // See query subpackage for our implementation.
17 cmn "github.com/tendermint/tmlibs/common"
35 tags map[string]interface{}
38 // Query defines an interface for a query to be used for subscribing.
39 type Query interface {
40 Matches(tags map[string]interface{}) bool
43 // Server allows clients to subscribe/unsubscribe for messages, publishing
44 // messages with or without tags, and manages internal state.
52 // Option sets a parameter for the server.
53 type Option func(*Server)
55 // NewServer returns a new server. See the commentary on the Option functions
56 // for a detailed description of how to configure buffering. If no options are
57 // provided, the resulting server's queue is unbuffered.
58 func NewServer(options ...Option) *Server {
60 s.BaseService = *cmn.NewBaseService(nil, "PubSub", s)
62 for _, option := range options {
66 // if BufferCapacity option was not set, the channel is unbuffered
67 s.cmds = make(chan cmd, s.cmdsCap)
72 // BufferCapacity allows you to specify capacity for the internal server's
73 // queue. Since the server, given Y subscribers, could only process X messages,
74 // this option could be used to survive spikes (e.g. high amount of
75 // transactions during peak hours).
76 func BufferCapacity(cap int) Option {
77 return func(s *Server) {
84 // BufferCapacity returns capacity of the internal server's queue.
85 func (s Server) BufferCapacity() int {
89 // Subscribe creates a subscription for the given client. It accepts a channel
90 // on which messages matching the given query can be received. If the
91 // subscription already exists, the old channel will be closed. An error will
92 // be returned to the caller if the context is canceled.
93 func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, out chan<- interface{}) error {
95 case s.cmds <- cmd{op: sub, clientID: clientID, query: query, ch: out}:
102 // Unsubscribe removes the subscription on the given query. An error will be
103 // returned to the caller if the context is canceled.
104 func (s *Server) Unsubscribe(ctx context.Context, clientID string, query Query) error {
106 case s.cmds <- cmd{op: unsub, clientID: clientID, query: query}:
113 // UnsubscribeAll removes all client subscriptions. An error will be returned
114 // to the caller if the context is canceled.
115 func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error {
117 case s.cmds <- cmd{op: unsub, clientID: clientID}:
124 // Publish publishes the given message. An error will be returned to the caller
125 // if the context is canceled.
126 func (s *Server) Publish(ctx context.Context, msg interface{}) error {
127 return s.PublishWithTags(ctx, msg, make(map[string]interface{}))
130 // PublishWithTags publishes the given message with the set of tags. The set is
131 // matched with clients queries. If there is a match, the message is sent to
133 func (s *Server) PublishWithTags(ctx context.Context, msg interface{}, tags map[string]interface{}) error {
135 case s.cmds <- cmd{op: pub, msg: msg, tags: tags}:
142 // OnStop implements Service.OnStop by shutting down the server.
143 func (s *Server) OnStop() {
144 s.cmds <- cmd{op: shutdown}
147 // NOTE: not goroutine safe
149 // query -> client -> ch
150 queries map[Query]map[string]chan<- interface{}
151 // client -> query -> struct{}
152 clients map[string]map[Query]struct{}
155 // OnStart implements Service.OnStart by starting the server.
156 func (s *Server) OnStart() error {
158 queries: make(map[Query]map[string]chan<- interface{}),
159 clients: make(map[string]map[Query]struct{}),
164 func (s *Server) loop(state state) {
166 for cmd := range s.cmds {
169 if cmd.query != nil {
170 state.remove(cmd.clientID, cmd.query)
172 state.removeAll(cmd.clientID)
175 for clientID := range state.clients {
176 state.removeAll(clientID)
180 state.add(cmd.clientID, cmd.query, cmd.ch)
182 state.send(cmd.msg, cmd.tags)
187 func (state *state) add(clientID string, q Query, ch chan<- interface{}) {
188 // add query if needed
189 if clientToChannelMap, ok := state.queries[q]; !ok {
190 state.queries[q] = make(map[string]chan<- interface{})
192 // check if already subscribed
193 if oldCh, ok := clientToChannelMap[clientID]; ok {
198 // create subscription
199 state.queries[q][clientID] = ch
201 // add client if needed
202 if _, ok := state.clients[clientID]; !ok {
203 state.clients[clientID] = make(map[Query]struct{})
205 state.clients[clientID][q] = struct{}{}
208 func (state *state) remove(clientID string, q Query) {
209 clientToChannelMap, ok := state.queries[q]
214 ch, ok := clientToChannelMap[clientID]
218 delete(state.clients[clientID], q)
220 // if it not subscribed to anything else, remove the client
221 if len(state.clients[clientID]) == 0 {
222 delete(state.clients, clientID)
225 delete(state.queries[q], clientID)
229 func (state *state) removeAll(clientID string) {
230 queryMap, ok := state.clients[clientID]
235 for q := range queryMap {
236 ch := state.queries[q][clientID]
239 delete(state.queries[q], clientID)
242 delete(state.clients, clientID)
245 func (state *state) send(msg interface{}, tags map[string]interface{}) {
246 for q, clientToChannelMap := range state.queries {
248 for _, ch := range clientToChannelMap {