OSDN Git Service

feat: add fed orm (#136)
[bytom/vapor.git] / event / event.go
1 // Package event deals with subscriptions to real-time events.
2 package event
3
4 import (
5         "errors"
6         "reflect"
7         "sync"
8         "time"
9
10         log "github.com/sirupsen/logrus"
11
12         "github.com/vapor/protocol/bc"
13         "github.com/vapor/protocol/bc/types"
14 )
15
16 const (
17         logModule      = "event"
18         maxEventChSize = 65536
19 )
20
21 var (
22         // ErrMuxClosed is returned when Posting on a closed TypeMux.
23         ErrMuxClosed = errors.New("event: mux closed")
24         //ErrDuplicateSubscribe is returned when subscribe duplicate type
25         ErrDuplicateSubscribe = errors.New("event: subscribe duplicate type")
26 )
27
28 type NewProposedBlockEvent struct{ Block types.Block }
29
30 type BlockSignatureEvent struct {
31         BlockHash bc.Hash
32         Signature []byte
33         XPub      [64]byte
34 }
35
36 //NewBlockProposeEvent block propose event which needs to broadcast.
37 type NewBlockProposeEvent struct{ Block types.Block }
38
39 // TypeMuxEvent is a time-tagged notification pushed to subscribers.
40 type TypeMuxEvent struct {
41         Time time.Time
42         Data interface{}
43 }
44
45 // A Dispatcher dispatches events to registered receivers. Receivers can be
46 // registered to handle events of certain type. Any operation
47 // called after mux is stopped will return ErrMuxClosed.
48 //
49 // The zero value is ready to use.
50 type Dispatcher struct {
51         mutex   sync.RWMutex
52         subm    map[reflect.Type][]*Subscription
53         stopped bool
54 }
55
56 func NewDispatcher() *Dispatcher {
57         return &Dispatcher{
58                 subm: make(map[reflect.Type][]*Subscription),
59         }
60 }
61
62 // Subscribe creates a subscription for events of the given types. The
63 // subscription's channel is closed when it is unsubscribed
64 // or the mux is closed.
65 func (d *Dispatcher) Subscribe(types ...interface{}) (*Subscription, error) {
66         sub := newSubscription(d)
67         d.mutex.Lock()
68         defer d.mutex.Unlock()
69         if d.stopped {
70                 // set the status to closed so that calling Unsubscribe after this
71                 // call will short circuit.
72                 sub.closed = true
73                 close(sub.postC)
74                 return sub, nil
75         }
76
77         for _, t := range types {
78                 rtyp := reflect.TypeOf(t)
79                 oldsubs := d.subm[rtyp]
80                 if find(oldsubs, sub) != -1 {
81                         log.WithFields(log.Fields{"module": logModule}).Errorf("duplicate type %s in Subscribe", rtyp)
82                         return nil, ErrDuplicateSubscribe
83                 }
84
85                 subs := make([]*Subscription, len(oldsubs)+1)
86                 copy(subs, oldsubs)
87                 subs[len(oldsubs)] = sub
88                 d.subm[rtyp] = subs
89         }
90         return sub, nil
91 }
92
93 // Post sends an event to all receivers registered for the given type.
94 // It returns ErrMuxClosed if the mux has been stopped.
95 func (d *Dispatcher) Post(ev interface{}) error {
96         event := &TypeMuxEvent{
97                 Time: time.Now(),
98                 Data: ev,
99         }
100         rtyp := reflect.TypeOf(ev)
101         d.mutex.RLock()
102         if d.stopped {
103                 d.mutex.RUnlock()
104                 return ErrMuxClosed
105         }
106
107         subs := d.subm[rtyp]
108         d.mutex.RUnlock()
109         for _, sub := range subs {
110                 sub.deliver(event)
111         }
112         return nil
113 }
114
115 // Stop closes a mux. The mux can no longer be used.
116 // Future Post calls will fail with ErrMuxClosed.
117 // Stop blocks until all current deliveries have finished.
118 func (d *Dispatcher) Stop() {
119         d.mutex.Lock()
120         for _, subs := range d.subm {
121                 for _, sub := range subs {
122                         sub.closewait()
123                 }
124         }
125         d.subm = nil
126         d.stopped = true
127         d.mutex.Unlock()
128 }
129
130 func (d *Dispatcher) del(s *Subscription) {
131         d.mutex.Lock()
132         for typ, subs := range d.subm {
133                 if pos := find(subs, s); pos >= 0 {
134                         if len(subs) == 1 {
135                                 delete(d.subm, typ)
136                         } else {
137                                 d.subm[typ] = posdelete(subs, pos)
138                         }
139                 }
140         }
141         d.mutex.Unlock()
142 }
143
144 func find(slice []*Subscription, item *Subscription) int {
145         for i, v := range slice {
146                 if v == item {
147                         return i
148                 }
149         }
150         return -1
151 }
152
153 func posdelete(slice []*Subscription, pos int) []*Subscription {
154         news := make([]*Subscription, len(slice)-1)
155         copy(news[:pos], slice[:pos])
156         copy(news[pos:], slice[pos+1:])
157         return news
158 }
159
160 // Subscription is a subscription established through TypeMux.
161 type Subscription struct {
162         dispatcher *Dispatcher
163         created    time.Time
164         closeMu    sync.Mutex
165         closing    chan struct{}
166         closed     bool
167
168         // these two are the same channel. they are stored separately so
169         // postC can be set to nil without affecting the return value of
170         // Chan.
171         postMu sync.RWMutex
172         readC  <-chan *TypeMuxEvent
173         postC  chan<- *TypeMuxEvent
174 }
175
176 func newSubscription(dispatcher *Dispatcher) *Subscription {
177         c := make(chan *TypeMuxEvent, maxEventChSize)
178         return &Subscription{
179                 dispatcher: dispatcher,
180                 created:    time.Now(),
181                 readC:      c,
182                 postC:      c,
183                 closing:    make(chan struct{}),
184         }
185 }
186
187 func (s *Subscription) Chan() <-chan *TypeMuxEvent {
188         return s.readC
189 }
190
191 func (s *Subscription) Unsubscribe() {
192         s.dispatcher.del(s)
193         s.closewait()
194 }
195
196 func (s *Subscription) Closed() bool {
197         s.closeMu.Lock()
198         defer s.closeMu.Unlock()
199         return s.closed
200 }
201
202 func (s *Subscription) closewait() {
203         s.closeMu.Lock()
204         defer s.closeMu.Unlock()
205         if s.closed {
206                 return
207         }
208         close(s.closing)
209         s.closed = true
210
211         s.postMu.Lock()
212         close(s.postC)
213         s.postC = nil
214         s.postMu.Unlock()
215 }
216
217 func (s *Subscription) deliver(event *TypeMuxEvent) {
218         // Short circuit delivery if stale event
219         if s.created.After(event.Time) {
220                 return
221         }
222         // Otherwise deliver the event
223         s.postMu.RLock()
224         defer s.postMu.RUnlock()
225
226         select {
227         case s.postC <- event:
228         case <-s.closing:
229         default:
230                 log.WithFields(log.Fields{"module": logModule}).Errorf("deliver event err unread event size %d", len(s.postC))
231         }
232 }