4 "github.com/sirupsen/logrus"
7 "github.com/vapor/p2p/connection"
11 logModule = "consensus"
12 consensusChannel = byte(0x50)
13 maxBlockchainResponseSize = 22020096 + 2
16 // ConsensusReactor handles new coming consensus message.
17 type ConsensusReactor struct {
22 // NewConsensusReactor create consensus reactor.
23 func NewConsensusReactor(manager *Manager) *ConsensusReactor {
24 cr := &ConsensusReactor{
27 cr.BaseReactor = *p2p.NewBaseReactor("ConsensusReactor", cr)
31 // GetChannels implements Reactor
32 func (cr *ConsensusReactor) GetChannels() []*connection.ChannelDescriptor {
33 return []*connection.ChannelDescriptor{
37 SendQueueCapacity: 100,
42 // OnStart implements BaseService
43 func (cr *ConsensusReactor) OnStart() error {
44 return cr.BaseReactor.OnStart()
47 // OnStop implements BaseService
48 func (cr *ConsensusReactor) OnStop() {
49 cr.BaseReactor.OnStop()
52 // AddPeer implements Reactor by sending our state to peer.
53 func (cr *ConsensusReactor) AddPeer(peer *p2p.Peer) error {
54 cr.manager.addPeer(peer)
58 // RemovePeer implements Reactor by removing peer from the pool.
59 func (cr *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
60 cr.manager.removePeer(peer.Key)
63 // Receive implements Reactor by handling messages.
64 func (cr *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
65 msgType, msg, err := decodeMessage(msgBytes)
67 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("fail on reactor decoding message")
71 cr.manager.processMsg(src.ID(), msgType, msg)