OSDN Git Service

Add bbft consensus message
[bytom/vapor.git] / event / event.go
1 // Package event deals with subscriptions to real-time events.
2 package event
3
4 import (
5         "errors"
6         "reflect"
7         "sync"
8         "time"
9
10         log "github.com/sirupsen/logrus"
11
12         "github.com/vapor/protocol/bc"
13         "github.com/vapor/protocol/bc/types"
14 )
15
16 const (
17         logModule      = "event"
18         maxEventChSize = 65536
19 )
20
21 var (
22         // ErrMuxClosed is returned when Posting on a closed TypeMux.
23         ErrMuxClosed = errors.New("event: mux closed")
24         //ErrDuplicateSubscribe is returned when subscribe duplicate type
25         ErrDuplicateSubscribe = errors.New("event: subscribe duplicate type")
26 )
27
28 type NewMinedBlockEvent struct{ Block types.Block }
29
30 type BlockSignatureEvent struct {
31         BlockHash bc.Hash
32         Signature []byte
33 }
34
35 //NewProposedBlockEvent the proposed block event which needs to broadcast.
36 type NewProposedBlockEvent struct{ Block types.Block }
37
38 //BlockSignEvent the signature which got from net.
39 type BlockSignEvent struct {
40         PeerID  []byte
41         BlockID [32]byte
42         Height  uint64
43         Sign    []byte
44         Pubkey  []byte
45 }
46
47 //SendBlockSignEvent the signature event which needs to broadcast.
48 type SendBlockSignEvent struct {
49         BlockID [32]byte
50         Height  uint64
51         Sign    []byte
52         Pubkey  []byte
53 }
54
55 // TypeMuxEvent is a time-tagged notification pushed to subscribers.
56 type TypeMuxEvent struct {
57         Time time.Time
58         Data interface{}
59 }
60
61 // A Dispatcher dispatches events to registered receivers. Receivers can be
62 // registered to handle events of certain type. Any operation
63 // called after mux is stopped will return ErrMuxClosed.
64 //
65 // The zero value is ready to use.
66 type Dispatcher struct {
67         mutex   sync.RWMutex
68         subm    map[reflect.Type][]*Subscription
69         stopped bool
70 }
71
72 func NewDispatcher() *Dispatcher {
73         return &Dispatcher{
74                 subm: make(map[reflect.Type][]*Subscription),
75         }
76 }
77
78 // Subscribe creates a subscription for events of the given types. The
79 // subscription's channel is closed when it is unsubscribed
80 // or the mux is closed.
81 func (d *Dispatcher) Subscribe(types ...interface{}) (*Subscription, error) {
82         sub := newSubscription(d)
83         d.mutex.Lock()
84         defer d.mutex.Unlock()
85         if d.stopped {
86                 // set the status to closed so that calling Unsubscribe after this
87                 // call will short circuit.
88                 sub.closed = true
89                 close(sub.postC)
90                 return sub, nil
91         }
92
93         for _, t := range types {
94                 rtyp := reflect.TypeOf(t)
95                 oldsubs := d.subm[rtyp]
96                 if find(oldsubs, sub) != -1 {
97                         log.WithFields(log.Fields{"module": logModule}).Errorf("duplicate type %s in Subscribe", rtyp)
98                         return nil, ErrDuplicateSubscribe
99                 }
100
101                 subs := make([]*Subscription, len(oldsubs)+1)
102                 copy(subs, oldsubs)
103                 subs[len(oldsubs)] = sub
104                 d.subm[rtyp] = subs
105         }
106         return sub, nil
107 }
108
109 // Post sends an event to all receivers registered for the given type.
110 // It returns ErrMuxClosed if the mux has been stopped.
111 func (d *Dispatcher) Post(ev interface{}) error {
112         event := &TypeMuxEvent{
113                 Time: time.Now(),
114                 Data: ev,
115         }
116         rtyp := reflect.TypeOf(ev)
117         d.mutex.RLock()
118         if d.stopped {
119                 d.mutex.RUnlock()
120                 return ErrMuxClosed
121         }
122
123         subs := d.subm[rtyp]
124         d.mutex.RUnlock()
125         for _, sub := range subs {
126                 sub.deliver(event)
127         }
128         return nil
129 }
130
131 // Stop closes a mux. The mux can no longer be used.
132 // Future Post calls will fail with ErrMuxClosed.
133 // Stop blocks until all current deliveries have finished.
134 func (d *Dispatcher) Stop() {
135         d.mutex.Lock()
136         for _, subs := range d.subm {
137                 for _, sub := range subs {
138                         sub.closewait()
139                 }
140         }
141         d.subm = nil
142         d.stopped = true
143         d.mutex.Unlock()
144 }
145
146 func (d *Dispatcher) del(s *Subscription) {
147         d.mutex.Lock()
148         for typ, subs := range d.subm {
149                 if pos := find(subs, s); pos >= 0 {
150                         if len(subs) == 1 {
151                                 delete(d.subm, typ)
152                         } else {
153                                 d.subm[typ] = posdelete(subs, pos)
154                         }
155                 }
156         }
157         d.mutex.Unlock()
158 }
159
160 func find(slice []*Subscription, item *Subscription) int {
161         for i, v := range slice {
162                 if v == item {
163                         return i
164                 }
165         }
166         return -1
167 }
168
169 func posdelete(slice []*Subscription, pos int) []*Subscription {
170         news := make([]*Subscription, len(slice)-1)
171         copy(news[:pos], slice[:pos])
172         copy(news[pos:], slice[pos+1:])
173         return news
174 }
175
176 // Subscription is a subscription established through TypeMux.
177 type Subscription struct {
178         dispatcher *Dispatcher
179         created    time.Time
180         closeMu    sync.Mutex
181         closing    chan struct{}
182         closed     bool
183
184         // these two are the same channel. they are stored separately so
185         // postC can be set to nil without affecting the return value of
186         // Chan.
187         postMu sync.RWMutex
188         readC  <-chan *TypeMuxEvent
189         postC  chan<- *TypeMuxEvent
190 }
191
192 func newSubscription(dispatcher *Dispatcher) *Subscription {
193         c := make(chan *TypeMuxEvent, maxEventChSize)
194         return &Subscription{
195                 dispatcher: dispatcher,
196                 created:    time.Now(),
197                 readC:      c,
198                 postC:      c,
199                 closing:    make(chan struct{}),
200         }
201 }
202
203 func (s *Subscription) Chan() <-chan *TypeMuxEvent {
204         return s.readC
205 }
206
207 func (s *Subscription) Unsubscribe() {
208         s.dispatcher.del(s)
209         s.closewait()
210 }
211
212 func (s *Subscription) Closed() bool {
213         s.closeMu.Lock()
214         defer s.closeMu.Unlock()
215         return s.closed
216 }
217
218 func (s *Subscription) closewait() {
219         s.closeMu.Lock()
220         defer s.closeMu.Unlock()
221         if s.closed {
222                 return
223         }
224         close(s.closing)
225         s.closed = true
226
227         s.postMu.Lock()
228         close(s.postC)
229         s.postC = nil
230         s.postMu.Unlock()
231 }
232
233 func (s *Subscription) deliver(event *TypeMuxEvent) {
234         // Short circuit delivery if stale event
235         if s.created.After(event.Time) {
236                 return
237         }
238         // Otherwise deliver the event
239         s.postMu.RLock()
240         defer s.postMu.RUnlock()
241
242         select {
243         case s.postC <- event:
244         case <-s.closing:
245         default:
246                 log.WithFields(log.Fields{"module": logModule}).Errorf("deliver event err unread event size %d", len(s.postC))
247         }
248 }