OSDN Git Service

10405fb97756f23bd85f4c3d66995bc8abde1bca
[bytom/vapor.git] / netsync / consensusmgr / reactor.go
1 package consensusmgr
2
3 import (
4         "github.com/sirupsen/logrus"
5
6         "github.com/vapor/p2p"
7         "github.com/vapor/p2p/connection"
8 )
9
10 const (
11         logModule                 = "consensus"
12         consensusChannel          = byte(0x50)
13         maxBlockchainResponseSize = 22020096 + 2
14 )
15
16 // ConsensusReactor handles new coming consensus message.
17 type ConsensusReactor struct {
18         p2p.BaseReactor
19         manager *Manager
20 }
21
22 // NewConsensusReactor create consensus reactor.
23 func NewConsensusReactor(manager *Manager) *ConsensusReactor {
24         cr := &ConsensusReactor{
25                 manager: manager,
26         }
27         cr.BaseReactor = *p2p.NewBaseReactor("ConsensusReactor", cr)
28         return cr
29 }
30
31 // GetChannels implements Reactor
32 func (cr *ConsensusReactor) GetChannels() []*connection.ChannelDescriptor {
33         return []*connection.ChannelDescriptor{
34                 {
35                         ID:                consensusChannel,
36                         Priority:          10,
37                         SendQueueCapacity: 100,
38                 },
39         }
40 }
41
42 // OnStart implements BaseService
43 func (cr *ConsensusReactor) OnStart() error {
44         return cr.BaseReactor.OnStart()
45 }
46
47 // OnStop implements BaseService
48 func (cr *ConsensusReactor) OnStop() {
49         cr.BaseReactor.OnStop()
50 }
51
52 // AddPeer implements Reactor by sending our state to peer.
53 func (cr *ConsensusReactor) AddPeer(peer *p2p.Peer) error {
54         cr.manager.addPeer(peer)
55         return nil
56 }
57
58 // RemovePeer implements Reactor by removing peer from the pool.
59 func (cr *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
60         cr.manager.removePeer(peer.Key)
61 }
62
63 // Receive implements Reactor by handling messages.
64 func (cr *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
65         msgType, msg, err := decodeMessage(msgBytes)
66         if err != nil {
67                 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("fail on reactor decoding message")
68                 return
69         }
70
71         cr.manager.processMsg(src.ID(), msgType, msg)
72 }