6 "github.com/sirupsen/logrus"
8 "github.com/vapor/event"
9 "github.com/vapor/netsync/peers"
10 "github.com/vapor/p2p"
11 "github.com/vapor/p2p/security"
12 "github.com/vapor/protocol/bc"
13 "github.com/vapor/protocol/bc/types"
16 // Switch is the interface for p2p switch.
17 type Switch interface {
18 AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
21 // Chain is the interface for Bytom core.
22 type Chain interface {
23 BestBlockHeight() uint64
24 GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
25 ProcessBlock(*types.Block) (bool, error)
26 ProcessBlockSignature(signature, pubkey []byte, blockHash *bc.Hash) error
29 type Peers interface {
30 AddPeer(peer peers.BasePeer)
31 BroadcastMsg(bm peers.BroadcastMsg) error
32 GetPeer(id string) *peers.Peer
33 MarkBlock(peerID string, hash *bc.Hash)
34 MarkBlockSignature(peerID string, signature []byte)
35 ProcessIllegal(peerID string, level byte, reason string)
36 RemovePeer(peerID string)
37 SetStatus(peerID string, height uint64, hash *bc.Hash)
40 type blockMsg struct {
45 // Manager is the consensus message network synchronization manager.
50 blockFetcher *blockFetcher
51 eventDispatcher *event.Dispatcher
56 // NewManager create new manager.
57 func NewManager(sw Switch, chain Chain, peers Peers, dispatcher *event.Dispatcher) *Manager {
62 blockFetcher: newBlockFetcher(chain, peers),
63 eventDispatcher: dispatcher,
64 quit: make(chan struct{}),
66 protocolReactor := NewConsensusReactor(manager)
67 manager.sw.AddReactor("CONSENSUS", protocolReactor)
71 func (m *Manager) addPeer(peer peers.BasePeer) {
75 func (m *Manager) processMsg(peerID string, msgType byte, msg ConsensusMessage) {
76 peer := m.peers.GetPeer(peerID)
81 logrus.WithFields(logrus.Fields{"module": logModule, "peer": peer.Addr(), "type": reflect.TypeOf(msg), "message": msg.String()}).Debug("receive message from peer")
83 switch msg := msg.(type) {
84 case *BlockProposeMsg:
85 m.handleBlockProposeMsg(peerID, msg)
87 case *BlockSignatureMsg:
88 m.handleBlockSignatureMsg(peerID, msg)
91 logrus.WithFields(logrus.Fields{"module": logModule, "peer": peerID, "message_type": reflect.TypeOf(msg)}).Error("unhandled message type")
95 func (m *Manager) handleBlockProposeMsg(peerID string, msg *BlockProposeMsg) {
96 block, err := msg.GetProposeBlock()
98 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Warning("failed on get propose block")
103 m.peers.MarkBlock(peerID, &hash)
104 m.blockFetcher.processNewBlock(&blockMsg{peerID: peerID, block: block})
105 m.peers.SetStatus(peerID, block.Height, &hash)
108 func (m *Manager) handleBlockSignatureMsg(peerID string, msg *BlockSignatureMsg) {
109 m.peers.MarkBlockSignature(peerID, msg.Signature)
110 blockHash := bc.NewHash(msg.BlockHash)
111 if err := m.chain.ProcessBlockSignature(msg.Signature, msg.PubKey, &blockHash); err != nil {
112 m.peers.ProcessIllegal(peerID, security.LevelMsgIllegal, err.Error())
117 func (m *Manager) blockProposeMsgBroadcastLoop() {
118 blockProposeMsgSub, err := m.eventDispatcher.Subscribe(event.NewProposedBlockEvent{})
120 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on subscribe NewBlockProposeEvent")
123 defer blockProposeMsgSub.Unsubscribe()
127 case obj, ok := <-blockProposeMsgSub.Chan():
129 logrus.WithFields(logrus.Fields{"module": logModule}).Warning("blockProposeMsgSub channel closed")
133 ev, ok := obj.Data.(event.NewProposedBlockEvent)
135 logrus.WithFields(logrus.Fields{"module": logModule}).Error("event type error")
138 proposeMsg, err := NewBlockProposeMsg(&ev.Block)
140 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on create BlockProposeMsg")
144 if err := m.peers.BroadcastMsg(NewBroadcastMsg(proposeMsg, consensusChannel)); err != nil {
145 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on broadcast BlockProposeBroadcastMsg")
155 func (m *Manager) blockSignatureMsgBroadcastLoop() {
156 blockSignatureMsgSub, err := m.eventDispatcher.Subscribe(event.BlockSignatureEvent{})
158 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on subscribe BlockSignatureEvent")
161 defer blockSignatureMsgSub.Unsubscribe()
164 case obj, ok := <-blockSignatureMsgSub.Chan():
166 logrus.WithFields(logrus.Fields{"module": logModule}).Warning("blockProposeMsgSub channel closed")
170 ev, ok := obj.Data.(event.BlockSignatureEvent)
172 logrus.WithFields(logrus.Fields{"module": logModule}).Error("event type error")
176 blockSignatureMsg := NewBroadcastMsg(NewBlockSignatureMsg(ev.BlockHash, ev.Signature, ev.XPub), consensusChannel)
177 if err := m.peers.BroadcastMsg(blockSignatureMsg); err != nil {
178 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on broadcast BlockSignBroadcastMsg.")
188 func (m *Manager) removePeer(peerID string) {
189 m.peers.RemovePeer(peerID)
192 //Start consensus manager service.
193 func (m *Manager) Start() error {
194 go m.blockProposeMsgBroadcastLoop()
195 go m.blockSignatureMsgBroadcastLoop()
199 //Stop consensus manager service.
200 func (m *Manager) Stop() {