6 "github.com/sirupsen/logrus"
8 "github.com/vapor/event"
9 "github.com/vapor/netsync/peers"
10 "github.com/vapor/p2p"
11 "github.com/vapor/protocol/bc"
12 "github.com/vapor/protocol/bc/types"
15 // Switch is the interface for p2p switch.
16 type Switch interface {
17 AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
18 AddBannedPeer(string) error
22 // Chain is the interface for Bytom core.
23 type Chain interface {
24 BestBlockHeight() uint64
25 GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
26 ProcessBlock(*types.Block) (bool, error)
27 ProcessBlockSignature(signature, pubkey []byte, blockHeight uint64, blockHash *bc.Hash) error
30 type blockMsg struct {
35 // Manager is the consensus message network synchronization manager.
40 blockFetcher *blockFetcher
41 eventDispatcher *event.Dispatcher
46 // NewManager create new manager.
47 func NewManager(sw Switch, chain Chain, dispatcher *event.Dispatcher, peers *peers.PeerSet) *Manager {
51 blockFetcher: newBlockFetcher(chain, peers),
52 eventDispatcher: dispatcher,
53 quit: make(chan struct{}),
55 protocolReactor := NewConsensusReactor(manager)
56 manager.sw.AddReactor("CONSENSUS", protocolReactor)
60 func (m *Manager) addPeer(peer peers.BasePeer) {
64 func (m *Manager) processMsg(peerID string, msgType byte, msg ConsensusMessage) {
65 peer := m.peers.GetPeer(peerID)
70 logrus.WithFields(logrus.Fields{"module": logModule, "peer": peerID, "type": reflect.TypeOf(msg), "message": msg.String()}).Info("receive message from peer")
72 switch msg := msg.(type) {
73 case *BlockProposeMsg:
74 m.handleBlockProposeMsg(peerID, msg)
76 case *BlockSignatureMsg:
77 m.handleBlockSignatureMsg(peerID, msg)
80 logrus.WithFields(logrus.Fields{"module": logModule, "peer": peerID, "message_type": reflect.TypeOf(msg)}).Error("unhandled message type")
84 func (m *Manager) handleBlockProposeMsg(peerID string, msg *BlockProposeMsg) {
85 block, err := msg.GetProposeBlock()
87 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Warning("failed on get propose block")
92 m.peers.MarkBlock(peerID, &hash)
93 m.blockFetcher.processNewBlock(&blockMsg{peerID: peerID, block: block})
96 func (m *Manager) handleBlockSignatureMsg(peerID string, msg *BlockSignatureMsg) {
97 blockHash := bc.NewHash(msg.BlockHash)
98 if err := m.chain.ProcessBlockSignature(msg.Signature, msg.PubKey[:], msg.Height, &blockHash); err != nil {
99 m.peers.AddBanScore(peerID, 20, 0, err.Error())
104 func (m *Manager) blockProposeMsgBroadcastLoop() {
105 blockProposeMsgSub, err := m.eventDispatcher.Subscribe(event.NewBlockProposeEvent{})
107 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on subscribe NewBlockProposeEvent")
110 defer blockProposeMsgSub.Unsubscribe()
114 case obj, ok := <-blockProposeMsgSub.Chan():
116 logrus.WithFields(logrus.Fields{"module": logModule}).Warning("blockProposeMsgSub channel closed")
120 ev, ok := obj.Data.(event.NewBlockProposeEvent)
122 logrus.WithFields(logrus.Fields{"module": logModule}).Error("event type error")
125 proposeMsg, err := NewBlockProposeMsg(&ev.Block)
127 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on create BlockProposeMsg")
131 if err := m.peers.BroadcastMsg(NewBroadcastMsg(proposeMsg, consensusChannel)); err != nil {
132 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on broadcast BlockProposeBroadcastMsg")
142 func (m *Manager) blockSignatureMsgBroadcastLoop() {
143 blockSignatureMsgSub, err := m.eventDispatcher.Subscribe(event.BlockSignatureEvent{})
145 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on subscribe BlockSignatureEvent")
148 defer blockSignatureMsgSub.Unsubscribe()
151 case obj, ok := <-blockSignatureMsgSub.Chan():
153 logrus.WithFields(logrus.Fields{"module": logModule}).Warning("blockProposeMsgSub channel closed")
157 ev, ok := obj.Data.(event.BlockSignatureEvent)
159 logrus.WithFields(logrus.Fields{"module": logModule}).Error("event type error")
163 blockHeader, err := m.chain.GetHeaderByHash(&ev.BlockHash)
165 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on get header by hash from chain.")
169 blockSignatureMsg := NewBroadcastMsg(NewBlockSignatureMsg(ev.BlockHash, blockHeader.Height, ev.Signature, m.sw.ID()), consensusChannel)
170 if err := m.peers.BroadcastMsg(blockSignatureMsg); err != nil {
171 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on broadcast BlockSignBroadcastMsg.")
181 func (m *Manager) removePeer(peerID string) {
182 m.peers.RemovePeer(peerID)
185 //Start consensus manager service.
186 func (m *Manager) Start() error {
187 go m.blockProposeMsgBroadcastLoop()
188 go m.blockSignatureMsgBroadcastLoop()
192 //Stop consensus manager service.
193 func (m *Manager) Stop() {