OSDN Git Service

Add consensus messages transfer (#90)
[bytom/vapor.git] / event / event.go
1 // Package event deals with subscriptions to real-time events.
2 package event
3
4 import (
5         "errors"
6         "reflect"
7         "sync"
8         "time"
9
10         log "github.com/sirupsen/logrus"
11
12         "github.com/vapor/protocol/bc"
13         "github.com/vapor/protocol/bc/types"
14 )
15
16 const (
17         logModule      = "event"
18         maxEventChSize = 65536
19 )
20
21 var (
22         // ErrMuxClosed is returned when Posting on a closed TypeMux.
23         ErrMuxClosed = errors.New("event: mux closed")
24         //ErrDuplicateSubscribe is returned when subscribe duplicate type
25         ErrDuplicateSubscribe = errors.New("event: subscribe duplicate type")
26 )
27
28 type NewProposedBlockEvent struct{ Block types.Block }
29
30 type BlockSignatureEvent struct {
31         BlockHash bc.Hash
32         Signature []byte
33 }
34
35 //NewBlockProposeEvent block propose event which needs to broadcast.
36 type NewBlockProposeEvent struct{ Block types.Block }
37
38 // TypeMuxEvent is a time-tagged notification pushed to subscribers.
39 type TypeMuxEvent struct {
40         Time time.Time
41         Data interface{}
42 }
43
44 // A Dispatcher dispatches events to registered receivers. Receivers can be
45 // registered to handle events of certain type. Any operation
46 // called after mux is stopped will return ErrMuxClosed.
47 //
48 // The zero value is ready to use.
49 type Dispatcher struct {
50         mutex   sync.RWMutex
51         subm    map[reflect.Type][]*Subscription
52         stopped bool
53 }
54
55 func NewDispatcher() *Dispatcher {
56         return &Dispatcher{
57                 subm: make(map[reflect.Type][]*Subscription),
58         }
59 }
60
61 // Subscribe creates a subscription for events of the given types. The
62 // subscription's channel is closed when it is unsubscribed
63 // or the mux is closed.
64 func (d *Dispatcher) Subscribe(types ...interface{}) (*Subscription, error) {
65         sub := newSubscription(d)
66         d.mutex.Lock()
67         defer d.mutex.Unlock()
68         if d.stopped {
69                 // set the status to closed so that calling Unsubscribe after this
70                 // call will short circuit.
71                 sub.closed = true
72                 close(sub.postC)
73                 return sub, nil
74         }
75
76         for _, t := range types {
77                 rtyp := reflect.TypeOf(t)
78                 oldsubs := d.subm[rtyp]
79                 if find(oldsubs, sub) != -1 {
80                         log.WithFields(log.Fields{"module": logModule}).Errorf("duplicate type %s in Subscribe", rtyp)
81                         return nil, ErrDuplicateSubscribe
82                 }
83
84                 subs := make([]*Subscription, len(oldsubs)+1)
85                 copy(subs, oldsubs)
86                 subs[len(oldsubs)] = sub
87                 d.subm[rtyp] = subs
88         }
89         return sub, nil
90 }
91
92 // Post sends an event to all receivers registered for the given type.
93 // It returns ErrMuxClosed if the mux has been stopped.
94 func (d *Dispatcher) Post(ev interface{}) error {
95         event := &TypeMuxEvent{
96                 Time: time.Now(),
97                 Data: ev,
98         }
99         rtyp := reflect.TypeOf(ev)
100         d.mutex.RLock()
101         if d.stopped {
102                 d.mutex.RUnlock()
103                 return ErrMuxClosed
104         }
105
106         subs := d.subm[rtyp]
107         d.mutex.RUnlock()
108         for _, sub := range subs {
109                 sub.deliver(event)
110         }
111         return nil
112 }
113
114 // Stop closes a mux. The mux can no longer be used.
115 // Future Post calls will fail with ErrMuxClosed.
116 // Stop blocks until all current deliveries have finished.
117 func (d *Dispatcher) Stop() {
118         d.mutex.Lock()
119         for _, subs := range d.subm {
120                 for _, sub := range subs {
121                         sub.closewait()
122                 }
123         }
124         d.subm = nil
125         d.stopped = true
126         d.mutex.Unlock()
127 }
128
129 func (d *Dispatcher) del(s *Subscription) {
130         d.mutex.Lock()
131         for typ, subs := range d.subm {
132                 if pos := find(subs, s); pos >= 0 {
133                         if len(subs) == 1 {
134                                 delete(d.subm, typ)
135                         } else {
136                                 d.subm[typ] = posdelete(subs, pos)
137                         }
138                 }
139         }
140         d.mutex.Unlock()
141 }
142
143 func find(slice []*Subscription, item *Subscription) int {
144         for i, v := range slice {
145                 if v == item {
146                         return i
147                 }
148         }
149         return -1
150 }
151
152 func posdelete(slice []*Subscription, pos int) []*Subscription {
153         news := make([]*Subscription, len(slice)-1)
154         copy(news[:pos], slice[:pos])
155         copy(news[pos:], slice[pos+1:])
156         return news
157 }
158
159 // Subscription is a subscription established through TypeMux.
160 type Subscription struct {
161         dispatcher *Dispatcher
162         created    time.Time
163         closeMu    sync.Mutex
164         closing    chan struct{}
165         closed     bool
166
167         // these two are the same channel. they are stored separately so
168         // postC can be set to nil without affecting the return value of
169         // Chan.
170         postMu sync.RWMutex
171         readC  <-chan *TypeMuxEvent
172         postC  chan<- *TypeMuxEvent
173 }
174
175 func newSubscription(dispatcher *Dispatcher) *Subscription {
176         c := make(chan *TypeMuxEvent, maxEventChSize)
177         return &Subscription{
178                 dispatcher: dispatcher,
179                 created:    time.Now(),
180                 readC:      c,
181                 postC:      c,
182                 closing:    make(chan struct{}),
183         }
184 }
185
186 func (s *Subscription) Chan() <-chan *TypeMuxEvent {
187         return s.readC
188 }
189
190 func (s *Subscription) Unsubscribe() {
191         s.dispatcher.del(s)
192         s.closewait()
193 }
194
195 func (s *Subscription) Closed() bool {
196         s.closeMu.Lock()
197         defer s.closeMu.Unlock()
198         return s.closed
199 }
200
201 func (s *Subscription) closewait() {
202         s.closeMu.Lock()
203         defer s.closeMu.Unlock()
204         if s.closed {
205                 return
206         }
207         close(s.closing)
208         s.closed = true
209
210         s.postMu.Lock()
211         close(s.postC)
212         s.postC = nil
213         s.postMu.Unlock()
214 }
215
216 func (s *Subscription) deliver(event *TypeMuxEvent) {
217         // Short circuit delivery if stale event
218         if s.created.After(event.Time) {
219                 return
220         }
221         // Otherwise deliver the event
222         s.postMu.RLock()
223         defer s.postMu.RUnlock()
224
225         select {
226         case s.postC <- event:
227         case <-s.closing:
228         default:
229                 log.WithFields(log.Fields{"module": logModule}).Errorf("deliver event err unread event size %d", len(s.postC))
230         }
231 }