--- /dev/null
+package consensusmgr
+
+import (
+ "github.com/sirupsen/logrus"
+
+ "github.com/vapor/p2p"
+ "github.com/vapor/p2p/connection"
+)
+
+const (
+ logModule = "consensus"
+ consensusChannel = byte(0x50)
+ maxBlockchainResponseSize = 22020096 + 2
+)
+
+// ConsensusReactor handles new coming consensus message.
+type ConsensusReactor struct {
+ p2p.BaseReactor
+ manager *Manager
+}
+
+// NewConsensusReactor create consensus reactor.
+func NewConsensusReactor(manager *Manager) *ConsensusReactor {
+ cr := &ConsensusReactor{
+ manager: manager,
+ }
+ cr.BaseReactor = *p2p.NewBaseReactor("ConsensusReactor", cr)
+ return cr
+}
+
+// GetChannels implements Reactor
+func (cr *ConsensusReactor) GetChannels() []*connection.ChannelDescriptor {
+ return []*connection.ChannelDescriptor{
+ {
+ ID: consensusChannel,
+ Priority: 10,
+ SendQueueCapacity: 100,
+ },
+ }
+}
+
+// OnStart implements BaseService
+func (cr *ConsensusReactor) OnStart() error {
+ return cr.BaseReactor.OnStart()
+}
+
+// OnStop implements BaseService
+func (cr *ConsensusReactor) OnStop() {
+ cr.BaseReactor.OnStop()
+}
+
+// AddPeer implements Reactor by sending our state to peer.
+func (cr *ConsensusReactor) AddPeer(peer *p2p.Peer) error {
+ cr.manager.addPeer(peer)
+ return nil
+}
+
+// RemovePeer implements Reactor by removing peer from the pool.
+func (cr *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
+ cr.manager.removePeer(peer.Key)
+}
+
+// Receive implements Reactor by handling messages.
+func (cr *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
+ msgType, msg, err := decodeMessage(msgBytes)
+ if err != nil {
+ logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("fail on reactor decoding message")
+ return
+ }
+
+ cr.manager.processMsg(src.ID(), msgType, msg)
+}