1 // Package event deals with subscriptions to real-time events.
10 log "github.com/sirupsen/logrus"
12 "github.com/vapor/protocol/bc"
13 "github.com/vapor/protocol/bc/types"
18 maxEventChSize = 65536
22 // ErrMuxClosed is returned when Posting on a closed TypeMux.
23 ErrMuxClosed = errors.New("event: mux closed")
24 //ErrDuplicateSubscribe is returned when subscribe duplicate type
25 ErrDuplicateSubscribe = errors.New("event: subscribe duplicate type")
28 type NewMinedBlockEvent struct{ Block types.Block }
30 type BlockSignatureEvent struct {
35 //NewProposedBlockEvent the proposed block event which needs to broadcast.
36 type NewProposedBlockEvent struct{ Block types.Block }
38 //BlockSignEvent the signature which got from net.
39 type BlockSignEvent struct {
47 //SendBlockSignEvent the signature event which needs to broadcast.
48 type SendBlockSignEvent struct {
55 // TypeMuxEvent is a time-tagged notification pushed to subscribers.
56 type TypeMuxEvent struct {
61 // A Dispatcher dispatches events to registered receivers. Receivers can be
62 // registered to handle events of certain type. Any operation
63 // called after mux is stopped will return ErrMuxClosed.
65 // The zero value is ready to use.
66 type Dispatcher struct {
68 subm map[reflect.Type][]*Subscription
72 func NewDispatcher() *Dispatcher {
74 subm: make(map[reflect.Type][]*Subscription),
78 // Subscribe creates a subscription for events of the given types. The
79 // subscription's channel is closed when it is unsubscribed
80 // or the mux is closed.
81 func (d *Dispatcher) Subscribe(types ...interface{}) (*Subscription, error) {
82 sub := newSubscription(d)
84 defer d.mutex.Unlock()
86 // set the status to closed so that calling Unsubscribe after this
87 // call will short circuit.
93 for _, t := range types {
94 rtyp := reflect.TypeOf(t)
95 oldsubs := d.subm[rtyp]
96 if find(oldsubs, sub) != -1 {
97 log.WithFields(log.Fields{"module": logModule}).Errorf("duplicate type %s in Subscribe", rtyp)
98 return nil, ErrDuplicateSubscribe
101 subs := make([]*Subscription, len(oldsubs)+1)
103 subs[len(oldsubs)] = sub
109 // Post sends an event to all receivers registered for the given type.
110 // It returns ErrMuxClosed if the mux has been stopped.
111 func (d *Dispatcher) Post(ev interface{}) error {
112 event := &TypeMuxEvent{
116 rtyp := reflect.TypeOf(ev)
125 for _, sub := range subs {
131 // Stop closes a mux. The mux can no longer be used.
132 // Future Post calls will fail with ErrMuxClosed.
133 // Stop blocks until all current deliveries have finished.
134 func (d *Dispatcher) Stop() {
136 for _, subs := range d.subm {
137 for _, sub := range subs {
146 func (d *Dispatcher) del(s *Subscription) {
148 for typ, subs := range d.subm {
149 if pos := find(subs, s); pos >= 0 {
153 d.subm[typ] = posdelete(subs, pos)
160 func find(slice []*Subscription, item *Subscription) int {
161 for i, v := range slice {
169 func posdelete(slice []*Subscription, pos int) []*Subscription {
170 news := make([]*Subscription, len(slice)-1)
171 copy(news[:pos], slice[:pos])
172 copy(news[pos:], slice[pos+1:])
176 // Subscription is a subscription established through TypeMux.
177 type Subscription struct {
178 dispatcher *Dispatcher
181 closing chan struct{}
184 // these two are the same channel. they are stored separately so
185 // postC can be set to nil without affecting the return value of
188 readC <-chan *TypeMuxEvent
189 postC chan<- *TypeMuxEvent
192 func newSubscription(dispatcher *Dispatcher) *Subscription {
193 c := make(chan *TypeMuxEvent, maxEventChSize)
194 return &Subscription{
195 dispatcher: dispatcher,
199 closing: make(chan struct{}),
203 func (s *Subscription) Chan() <-chan *TypeMuxEvent {
207 func (s *Subscription) Unsubscribe() {
212 func (s *Subscription) Closed() bool {
214 defer s.closeMu.Unlock()
218 func (s *Subscription) closewait() {
220 defer s.closeMu.Unlock()
233 func (s *Subscription) deliver(event *TypeMuxEvent) {
234 // Short circuit delivery if stale event
235 if s.created.After(event.Time) {
238 // Otherwise deliver the event
240 defer s.postMu.RUnlock()
243 case s.postC <- event:
246 log.WithFields(log.Fields{"module": logModule}).Errorf("deliver event err unread event size %d", len(s.postC))