6 "github.com/sirupsen/logrus"
8 "github.com/vapor/event"
9 "github.com/vapor/netsync/peers"
10 "github.com/vapor/p2p"
11 "github.com/vapor/protocol/bc"
12 "github.com/vapor/protocol/bc/types"
15 // Switch is the interface for p2p switch.
16 type Switch interface {
17 AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
18 AddBannedPeer(string) error
22 // Chain is the interface for Bytom core.
23 type Chain interface {
24 BestBlockHeight() uint64
25 GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
26 ProcessBlock(*types.Block) (bool, error)
27 ProcessBlockSignature(signature []byte, pubkey [64]byte, blockHeight uint64, blockHash *bc.Hash) error
30 type blockMsg struct {
35 // Manager is the consensus message network synchronization manager.
40 blockFetcher *blockFetcher
41 eventDispatcher *event.Dispatcher
46 // NewManager create new manager.
47 func NewManager(sw Switch, chain Chain, dispatcher *event.Dispatcher, peers *peers.PeerSet) *Manager {
52 blockFetcher: newBlockFetcher(chain, peers),
53 eventDispatcher: dispatcher,
54 quit: make(chan struct{}),
56 protocolReactor := NewConsensusReactor(manager)
57 manager.sw.AddReactor("CONSENSUS", protocolReactor)
61 func (m *Manager) addPeer(peer peers.BasePeer) {
65 func (m *Manager) processMsg(peerID string, msgType byte, msg ConsensusMessage) {
66 peer := m.peers.GetPeer(peerID)
71 logrus.WithFields(logrus.Fields{"module": logModule, "peer": peerID, "type": reflect.TypeOf(msg), "message": msg.String()}).Info("receive message from peer")
73 switch msg := msg.(type) {
74 case *BlockProposeMsg:
75 m.handleBlockProposeMsg(peerID, msg)
77 case *BlockSignatureMsg:
78 m.handleBlockSignatureMsg(peerID, msg)
81 logrus.WithFields(logrus.Fields{"module": logModule, "peer": peerID, "message_type": reflect.TypeOf(msg)}).Error("unhandled message type")
85 func (m *Manager) handleBlockProposeMsg(peerID string, msg *BlockProposeMsg) {
86 block, err := msg.GetProposeBlock()
88 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Warning("failed on get propose block")
93 m.peers.MarkBlock(peerID, &hash)
94 m.blockFetcher.processNewBlock(&blockMsg{peerID: peerID, block: block})
97 func (m *Manager) handleBlockSignatureMsg(peerID string, msg *BlockSignatureMsg) {
98 blockHash := bc.NewHash(msg.BlockHash)
99 if err := m.chain.ProcessBlockSignature(msg.Signature, msg.PubKey, msg.Height, &blockHash); err != nil {
100 m.peers.AddBanScore(peerID, 20, 0, err.Error())
105 func (m *Manager) blockProposeMsgBroadcastLoop() {
106 blockProposeMsgSub, err := m.eventDispatcher.Subscribe(event.NewBlockProposeEvent{})
108 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on subscribe NewBlockProposeEvent")
111 defer blockProposeMsgSub.Unsubscribe()
115 case obj, ok := <-blockProposeMsgSub.Chan():
117 logrus.WithFields(logrus.Fields{"module": logModule}).Warning("blockProposeMsgSub channel closed")
121 ev, ok := obj.Data.(event.NewBlockProposeEvent)
123 logrus.WithFields(logrus.Fields{"module": logModule}).Error("event type error")
126 proposeMsg, err := NewBlockProposeMsg(&ev.Block)
128 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on create BlockProposeMsg")
132 if err := m.peers.BroadcastMsg(NewBroadcastMsg(proposeMsg, consensusChannel)); err != nil {
133 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on broadcast BlockProposeBroadcastMsg")
143 func (m *Manager) blockSignatureMsgBroadcastLoop() {
144 blockSignatureMsgSub, err := m.eventDispatcher.Subscribe(event.BlockSignatureEvent{})
146 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on subscribe BlockSignatureEvent")
149 defer blockSignatureMsgSub.Unsubscribe()
152 case obj, ok := <-blockSignatureMsgSub.Chan():
154 logrus.WithFields(logrus.Fields{"module": logModule}).Warning("blockProposeMsgSub channel closed")
158 ev, ok := obj.Data.(event.BlockSignatureEvent)
160 logrus.WithFields(logrus.Fields{"module": logModule}).Error("event type error")
164 blockHeader, err := m.chain.GetHeaderByHash(&ev.BlockHash)
166 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on get header by hash from chain.")
170 blockSignatureMsg := NewBroadcastMsg(NewBlockSignatureMsg(ev.BlockHash, blockHeader.Height, ev.Signature, ev.XPub), consensusChannel)
171 if err := m.peers.BroadcastMsg(blockSignatureMsg); err != nil {
172 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on broadcast BlockSignBroadcastMsg.")
182 func (m *Manager) removePeer(peerID string) {
183 m.peers.RemovePeer(peerID)
186 //Start consensus manager service.
187 func (m *Manager) Start() error {
188 go m.blockProposeMsgBroadcastLoop()
189 go m.blockSignatureMsgBroadcastLoop()
193 //Stop consensus manager service.
194 func (m *Manager) Stop() {