OSDN Git Service

Add consensus messages transfer (#90)
authoryahtoo <yahtoo.ma@gmail.com>
Tue, 28 May 2019 13:39:07 +0000 (21:39 +0800)
committerPaladz <yzhu101@uottawa.ca>
Tue, 28 May 2019 13:39:07 +0000 (21:39 +0800)
* Add consensus messages transfer

* Abstract broadcast message processing

* opz code format

14 files changed:
event/event.go
netsync/chainmgr/block_keeper_test.go
netsync/chainmgr/handle.go
netsync/chainmgr/protocol_reactor.go
netsync/chainmgr/tool_test.go
netsync/chainmgr/tx_keeper.go
netsync/consensusmgr/block_fetcher.go [new file with mode: 0644]
netsync/consensusmgr/broadcast_msg.go [new file with mode: 0644]
netsync/consensusmgr/consensus_msg.go [new file with mode: 0644]
netsync/consensusmgr/consensus_msg_test.go [new file with mode: 0644]
netsync/consensusmgr/handle.go [new file with mode: 0644]
netsync/consensusmgr/reactor.go [new file with mode: 0644]
netsync/sync_manager.go
protocol/block.go

index 4e76fcb..9ce4d41 100644 (file)
@@ -27,11 +27,14 @@ var (
 
 type NewProposedBlockEvent struct{ Block types.Block }
 
-type BlockSignatureEvent struct { 
+type BlockSignatureEvent struct {
        BlockHash bc.Hash
-       Signature []byte 
+       Signature []byte
 }
 
+//NewBlockProposeEvent block propose event which needs to broadcast.
+type NewBlockProposeEvent struct{ Block types.Block }
+
 // TypeMuxEvent is a time-tagged notification pushed to subscribers.
 type TypeMuxEvent struct {
        Time time.Time
index 2be129b..2b1cc69 100644 (file)
@@ -489,7 +489,7 @@ func TestRequireBlock(t *testing.T) {
        b.blockKeeper.syncPeer = b.peers.GetPeer("test node A")
        cases := []struct {
                syncTimeout   time.Duration
-               testNode      *ChainManager
+               testNode      *Manager
                requireHeight uint64
                want          *types.Block
                err           error
index 8f7d679..9dea086 100644 (file)
@@ -45,8 +45,8 @@ type Switch interface {
        Peers() *p2p.PeerSet
 }
 
-//ChainManager is responsible for the business layer information synchronization
-type ChainManager struct {
+//Manager is responsible for the business layer information synchronization
+type Manager struct {
        sw          Switch
        chain       Chain
        txPool      *core.TxPool
@@ -62,8 +62,8 @@ type ChainManager struct {
 }
 
 //NewChainManager create a chain sync manager.
-func NewChainManager(config *cfg.Config, sw Switch, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher, peers *peers.PeerSet) (*ChainManager, error) {
-       manager := &ChainManager{
+func NewManager(config *cfg.Config, sw Switch, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher, peers *peers.PeerSet) (*Manager, error) {
+       manager := &Manager{
                sw:              sw,
                txPool:          txPool,
                chain:           chain,
@@ -82,53 +82,53 @@ func NewChainManager(config *cfg.Config, sw Switch, chain Chain, txPool *core.Tx
        return manager, nil
 }
 
-func (cm *ChainManager) AddPeer(peer peers.BasePeer) {
-       cm.peers.AddPeer(peer)
+func (m *Manager) AddPeer(peer peers.BasePeer) {
+       m.peers.AddPeer(peer)
 }
 
 //IsCaughtUp check wheather the peer finish the sync
-func (cm *ChainManager) IsCaughtUp() bool {
-       peer := cm.peers.BestPeer(consensus.SFFullNode)
-       return peer == nil || peer.Height() <= cm.chain.BestBlockHeight()
+func (m *Manager) IsCaughtUp() bool {
+       peer := m.peers.BestPeer(consensus.SFFullNode)
+       return peer == nil || peer.Height() <= m.chain.BestBlockHeight()
 }
 
-func (cm *ChainManager) handleBlockMsg(peer *peers.Peer, msg *msgs.BlockMessage) {
+func (m *Manager) handleBlockMsg(peer *peers.Peer, msg *msgs.BlockMessage) {
        block, err := msg.GetBlock()
        if err != nil {
                return
        }
-       cm.blockKeeper.processBlock(peer.ID(), block)
+       m.blockKeeper.processBlock(peer.ID(), block)
 }
 
-func (cm *ChainManager) handleBlocksMsg(peer *peers.Peer, msg *msgs.BlocksMessage) {
+func (m *Manager) handleBlocksMsg(peer *peers.Peer, msg *msgs.BlocksMessage) {
        blocks, err := msg.GetBlocks()
        if err != nil {
                log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleBlocksMsg GetBlocks")
                return
        }
 
-       cm.blockKeeper.processBlocks(peer.ID(), blocks)
+       m.blockKeeper.processBlocks(peer.ID(), blocks)
 }
 
-func (cm *ChainManager) handleFilterAddMsg(peer *peers.Peer, msg *msgs.FilterAddMessage) {
+func (m *Manager) handleFilterAddMsg(peer *peers.Peer, msg *msgs.FilterAddMessage) {
        peer.AddFilterAddress(msg.Address)
 }
 
-func (cm *ChainManager) handleFilterClearMsg(peer *peers.Peer) {
+func (m *Manager) handleFilterClearMsg(peer *peers.Peer) {
        peer.FilterClear()
 }
 
-func (cm *ChainManager) handleFilterLoadMsg(peer *peers.Peer, msg *msgs.FilterLoadMessage) {
+func (m *Manager) handleFilterLoadMsg(peer *peers.Peer, msg *msgs.FilterLoadMessage) {
        peer.AddFilterAddresses(msg.Addresses)
 }
 
-func (cm *ChainManager) handleGetBlockMsg(peer *peers.Peer, msg *msgs.GetBlockMessage) {
+func (m *Manager) handleGetBlockMsg(peer *peers.Peer, msg *msgs.GetBlockMessage) {
        var block *types.Block
        var err error
        if msg.Height != 0 {
-               block, err = cm.chain.GetBlockByHeight(msg.Height)
+               block, err = m.chain.GetBlockByHeight(msg.Height)
        } else {
-               block, err = cm.chain.GetBlockByHash(msg.GetHash())
+               block, err = m.chain.GetBlockByHash(msg.GetHash())
        }
        if err != nil {
                log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetBlockMsg get block from chain")
@@ -137,15 +137,15 @@ func (cm *ChainManager) handleGetBlockMsg(peer *peers.Peer, msg *msgs.GetBlockMe
 
        ok, err := peer.SendBlock(block)
        if !ok {
-               cm.peers.RemovePeer(peer.ID())
+               m.peers.RemovePeer(peer.ID())
        }
        if err != nil {
                log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlockMsg sentBlock")
        }
 }
 
-func (cm *ChainManager) handleGetBlocksMsg(peer *peers.Peer, msg *msgs.GetBlocksMessage) {
-       blocks, err := cm.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
+func (m *Manager) handleGetBlocksMsg(peer *peers.Peer, msg *msgs.GetBlocksMessage) {
+       blocks, err := m.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
        if err != nil || len(blocks) == 0 {
                return
        }
@@ -168,15 +168,15 @@ func (cm *ChainManager) handleGetBlocksMsg(peer *peers.Peer, msg *msgs.GetBlocks
 
        ok, err := peer.SendBlocks(sendBlocks)
        if !ok {
-               cm.peers.RemovePeer(peer.ID())
+               m.peers.RemovePeer(peer.ID())
        }
        if err != nil {
                log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg sentBlock")
        }
 }
 
-func (cm *ChainManager) handleGetHeadersMsg(peer *peers.Peer, msg *msgs.GetHeadersMessage) {
-       headers, err := cm.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash())
+func (m *Manager) handleGetHeadersMsg(peer *peers.Peer, msg *msgs.GetHeadersMessage) {
+       headers, err := m.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash())
        if err != nil || len(headers) == 0 {
                log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleGetHeadersMsg locateHeaders")
                return
@@ -184,20 +184,20 @@ func (cm *ChainManager) handleGetHeadersMsg(peer *peers.Peer, msg *msgs.GetHeade
 
        ok, err := peer.SendHeaders(headers)
        if !ok {
-               cm.peers.RemovePeer(peer.ID())
+               m.peers.RemovePeer(peer.ID())
        }
        if err != nil {
                log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetHeadersMsg sentBlock")
        }
 }
 
-func (cm *ChainManager) handleGetMerkleBlockMsg(peer *peers.Peer, msg *msgs.GetMerkleBlockMessage) {
+func (m *Manager) handleGetMerkleBlockMsg(peer *peers.Peer, msg *msgs.GetMerkleBlockMessage) {
        var err error
        var block *types.Block
        if msg.Height != 0 {
-               block, err = cm.chain.GetBlockByHeight(msg.Height)
+               block, err = m.chain.GetBlockByHeight(msg.Height)
        } else {
-               block, err = cm.chain.GetBlockByHash(msg.GetHash())
+               block, err = m.chain.GetBlockByHash(msg.GetHash())
        }
        if err != nil {
                log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get block from chain")
@@ -205,7 +205,7 @@ func (cm *ChainManager) handleGetMerkleBlockMsg(peer *peers.Peer, msg *msgs.GetM
        }
 
        blockHash := block.Hash()
-       txStatus, err := cm.chain.GetTransactionStatus(&blockHash)
+       txStatus, err := m.chain.GetTransactionStatus(&blockHash)
        if err != nil {
                log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get transaction status")
                return
@@ -218,63 +218,63 @@ func (cm *ChainManager) handleGetMerkleBlockMsg(peer *peers.Peer, msg *msgs.GetM
        }
 
        if !ok {
-               cm.peers.RemovePeer(peer.ID())
+               m.peers.RemovePeer(peer.ID())
        }
 }
 
-func (cm *ChainManager) handleHeadersMsg(peer *peers.Peer, msg *msgs.HeadersMessage) {
+func (m *Manager) handleHeadersMsg(peer *peers.Peer, msg *msgs.HeadersMessage) {
        headers, err := msg.GetHeaders()
        if err != nil {
                log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleHeadersMsg GetHeaders")
                return
        }
 
-       cm.blockKeeper.processHeaders(peer.ID(), headers)
+       m.blockKeeper.processHeaders(peer.ID(), headers)
 }
 
-func (cm *ChainManager) handleStatusMsg(basePeer peers.BasePeer, msg *msgs.StatusMessage) {
-       if peer := cm.peers.GetPeer(basePeer.ID()); peer != nil {
+func (m *Manager) handleStatusMsg(basePeer peers.BasePeer, msg *msgs.StatusMessage) {
+       if peer := m.peers.GetPeer(basePeer.ID()); peer != nil {
                peer.SetStatus(msg.Height, msg.GetHash())
                return
        }
 }
 
-func (cm *ChainManager) handleTransactionMsg(peer *peers.Peer, msg *msgs.TransactionMessage) {
+func (m *Manager) handleTransactionMsg(peer *peers.Peer, msg *msgs.TransactionMessage) {
        tx, err := msg.GetTransaction()
        if err != nil {
-               cm.peers.AddBanScore(peer.ID(), 0, 10, "fail on get tx from message")
+               m.peers.AddBanScore(peer.ID(), 0, 10, "fail on get tx from message")
                return
        }
 
-       if isOrphan, err := cm.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
-               cm.peers.AddBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
+       if isOrphan, err := m.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
+               m.peers.AddBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
        }
-       cm.peers.MarkTx(peer.ID(), tx.ID)
+       m.peers.MarkTx(peer.ID(), tx.ID)
 }
 
-func (cm *ChainManager) handleTransactionsMsg(peer *peers.Peer, msg *msgs.TransactionsMessage) {
+func (m *Manager) handleTransactionsMsg(peer *peers.Peer, msg *msgs.TransactionsMessage) {
        txs, err := msg.GetTransactions()
        if err != nil {
-               cm.peers.AddBanScore(peer.ID(), 0, 20, "fail on get txs from message")
+               m.peers.AddBanScore(peer.ID(), 0, 20, "fail on get txs from message")
                return
        }
 
        if len(txs) > msgs.TxsMsgMaxTxNum {
-               cm.peers.AddBanScore(peer.ID(), 20, 0, "exceeded the maximum tx number limit")
+               m.peers.AddBanScore(peer.ID(), 20, 0, "exceeded the maximum tx number limit")
                return
        }
 
        for _, tx := range txs {
-               if isOrphan, err := cm.chain.ValidateTx(tx); err != nil && !isOrphan {
-                       cm.peers.AddBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
+               if isOrphan, err := m.chain.ValidateTx(tx); err != nil && !isOrphan {
+                       m.peers.AddBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
                        return
                }
-               cm.peers.MarkTx(peer.ID(), tx.ID)
+               m.peers.MarkTx(peer.ID(), tx.ID)
        }
 }
 
-func (cm *ChainManager) processMsg(basePeer peers.BasePeer, msgType byte, msg msgs.BlockchainMessage) {
-       peer := cm.peers.GetPeer(basePeer.ID())
+func (m *Manager) processMsg(basePeer peers.BasePeer, msgType byte, msg msgs.BlockchainMessage) {
+       peer := m.peers.GetPeer(basePeer.ID())
        if peer == nil {
                return
        }
@@ -288,43 +288,43 @@ func (cm *ChainManager) processMsg(basePeer peers.BasePeer, msgType byte, msg ms
 
        switch msg := msg.(type) {
        case *msgs.GetBlockMessage:
-               cm.handleGetBlockMsg(peer, msg)
+               m.handleGetBlockMsg(peer, msg)
 
        case *msgs.BlockMessage:
-               cm.handleBlockMsg(peer, msg)
+               m.handleBlockMsg(peer, msg)
 
        case *msgs.StatusMessage:
-               cm.handleStatusMsg(basePeer, msg)
+               m.handleStatusMsg(basePeer, msg)
 
        case *msgs.TransactionMessage:
-               cm.handleTransactionMsg(peer, msg)
+               m.handleTransactionMsg(peer, msg)
 
        case *msgs.TransactionsMessage:
-               cm.handleTransactionsMsg(peer, msg)
+               m.handleTransactionsMsg(peer, msg)
 
        case *msgs.GetHeadersMessage:
-               cm.handleGetHeadersMsg(peer, msg)
+               m.handleGetHeadersMsg(peer, msg)
 
        case *msgs.HeadersMessage:
-               cm.handleHeadersMsg(peer, msg)
+               m.handleHeadersMsg(peer, msg)
 
        case *msgs.GetBlocksMessage:
-               cm.handleGetBlocksMsg(peer, msg)
+               m.handleGetBlocksMsg(peer, msg)
 
        case *msgs.BlocksMessage:
-               cm.handleBlocksMsg(peer, msg)
+               m.handleBlocksMsg(peer, msg)
 
        case *msgs.FilterLoadMessage:
-               cm.handleFilterLoadMsg(peer, msg)
+               m.handleFilterLoadMsg(peer, msg)
 
        case *msgs.FilterAddMessage:
-               cm.handleFilterAddMsg(peer, msg)
+               m.handleFilterAddMsg(peer, msg)
 
        case *msgs.FilterClearMessage:
-               cm.handleFilterClearMsg(peer)
+               m.handleFilterClearMsg(peer)
 
        case *msgs.GetMerkleBlockMessage:
-               cm.handleGetMerkleBlockMsg(peer, msg)
+               m.handleGetMerkleBlockMsg(peer, msg)
 
        default:
                log.WithFields(log.Fields{
@@ -335,38 +335,38 @@ func (cm *ChainManager) processMsg(basePeer peers.BasePeer, msgType byte, msg ms
        }
 }
 
-func (cm *ChainManager) RemovePeer(peerID string) {
-       cm.peers.RemovePeer(peerID)
+func (m *Manager) RemovePeer(peerID string) {
+       m.peers.RemovePeer(peerID)
 }
 
-func (cm *ChainManager) SendStatus(peer peers.BasePeer) error {
-       p := cm.peers.GetPeer(peer.ID())
+func (m *Manager) SendStatus(peer peers.BasePeer) error {
+       p := m.peers.GetPeer(peer.ID())
        if p == nil {
                return errors.New("invalid peer")
        }
 
-       if err := p.SendStatus(cm.chain.BestBlockHeader()); err != nil {
-               cm.peers.RemovePeer(p.ID())
+       if err := p.SendStatus(m.chain.BestBlockHeader()); err != nil {
+               m.peers.RemovePeer(p.ID())
                return err
        }
        return nil
 }
 
-func (cm *ChainManager) Start() error {
+func (m *Manager) Start() error {
        var err error
-       cm.txMsgSub, err = cm.eventDispatcher.Subscribe(core.TxMsgEvent{})
+       m.txMsgSub, err = m.eventDispatcher.Subscribe(core.TxMsgEvent{})
        if err != nil {
                return err
        }
 
        // broadcast transactions
-       go cm.txBroadcastLoop()
-       go cm.txSyncLoop()
+       go m.txBroadcastLoop()
+       go m.txSyncLoop()
 
        return nil
 }
 
 //Stop stop sync manager
-func (cm *ChainManager) Stop() {
-       close(cm.quitSync)
+func (m *Manager) Stop() {
+       close(m.quitSync)
 }
index 86987fb..85a5c25 100644 (file)
@@ -16,13 +16,13 @@ import (
 type ProtocolReactor struct {
        p2p.BaseReactor
 
-       cm *ChainManager
+       manager *Manager
 }
 
 // NewProtocolReactor returns the reactor of whole blockchain.
-func NewProtocolReactor(cm *ChainManager) *ProtocolReactor {
+func NewProtocolReactor(manager *Manager) *ProtocolReactor {
        pr := &ProtocolReactor{
-               cm: cm,
+               manager: manager,
        }
        pr.BaseReactor = *p2p.NewBaseReactor("ProtocolReactor", pr)
        return pr
@@ -52,17 +52,17 @@ func (pr *ProtocolReactor) OnStop() {
 
 // AddPeer implements Reactor by sending our state to peer.
 func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error {
-       pr.cm.AddPeer(peer)
-       if err := pr.cm.SendStatus(peer); err != nil {
+       pr.manager.AddPeer(peer)
+       if err := pr.manager.SendStatus(peer); err != nil {
                return err
        }
-       pr.cm.syncTransactions(peer.Key)
+       pr.manager.syncTransactions(peer.Key)
        return nil
 }
 
 // RemovePeer implements Reactor by removing peer from the pool.
 func (pr *ProtocolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
-       pr.cm.RemovePeer(peer.Key)
+       pr.manager.RemovePeer(peer.Key)
 }
 
 //decodeMessage decode msg
@@ -85,5 +85,5 @@ func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
                return
        }
 
-       pr.cm.processMsg(src, msgType, msg)
+       pr.manager.processMsg(src, msgType, msg)
 }
index 34f0cc6..dba3c89 100644 (file)
@@ -21,7 +21,7 @@ type P2PPeer struct {
        flag consensus.ServiceFlag
 
        srcPeer    *P2PPeer
-       remoteNode *ChainManager
+       remoteNode *Manager
        msgCh      chan []byte
        async      bool
 }
@@ -52,7 +52,7 @@ func (p *P2PPeer) ServiceFlag() consensus.ServiceFlag {
        return p.flag
 }
 
-func (p *P2PPeer) SetConnection(srcPeer *P2PPeer, node *ChainManager) {
+func (p *P2PPeer) SetConnection(srcPeer *P2PPeer, node *Manager) {
        p.srcPeer = srcPeer
        p.remoteNode = node
 }
@@ -93,19 +93,19 @@ func (ps *PeerSet) AddBannedPeer(string) error { return nil }
 func (ps *PeerSet) StopPeerGracefully(string)  {}
 
 type NetWork struct {
-       nodes map[*ChainManager]P2PPeer
+       nodes map[*Manager]P2PPeer
 }
 
 func NewNetWork() *NetWork {
-       return &NetWork{map[*ChainManager]P2PPeer{}}
+       return &NetWork{map[*Manager]P2PPeer{}}
 }
 
-func (nw *NetWork) Register(node *ChainManager, addr, id string, flag consensus.ServiceFlag) {
+func (nw *NetWork) Register(node *Manager, addr, id string, flag consensus.ServiceFlag) {
        peer := NewP2PPeer(addr, id, flag)
        nw.nodes[node] = *peer
 }
 
-func (nw *NetWork) HandsShake(nodeA, nodeB *ChainManager) (*P2PPeer, *P2PPeer, error) {
+func (nw *NetWork) HandsShake(nodeA, nodeB *Manager) (*P2PPeer, *P2PPeer, error) {
        B2A, ok := nw.nodes[nodeA]
        if !ok {
                return nil, nil, errors.New("can't find nodeA's p2p peer on network")
@@ -150,7 +150,7 @@ func mockBlocks(startBlock *types.Block, height uint64) []*types.Block {
        return blocks
 }
 
-func mockSync(blocks []*types.Block) *ChainManager {
+func mockSync(blocks []*types.Block) *Manager {
        chain := mock.NewChain()
        peers := peers.NewPeerSet(NewPeerSet())
        chain.SetBestBlockHeader(&blocks[len(blocks)-1].BlockHeader)
@@ -158,7 +158,7 @@ func mockSync(blocks []*types.Block) *ChainManager {
                chain.SetBlockByHeight(block.Height, block)
        }
 
-       return &ChainManager{
+       return &Manager{
                chain:       chain,
                blockKeeper: newBlockKeeper(chain, peers),
                peers:       peers,
index 9f1a7cc..5071403 100644 (file)
@@ -21,8 +21,8 @@ type txSyncMsg struct {
        txs    []*types.Tx
 }
 
-func (cm *ChainManager) syncTransactions(peerID string) {
-       pending := cm.txPool.GetTransactions()
+func (m *Manager) syncTransactions(peerID string) {
+       pending := m.txPool.GetTransactions()
        if len(pending) == 0 {
                return
        }
@@ -31,13 +31,13 @@ func (cm *ChainManager) syncTransactions(peerID string) {
        for i, batch := range pending {
                txs[i] = batch.Tx
        }
-       cm.txSyncCh <- &txSyncMsg{peerID, txs}
+       m.txSyncCh <- &txSyncMsg{peerID, txs}
 }
 
-func (cm *ChainManager) txBroadcastLoop() {
+func (m *Manager) txBroadcastLoop() {
        for {
                select {
-               case obj, ok := <-cm.txMsgSub.Chan():
+               case obj, ok := <-m.txMsgSub.Chan():
                        if !ok {
                                log.WithFields(log.Fields{"module": logModule}).Warning("mempool tx msg subscription channel closed")
                                return
@@ -50,12 +50,12 @@ func (cm *ChainManager) txBroadcastLoop() {
                        }
 
                        if ev.TxMsg.MsgType == core.MsgNewTx {
-                               if err := cm.peers.BroadcastTx(ev.TxMsg.Tx); err != nil {
+                               if err := m.peers.BroadcastTx(ev.TxMsg.Tx); err != nil {
                                        log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast new tx.")
                                        continue
                                }
                        }
-               case <-cm.quitSync:
+               case <-m.quitSync:
                        return
                }
        }
@@ -65,14 +65,14 @@ func (cm *ChainManager) txBroadcastLoop() {
 // connection. When a new peer appears, we relay all currently pending
 // transactions. In order to minimise egress bandwidth usage, we send
 // the transactions in small packs to one peer at a time.
-func (cm *ChainManager) txSyncLoop() {
+func (m *Manager) txSyncLoop() {
        pending := make(map[string]*txSyncMsg)
        sending := false            // whether a send is active
        done := make(chan error, 1) // result of the send
 
        // send starts a sending a pack of transactions from the sync.
        send := func(msg *txSyncMsg) {
-               peer := cm.peers.GetPeer(msg.peerID)
+               peer := m.peers.GetPeer(msg.peerID)
                if peer == nil {
                        delete(pending, msg.peerID)
                        return
@@ -102,7 +102,7 @@ func (cm *ChainManager) txSyncLoop() {
                go func() {
                        err := peer.SendTransactions(sendTxs)
                        if err != nil {
-                               cm.peers.RemovePeer(msg.peerID)
+                               m.peers.RemovePeer(msg.peerID)
                        }
                        done <- err
                }()
@@ -125,7 +125,7 @@ func (cm *ChainManager) txSyncLoop() {
 
        for {
                select {
-               case msg := <-cm.txSyncCh:
+               case msg := <-m.txSyncCh:
                        pending[msg.peerID] = msg
                        if !sending {
                                send(msg)
diff --git a/netsync/consensusmgr/block_fetcher.go b/netsync/consensusmgr/block_fetcher.go
new file mode 100644 (file)
index 0000000..6278c61
--- /dev/null
@@ -0,0 +1,108 @@
+package consensusmgr
+
+import (
+       "github.com/sirupsen/logrus"
+       "gopkg.in/karalabe/cookiejar.v2/collections/prque"
+
+       "github.com/vapor/netsync/peers"
+       "github.com/vapor/protocol/bc"
+)
+
+const (
+       maxBlockDistance = 64
+       maxMsgSetSize    = 128
+       newBlockChSize   = 64
+)
+
+// blockFetcher is responsible for accumulating block announcements from various peers
+// and scheduling them for retrieval.
+type blockFetcher struct {
+       chain Chain
+       peers *peers.PeerSet
+
+       newBlockCh chan *blockMsg
+       queue      *prque.Prque
+       msgSet     map[bc.Hash]*blockMsg
+}
+
+//NewBlockFetcher creates a block fetcher to retrieve blocks of the new propose.
+func newBlockFetcher(chain Chain, peers *peers.PeerSet) *blockFetcher {
+       f := &blockFetcher{
+               chain:      chain,
+               peers:      peers,
+               newBlockCh: make(chan *blockMsg, newBlockChSize),
+               queue:      prque.New(),
+               msgSet:     make(map[bc.Hash]*blockMsg),
+       }
+       go f.blockProcessor()
+       return f
+}
+
+func (f *blockFetcher) blockProcessor() {
+       for {
+               height := f.chain.BestBlockHeight()
+               for !f.queue.Empty() {
+                       msg := f.queue.PopItem().(*blockMsg)
+                       if msg.block.Height > height+1 {
+                               f.queue.Push(msg, -float32(msg.block.Height))
+                               break
+                       }
+
+                       f.insert(msg)
+                       delete(f.msgSet, msg.block.Hash())
+               }
+               f.add(<-f.newBlockCh)
+       }
+}
+
+func (f *blockFetcher) add(msg *blockMsg) {
+       bestHeight := f.chain.BestBlockHeight()
+       if len(f.msgSet) > maxMsgSetSize || bestHeight > msg.block.Height || msg.block.Height-bestHeight > maxBlockDistance {
+               return
+       }
+
+       blockHash := msg.block.Hash()
+       if _, ok := f.msgSet[blockHash]; !ok {
+               f.msgSet[blockHash] = msg
+               f.queue.Push(msg, -float32(msg.block.Height))
+               logrus.WithFields(logrus.Fields{
+                       "module":       logModule,
+                       "block height": msg.block.Height,
+                       "block hash":   blockHash.String(),
+               }).Debug("blockFetcher receive propose block")
+       }
+}
+
+func (f *blockFetcher) insert(msg *blockMsg) {
+       isOrphan, err := f.chain.ProcessBlock(msg.block)
+       if err != nil {
+               peer := f.peers.GetPeer(msg.peerID)
+               if peer == nil {
+                       return
+               }
+
+               f.peers.AddBanScore(msg.peerID, 20, 0, err.Error())
+               return
+       }
+
+       if isOrphan {
+               return
+       }
+
+       hash := msg.block.Hash()
+       f.peers.SetStatus(msg.peerID, msg.block.Height, &hash)
+       proposeMsg, err := NewBlockProposeMsg(msg.block)
+       if err != nil {
+               logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on create BlockProposeMsg")
+               return
+       }
+
+       if err := f.peers.BroadcastMsg(NewBroadcastMsg(proposeMsg, consensusChannel)); err != nil {
+               logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on broadcast proposed block")
+               return
+       }
+}
+
+func (f *blockFetcher) processNewBlock(msg *blockMsg) {
+       f.newBlockCh <- msg
+}
diff --git a/netsync/consensusmgr/broadcast_msg.go b/netsync/consensusmgr/broadcast_msg.go
new file mode 100644 (file)
index 0000000..b11695a
--- /dev/null
@@ -0,0 +1,45 @@
+package consensusmgr
+
+import (
+       "github.com/vapor/netsync/peers"
+)
+
+// BroadcastMsg the message that can be broadcast
+// by peer set.
+type BroadcastMsg struct {
+       msg       ConsensusMessage
+       transChan byte
+}
+
+// NewBroadcastMsg create concrete broadcast message, implement peers.BroadcastMsg interface.
+func NewBroadcastMsg(msg ConsensusMessage, transChan byte) *BroadcastMsg {
+       return &BroadcastMsg{
+               msg:       msg,
+               transChan: transChan,
+       }
+}
+
+// GetChan get message transfer channel.
+func (b *BroadcastMsg) GetChan() byte {
+       return b.transChan
+}
+
+// GetMsg get ConsensusMessage.
+func (b *BroadcastMsg) GetMsg() interface{} {
+       return struct{ ConsensusMessage }{b.msg}
+}
+
+// MsgString get a string representation of the message.
+func (b *BroadcastMsg) MsgString() string {
+       return b.msg.String()
+}
+
+// MarkSendRecord mark send message record to prevent messages from being sent repeatedly.
+func (b *BroadcastMsg) MarkSendRecord(ps *peers.PeerSet, peers []string) {
+       b.msg.BroadcastMarkSendRecord(ps, peers)
+}
+
+// FilterTargetPeers filter target peers to filter the nodes that need to send messages.
+func (b *BroadcastMsg) FilterTargetPeers(ps *peers.PeerSet) []string {
+       return b.msg.BroadcastFilterTargetPeers(ps)
+}
diff --git a/netsync/consensusmgr/consensus_msg.go b/netsync/consensusmgr/consensus_msg.go
new file mode 100644 (file)
index 0000000..cd6845c
--- /dev/null
@@ -0,0 +1,131 @@
+package consensusmgr
+
+import (
+       "bytes"
+       "encoding/hex"
+       "errors"
+       "fmt"
+
+       "github.com/tendermint/go-wire"
+
+       "github.com/vapor/netsync/peers"
+       "github.com/vapor/protocol/bc"
+       "github.com/vapor/protocol/bc/types"
+)
+
+const (
+       blockSignatureByte = byte(0x10)
+       blockProposeByte   = byte(0x11)
+)
+
+//ConsensusMessage is a generic message for consensus reactor.
+type ConsensusMessage interface {
+       String() string
+       BroadcastMarkSendRecord(ps *peers.PeerSet, peers []string)
+       BroadcastFilterTargetPeers(ps *peers.PeerSet) []string
+}
+
+var _ = wire.RegisterInterface(
+       struct{ ConsensusMessage }{},
+       wire.ConcreteType{O: &BlockSignatureMsg{}, Byte: blockSignatureByte},
+       wire.ConcreteType{O: &BlockProposeMsg{}, Byte: blockProposeByte},
+)
+
+//decodeMessage decode msg
+func decodeMessage(bz []byte) (msgType byte, msg ConsensusMessage, err error) {
+       msgType = bz[0]
+       n := int(0)
+       r := bytes.NewReader(bz)
+       msg = wire.ReadBinary(struct{ ConsensusMessage }{}, r, maxBlockchainResponseSize, &n, &err).(struct{ ConsensusMessage }).ConsensusMessage
+       if err != nil && n != len(bz) {
+               err = errors.New("DecodeMessage() had bytes left over")
+       }
+       return
+}
+
+// BlockSignatureMsg block signature message transferred between nodes.
+type BlockSignatureMsg struct {
+       BlockHash [32]byte
+       Height    uint64
+       Signature []byte
+       PubKey    [32]byte
+}
+
+//NewBlockSignatureMsg create new block signature msg.
+func NewBlockSignatureMsg(blockHash bc.Hash, height uint64, signature []byte, pubKey [32]byte) ConsensusMessage {
+       hash := blockHash.Byte32()
+       return &BlockSignatureMsg{BlockHash: hash, Height: height, Signature: signature, PubKey: pubKey}
+}
+
+func (bs *BlockSignatureMsg) String() string {
+       return fmt.Sprintf("{block_hash: %s,block_height:%d,signature:%s,pubkey:%s}", hex.EncodeToString(bs.BlockHash[:]), bs.Height, hex.EncodeToString(bs.Signature), hex.EncodeToString(bs.PubKey[:]))
+}
+
+// BroadcastMarkSendRecord mark send message record to prevent messages from being sent repeatedly.
+func (bs *BlockSignatureMsg) BroadcastMarkSendRecord(ps *peers.PeerSet, peers []string) {
+       for _, peer := range peers {
+               ps.MarkBlockSignature(peer, bs.Signature)
+       }
+}
+
+// BroadcastFilterTargetPeers filter target peers to filter the nodes that need to send messages.
+func (bs *BlockSignatureMsg) BroadcastFilterTargetPeers(ps *peers.PeerSet) []string {
+       return ps.PeersWithoutSign(bs.Signature)
+}
+
+// BlockProposeMsg block propose message transferred between nodes.
+type BlockProposeMsg struct {
+       RawBlock []byte
+}
+
+//NewBlockProposeMsg create new block propose msg.
+func NewBlockProposeMsg(block *types.Block) (ConsensusMessage, error) {
+       rawBlock, err := block.MarshalText()
+       if err != nil {
+               return nil, err
+       }
+       return &BlockProposeMsg{RawBlock: rawBlock}, nil
+}
+
+//GetProposeBlock get propose block from msg.
+func (bp *BlockProposeMsg) GetProposeBlock() (*types.Block, error) {
+       block := &types.Block{}
+       if err := block.UnmarshalText(bp.RawBlock); err != nil {
+               return nil, err
+       }
+       return block, nil
+}
+
+func (bp *BlockProposeMsg) String() string {
+       block, err := bp.GetProposeBlock()
+       if err != nil {
+               return "{err: wrong message}"
+       }
+       blockHash := block.Hash()
+       return fmt.Sprintf("{block_height: %d, block_hash: %s}", block.Height, blockHash.String())
+}
+
+// BroadcastMarkSendRecord mark send message record to prevent messages from being sent repeatedly.
+func (bp *BlockProposeMsg) BroadcastMarkSendRecord(ps *peers.PeerSet, peers []string) {
+       block, err := bp.GetProposeBlock()
+       if err != nil {
+               return
+       }
+
+       hash := block.Hash()
+       height := block.Height
+       for _, peer := range peers {
+               ps.MarkBlock(peer, &hash)
+               ps.MarkStatus(peer, height)
+       }
+}
+
+// BroadcastFilterTargetPeers filter target peers to filter the nodes that need to send messages.
+func (bp *BlockProposeMsg) BroadcastFilterTargetPeers(ps *peers.PeerSet) []string {
+       block, err := bp.GetProposeBlock()
+       if err != nil {
+               return nil
+       }
+
+       return ps.PeersWithoutBlock(block.Hash())
+}
diff --git a/netsync/consensusmgr/consensus_msg_test.go b/netsync/consensusmgr/consensus_msg_test.go
new file mode 100644 (file)
index 0000000..ea4e004
--- /dev/null
@@ -0,0 +1,155 @@
+package consensusmgr
+
+import (
+       "reflect"
+       "testing"
+
+       "github.com/davecgh/go-spew/spew"
+       "github.com/tendermint/go-wire"
+
+       "github.com/vapor/protocol/bc"
+       "github.com/vapor/protocol/bc/types"
+)
+
+var _ = wire.RegisterInterface(
+       struct{ ConsensusMessage }{},
+       wire.ConcreteType{O: &BlockSignatureMsg{}, Byte: blockSignatureByte},
+       wire.ConcreteType{O: &BlockProposeMsg{}, Byte: blockProposeByte},
+)
+
+func TestDecodeMessage(t *testing.T) {
+       testCases := []struct {
+               msg     ConsensusMessage
+               msgType byte
+       }{
+               {
+                       msg: &BlockSignatureMsg{
+                               BlockHash: [32]byte{0x01},
+                               Signature: []byte{0x00},
+                               PubKey:    [32]byte{0x01},
+                       },
+                       msgType: blockSignatureByte,
+               },
+               {
+                       msg: &BlockProposeMsg{
+                               RawBlock: []byte{0x01, 0x02},
+                       },
+                       msgType: blockProposeByte,
+               },
+       }
+       for i, c := range testCases {
+               binMsg := wire.BinaryBytes(struct{ ConsensusMessage }{c.msg})
+               gotMsgType, gotMsg, err := decodeMessage(binMsg)
+               if err != nil {
+                       t.Fatalf("index:%d decode Message err %s", i, err)
+               }
+               if gotMsgType != c.msgType {
+                       t.Fatalf("index:%d decode Message type err. got:%d want:%d", i, gotMsgType, c.msg)
+               }
+               if !reflect.DeepEqual(gotMsg, c.msg) {
+                       t.Fatalf("index:%d decode Message err. got:%s\n want:%s", i, spew.Sdump(gotMsg), spew.Sdump(c.msg))
+               }
+       }
+}
+
+func TestBlockSignBroadcastMsg(t *testing.T) {
+       blockSignMsg := &BlockSignatureMsg{
+               BlockHash: [32]byte{0x01},
+               Height:    100,
+               Signature: []byte{0x00},
+               PubKey:    [32]byte{0x01},
+       }
+       signatureBroadcastMsg := NewBroadcastMsg(NewBlockSignatureMsg(bc.NewHash(blockSignMsg.BlockHash), blockSignMsg.Height, blockSignMsg.Signature, blockSignMsg.PubKey), consensusChannel)
+
+       binMsg := wire.BinaryBytes(signatureBroadcastMsg.GetMsg())
+       gotMsgType, gotMsg, err := decodeMessage(binMsg)
+       if err != nil {
+               t.Fatalf("decode Message err %s", err)
+       }
+       if gotMsgType != blockSignatureByte {
+               t.Fatalf("decode Message type err. got:%d want:%d", gotMsgType, blockSignatureByte)
+       }
+       if !reflect.DeepEqual(gotMsg, blockSignMsg) {
+               t.Fatalf("decode Message err. got:%s\n want:%s", spew.Sdump(gotMsg), spew.Sdump(blockSignMsg))
+       }
+}
+
+func TestBlockProposeBroadcastMsg(t *testing.T) {
+       blockProposeMsg, _ := NewBlockProposeMsg(testBlock)
+
+       proposeBroadcastMsg := NewBroadcastMsg(blockProposeMsg, consensusChannel)
+
+       binMsg := wire.BinaryBytes(proposeBroadcastMsg.GetMsg())
+       gotMsgType, gotMsg, err := decodeMessage(binMsg)
+       if err != nil {
+               t.Fatalf("decode Message err %s", err)
+       }
+       if gotMsgType != blockProposeByte {
+               t.Fatalf("decode Message type err. got:%d want:%d", gotMsgType, blockProposeByte)
+       }
+       if !reflect.DeepEqual(gotMsg, blockProposeMsg) {
+               t.Fatalf("decode Message err. got:%s\n want:%s", spew.Sdump(gotMsg), spew.Sdump(blockProposeMsg))
+       }
+}
+
+var testBlock = &types.Block{
+       BlockHeader: types.BlockHeader{
+               Version:   1,
+               Height:    0,
+               Timestamp: 1528945000,
+               BlockCommitment: types.BlockCommitment{
+                       TransactionsMerkleRoot: bc.Hash{V0: uint64(0x11)},
+                       TransactionStatusHash:  bc.Hash{V0: uint64(0x55)},
+               },
+       },
+}
+
+func TestBlockProposeMsg(t *testing.T) {
+       blockMsg, err := NewBlockProposeMsg(testBlock)
+       if err != nil {
+               t.Fatalf("create new mine block msg err:%s", err)
+       }
+
+       gotBlock, err := blockMsg.(*BlockProposeMsg).GetProposeBlock()
+       if err != nil {
+               t.Fatalf("got block err:%s", err)
+       }
+
+       if !reflect.DeepEqual(gotBlock.BlockHeader, testBlock.BlockHeader) {
+               t.Errorf("block msg test err: got %s\nwant %s", spew.Sdump(gotBlock.BlockHeader), spew.Sdump(testBlock.BlockHeader))
+       }
+
+       wantString := "{block_height: 0, block_hash: f59514e2541488a38bc2667940bc2c24027e4a3a371d884b55570d036997bb57}"
+       if blockMsg.String() != wantString {
+               t.Errorf("block msg test err. got:%s want:%s", blockMsg.String(), wantString)
+       }
+
+       blockMsg.(*BlockProposeMsg).RawBlock[1] = blockMsg.(*BlockProposeMsg).RawBlock[1] + 0x1
+       _, err = blockMsg.(*BlockProposeMsg).GetProposeBlock()
+       if err == nil {
+               t.Fatalf("get mine block err")
+       }
+
+       wantString = "{err: wrong message}"
+       if blockMsg.String() != wantString {
+               t.Errorf("block msg test err. got:%s want:%s", blockMsg.String(), wantString)
+       }
+}
+
+func TestBlockSignatureMsg(t *testing.T) {
+       msg := &BlockSignatureMsg{
+               BlockHash: [32]byte{0x01},
+               Height:    100,
+               Signature: []byte{0x00},
+               PubKey:    [32]byte{0x01},
+       }
+       gotMsg := NewBlockSignatureMsg(bc.NewHash(msg.BlockHash), msg.Height, msg.Signature, msg.PubKey)
+
+       if !reflect.DeepEqual(gotMsg, msg) {
+               t.Fatalf("test block signature message err. got:%s\n want:%s", spew.Sdump(gotMsg), spew.Sdump(msg))
+       }
+       wantString := "{block_hash: 0100000000000000000000000000000000000000000000000000000000000000,block_height:100,signature:00,pubkey:0100000000000000000000000000000000000000000000000000000000000000}"
+       if gotMsg.String() != wantString {
+               t.Fatalf("test block signature message err. got string:%s\n want string:%s", gotMsg.String(), wantString)
+       }
+}
diff --git a/netsync/consensusmgr/handle.go b/netsync/consensusmgr/handle.go
new file mode 100644 (file)
index 0000000..f3823e0
--- /dev/null
@@ -0,0 +1,195 @@
+package consensusmgr
+
+import (
+       "reflect"
+
+       "github.com/sirupsen/logrus"
+
+       "github.com/vapor/event"
+       "github.com/vapor/netsync/peers"
+       "github.com/vapor/p2p"
+       "github.com/vapor/protocol/bc"
+       "github.com/vapor/protocol/bc/types"
+)
+
+// Switch is the interface for p2p switch.
+type Switch interface {
+       AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
+       AddBannedPeer(string) error
+       ID() [32]byte
+}
+
+// Chain is the interface for Bytom core.
+type Chain interface {
+       BestBlockHeight() uint64
+       GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
+       ProcessBlock(*types.Block) (bool, error)
+       ProcessBlockSignature(signature, pubkey []byte, blockHeight uint64, blockHash *bc.Hash) error
+}
+
+type blockMsg struct {
+       block  *types.Block
+       peerID string
+}
+
+// Manager is the consensus message network synchronization manager.
+type Manager struct {
+       sw              Switch
+       chain           Chain
+       peers           *peers.PeerSet
+       blockFetcher    *blockFetcher
+       eventDispatcher *event.Dispatcher
+
+       quit chan struct{}
+}
+
+// NewManager create new manager.
+func NewManager(sw Switch, chain Chain, dispatcher *event.Dispatcher, peers *peers.PeerSet) *Manager {
+       manager := &Manager{
+               sw:              sw,
+               peers:           peers,
+               blockFetcher:    newBlockFetcher(chain, peers),
+               eventDispatcher: dispatcher,
+               quit:            make(chan struct{}),
+       }
+       protocolReactor := NewConsensusReactor(manager)
+       manager.sw.AddReactor("CONSENSUS", protocolReactor)
+       return manager
+}
+
+func (m *Manager) addPeer(peer peers.BasePeer) {
+       m.peers.AddPeer(peer)
+}
+
+func (m *Manager) processMsg(peerID string, msgType byte, msg ConsensusMessage) {
+       peer := m.peers.GetPeer(peerID)
+       if peer == nil {
+               return
+       }
+
+       logrus.WithFields(logrus.Fields{"module": logModule, "peer": peerID, "type": reflect.TypeOf(msg), "message": msg.String()}).Info("receive message from peer")
+
+       switch msg := msg.(type) {
+       case *BlockProposeMsg:
+               m.handleBlockProposeMsg(peerID, msg)
+
+       case *BlockSignatureMsg:
+               m.handleBlockSignatureMsg(peerID, msg)
+
+       default:
+               logrus.WithFields(logrus.Fields{"module": logModule, "peer": peerID, "message_type": reflect.TypeOf(msg)}).Error("unhandled message type")
+       }
+}
+
+func (m *Manager) handleBlockProposeMsg(peerID string, msg *BlockProposeMsg) {
+       block, err := msg.GetProposeBlock()
+       if err != nil {
+               logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Warning("failed on get propose block")
+               return
+       }
+
+       hash := block.Hash()
+       m.peers.MarkBlock(peerID, &hash)
+       m.blockFetcher.processNewBlock(&blockMsg{peerID: peerID, block: block})
+}
+
+func (m *Manager) handleBlockSignatureMsg(peerID string, msg *BlockSignatureMsg) {
+       blockHash := bc.NewHash(msg.BlockHash)
+       if err := m.chain.ProcessBlockSignature(msg.Signature, msg.PubKey[:], msg.Height, &blockHash); err != nil {
+               m.peers.AddBanScore(peerID, 20, 0, err.Error())
+               return
+       }
+}
+
+func (m *Manager) blockProposeMsgBroadcastLoop() {
+       blockProposeMsgSub, err := m.eventDispatcher.Subscribe(event.NewBlockProposeEvent{})
+       if err != nil {
+               logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on subscribe NewBlockProposeEvent")
+               return
+       }
+       defer blockProposeMsgSub.Unsubscribe()
+
+       for {
+               select {
+               case obj, ok := <-blockProposeMsgSub.Chan():
+                       if !ok {
+                               logrus.WithFields(logrus.Fields{"module": logModule}).Warning("blockProposeMsgSub channel closed")
+                               return
+                       }
+
+                       ev, ok := obj.Data.(event.NewBlockProposeEvent)
+                       if !ok {
+                               logrus.WithFields(logrus.Fields{"module": logModule}).Error("event type error")
+                               continue
+                       }
+                       proposeMsg, err := NewBlockProposeMsg(&ev.Block)
+                       if err != nil {
+                               logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on create BlockProposeMsg")
+                               return
+                       }
+
+                       if err := m.peers.BroadcastMsg(NewBroadcastMsg(proposeMsg, consensusChannel)); err != nil {
+                               logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on broadcast BlockProposeBroadcastMsg")
+                               continue
+                       }
+
+               case <-m.quit:
+                       return
+               }
+       }
+}
+
+func (m *Manager) blockSignatureMsgBroadcastLoop() {
+       blockSignatureMsgSub, err := m.eventDispatcher.Subscribe(event.BlockSignatureEvent{})
+       if err != nil {
+               logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on subscribe BlockSignatureEvent")
+               return
+       }
+       defer blockSignatureMsgSub.Unsubscribe()
+       for {
+               select {
+               case obj, ok := <-blockSignatureMsgSub.Chan():
+                       if !ok {
+                               logrus.WithFields(logrus.Fields{"module": logModule}).Warning("blockProposeMsgSub channel closed")
+                               return
+                       }
+
+                       ev, ok := obj.Data.(event.BlockSignatureEvent)
+                       if !ok {
+                               logrus.WithFields(logrus.Fields{"module": logModule}).Error("event type error")
+                               continue
+                       }
+
+                       blockHeader, err := m.chain.GetHeaderByHash(&ev.BlockHash)
+                       if err != nil {
+                               logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on get header by hash from chain.")
+                               return
+                       }
+
+                       blockSignatureMsg := NewBroadcastMsg(NewBlockSignatureMsg(ev.BlockHash, blockHeader.Height, ev.Signature, m.sw.ID()), consensusChannel)
+                       if err := m.peers.BroadcastMsg(blockSignatureMsg); err != nil {
+                               logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on broadcast BlockSignBroadcastMsg.")
+                               return
+                       }
+
+               case <-m.quit:
+                       return
+               }
+       }
+}
+
+func (m *Manager) removePeer(peerID string) {
+       m.peers.RemovePeer(peerID)
+}
+
+//Start consensus manager service.
+func (m *Manager) Start() error {
+       go m.blockProposeMsgBroadcastLoop()
+       go m.blockSignatureMsgBroadcastLoop()
+       return nil
+}
+
+//Stop consensus manager service.
+func (m *Manager) Stop() {
+       close(m.quit)
+}
diff --git a/netsync/consensusmgr/reactor.go b/netsync/consensusmgr/reactor.go
new file mode 100644 (file)
index 0000000..10405fb
--- /dev/null
@@ -0,0 +1,72 @@
+package consensusmgr
+
+import (
+       "github.com/sirupsen/logrus"
+
+       "github.com/vapor/p2p"
+       "github.com/vapor/p2p/connection"
+)
+
+const (
+       logModule                 = "consensus"
+       consensusChannel          = byte(0x50)
+       maxBlockchainResponseSize = 22020096 + 2
+)
+
+// ConsensusReactor handles new coming consensus message.
+type ConsensusReactor struct {
+       p2p.BaseReactor
+       manager *Manager
+}
+
+// NewConsensusReactor create consensus reactor.
+func NewConsensusReactor(manager *Manager) *ConsensusReactor {
+       cr := &ConsensusReactor{
+               manager: manager,
+       }
+       cr.BaseReactor = *p2p.NewBaseReactor("ConsensusReactor", cr)
+       return cr
+}
+
+// GetChannels implements Reactor
+func (cr *ConsensusReactor) GetChannels() []*connection.ChannelDescriptor {
+       return []*connection.ChannelDescriptor{
+               {
+                       ID:                consensusChannel,
+                       Priority:          10,
+                       SendQueueCapacity: 100,
+               },
+       }
+}
+
+// OnStart implements BaseService
+func (cr *ConsensusReactor) OnStart() error {
+       return cr.BaseReactor.OnStart()
+}
+
+// OnStop implements BaseService
+func (cr *ConsensusReactor) OnStop() {
+       cr.BaseReactor.OnStop()
+}
+
+// AddPeer implements Reactor by sending our state to peer.
+func (cr *ConsensusReactor) AddPeer(peer *p2p.Peer) error {
+       cr.manager.addPeer(peer)
+       return nil
+}
+
+// RemovePeer implements Reactor by removing peer from the pool.
+func (cr *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
+       cr.manager.removePeer(peer.Key)
+}
+
+// Receive implements Reactor by handling messages.
+func (cr *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
+       msgType, msg, err := decodeMessage(msgBytes)
+       if err != nil {
+               logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("fail on reactor decoding message")
+               return
+       }
+
+       cr.manager.processMsg(src.ID(), msgType, msg)
+}
index 18d5291..61645bf 100644 (file)
@@ -3,15 +3,16 @@ package netsync
 import (
        "errors"
 
-       log "github.com/sirupsen/logrus"
+       "github.com/sirupsen/logrus"
 
-       cfg "github.com/vapor/config"
+       "github.com/vapor/config"
        "github.com/vapor/consensus"
        "github.com/vapor/event"
        "github.com/vapor/netsync/chainmgr"
+       "github.com/vapor/netsync/consensusmgr"
        "github.com/vapor/netsync/peers"
        "github.com/vapor/p2p"
-       core "github.com/vapor/protocol"
+       "github.com/vapor/protocol"
 )
 
 const (
@@ -22,12 +23,20 @@ var (
        errVaultModeDialPeer = errors.New("can't dial peer in vault mode")
 )
 
+// ChainMgr is the interface for p2p chain message sync manager.
 type ChainMgr interface {
        Start() error
        IsCaughtUp() bool
        Stop()
 }
 
+// ConsensusMgr is the interface for consensus message sync manager.
+type ConsensusMgr interface {
+       Start() error
+       Stop()
+}
+
+// Switch is the interface for p2p switch.
 type Switch interface {
        Start() (bool, error)
        Stop() bool
@@ -38,50 +47,60 @@ type Switch interface {
 
 //SyncManager Sync Manager is responsible for the business layer information synchronization
 type SyncManager struct {
-       config   *cfg.Config
-       sw       Switch
-       chainMgr ChainMgr
-       peers    *peers.PeerSet
+       config       *config.Config
+       sw           Switch
+       chainMgr     ChainMgr
+       consensusMgr ConsensusMgr
+       peers        *peers.PeerSet
 }
 
 // NewSyncManager create sync manager and set switch.
-func NewSyncManager(config *cfg.Config, chain *core.Chain, txPool *core.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) {
+func NewSyncManager(config *config.Config, chain *protocol.Chain, txPool *protocol.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) {
        sw, err := p2p.NewSwitch(config)
        if err != nil {
                return nil, err
        }
        peers := peers.NewPeerSet(sw)
 
-       chainManger, err := chainmgr.NewChainManager(config, sw, chain, txPool, dispatcher, peers)
+       chainManger, err := chainmgr.NewManager(config, sw, chain, txPool, dispatcher, peers)
        if err != nil {
                return nil, err
        }
-
+       consensusMgr := consensusmgr.NewManager(sw, chain, dispatcher, peers)
        return &SyncManager{
-               config:   config,
-               sw:       sw,
-               chainMgr: chainManger,
-               peers:    peers,
+               config:       config,
+               sw:           sw,
+               chainMgr:     chainManger,
+               consensusMgr: consensusMgr,
+               peers:        peers,
        }, nil
 }
 
+// Start message sync manager service.
 func (sm *SyncManager) Start() error {
        if _, err := sm.sw.Start(); err != nil {
-               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed start switch")
+               logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed start switch")
+               return err
+       }
+
+       if err := sm.chainMgr.Start(); err != nil {
                return err
        }
 
-       return sm.chainMgr.Start()
+       return sm.consensusMgr.Start()
 }
 
+// Stop message sync manager service.
 func (sm *SyncManager) Stop() {
        sm.chainMgr.Stop()
+       sm.consensusMgr.Stop()
        if !sm.config.VaultMode {
                sm.sw.Stop()
        }
 
 }
 
+// IsListening check if the bytomd service port is open?
 func (sm *SyncManager) IsListening() bool {
        if sm.config.VaultMode {
                return false
@@ -95,6 +114,7 @@ func (sm *SyncManager) IsCaughtUp() bool {
        return sm.chainMgr.IsCaughtUp()
 }
 
+// PeerCount count the number of connected peers.
 func (sm *SyncManager) PeerCount() int {
        if sm.config.VaultMode {
                return 0
@@ -102,10 +122,12 @@ func (sm *SyncManager) PeerCount() int {
        return len(sm.sw.Peers().List())
 }
 
+// GetNetwork get the type of network.
 func (sm *SyncManager) GetNetwork() string {
        return sm.config.ChainID
 }
 
+// BestPeer fine the peer with the highest height from the connected peers.
 func (sm *SyncManager) BestPeer() *peers.PeerInfo {
        bestPeer := sm.peers.BestPeer(consensus.SFFullNode)
        if bestPeer != nil {
@@ -114,6 +136,7 @@ func (sm *SyncManager) BestPeer() *peers.PeerInfo {
        return nil
 }
 
+// DialPeerWithAddress dial the peer and establish a connection.
 func (sm *SyncManager) DialPeerWithAddress(addr *p2p.NetAddress) error {
        if sm.config.VaultMode {
                return errVaultModeDialPeer
@@ -122,7 +145,7 @@ func (sm *SyncManager) DialPeerWithAddress(addr *p2p.NetAddress) error {
        return sm.sw.DialPeerWithAddress(addr)
 }
 
-//GetPeerInfos return peer info of all peers
+//GetPeerInfos return peer info of all connected peers.
 func (sm *SyncManager) GetPeerInfos() []*peers.PeerInfo {
        return sm.peers.GetPeerInfos()
 }
index 8ae535a..dca8b4d 100644 (file)
@@ -302,7 +302,7 @@ func (c *Chain) processBlock(block *types.Block) (bool, error) {
        return false, nil
 }
 
-func (c *Chain) processBlockSignature(signature, pubkey []byte, blockHeight uint64, blockHash *bc.Hash) error {
+func (c *Chain) ProcessBlockSignature(signature, pubkey []byte, blockHeight uint64, blockHash *bc.Hash) error {
        isBestIrreversible, err := c.bbft.ProcessBlockSignature(signature, pubkey, blockHeight, blockHash)
        if err != nil {
                return err