OSDN Git Service

1e6bd992bcd153e97f9d9c7ff18313f5c9769fc8
[bytom/vapor.git] / event / event.go
1 // Package event deals with subscriptions to real-time events.
2 package event
3
4 import (
5         "errors"
6         "reflect"
7         "sync"
8         "time"
9
10         log "github.com/sirupsen/logrus"
11
12         "github.com/vapor/protocol/bc/types"
13 )
14
15 const (
16         logModule      = "event"
17         maxEventChSize = 65536
18 )
19
20 var (
21         // ErrMuxClosed is returned when Posting on a closed TypeMux.
22         ErrMuxClosed = errors.New("event: mux closed")
23         //ErrDuplicateSubscribe is returned when subscribe duplicate type
24         ErrDuplicateSubscribe = errors.New("event: subscribe duplicate type")
25 )
26
27 type NewMinedBlockEvent struct{ Block types.Block }
28
29 // TypeMuxEvent is a time-tagged notification pushed to subscribers.
30 type TypeMuxEvent struct {
31         Time time.Time
32         Data interface{}
33 }
34
35 // A Dispatcher dispatches events to registered receivers. Receivers can be
36 // registered to handle events of certain type. Any operation
37 // called after mux is stopped will return ErrMuxClosed.
38 //
39 // The zero value is ready to use.
40 type Dispatcher struct {
41         mutex   sync.RWMutex
42         subm    map[reflect.Type][]*Subscription
43         stopped bool
44 }
45
46 func NewDispatcher() *Dispatcher {
47         return &Dispatcher{
48                 subm: make(map[reflect.Type][]*Subscription),
49         }
50 }
51
52 // Subscribe creates a subscription for events of the given types. The
53 // subscription's channel is closed when it is unsubscribed
54 // or the mux is closed.
55 func (d *Dispatcher) Subscribe(types ...interface{}) (*Subscription, error) {
56         sub := newSubscription(d)
57         d.mutex.Lock()
58         defer d.mutex.Unlock()
59         if d.stopped {
60                 // set the status to closed so that calling Unsubscribe after this
61                 // call will short circuit.
62                 sub.closed = true
63                 close(sub.postC)
64                 return sub, nil
65         }
66
67         for _, t := range types {
68                 rtyp := reflect.TypeOf(t)
69                 oldsubs := d.subm[rtyp]
70                 if find(oldsubs, sub) != -1 {
71                         log.WithFields(log.Fields{"module": logModule}).Errorf("duplicate type %s in Subscribe", rtyp)
72                         return nil, ErrDuplicateSubscribe
73                 }
74
75                 subs := make([]*Subscription, len(oldsubs)+1)
76                 copy(subs, oldsubs)
77                 subs[len(oldsubs)] = sub
78                 d.subm[rtyp] = subs
79         }
80         return sub, nil
81 }
82
83 // Post sends an event to all receivers registered for the given type.
84 // It returns ErrMuxClosed if the mux has been stopped.
85 func (d *Dispatcher) Post(ev interface{}) error {
86         event := &TypeMuxEvent{
87                 Time: time.Now(),
88                 Data: ev,
89         }
90         rtyp := reflect.TypeOf(ev)
91         d.mutex.RLock()
92         if d.stopped {
93                 d.mutex.RUnlock()
94                 return ErrMuxClosed
95         }
96
97         subs := d.subm[rtyp]
98         d.mutex.RUnlock()
99         for _, sub := range subs {
100                 sub.deliver(event)
101         }
102         return nil
103 }
104
105 // Stop closes a mux. The mux can no longer be used.
106 // Future Post calls will fail with ErrMuxClosed.
107 // Stop blocks until all current deliveries have finished.
108 func (d *Dispatcher) Stop() {
109         d.mutex.Lock()
110         for _, subs := range d.subm {
111                 for _, sub := range subs {
112                         sub.closewait()
113                 }
114         }
115         d.subm = nil
116         d.stopped = true
117         d.mutex.Unlock()
118 }
119
120 func (d *Dispatcher) del(s *Subscription) {
121         d.mutex.Lock()
122         for typ, subs := range d.subm {
123                 if pos := find(subs, s); pos >= 0 {
124                         if len(subs) == 1 {
125                                 delete(d.subm, typ)
126                         } else {
127                                 d.subm[typ] = posdelete(subs, pos)
128                         }
129                 }
130         }
131         d.mutex.Unlock()
132 }
133
134 func find(slice []*Subscription, item *Subscription) int {
135         for i, v := range slice {
136                 if v == item {
137                         return i
138                 }
139         }
140         return -1
141 }
142
143 func posdelete(slice []*Subscription, pos int) []*Subscription {
144         news := make([]*Subscription, len(slice)-1)
145         copy(news[:pos], slice[:pos])
146         copy(news[pos:], slice[pos+1:])
147         return news
148 }
149
150 // Subscription is a subscription established through TypeMux.
151 type Subscription struct {
152         dispatcher *Dispatcher
153         created    time.Time
154         closeMu    sync.Mutex
155         closing    chan struct{}
156         closed     bool
157
158         // these two are the same channel. they are stored separately so
159         // postC can be set to nil without affecting the return value of
160         // Chan.
161         postMu sync.RWMutex
162         readC  <-chan *TypeMuxEvent
163         postC  chan<- *TypeMuxEvent
164 }
165
166 func newSubscription(dispatcher *Dispatcher) *Subscription {
167         c := make(chan *TypeMuxEvent, maxEventChSize)
168         return &Subscription{
169                 dispatcher: dispatcher,
170                 created:    time.Now(),
171                 readC:      c,
172                 postC:      c,
173                 closing:    make(chan struct{}),
174         }
175 }
176
177 func (s *Subscription) Chan() <-chan *TypeMuxEvent {
178         return s.readC
179 }
180
181 func (s *Subscription) Unsubscribe() {
182         s.dispatcher.del(s)
183         s.closewait()
184 }
185
186 func (s *Subscription) Closed() bool {
187         s.closeMu.Lock()
188         defer s.closeMu.Unlock()
189         return s.closed
190 }
191
192 func (s *Subscription) closewait() {
193         s.closeMu.Lock()
194         defer s.closeMu.Unlock()
195         if s.closed {
196                 return
197         }
198         close(s.closing)
199         s.closed = true
200
201         s.postMu.Lock()
202         close(s.postC)
203         s.postC = nil
204         s.postMu.Unlock()
205 }
206
207 func (s *Subscription) deliver(event *TypeMuxEvent) {
208         // Short circuit delivery if stale event
209         if s.created.After(event.Time) {
210                 return
211         }
212         // Otherwise deliver the event
213         s.postMu.RLock()
214         defer s.postMu.RUnlock()
215
216         select {
217         case s.postC <- event:
218         case <-s.closing:
219         default:
220                 log.WithFields(log.Fields{"module": logModule}).Errorf("deliver event err unread event size %d", len(s.postC))
221         }
222 }