7 "github.com/sirupsen/logrus"
9 "github.com/bytom/bytom/event"
10 "github.com/bytom/bytom/netsync/peers"
11 "github.com/bytom/bytom/p2p"
12 "github.com/bytom/bytom/p2p/security"
13 "github.com/bytom/bytom/protocol"
14 "github.com/bytom/bytom/protocol/bc"
15 "github.com/bytom/bytom/protocol/bc/types"
18 // Switch is the interface for p2p switch.
19 type Switch interface {
20 AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
23 // Chain is the interface for Bytom core.
24 type Chain interface {
25 BestBlockHeight() uint64
26 GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
27 ProcessBlock(*types.Block) (bool, error)
28 ProcessBlockVerification(*protocol.Verification) error
31 type Peers interface {
32 AddPeer(peer peers.BasePeer)
33 BroadcastMsg(bm peers.BroadcastMsg) error
34 GetPeer(id string) *peers.Peer
35 MarkBlock(peerID string, hash *bc.Hash)
36 MarkBlockVerification(peerID string, signature []byte)
37 ProcessIllegal(peerID string, level byte, reason string)
38 RemovePeer(peerID string)
39 SetStatus(peerID string, height uint64, hash *bc.Hash)
42 type blockMsg struct {
47 // Manager is the consensus message network synchronization manager.
52 blockFetcher *blockFetcher
53 eventDispatcher *event.Dispatcher
58 // NewManager create new manager.
59 func NewManager(sw Switch, chain Chain, peers Peers, dispatcher *event.Dispatcher) *Manager {
64 blockFetcher: newBlockFetcher(chain, peers),
65 eventDispatcher: dispatcher,
66 quit: make(chan struct{}),
68 protocolReactor := NewConsensusReactor(manager)
69 manager.sw.AddReactor("CONSENSUS", protocolReactor)
73 func (m *Manager) addPeer(peer peers.BasePeer) {
77 func (m *Manager) processMsg(peerID string, msgType byte, msg ConsensusMessage) {
78 peer := m.peers.GetPeer(peerID)
83 logrus.WithFields(logrus.Fields{"module": logModule, "peer": peer.Addr(), "type": reflect.TypeOf(msg), "message": msg.String()}).Debug("receive message from peer")
85 switch msg := msg.(type) {
86 case *BlockProposeMsg:
87 m.handleBlockProposeMsg(peerID, msg)
89 case *BlockVerificationMsg:
90 m.handleBlockVerificationMsg(peerID, msg)
93 logrus.WithFields(logrus.Fields{"module": logModule, "peer": peerID, "message_type": reflect.TypeOf(msg)}).Error("unhandled message type")
97 func (m *Manager) handleBlockProposeMsg(peerID string, msg *BlockProposeMsg) {
98 block, err := msg.GetProposeBlock()
100 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Warning("failed on get propose block")
105 m.peers.MarkBlock(peerID, &hash)
106 m.blockFetcher.processNewBlock(&blockMsg{peerID: peerID, block: block})
107 m.peers.SetStatus(peerID, block.Height, &hash)
110 func (m *Manager) handleBlockVerificationMsg(peerID string, msg *BlockVerificationMsg) {
111 m.peers.MarkBlockVerification(peerID, msg.Signature)
112 if err := m.chain.ProcessBlockVerification(&protocol.Verification{
113 SourceHash: msg.SourceHash,
114 TargetHash: msg.TargetHash,
115 SourceHeight: msg.SourceHeight,
116 TargetHeight: msg.TargetHeight,
117 Signature: msg.Signature,
118 PubKey: hex.EncodeToString(msg.PubKey),
120 m.peers.ProcessIllegal(peerID, security.LevelMsgIllegal, err.Error())
124 func (m *Manager) blockProposeMsgBroadcastLoop() {
125 m.msgBroadcastLoop(event.NewProposedBlockEvent{}, func(data interface{}) (ConsensusMessage, error) {
126 ev := data.(event.NewProposedBlockEvent)
127 return NewBlockProposeMsg(&ev.Block)
131 func (m *Manager) blockVerificationMsgBroadcastLoop() {
132 m.msgBroadcastLoop(event.BlockVerificationEvent{}, func(data interface{}) (ConsensusMessage, error) {
133 ev := data.(event.BlockVerificationEvent)
134 return NewBlockVerificationMsg(ev.SourceHeight, ev.TargetHeight, ev.SourceHash, ev.TargetHash, ev.PubKey, ev.Signature), nil
138 func (m *Manager) msgBroadcastLoop(msgType interface{}, newMsg func(event interface{}) (ConsensusMessage, error)) {
139 subscribeType := reflect.TypeOf(msgType)
140 msgSub, err := m.eventDispatcher.Subscribe(msgType)
142 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Errorf("failed on subscribe %s", subscribeType)
145 defer msgSub.Unsubscribe()
148 case obj, ok := <-msgSub.Chan():
150 logrus.WithFields(logrus.Fields{"module": logModule}).Warningf("%sSub channel closed", subscribeType)
154 if reflect.TypeOf(obj.Data) != subscribeType {
155 logrus.WithFields(logrus.Fields{"module": logModule}).Error("event type error")
159 msg, err := newMsg(obj.Data)
161 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Errorf("failed on create %s message", subscribeType)
165 message := NewBroadcastMsg(msg, consensusChannel)
166 if err := m.peers.BroadcastMsg(message); err != nil {
167 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Errorf("failed on broadcast %s message.", subscribeType)
177 func (m *Manager) removePeer(peerID string) {
178 m.peers.RemovePeer(peerID)
181 // Start consensus manager service.
182 func (m *Manager) Start() error {
183 go m.blockFetcher.blockProcessorLoop()
184 go m.blockProposeMsgBroadcastLoop()
185 go m.blockVerificationMsgBroadcastLoop()
189 // Stop consensus manager service.
190 func (m *Manager) Stop() {