OSDN Git Service

add log (#373)
[bytom/vapor.git] / event / event.go
1 // Package event deals with subscriptions to real-time events.
2 package event
3
4 import (
5         "errors"
6         "reflect"
7         "sync"
8         "time"
9
10         log "github.com/sirupsen/logrus"
11
12         "github.com/vapor/protocol/bc"
13         "github.com/vapor/protocol/bc/types"
14 )
15
16 const (
17         logModule      = "event"
18         maxEventChSize = 65536
19 )
20
21 var (
22         // ErrMuxClosed is returned when Posting on a closed TypeMux.
23         ErrMuxClosed = errors.New("event: mux closed")
24         //ErrDuplicateSubscribe is returned when subscribe duplicate type
25         ErrDuplicateSubscribe = errors.New("event: subscribe duplicate type")
26 )
27
28 type NewProposedBlockEvent struct{ Block types.Block }
29
30 type BlockSignatureEvent struct {
31         BlockHash bc.Hash
32         Signature []byte
33         XPub      []byte
34 }
35
36 // TypeMuxEvent is a time-tagged notification pushed to subscribers.
37 type TypeMuxEvent struct {
38         Time time.Time
39         Data interface{}
40 }
41
42 // A Dispatcher dispatches events to registered receivers. Receivers can be
43 // registered to handle events of certain type. Any operation
44 // called after mux is stopped will return ErrMuxClosed.
45 //
46 // The zero value is ready to use.
47 type Dispatcher struct {
48         mutex   sync.RWMutex
49         subm    map[reflect.Type][]*Subscription
50         stopped bool
51 }
52
53 func NewDispatcher() *Dispatcher {
54         return &Dispatcher{
55                 subm: make(map[reflect.Type][]*Subscription),
56         }
57 }
58
59 // Subscribe creates a subscription for events of the given types. The
60 // subscription's channel is closed when it is unsubscribed
61 // or the mux is closed.
62 func (d *Dispatcher) Subscribe(types ...interface{}) (*Subscription, error) {
63         sub := newSubscription(d)
64         d.mutex.Lock()
65         defer d.mutex.Unlock()
66         if d.stopped {
67                 // set the status to closed so that calling Unsubscribe after this
68                 // call will short circuit.
69                 sub.closed = true
70                 close(sub.postC)
71                 return sub, nil
72         }
73
74         for _, t := range types {
75                 rtyp := reflect.TypeOf(t)
76                 oldsubs := d.subm[rtyp]
77                 if find(oldsubs, sub) != -1 {
78                         log.WithFields(log.Fields{"module": logModule}).Errorf("duplicate type %s in Subscribe", rtyp)
79                         return nil, ErrDuplicateSubscribe
80                 }
81
82                 subs := make([]*Subscription, len(oldsubs)+1)
83                 copy(subs, oldsubs)
84                 subs[len(oldsubs)] = sub
85                 d.subm[rtyp] = subs
86         }
87         return sub, nil
88 }
89
90 // Post sends an event to all receivers registered for the given type.
91 // It returns ErrMuxClosed if the mux has been stopped.
92 func (d *Dispatcher) Post(ev interface{}) error {
93         event := &TypeMuxEvent{
94                 Time: time.Now(),
95                 Data: ev,
96         }
97         rtyp := reflect.TypeOf(ev)
98         d.mutex.RLock()
99         if d.stopped {
100                 d.mutex.RUnlock()
101                 return ErrMuxClosed
102         }
103
104         subs := d.subm[rtyp]
105         d.mutex.RUnlock()
106         for _, sub := range subs {
107                 sub.deliver(event)
108         }
109         return nil
110 }
111
112 // Stop closes a mux. The mux can no longer be used.
113 // Future Post calls will fail with ErrMuxClosed.
114 // Stop blocks until all current deliveries have finished.
115 func (d *Dispatcher) Stop() {
116         d.mutex.Lock()
117         for _, subs := range d.subm {
118                 for _, sub := range subs {
119                         sub.closewait()
120                 }
121         }
122         d.subm = nil
123         d.stopped = true
124         d.mutex.Unlock()
125 }
126
127 func (d *Dispatcher) del(s *Subscription) {
128         d.mutex.Lock()
129         for typ, subs := range d.subm {
130                 if pos := find(subs, s); pos >= 0 {
131                         if len(subs) == 1 {
132                                 delete(d.subm, typ)
133                         } else {
134                                 d.subm[typ] = posdelete(subs, pos)
135                         }
136                 }
137         }
138         d.mutex.Unlock()
139 }
140
141 func find(slice []*Subscription, item *Subscription) int {
142         for i, v := range slice {
143                 if v == item {
144                         return i
145                 }
146         }
147         return -1
148 }
149
150 func posdelete(slice []*Subscription, pos int) []*Subscription {
151         news := make([]*Subscription, len(slice)-1)
152         copy(news[:pos], slice[:pos])
153         copy(news[pos:], slice[pos+1:])
154         return news
155 }
156
157 // Subscription is a subscription established through TypeMux.
158 type Subscription struct {
159         dispatcher *Dispatcher
160         created    time.Time
161         closeMu    sync.Mutex
162         closing    chan struct{}
163         closed     bool
164
165         // these two are the same channel. they are stored separately so
166         // postC can be set to nil without affecting the return value of
167         // Chan.
168         postMu sync.RWMutex
169         readC  <-chan *TypeMuxEvent
170         postC  chan<- *TypeMuxEvent
171 }
172
173 func newSubscription(dispatcher *Dispatcher) *Subscription {
174         c := make(chan *TypeMuxEvent, maxEventChSize)
175         return &Subscription{
176                 dispatcher: dispatcher,
177                 created:    time.Now(),
178                 readC:      c,
179                 postC:      c,
180                 closing:    make(chan struct{}),
181         }
182 }
183
184 func (s *Subscription) Chan() <-chan *TypeMuxEvent {
185         return s.readC
186 }
187
188 func (s *Subscription) Unsubscribe() {
189         s.dispatcher.del(s)
190         s.closewait()
191 }
192
193 func (s *Subscription) Closed() bool {
194         s.closeMu.Lock()
195         defer s.closeMu.Unlock()
196         return s.closed
197 }
198
199 func (s *Subscription) closewait() {
200         s.closeMu.Lock()
201         defer s.closeMu.Unlock()
202         if s.closed {
203                 return
204         }
205         close(s.closing)
206         s.closed = true
207
208         s.postMu.Lock()
209         close(s.postC)
210         s.postC = nil
211         s.postMu.Unlock()
212 }
213
214 func (s *Subscription) deliver(event *TypeMuxEvent) {
215         // Short circuit delivery if stale event
216         if s.created.After(event.Time) {
217                 return
218         }
219         // Otherwise deliver the event
220         s.postMu.RLock()
221         defer s.postMu.RUnlock()
222
223         select {
224         case s.postC <- event:
225         case <-s.closing:
226         default:
227                 log.WithFields(log.Fields{"module": logModule}).Errorf("deliver event err unread event size %d", len(s.postC))
228         }
229 }