OSDN Git Service

Add consensus messages transfer (#90)
[bytom/vapor.git] / netsync / consensusmgr / reactor.go
diff --git a/netsync/consensusmgr/reactor.go b/netsync/consensusmgr/reactor.go
new file mode 100644 (file)
index 0000000..10405fb
--- /dev/null
@@ -0,0 +1,72 @@
+package consensusmgr
+
+import (
+       "github.com/sirupsen/logrus"
+
+       "github.com/vapor/p2p"
+       "github.com/vapor/p2p/connection"
+)
+
+const (
+       logModule                 = "consensus"
+       consensusChannel          = byte(0x50)
+       maxBlockchainResponseSize = 22020096 + 2
+)
+
+// ConsensusReactor handles new coming consensus message.
+type ConsensusReactor struct {
+       p2p.BaseReactor
+       manager *Manager
+}
+
+// NewConsensusReactor create consensus reactor.
+func NewConsensusReactor(manager *Manager) *ConsensusReactor {
+       cr := &ConsensusReactor{
+               manager: manager,
+       }
+       cr.BaseReactor = *p2p.NewBaseReactor("ConsensusReactor", cr)
+       return cr
+}
+
+// GetChannels implements Reactor
+func (cr *ConsensusReactor) GetChannels() []*connection.ChannelDescriptor {
+       return []*connection.ChannelDescriptor{
+               {
+                       ID:                consensusChannel,
+                       Priority:          10,
+                       SendQueueCapacity: 100,
+               },
+       }
+}
+
+// OnStart implements BaseService
+func (cr *ConsensusReactor) OnStart() error {
+       return cr.BaseReactor.OnStart()
+}
+
+// OnStop implements BaseService
+func (cr *ConsensusReactor) OnStop() {
+       cr.BaseReactor.OnStop()
+}
+
+// AddPeer implements Reactor by sending our state to peer.
+func (cr *ConsensusReactor) AddPeer(peer *p2p.Peer) error {
+       cr.manager.addPeer(peer)
+       return nil
+}
+
+// RemovePeer implements Reactor by removing peer from the pool.
+func (cr *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
+       cr.manager.removePeer(peer.Key)
+}
+
+// Receive implements Reactor by handling messages.
+func (cr *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
+       msgType, msg, err := decodeMessage(msgBytes)
+       if err != nil {
+               logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("fail on reactor decoding message")
+               return
+       }
+
+       cr.manager.processMsg(src.ID(), msgType, msg)
+}