From: yahtoo Date: Tue, 28 May 2019 13:39:07 +0000 (+0800) Subject: Add consensus messages transfer (#90) X-Git-Tag: v1.0.5~208^2~99 X-Git-Url: http://git.osdn.net/view?p=bytom%2Fvapor.git;a=commitdiff_plain;h=c071c7f9648e8e8f39c1826ccb4e5d2ba5e8efd7 Add consensus messages transfer (#90) * Add consensus messages transfer * Abstract broadcast message processing * opz code format --- diff --git a/event/event.go b/event/event.go index 4e76fcb8..9ce4d413 100644 --- a/event/event.go +++ b/event/event.go @@ -27,11 +27,14 @@ var ( type NewProposedBlockEvent struct{ Block types.Block } -type BlockSignatureEvent struct { +type BlockSignatureEvent struct { BlockHash bc.Hash - Signature []byte + Signature []byte } +//NewBlockProposeEvent block propose event which needs to broadcast. +type NewBlockProposeEvent struct{ Block types.Block } + // TypeMuxEvent is a time-tagged notification pushed to subscribers. type TypeMuxEvent struct { Time time.Time diff --git a/netsync/chainmgr/block_keeper_test.go b/netsync/chainmgr/block_keeper_test.go index 2be129b6..2b1cc699 100644 --- a/netsync/chainmgr/block_keeper_test.go +++ b/netsync/chainmgr/block_keeper_test.go @@ -489,7 +489,7 @@ func TestRequireBlock(t *testing.T) { b.blockKeeper.syncPeer = b.peers.GetPeer("test node A") cases := []struct { syncTimeout time.Duration - testNode *ChainManager + testNode *Manager requireHeight uint64 want *types.Block err error diff --git a/netsync/chainmgr/handle.go b/netsync/chainmgr/handle.go index 8f7d6795..9dea086f 100644 --- a/netsync/chainmgr/handle.go +++ b/netsync/chainmgr/handle.go @@ -45,8 +45,8 @@ type Switch interface { Peers() *p2p.PeerSet } -//ChainManager is responsible for the business layer information synchronization -type ChainManager struct { +//Manager is responsible for the business layer information synchronization +type Manager struct { sw Switch chain Chain txPool *core.TxPool @@ -62,8 +62,8 @@ type ChainManager struct { } //NewChainManager create a chain sync manager. -func NewChainManager(config *cfg.Config, sw Switch, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher, peers *peers.PeerSet) (*ChainManager, error) { - manager := &ChainManager{ +func NewManager(config *cfg.Config, sw Switch, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher, peers *peers.PeerSet) (*Manager, error) { + manager := &Manager{ sw: sw, txPool: txPool, chain: chain, @@ -82,53 +82,53 @@ func NewChainManager(config *cfg.Config, sw Switch, chain Chain, txPool *core.Tx return manager, nil } -func (cm *ChainManager) AddPeer(peer peers.BasePeer) { - cm.peers.AddPeer(peer) +func (m *Manager) AddPeer(peer peers.BasePeer) { + m.peers.AddPeer(peer) } //IsCaughtUp check wheather the peer finish the sync -func (cm *ChainManager) IsCaughtUp() bool { - peer := cm.peers.BestPeer(consensus.SFFullNode) - return peer == nil || peer.Height() <= cm.chain.BestBlockHeight() +func (m *Manager) IsCaughtUp() bool { + peer := m.peers.BestPeer(consensus.SFFullNode) + return peer == nil || peer.Height() <= m.chain.BestBlockHeight() } -func (cm *ChainManager) handleBlockMsg(peer *peers.Peer, msg *msgs.BlockMessage) { +func (m *Manager) handleBlockMsg(peer *peers.Peer, msg *msgs.BlockMessage) { block, err := msg.GetBlock() if err != nil { return } - cm.blockKeeper.processBlock(peer.ID(), block) + m.blockKeeper.processBlock(peer.ID(), block) } -func (cm *ChainManager) handleBlocksMsg(peer *peers.Peer, msg *msgs.BlocksMessage) { +func (m *Manager) handleBlocksMsg(peer *peers.Peer, msg *msgs.BlocksMessage) { blocks, err := msg.GetBlocks() if err != nil { log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleBlocksMsg GetBlocks") return } - cm.blockKeeper.processBlocks(peer.ID(), blocks) + m.blockKeeper.processBlocks(peer.ID(), blocks) } -func (cm *ChainManager) handleFilterAddMsg(peer *peers.Peer, msg *msgs.FilterAddMessage) { +func (m *Manager) handleFilterAddMsg(peer *peers.Peer, msg *msgs.FilterAddMessage) { peer.AddFilterAddress(msg.Address) } -func (cm *ChainManager) handleFilterClearMsg(peer *peers.Peer) { +func (m *Manager) handleFilterClearMsg(peer *peers.Peer) { peer.FilterClear() } -func (cm *ChainManager) handleFilterLoadMsg(peer *peers.Peer, msg *msgs.FilterLoadMessage) { +func (m *Manager) handleFilterLoadMsg(peer *peers.Peer, msg *msgs.FilterLoadMessage) { peer.AddFilterAddresses(msg.Addresses) } -func (cm *ChainManager) handleGetBlockMsg(peer *peers.Peer, msg *msgs.GetBlockMessage) { +func (m *Manager) handleGetBlockMsg(peer *peers.Peer, msg *msgs.GetBlockMessage) { var block *types.Block var err error if msg.Height != 0 { - block, err = cm.chain.GetBlockByHeight(msg.Height) + block, err = m.chain.GetBlockByHeight(msg.Height) } else { - block, err = cm.chain.GetBlockByHash(msg.GetHash()) + block, err = m.chain.GetBlockByHash(msg.GetHash()) } if err != nil { log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetBlockMsg get block from chain") @@ -137,15 +137,15 @@ func (cm *ChainManager) handleGetBlockMsg(peer *peers.Peer, msg *msgs.GetBlockMe ok, err := peer.SendBlock(block) if !ok { - cm.peers.RemovePeer(peer.ID()) + m.peers.RemovePeer(peer.ID()) } if err != nil { log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlockMsg sentBlock") } } -func (cm *ChainManager) handleGetBlocksMsg(peer *peers.Peer, msg *msgs.GetBlocksMessage) { - blocks, err := cm.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash()) +func (m *Manager) handleGetBlocksMsg(peer *peers.Peer, msg *msgs.GetBlocksMessage) { + blocks, err := m.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash()) if err != nil || len(blocks) == 0 { return } @@ -168,15 +168,15 @@ func (cm *ChainManager) handleGetBlocksMsg(peer *peers.Peer, msg *msgs.GetBlocks ok, err := peer.SendBlocks(sendBlocks) if !ok { - cm.peers.RemovePeer(peer.ID()) + m.peers.RemovePeer(peer.ID()) } if err != nil { log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg sentBlock") } } -func (cm *ChainManager) handleGetHeadersMsg(peer *peers.Peer, msg *msgs.GetHeadersMessage) { - headers, err := cm.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash()) +func (m *Manager) handleGetHeadersMsg(peer *peers.Peer, msg *msgs.GetHeadersMessage) { + headers, err := m.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash()) if err != nil || len(headers) == 0 { log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleGetHeadersMsg locateHeaders") return @@ -184,20 +184,20 @@ func (cm *ChainManager) handleGetHeadersMsg(peer *peers.Peer, msg *msgs.GetHeade ok, err := peer.SendHeaders(headers) if !ok { - cm.peers.RemovePeer(peer.ID()) + m.peers.RemovePeer(peer.ID()) } if err != nil { log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetHeadersMsg sentBlock") } } -func (cm *ChainManager) handleGetMerkleBlockMsg(peer *peers.Peer, msg *msgs.GetMerkleBlockMessage) { +func (m *Manager) handleGetMerkleBlockMsg(peer *peers.Peer, msg *msgs.GetMerkleBlockMessage) { var err error var block *types.Block if msg.Height != 0 { - block, err = cm.chain.GetBlockByHeight(msg.Height) + block, err = m.chain.GetBlockByHeight(msg.Height) } else { - block, err = cm.chain.GetBlockByHash(msg.GetHash()) + block, err = m.chain.GetBlockByHash(msg.GetHash()) } if err != nil { log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get block from chain") @@ -205,7 +205,7 @@ func (cm *ChainManager) handleGetMerkleBlockMsg(peer *peers.Peer, msg *msgs.GetM } blockHash := block.Hash() - txStatus, err := cm.chain.GetTransactionStatus(&blockHash) + txStatus, err := m.chain.GetTransactionStatus(&blockHash) if err != nil { log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get transaction status") return @@ -218,63 +218,63 @@ func (cm *ChainManager) handleGetMerkleBlockMsg(peer *peers.Peer, msg *msgs.GetM } if !ok { - cm.peers.RemovePeer(peer.ID()) + m.peers.RemovePeer(peer.ID()) } } -func (cm *ChainManager) handleHeadersMsg(peer *peers.Peer, msg *msgs.HeadersMessage) { +func (m *Manager) handleHeadersMsg(peer *peers.Peer, msg *msgs.HeadersMessage) { headers, err := msg.GetHeaders() if err != nil { log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleHeadersMsg GetHeaders") return } - cm.blockKeeper.processHeaders(peer.ID(), headers) + m.blockKeeper.processHeaders(peer.ID(), headers) } -func (cm *ChainManager) handleStatusMsg(basePeer peers.BasePeer, msg *msgs.StatusMessage) { - if peer := cm.peers.GetPeer(basePeer.ID()); peer != nil { +func (m *Manager) handleStatusMsg(basePeer peers.BasePeer, msg *msgs.StatusMessage) { + if peer := m.peers.GetPeer(basePeer.ID()); peer != nil { peer.SetStatus(msg.Height, msg.GetHash()) return } } -func (cm *ChainManager) handleTransactionMsg(peer *peers.Peer, msg *msgs.TransactionMessage) { +func (m *Manager) handleTransactionMsg(peer *peers.Peer, msg *msgs.TransactionMessage) { tx, err := msg.GetTransaction() if err != nil { - cm.peers.AddBanScore(peer.ID(), 0, 10, "fail on get tx from message") + m.peers.AddBanScore(peer.ID(), 0, 10, "fail on get tx from message") return } - if isOrphan, err := cm.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan { - cm.peers.AddBanScore(peer.ID(), 10, 0, "fail on validate tx transaction") + if isOrphan, err := m.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan { + m.peers.AddBanScore(peer.ID(), 10, 0, "fail on validate tx transaction") } - cm.peers.MarkTx(peer.ID(), tx.ID) + m.peers.MarkTx(peer.ID(), tx.ID) } -func (cm *ChainManager) handleTransactionsMsg(peer *peers.Peer, msg *msgs.TransactionsMessage) { +func (m *Manager) handleTransactionsMsg(peer *peers.Peer, msg *msgs.TransactionsMessage) { txs, err := msg.GetTransactions() if err != nil { - cm.peers.AddBanScore(peer.ID(), 0, 20, "fail on get txs from message") + m.peers.AddBanScore(peer.ID(), 0, 20, "fail on get txs from message") return } if len(txs) > msgs.TxsMsgMaxTxNum { - cm.peers.AddBanScore(peer.ID(), 20, 0, "exceeded the maximum tx number limit") + m.peers.AddBanScore(peer.ID(), 20, 0, "exceeded the maximum tx number limit") return } for _, tx := range txs { - if isOrphan, err := cm.chain.ValidateTx(tx); err != nil && !isOrphan { - cm.peers.AddBanScore(peer.ID(), 10, 0, "fail on validate tx transaction") + if isOrphan, err := m.chain.ValidateTx(tx); err != nil && !isOrphan { + m.peers.AddBanScore(peer.ID(), 10, 0, "fail on validate tx transaction") return } - cm.peers.MarkTx(peer.ID(), tx.ID) + m.peers.MarkTx(peer.ID(), tx.ID) } } -func (cm *ChainManager) processMsg(basePeer peers.BasePeer, msgType byte, msg msgs.BlockchainMessage) { - peer := cm.peers.GetPeer(basePeer.ID()) +func (m *Manager) processMsg(basePeer peers.BasePeer, msgType byte, msg msgs.BlockchainMessage) { + peer := m.peers.GetPeer(basePeer.ID()) if peer == nil { return } @@ -288,43 +288,43 @@ func (cm *ChainManager) processMsg(basePeer peers.BasePeer, msgType byte, msg ms switch msg := msg.(type) { case *msgs.GetBlockMessage: - cm.handleGetBlockMsg(peer, msg) + m.handleGetBlockMsg(peer, msg) case *msgs.BlockMessage: - cm.handleBlockMsg(peer, msg) + m.handleBlockMsg(peer, msg) case *msgs.StatusMessage: - cm.handleStatusMsg(basePeer, msg) + m.handleStatusMsg(basePeer, msg) case *msgs.TransactionMessage: - cm.handleTransactionMsg(peer, msg) + m.handleTransactionMsg(peer, msg) case *msgs.TransactionsMessage: - cm.handleTransactionsMsg(peer, msg) + m.handleTransactionsMsg(peer, msg) case *msgs.GetHeadersMessage: - cm.handleGetHeadersMsg(peer, msg) + m.handleGetHeadersMsg(peer, msg) case *msgs.HeadersMessage: - cm.handleHeadersMsg(peer, msg) + m.handleHeadersMsg(peer, msg) case *msgs.GetBlocksMessage: - cm.handleGetBlocksMsg(peer, msg) + m.handleGetBlocksMsg(peer, msg) case *msgs.BlocksMessage: - cm.handleBlocksMsg(peer, msg) + m.handleBlocksMsg(peer, msg) case *msgs.FilterLoadMessage: - cm.handleFilterLoadMsg(peer, msg) + m.handleFilterLoadMsg(peer, msg) case *msgs.FilterAddMessage: - cm.handleFilterAddMsg(peer, msg) + m.handleFilterAddMsg(peer, msg) case *msgs.FilterClearMessage: - cm.handleFilterClearMsg(peer) + m.handleFilterClearMsg(peer) case *msgs.GetMerkleBlockMessage: - cm.handleGetMerkleBlockMsg(peer, msg) + m.handleGetMerkleBlockMsg(peer, msg) default: log.WithFields(log.Fields{ @@ -335,38 +335,38 @@ func (cm *ChainManager) processMsg(basePeer peers.BasePeer, msgType byte, msg ms } } -func (cm *ChainManager) RemovePeer(peerID string) { - cm.peers.RemovePeer(peerID) +func (m *Manager) RemovePeer(peerID string) { + m.peers.RemovePeer(peerID) } -func (cm *ChainManager) SendStatus(peer peers.BasePeer) error { - p := cm.peers.GetPeer(peer.ID()) +func (m *Manager) SendStatus(peer peers.BasePeer) error { + p := m.peers.GetPeer(peer.ID()) if p == nil { return errors.New("invalid peer") } - if err := p.SendStatus(cm.chain.BestBlockHeader()); err != nil { - cm.peers.RemovePeer(p.ID()) + if err := p.SendStatus(m.chain.BestBlockHeader()); err != nil { + m.peers.RemovePeer(p.ID()) return err } return nil } -func (cm *ChainManager) Start() error { +func (m *Manager) Start() error { var err error - cm.txMsgSub, err = cm.eventDispatcher.Subscribe(core.TxMsgEvent{}) + m.txMsgSub, err = m.eventDispatcher.Subscribe(core.TxMsgEvent{}) if err != nil { return err } // broadcast transactions - go cm.txBroadcastLoop() - go cm.txSyncLoop() + go m.txBroadcastLoop() + go m.txSyncLoop() return nil } //Stop stop sync manager -func (cm *ChainManager) Stop() { - close(cm.quitSync) +func (m *Manager) Stop() { + close(m.quitSync) } diff --git a/netsync/chainmgr/protocol_reactor.go b/netsync/chainmgr/protocol_reactor.go index 86987fb7..85a5c259 100644 --- a/netsync/chainmgr/protocol_reactor.go +++ b/netsync/chainmgr/protocol_reactor.go @@ -16,13 +16,13 @@ import ( type ProtocolReactor struct { p2p.BaseReactor - cm *ChainManager + manager *Manager } // NewProtocolReactor returns the reactor of whole blockchain. -func NewProtocolReactor(cm *ChainManager) *ProtocolReactor { +func NewProtocolReactor(manager *Manager) *ProtocolReactor { pr := &ProtocolReactor{ - cm: cm, + manager: manager, } pr.BaseReactor = *p2p.NewBaseReactor("ProtocolReactor", pr) return pr @@ -52,17 +52,17 @@ func (pr *ProtocolReactor) OnStop() { // AddPeer implements Reactor by sending our state to peer. func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error { - pr.cm.AddPeer(peer) - if err := pr.cm.SendStatus(peer); err != nil { + pr.manager.AddPeer(peer) + if err := pr.manager.SendStatus(peer); err != nil { return err } - pr.cm.syncTransactions(peer.Key) + pr.manager.syncTransactions(peer.Key) return nil } // RemovePeer implements Reactor by removing peer from the pool. func (pr *ProtocolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { - pr.cm.RemovePeer(peer.Key) + pr.manager.RemovePeer(peer.Key) } //decodeMessage decode msg @@ -85,5 +85,5 @@ func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) { return } - pr.cm.processMsg(src, msgType, msg) + pr.manager.processMsg(src, msgType, msg) } diff --git a/netsync/chainmgr/tool_test.go b/netsync/chainmgr/tool_test.go index 34f0cc6e..dba3c89c 100644 --- a/netsync/chainmgr/tool_test.go +++ b/netsync/chainmgr/tool_test.go @@ -21,7 +21,7 @@ type P2PPeer struct { flag consensus.ServiceFlag srcPeer *P2PPeer - remoteNode *ChainManager + remoteNode *Manager msgCh chan []byte async bool } @@ -52,7 +52,7 @@ func (p *P2PPeer) ServiceFlag() consensus.ServiceFlag { return p.flag } -func (p *P2PPeer) SetConnection(srcPeer *P2PPeer, node *ChainManager) { +func (p *P2PPeer) SetConnection(srcPeer *P2PPeer, node *Manager) { p.srcPeer = srcPeer p.remoteNode = node } @@ -93,19 +93,19 @@ func (ps *PeerSet) AddBannedPeer(string) error { return nil } func (ps *PeerSet) StopPeerGracefully(string) {} type NetWork struct { - nodes map[*ChainManager]P2PPeer + nodes map[*Manager]P2PPeer } func NewNetWork() *NetWork { - return &NetWork{map[*ChainManager]P2PPeer{}} + return &NetWork{map[*Manager]P2PPeer{}} } -func (nw *NetWork) Register(node *ChainManager, addr, id string, flag consensus.ServiceFlag) { +func (nw *NetWork) Register(node *Manager, addr, id string, flag consensus.ServiceFlag) { peer := NewP2PPeer(addr, id, flag) nw.nodes[node] = *peer } -func (nw *NetWork) HandsShake(nodeA, nodeB *ChainManager) (*P2PPeer, *P2PPeer, error) { +func (nw *NetWork) HandsShake(nodeA, nodeB *Manager) (*P2PPeer, *P2PPeer, error) { B2A, ok := nw.nodes[nodeA] if !ok { return nil, nil, errors.New("can't find nodeA's p2p peer on network") @@ -150,7 +150,7 @@ func mockBlocks(startBlock *types.Block, height uint64) []*types.Block { return blocks } -func mockSync(blocks []*types.Block) *ChainManager { +func mockSync(blocks []*types.Block) *Manager { chain := mock.NewChain() peers := peers.NewPeerSet(NewPeerSet()) chain.SetBestBlockHeader(&blocks[len(blocks)-1].BlockHeader) @@ -158,7 +158,7 @@ func mockSync(blocks []*types.Block) *ChainManager { chain.SetBlockByHeight(block.Height, block) } - return &ChainManager{ + return &Manager{ chain: chain, blockKeeper: newBlockKeeper(chain, peers), peers: peers, diff --git a/netsync/chainmgr/tx_keeper.go b/netsync/chainmgr/tx_keeper.go index 9f1a7cc3..50714031 100644 --- a/netsync/chainmgr/tx_keeper.go +++ b/netsync/chainmgr/tx_keeper.go @@ -21,8 +21,8 @@ type txSyncMsg struct { txs []*types.Tx } -func (cm *ChainManager) syncTransactions(peerID string) { - pending := cm.txPool.GetTransactions() +func (m *Manager) syncTransactions(peerID string) { + pending := m.txPool.GetTransactions() if len(pending) == 0 { return } @@ -31,13 +31,13 @@ func (cm *ChainManager) syncTransactions(peerID string) { for i, batch := range pending { txs[i] = batch.Tx } - cm.txSyncCh <- &txSyncMsg{peerID, txs} + m.txSyncCh <- &txSyncMsg{peerID, txs} } -func (cm *ChainManager) txBroadcastLoop() { +func (m *Manager) txBroadcastLoop() { for { select { - case obj, ok := <-cm.txMsgSub.Chan(): + case obj, ok := <-m.txMsgSub.Chan(): if !ok { log.WithFields(log.Fields{"module": logModule}).Warning("mempool tx msg subscription channel closed") return @@ -50,12 +50,12 @@ func (cm *ChainManager) txBroadcastLoop() { } if ev.TxMsg.MsgType == core.MsgNewTx { - if err := cm.peers.BroadcastTx(ev.TxMsg.Tx); err != nil { + if err := m.peers.BroadcastTx(ev.TxMsg.Tx); err != nil { log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast new tx.") continue } } - case <-cm.quitSync: + case <-m.quitSync: return } } @@ -65,14 +65,14 @@ func (cm *ChainManager) txBroadcastLoop() { // connection. When a new peer appears, we relay all currently pending // transactions. In order to minimise egress bandwidth usage, we send // the transactions in small packs to one peer at a time. -func (cm *ChainManager) txSyncLoop() { +func (m *Manager) txSyncLoop() { pending := make(map[string]*txSyncMsg) sending := false // whether a send is active done := make(chan error, 1) // result of the send // send starts a sending a pack of transactions from the sync. send := func(msg *txSyncMsg) { - peer := cm.peers.GetPeer(msg.peerID) + peer := m.peers.GetPeer(msg.peerID) if peer == nil { delete(pending, msg.peerID) return @@ -102,7 +102,7 @@ func (cm *ChainManager) txSyncLoop() { go func() { err := peer.SendTransactions(sendTxs) if err != nil { - cm.peers.RemovePeer(msg.peerID) + m.peers.RemovePeer(msg.peerID) } done <- err }() @@ -125,7 +125,7 @@ func (cm *ChainManager) txSyncLoop() { for { select { - case msg := <-cm.txSyncCh: + case msg := <-m.txSyncCh: pending[msg.peerID] = msg if !sending { send(msg) diff --git a/netsync/consensusmgr/block_fetcher.go b/netsync/consensusmgr/block_fetcher.go new file mode 100644 index 00000000..6278c618 --- /dev/null +++ b/netsync/consensusmgr/block_fetcher.go @@ -0,0 +1,108 @@ +package consensusmgr + +import ( + "github.com/sirupsen/logrus" + "gopkg.in/karalabe/cookiejar.v2/collections/prque" + + "github.com/vapor/netsync/peers" + "github.com/vapor/protocol/bc" +) + +const ( + maxBlockDistance = 64 + maxMsgSetSize = 128 + newBlockChSize = 64 +) + +// blockFetcher is responsible for accumulating block announcements from various peers +// and scheduling them for retrieval. +type blockFetcher struct { + chain Chain + peers *peers.PeerSet + + newBlockCh chan *blockMsg + queue *prque.Prque + msgSet map[bc.Hash]*blockMsg +} + +//NewBlockFetcher creates a block fetcher to retrieve blocks of the new propose. +func newBlockFetcher(chain Chain, peers *peers.PeerSet) *blockFetcher { + f := &blockFetcher{ + chain: chain, + peers: peers, + newBlockCh: make(chan *blockMsg, newBlockChSize), + queue: prque.New(), + msgSet: make(map[bc.Hash]*blockMsg), + } + go f.blockProcessor() + return f +} + +func (f *blockFetcher) blockProcessor() { + for { + height := f.chain.BestBlockHeight() + for !f.queue.Empty() { + msg := f.queue.PopItem().(*blockMsg) + if msg.block.Height > height+1 { + f.queue.Push(msg, -float32(msg.block.Height)) + break + } + + f.insert(msg) + delete(f.msgSet, msg.block.Hash()) + } + f.add(<-f.newBlockCh) + } +} + +func (f *blockFetcher) add(msg *blockMsg) { + bestHeight := f.chain.BestBlockHeight() + if len(f.msgSet) > maxMsgSetSize || bestHeight > msg.block.Height || msg.block.Height-bestHeight > maxBlockDistance { + return + } + + blockHash := msg.block.Hash() + if _, ok := f.msgSet[blockHash]; !ok { + f.msgSet[blockHash] = msg + f.queue.Push(msg, -float32(msg.block.Height)) + logrus.WithFields(logrus.Fields{ + "module": logModule, + "block height": msg.block.Height, + "block hash": blockHash.String(), + }).Debug("blockFetcher receive propose block") + } +} + +func (f *blockFetcher) insert(msg *blockMsg) { + isOrphan, err := f.chain.ProcessBlock(msg.block) + if err != nil { + peer := f.peers.GetPeer(msg.peerID) + if peer == nil { + return + } + + f.peers.AddBanScore(msg.peerID, 20, 0, err.Error()) + return + } + + if isOrphan { + return + } + + hash := msg.block.Hash() + f.peers.SetStatus(msg.peerID, msg.block.Height, &hash) + proposeMsg, err := NewBlockProposeMsg(msg.block) + if err != nil { + logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on create BlockProposeMsg") + return + } + + if err := f.peers.BroadcastMsg(NewBroadcastMsg(proposeMsg, consensusChannel)); err != nil { + logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on broadcast proposed block") + return + } +} + +func (f *blockFetcher) processNewBlock(msg *blockMsg) { + f.newBlockCh <- msg +} diff --git a/netsync/consensusmgr/broadcast_msg.go b/netsync/consensusmgr/broadcast_msg.go new file mode 100644 index 00000000..b11695a0 --- /dev/null +++ b/netsync/consensusmgr/broadcast_msg.go @@ -0,0 +1,45 @@ +package consensusmgr + +import ( + "github.com/vapor/netsync/peers" +) + +// BroadcastMsg the message that can be broadcast +// by peer set. +type BroadcastMsg struct { + msg ConsensusMessage + transChan byte +} + +// NewBroadcastMsg create concrete broadcast message, implement peers.BroadcastMsg interface. +func NewBroadcastMsg(msg ConsensusMessage, transChan byte) *BroadcastMsg { + return &BroadcastMsg{ + msg: msg, + transChan: transChan, + } +} + +// GetChan get message transfer channel. +func (b *BroadcastMsg) GetChan() byte { + return b.transChan +} + +// GetMsg get ConsensusMessage. +func (b *BroadcastMsg) GetMsg() interface{} { + return struct{ ConsensusMessage }{b.msg} +} + +// MsgString get a string representation of the message. +func (b *BroadcastMsg) MsgString() string { + return b.msg.String() +} + +// MarkSendRecord mark send message record to prevent messages from being sent repeatedly. +func (b *BroadcastMsg) MarkSendRecord(ps *peers.PeerSet, peers []string) { + b.msg.BroadcastMarkSendRecord(ps, peers) +} + +// FilterTargetPeers filter target peers to filter the nodes that need to send messages. +func (b *BroadcastMsg) FilterTargetPeers(ps *peers.PeerSet) []string { + return b.msg.BroadcastFilterTargetPeers(ps) +} diff --git a/netsync/consensusmgr/consensus_msg.go b/netsync/consensusmgr/consensus_msg.go new file mode 100644 index 00000000..cd6845cf --- /dev/null +++ b/netsync/consensusmgr/consensus_msg.go @@ -0,0 +1,131 @@ +package consensusmgr + +import ( + "bytes" + "encoding/hex" + "errors" + "fmt" + + "github.com/tendermint/go-wire" + + "github.com/vapor/netsync/peers" + "github.com/vapor/protocol/bc" + "github.com/vapor/protocol/bc/types" +) + +const ( + blockSignatureByte = byte(0x10) + blockProposeByte = byte(0x11) +) + +//ConsensusMessage is a generic message for consensus reactor. +type ConsensusMessage interface { + String() string + BroadcastMarkSendRecord(ps *peers.PeerSet, peers []string) + BroadcastFilterTargetPeers(ps *peers.PeerSet) []string +} + +var _ = wire.RegisterInterface( + struct{ ConsensusMessage }{}, + wire.ConcreteType{O: &BlockSignatureMsg{}, Byte: blockSignatureByte}, + wire.ConcreteType{O: &BlockProposeMsg{}, Byte: blockProposeByte}, +) + +//decodeMessage decode msg +func decodeMessage(bz []byte) (msgType byte, msg ConsensusMessage, err error) { + msgType = bz[0] + n := int(0) + r := bytes.NewReader(bz) + msg = wire.ReadBinary(struct{ ConsensusMessage }{}, r, maxBlockchainResponseSize, &n, &err).(struct{ ConsensusMessage }).ConsensusMessage + if err != nil && n != len(bz) { + err = errors.New("DecodeMessage() had bytes left over") + } + return +} + +// BlockSignatureMsg block signature message transferred between nodes. +type BlockSignatureMsg struct { + BlockHash [32]byte + Height uint64 + Signature []byte + PubKey [32]byte +} + +//NewBlockSignatureMsg create new block signature msg. +func NewBlockSignatureMsg(blockHash bc.Hash, height uint64, signature []byte, pubKey [32]byte) ConsensusMessage { + hash := blockHash.Byte32() + return &BlockSignatureMsg{BlockHash: hash, Height: height, Signature: signature, PubKey: pubKey} +} + +func (bs *BlockSignatureMsg) String() string { + return fmt.Sprintf("{block_hash: %s,block_height:%d,signature:%s,pubkey:%s}", hex.EncodeToString(bs.BlockHash[:]), bs.Height, hex.EncodeToString(bs.Signature), hex.EncodeToString(bs.PubKey[:])) +} + +// BroadcastMarkSendRecord mark send message record to prevent messages from being sent repeatedly. +func (bs *BlockSignatureMsg) BroadcastMarkSendRecord(ps *peers.PeerSet, peers []string) { + for _, peer := range peers { + ps.MarkBlockSignature(peer, bs.Signature) + } +} + +// BroadcastFilterTargetPeers filter target peers to filter the nodes that need to send messages. +func (bs *BlockSignatureMsg) BroadcastFilterTargetPeers(ps *peers.PeerSet) []string { + return ps.PeersWithoutSign(bs.Signature) +} + +// BlockProposeMsg block propose message transferred between nodes. +type BlockProposeMsg struct { + RawBlock []byte +} + +//NewBlockProposeMsg create new block propose msg. +func NewBlockProposeMsg(block *types.Block) (ConsensusMessage, error) { + rawBlock, err := block.MarshalText() + if err != nil { + return nil, err + } + return &BlockProposeMsg{RawBlock: rawBlock}, nil +} + +//GetProposeBlock get propose block from msg. +func (bp *BlockProposeMsg) GetProposeBlock() (*types.Block, error) { + block := &types.Block{} + if err := block.UnmarshalText(bp.RawBlock); err != nil { + return nil, err + } + return block, nil +} + +func (bp *BlockProposeMsg) String() string { + block, err := bp.GetProposeBlock() + if err != nil { + return "{err: wrong message}" + } + blockHash := block.Hash() + return fmt.Sprintf("{block_height: %d, block_hash: %s}", block.Height, blockHash.String()) +} + +// BroadcastMarkSendRecord mark send message record to prevent messages from being sent repeatedly. +func (bp *BlockProposeMsg) BroadcastMarkSendRecord(ps *peers.PeerSet, peers []string) { + block, err := bp.GetProposeBlock() + if err != nil { + return + } + + hash := block.Hash() + height := block.Height + for _, peer := range peers { + ps.MarkBlock(peer, &hash) + ps.MarkStatus(peer, height) + } +} + +// BroadcastFilterTargetPeers filter target peers to filter the nodes that need to send messages. +func (bp *BlockProposeMsg) BroadcastFilterTargetPeers(ps *peers.PeerSet) []string { + block, err := bp.GetProposeBlock() + if err != nil { + return nil + } + + return ps.PeersWithoutBlock(block.Hash()) +} diff --git a/netsync/consensusmgr/consensus_msg_test.go b/netsync/consensusmgr/consensus_msg_test.go new file mode 100644 index 00000000..ea4e004d --- /dev/null +++ b/netsync/consensusmgr/consensus_msg_test.go @@ -0,0 +1,155 @@ +package consensusmgr + +import ( + "reflect" + "testing" + + "github.com/davecgh/go-spew/spew" + "github.com/tendermint/go-wire" + + "github.com/vapor/protocol/bc" + "github.com/vapor/protocol/bc/types" +) + +var _ = wire.RegisterInterface( + struct{ ConsensusMessage }{}, + wire.ConcreteType{O: &BlockSignatureMsg{}, Byte: blockSignatureByte}, + wire.ConcreteType{O: &BlockProposeMsg{}, Byte: blockProposeByte}, +) + +func TestDecodeMessage(t *testing.T) { + testCases := []struct { + msg ConsensusMessage + msgType byte + }{ + { + msg: &BlockSignatureMsg{ + BlockHash: [32]byte{0x01}, + Signature: []byte{0x00}, + PubKey: [32]byte{0x01}, + }, + msgType: blockSignatureByte, + }, + { + msg: &BlockProposeMsg{ + RawBlock: []byte{0x01, 0x02}, + }, + msgType: blockProposeByte, + }, + } + for i, c := range testCases { + binMsg := wire.BinaryBytes(struct{ ConsensusMessage }{c.msg}) + gotMsgType, gotMsg, err := decodeMessage(binMsg) + if err != nil { + t.Fatalf("index:%d decode Message err %s", i, err) + } + if gotMsgType != c.msgType { + t.Fatalf("index:%d decode Message type err. got:%d want:%d", i, gotMsgType, c.msg) + } + if !reflect.DeepEqual(gotMsg, c.msg) { + t.Fatalf("index:%d decode Message err. got:%s\n want:%s", i, spew.Sdump(gotMsg), spew.Sdump(c.msg)) + } + } +} + +func TestBlockSignBroadcastMsg(t *testing.T) { + blockSignMsg := &BlockSignatureMsg{ + BlockHash: [32]byte{0x01}, + Height: 100, + Signature: []byte{0x00}, + PubKey: [32]byte{0x01}, + } + signatureBroadcastMsg := NewBroadcastMsg(NewBlockSignatureMsg(bc.NewHash(blockSignMsg.BlockHash), blockSignMsg.Height, blockSignMsg.Signature, blockSignMsg.PubKey), consensusChannel) + + binMsg := wire.BinaryBytes(signatureBroadcastMsg.GetMsg()) + gotMsgType, gotMsg, err := decodeMessage(binMsg) + if err != nil { + t.Fatalf("decode Message err %s", err) + } + if gotMsgType != blockSignatureByte { + t.Fatalf("decode Message type err. got:%d want:%d", gotMsgType, blockSignatureByte) + } + if !reflect.DeepEqual(gotMsg, blockSignMsg) { + t.Fatalf("decode Message err. got:%s\n want:%s", spew.Sdump(gotMsg), spew.Sdump(blockSignMsg)) + } +} + +func TestBlockProposeBroadcastMsg(t *testing.T) { + blockProposeMsg, _ := NewBlockProposeMsg(testBlock) + + proposeBroadcastMsg := NewBroadcastMsg(blockProposeMsg, consensusChannel) + + binMsg := wire.BinaryBytes(proposeBroadcastMsg.GetMsg()) + gotMsgType, gotMsg, err := decodeMessage(binMsg) + if err != nil { + t.Fatalf("decode Message err %s", err) + } + if gotMsgType != blockProposeByte { + t.Fatalf("decode Message type err. got:%d want:%d", gotMsgType, blockProposeByte) + } + if !reflect.DeepEqual(gotMsg, blockProposeMsg) { + t.Fatalf("decode Message err. got:%s\n want:%s", spew.Sdump(gotMsg), spew.Sdump(blockProposeMsg)) + } +} + +var testBlock = &types.Block{ + BlockHeader: types.BlockHeader{ + Version: 1, + Height: 0, + Timestamp: 1528945000, + BlockCommitment: types.BlockCommitment{ + TransactionsMerkleRoot: bc.Hash{V0: uint64(0x11)}, + TransactionStatusHash: bc.Hash{V0: uint64(0x55)}, + }, + }, +} + +func TestBlockProposeMsg(t *testing.T) { + blockMsg, err := NewBlockProposeMsg(testBlock) + if err != nil { + t.Fatalf("create new mine block msg err:%s", err) + } + + gotBlock, err := blockMsg.(*BlockProposeMsg).GetProposeBlock() + if err != nil { + t.Fatalf("got block err:%s", err) + } + + if !reflect.DeepEqual(gotBlock.BlockHeader, testBlock.BlockHeader) { + t.Errorf("block msg test err: got %s\nwant %s", spew.Sdump(gotBlock.BlockHeader), spew.Sdump(testBlock.BlockHeader)) + } + + wantString := "{block_height: 0, block_hash: f59514e2541488a38bc2667940bc2c24027e4a3a371d884b55570d036997bb57}" + if blockMsg.String() != wantString { + t.Errorf("block msg test err. got:%s want:%s", blockMsg.String(), wantString) + } + + blockMsg.(*BlockProposeMsg).RawBlock[1] = blockMsg.(*BlockProposeMsg).RawBlock[1] + 0x1 + _, err = blockMsg.(*BlockProposeMsg).GetProposeBlock() + if err == nil { + t.Fatalf("get mine block err") + } + + wantString = "{err: wrong message}" + if blockMsg.String() != wantString { + t.Errorf("block msg test err. got:%s want:%s", blockMsg.String(), wantString) + } +} + +func TestBlockSignatureMsg(t *testing.T) { + msg := &BlockSignatureMsg{ + BlockHash: [32]byte{0x01}, + Height: 100, + Signature: []byte{0x00}, + PubKey: [32]byte{0x01}, + } + gotMsg := NewBlockSignatureMsg(bc.NewHash(msg.BlockHash), msg.Height, msg.Signature, msg.PubKey) + + if !reflect.DeepEqual(gotMsg, msg) { + t.Fatalf("test block signature message err. got:%s\n want:%s", spew.Sdump(gotMsg), spew.Sdump(msg)) + } + wantString := "{block_hash: 0100000000000000000000000000000000000000000000000000000000000000,block_height:100,signature:00,pubkey:0100000000000000000000000000000000000000000000000000000000000000}" + if gotMsg.String() != wantString { + t.Fatalf("test block signature message err. got string:%s\n want string:%s", gotMsg.String(), wantString) + } +} diff --git a/netsync/consensusmgr/handle.go b/netsync/consensusmgr/handle.go new file mode 100644 index 00000000..f3823e05 --- /dev/null +++ b/netsync/consensusmgr/handle.go @@ -0,0 +1,195 @@ +package consensusmgr + +import ( + "reflect" + + "github.com/sirupsen/logrus" + + "github.com/vapor/event" + "github.com/vapor/netsync/peers" + "github.com/vapor/p2p" + "github.com/vapor/protocol/bc" + "github.com/vapor/protocol/bc/types" +) + +// Switch is the interface for p2p switch. +type Switch interface { + AddReactor(name string, reactor p2p.Reactor) p2p.Reactor + AddBannedPeer(string) error + ID() [32]byte +} + +// Chain is the interface for Bytom core. +type Chain interface { + BestBlockHeight() uint64 + GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error) + ProcessBlock(*types.Block) (bool, error) + ProcessBlockSignature(signature, pubkey []byte, blockHeight uint64, blockHash *bc.Hash) error +} + +type blockMsg struct { + block *types.Block + peerID string +} + +// Manager is the consensus message network synchronization manager. +type Manager struct { + sw Switch + chain Chain + peers *peers.PeerSet + blockFetcher *blockFetcher + eventDispatcher *event.Dispatcher + + quit chan struct{} +} + +// NewManager create new manager. +func NewManager(sw Switch, chain Chain, dispatcher *event.Dispatcher, peers *peers.PeerSet) *Manager { + manager := &Manager{ + sw: sw, + peers: peers, + blockFetcher: newBlockFetcher(chain, peers), + eventDispatcher: dispatcher, + quit: make(chan struct{}), + } + protocolReactor := NewConsensusReactor(manager) + manager.sw.AddReactor("CONSENSUS", protocolReactor) + return manager +} + +func (m *Manager) addPeer(peer peers.BasePeer) { + m.peers.AddPeer(peer) +} + +func (m *Manager) processMsg(peerID string, msgType byte, msg ConsensusMessage) { + peer := m.peers.GetPeer(peerID) + if peer == nil { + return + } + + logrus.WithFields(logrus.Fields{"module": logModule, "peer": peerID, "type": reflect.TypeOf(msg), "message": msg.String()}).Info("receive message from peer") + + switch msg := msg.(type) { + case *BlockProposeMsg: + m.handleBlockProposeMsg(peerID, msg) + + case *BlockSignatureMsg: + m.handleBlockSignatureMsg(peerID, msg) + + default: + logrus.WithFields(logrus.Fields{"module": logModule, "peer": peerID, "message_type": reflect.TypeOf(msg)}).Error("unhandled message type") + } +} + +func (m *Manager) handleBlockProposeMsg(peerID string, msg *BlockProposeMsg) { + block, err := msg.GetProposeBlock() + if err != nil { + logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Warning("failed on get propose block") + return + } + + hash := block.Hash() + m.peers.MarkBlock(peerID, &hash) + m.blockFetcher.processNewBlock(&blockMsg{peerID: peerID, block: block}) +} + +func (m *Manager) handleBlockSignatureMsg(peerID string, msg *BlockSignatureMsg) { + blockHash := bc.NewHash(msg.BlockHash) + if err := m.chain.ProcessBlockSignature(msg.Signature, msg.PubKey[:], msg.Height, &blockHash); err != nil { + m.peers.AddBanScore(peerID, 20, 0, err.Error()) + return + } +} + +func (m *Manager) blockProposeMsgBroadcastLoop() { + blockProposeMsgSub, err := m.eventDispatcher.Subscribe(event.NewBlockProposeEvent{}) + if err != nil { + logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on subscribe NewBlockProposeEvent") + return + } + defer blockProposeMsgSub.Unsubscribe() + + for { + select { + case obj, ok := <-blockProposeMsgSub.Chan(): + if !ok { + logrus.WithFields(logrus.Fields{"module": logModule}).Warning("blockProposeMsgSub channel closed") + return + } + + ev, ok := obj.Data.(event.NewBlockProposeEvent) + if !ok { + logrus.WithFields(logrus.Fields{"module": logModule}).Error("event type error") + continue + } + proposeMsg, err := NewBlockProposeMsg(&ev.Block) + if err != nil { + logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on create BlockProposeMsg") + return + } + + if err := m.peers.BroadcastMsg(NewBroadcastMsg(proposeMsg, consensusChannel)); err != nil { + logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on broadcast BlockProposeBroadcastMsg") + continue + } + + case <-m.quit: + return + } + } +} + +func (m *Manager) blockSignatureMsgBroadcastLoop() { + blockSignatureMsgSub, err := m.eventDispatcher.Subscribe(event.BlockSignatureEvent{}) + if err != nil { + logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on subscribe BlockSignatureEvent") + return + } + defer blockSignatureMsgSub.Unsubscribe() + for { + select { + case obj, ok := <-blockSignatureMsgSub.Chan(): + if !ok { + logrus.WithFields(logrus.Fields{"module": logModule}).Warning("blockProposeMsgSub channel closed") + return + } + + ev, ok := obj.Data.(event.BlockSignatureEvent) + if !ok { + logrus.WithFields(logrus.Fields{"module": logModule}).Error("event type error") + continue + } + + blockHeader, err := m.chain.GetHeaderByHash(&ev.BlockHash) + if err != nil { + logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on get header by hash from chain.") + return + } + + blockSignatureMsg := NewBroadcastMsg(NewBlockSignatureMsg(ev.BlockHash, blockHeader.Height, ev.Signature, m.sw.ID()), consensusChannel) + if err := m.peers.BroadcastMsg(blockSignatureMsg); err != nil { + logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on broadcast BlockSignBroadcastMsg.") + return + } + + case <-m.quit: + return + } + } +} + +func (m *Manager) removePeer(peerID string) { + m.peers.RemovePeer(peerID) +} + +//Start consensus manager service. +func (m *Manager) Start() error { + go m.blockProposeMsgBroadcastLoop() + go m.blockSignatureMsgBroadcastLoop() + return nil +} + +//Stop consensus manager service. +func (m *Manager) Stop() { + close(m.quit) +} diff --git a/netsync/consensusmgr/reactor.go b/netsync/consensusmgr/reactor.go new file mode 100644 index 00000000..10405fb9 --- /dev/null +++ b/netsync/consensusmgr/reactor.go @@ -0,0 +1,72 @@ +package consensusmgr + +import ( + "github.com/sirupsen/logrus" + + "github.com/vapor/p2p" + "github.com/vapor/p2p/connection" +) + +const ( + logModule = "consensus" + consensusChannel = byte(0x50) + maxBlockchainResponseSize = 22020096 + 2 +) + +// ConsensusReactor handles new coming consensus message. +type ConsensusReactor struct { + p2p.BaseReactor + manager *Manager +} + +// NewConsensusReactor create consensus reactor. +func NewConsensusReactor(manager *Manager) *ConsensusReactor { + cr := &ConsensusReactor{ + manager: manager, + } + cr.BaseReactor = *p2p.NewBaseReactor("ConsensusReactor", cr) + return cr +} + +// GetChannels implements Reactor +func (cr *ConsensusReactor) GetChannels() []*connection.ChannelDescriptor { + return []*connection.ChannelDescriptor{ + { + ID: consensusChannel, + Priority: 10, + SendQueueCapacity: 100, + }, + } +} + +// OnStart implements BaseService +func (cr *ConsensusReactor) OnStart() error { + return cr.BaseReactor.OnStart() +} + +// OnStop implements BaseService +func (cr *ConsensusReactor) OnStop() { + cr.BaseReactor.OnStop() +} + +// AddPeer implements Reactor by sending our state to peer. +func (cr *ConsensusReactor) AddPeer(peer *p2p.Peer) error { + cr.manager.addPeer(peer) + return nil +} + +// RemovePeer implements Reactor by removing peer from the pool. +func (cr *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { + cr.manager.removePeer(peer.Key) +} + +// Receive implements Reactor by handling messages. +func (cr *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) { + msgType, msg, err := decodeMessage(msgBytes) + if err != nil { + logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("fail on reactor decoding message") + return + } + + cr.manager.processMsg(src.ID(), msgType, msg) +} diff --git a/netsync/sync_manager.go b/netsync/sync_manager.go index 18d5291e..61645bfc 100644 --- a/netsync/sync_manager.go +++ b/netsync/sync_manager.go @@ -3,15 +3,16 @@ package netsync import ( "errors" - log "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus" - cfg "github.com/vapor/config" + "github.com/vapor/config" "github.com/vapor/consensus" "github.com/vapor/event" "github.com/vapor/netsync/chainmgr" + "github.com/vapor/netsync/consensusmgr" "github.com/vapor/netsync/peers" "github.com/vapor/p2p" - core "github.com/vapor/protocol" + "github.com/vapor/protocol" ) const ( @@ -22,12 +23,20 @@ var ( errVaultModeDialPeer = errors.New("can't dial peer in vault mode") ) +// ChainMgr is the interface for p2p chain message sync manager. type ChainMgr interface { Start() error IsCaughtUp() bool Stop() } +// ConsensusMgr is the interface for consensus message sync manager. +type ConsensusMgr interface { + Start() error + Stop() +} + +// Switch is the interface for p2p switch. type Switch interface { Start() (bool, error) Stop() bool @@ -38,50 +47,60 @@ type Switch interface { //SyncManager Sync Manager is responsible for the business layer information synchronization type SyncManager struct { - config *cfg.Config - sw Switch - chainMgr ChainMgr - peers *peers.PeerSet + config *config.Config + sw Switch + chainMgr ChainMgr + consensusMgr ConsensusMgr + peers *peers.PeerSet } // NewSyncManager create sync manager and set switch. -func NewSyncManager(config *cfg.Config, chain *core.Chain, txPool *core.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) { +func NewSyncManager(config *config.Config, chain *protocol.Chain, txPool *protocol.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) { sw, err := p2p.NewSwitch(config) if err != nil { return nil, err } peers := peers.NewPeerSet(sw) - chainManger, err := chainmgr.NewChainManager(config, sw, chain, txPool, dispatcher, peers) + chainManger, err := chainmgr.NewManager(config, sw, chain, txPool, dispatcher, peers) if err != nil { return nil, err } - + consensusMgr := consensusmgr.NewManager(sw, chain, dispatcher, peers) return &SyncManager{ - config: config, - sw: sw, - chainMgr: chainManger, - peers: peers, + config: config, + sw: sw, + chainMgr: chainManger, + consensusMgr: consensusMgr, + peers: peers, }, nil } +// Start message sync manager service. func (sm *SyncManager) Start() error { if _, err := sm.sw.Start(); err != nil { - log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed start switch") + logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed start switch") + return err + } + + if err := sm.chainMgr.Start(); err != nil { return err } - return sm.chainMgr.Start() + return sm.consensusMgr.Start() } +// Stop message sync manager service. func (sm *SyncManager) Stop() { sm.chainMgr.Stop() + sm.consensusMgr.Stop() if !sm.config.VaultMode { sm.sw.Stop() } } +// IsListening check if the bytomd service port is open? func (sm *SyncManager) IsListening() bool { if sm.config.VaultMode { return false @@ -95,6 +114,7 @@ func (sm *SyncManager) IsCaughtUp() bool { return sm.chainMgr.IsCaughtUp() } +// PeerCount count the number of connected peers. func (sm *SyncManager) PeerCount() int { if sm.config.VaultMode { return 0 @@ -102,10 +122,12 @@ func (sm *SyncManager) PeerCount() int { return len(sm.sw.Peers().List()) } +// GetNetwork get the type of network. func (sm *SyncManager) GetNetwork() string { return sm.config.ChainID } +// BestPeer fine the peer with the highest height from the connected peers. func (sm *SyncManager) BestPeer() *peers.PeerInfo { bestPeer := sm.peers.BestPeer(consensus.SFFullNode) if bestPeer != nil { @@ -114,6 +136,7 @@ func (sm *SyncManager) BestPeer() *peers.PeerInfo { return nil } +// DialPeerWithAddress dial the peer and establish a connection. func (sm *SyncManager) DialPeerWithAddress(addr *p2p.NetAddress) error { if sm.config.VaultMode { return errVaultModeDialPeer @@ -122,7 +145,7 @@ func (sm *SyncManager) DialPeerWithAddress(addr *p2p.NetAddress) error { return sm.sw.DialPeerWithAddress(addr) } -//GetPeerInfos return peer info of all peers +//GetPeerInfos return peer info of all connected peers. func (sm *SyncManager) GetPeerInfos() []*peers.PeerInfo { return sm.peers.GetPeerInfos() } diff --git a/protocol/block.go b/protocol/block.go index 8ae535a0..dca8b4d9 100644 --- a/protocol/block.go +++ b/protocol/block.go @@ -302,7 +302,7 @@ func (c *Chain) processBlock(block *types.Block) (bool, error) { return false, nil } -func (c *Chain) processBlockSignature(signature, pubkey []byte, blockHeight uint64, blockHash *bc.Hash) error { +func (c *Chain) ProcessBlockSignature(signature, pubkey []byte, blockHeight uint64, blockHash *bc.Hash) error { isBestIrreversible, err := c.bbft.ProcessBlockSignature(signature, pubkey, blockHeight, blockHash) if err != nil { return err