OSDN Git Service

77bbb9f6e9e6d278af4ae2103bb39432975b96cf
[bytom/vapor.git] / event / event.go
1 // Package event deals with subscriptions to real-time events.
2 package event
3
4 import (
5         "errors"
6         "reflect"
7         "sync"
8         "time"
9
10         log "github.com/sirupsen/logrus"
11
12         "github.com/vapor/protocol/bc"
13         "github.com/vapor/protocol/bc/types"
14 )
15
16 const (
17         logModule      = "event"
18         maxEventChSize = 65536
19 )
20
21 var (
22         // ErrMuxClosed is returned when Posting on a closed TypeMux.
23         ErrMuxClosed = errors.New("event: mux closed")
24         //ErrDuplicateSubscribe is returned when subscribe duplicate type
25         ErrDuplicateSubscribe = errors.New("event: subscribe duplicate type")
26 )
27
28 type NewMinedBlockEvent struct{ Block types.Block }
29
30 type BlockSignatureEvent struct { 
31         BlockHash bc.Hash
32         Signature []byte 
33 }
34
35 // TypeMuxEvent is a time-tagged notification pushed to subscribers.
36 type TypeMuxEvent struct {
37         Time time.Time
38         Data interface{}
39 }
40
41 // A Dispatcher dispatches events to registered receivers. Receivers can be
42 // registered to handle events of certain type. Any operation
43 // called after mux is stopped will return ErrMuxClosed.
44 //
45 // The zero value is ready to use.
46 type Dispatcher struct {
47         mutex   sync.RWMutex
48         subm    map[reflect.Type][]*Subscription
49         stopped bool
50 }
51
52 func NewDispatcher() *Dispatcher {
53         return &Dispatcher{
54                 subm: make(map[reflect.Type][]*Subscription),
55         }
56 }
57
58 // Subscribe creates a subscription for events of the given types. The
59 // subscription's channel is closed when it is unsubscribed
60 // or the mux is closed.
61 func (d *Dispatcher) Subscribe(types ...interface{}) (*Subscription, error) {
62         sub := newSubscription(d)
63         d.mutex.Lock()
64         defer d.mutex.Unlock()
65         if d.stopped {
66                 // set the status to closed so that calling Unsubscribe after this
67                 // call will short circuit.
68                 sub.closed = true
69                 close(sub.postC)
70                 return sub, nil
71         }
72
73         for _, t := range types {
74                 rtyp := reflect.TypeOf(t)
75                 oldsubs := d.subm[rtyp]
76                 if find(oldsubs, sub) != -1 {
77                         log.WithFields(log.Fields{"module": logModule}).Errorf("duplicate type %s in Subscribe", rtyp)
78                         return nil, ErrDuplicateSubscribe
79                 }
80
81                 subs := make([]*Subscription, len(oldsubs)+1)
82                 copy(subs, oldsubs)
83                 subs[len(oldsubs)] = sub
84                 d.subm[rtyp] = subs
85         }
86         return sub, nil
87 }
88
89 // Post sends an event to all receivers registered for the given type.
90 // It returns ErrMuxClosed if the mux has been stopped.
91 func (d *Dispatcher) Post(ev interface{}) error {
92         event := &TypeMuxEvent{
93                 Time: time.Now(),
94                 Data: ev,
95         }
96         rtyp := reflect.TypeOf(ev)
97         d.mutex.RLock()
98         if d.stopped {
99                 d.mutex.RUnlock()
100                 return ErrMuxClosed
101         }
102
103         subs := d.subm[rtyp]
104         d.mutex.RUnlock()
105         for _, sub := range subs {
106                 sub.deliver(event)
107         }
108         return nil
109 }
110
111 // Stop closes a mux. The mux can no longer be used.
112 // Future Post calls will fail with ErrMuxClosed.
113 // Stop blocks until all current deliveries have finished.
114 func (d *Dispatcher) Stop() {
115         d.mutex.Lock()
116         for _, subs := range d.subm {
117                 for _, sub := range subs {
118                         sub.closewait()
119                 }
120         }
121         d.subm = nil
122         d.stopped = true
123         d.mutex.Unlock()
124 }
125
126 func (d *Dispatcher) del(s *Subscription) {
127         d.mutex.Lock()
128         for typ, subs := range d.subm {
129                 if pos := find(subs, s); pos >= 0 {
130                         if len(subs) == 1 {
131                                 delete(d.subm, typ)
132                         } else {
133                                 d.subm[typ] = posdelete(subs, pos)
134                         }
135                 }
136         }
137         d.mutex.Unlock()
138 }
139
140 func find(slice []*Subscription, item *Subscription) int {
141         for i, v := range slice {
142                 if v == item {
143                         return i
144                 }
145         }
146         return -1
147 }
148
149 func posdelete(slice []*Subscription, pos int) []*Subscription {
150         news := make([]*Subscription, len(slice)-1)
151         copy(news[:pos], slice[:pos])
152         copy(news[pos:], slice[pos+1:])
153         return news
154 }
155
156 // Subscription is a subscription established through TypeMux.
157 type Subscription struct {
158         dispatcher *Dispatcher
159         created    time.Time
160         closeMu    sync.Mutex
161         closing    chan struct{}
162         closed     bool
163
164         // these two are the same channel. they are stored separately so
165         // postC can be set to nil without affecting the return value of
166         // Chan.
167         postMu sync.RWMutex
168         readC  <-chan *TypeMuxEvent
169         postC  chan<- *TypeMuxEvent
170 }
171
172 func newSubscription(dispatcher *Dispatcher) *Subscription {
173         c := make(chan *TypeMuxEvent, maxEventChSize)
174         return &Subscription{
175                 dispatcher: dispatcher,
176                 created:    time.Now(),
177                 readC:      c,
178                 postC:      c,
179                 closing:    make(chan struct{}),
180         }
181 }
182
183 func (s *Subscription) Chan() <-chan *TypeMuxEvent {
184         return s.readC
185 }
186
187 func (s *Subscription) Unsubscribe() {
188         s.dispatcher.del(s)
189         s.closewait()
190 }
191
192 func (s *Subscription) Closed() bool {
193         s.closeMu.Lock()
194         defer s.closeMu.Unlock()
195         return s.closed
196 }
197
198 func (s *Subscription) closewait() {
199         s.closeMu.Lock()
200         defer s.closeMu.Unlock()
201         if s.closed {
202                 return
203         }
204         close(s.closing)
205         s.closed = true
206
207         s.postMu.Lock()
208         close(s.postC)
209         s.postC = nil
210         s.postMu.Unlock()
211 }
212
213 func (s *Subscription) deliver(event *TypeMuxEvent) {
214         // Short circuit delivery if stale event
215         if s.created.After(event.Time) {
216                 return
217         }
218         // Otherwise deliver the event
219         s.postMu.RLock()
220         defer s.postMu.RUnlock()
221
222         select {
223         case s.postC <- event:
224         case <-s.closing:
225         default:
226                 log.WithFields(log.Fields{"module": logModule}).Errorf("deliver event err unread event size %d", len(s.postC))
227         }
228 }