OSDN Git Service

Add consensus messages transfer (#90)
[bytom/vapor.git] / netsync / chainmgr / handle.go
index 8f7d679..9dea086 100644 (file)
@@ -45,8 +45,8 @@ type Switch interface {
        Peers() *p2p.PeerSet
 }
 
-//ChainManager is responsible for the business layer information synchronization
-type ChainManager struct {
+//Manager is responsible for the business layer information synchronization
+type Manager struct {
        sw          Switch
        chain       Chain
        txPool      *core.TxPool
@@ -62,8 +62,8 @@ type ChainManager struct {
 }
 
 //NewChainManager create a chain sync manager.
-func NewChainManager(config *cfg.Config, sw Switch, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher, peers *peers.PeerSet) (*ChainManager, error) {
-       manager := &ChainManager{
+func NewManager(config *cfg.Config, sw Switch, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher, peers *peers.PeerSet) (*Manager, error) {
+       manager := &Manager{
                sw:              sw,
                txPool:          txPool,
                chain:           chain,
@@ -82,53 +82,53 @@ func NewChainManager(config *cfg.Config, sw Switch, chain Chain, txPool *core.Tx
        return manager, nil
 }
 
-func (cm *ChainManager) AddPeer(peer peers.BasePeer) {
-       cm.peers.AddPeer(peer)
+func (m *Manager) AddPeer(peer peers.BasePeer) {
+       m.peers.AddPeer(peer)
 }
 
 //IsCaughtUp check wheather the peer finish the sync
-func (cm *ChainManager) IsCaughtUp() bool {
-       peer := cm.peers.BestPeer(consensus.SFFullNode)
-       return peer == nil || peer.Height() <= cm.chain.BestBlockHeight()
+func (m *Manager) IsCaughtUp() bool {
+       peer := m.peers.BestPeer(consensus.SFFullNode)
+       return peer == nil || peer.Height() <= m.chain.BestBlockHeight()
 }
 
-func (cm *ChainManager) handleBlockMsg(peer *peers.Peer, msg *msgs.BlockMessage) {
+func (m *Manager) handleBlockMsg(peer *peers.Peer, msg *msgs.BlockMessage) {
        block, err := msg.GetBlock()
        if err != nil {
                return
        }
-       cm.blockKeeper.processBlock(peer.ID(), block)
+       m.blockKeeper.processBlock(peer.ID(), block)
 }
 
-func (cm *ChainManager) handleBlocksMsg(peer *peers.Peer, msg *msgs.BlocksMessage) {
+func (m *Manager) handleBlocksMsg(peer *peers.Peer, msg *msgs.BlocksMessage) {
        blocks, err := msg.GetBlocks()
        if err != nil {
                log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleBlocksMsg GetBlocks")
                return
        }
 
-       cm.blockKeeper.processBlocks(peer.ID(), blocks)
+       m.blockKeeper.processBlocks(peer.ID(), blocks)
 }
 
-func (cm *ChainManager) handleFilterAddMsg(peer *peers.Peer, msg *msgs.FilterAddMessage) {
+func (m *Manager) handleFilterAddMsg(peer *peers.Peer, msg *msgs.FilterAddMessage) {
        peer.AddFilterAddress(msg.Address)
 }
 
-func (cm *ChainManager) handleFilterClearMsg(peer *peers.Peer) {
+func (m *Manager) handleFilterClearMsg(peer *peers.Peer) {
        peer.FilterClear()
 }
 
-func (cm *ChainManager) handleFilterLoadMsg(peer *peers.Peer, msg *msgs.FilterLoadMessage) {
+func (m *Manager) handleFilterLoadMsg(peer *peers.Peer, msg *msgs.FilterLoadMessage) {
        peer.AddFilterAddresses(msg.Addresses)
 }
 
-func (cm *ChainManager) handleGetBlockMsg(peer *peers.Peer, msg *msgs.GetBlockMessage) {
+func (m *Manager) handleGetBlockMsg(peer *peers.Peer, msg *msgs.GetBlockMessage) {
        var block *types.Block
        var err error
        if msg.Height != 0 {
-               block, err = cm.chain.GetBlockByHeight(msg.Height)
+               block, err = m.chain.GetBlockByHeight(msg.Height)
        } else {
-               block, err = cm.chain.GetBlockByHash(msg.GetHash())
+               block, err = m.chain.GetBlockByHash(msg.GetHash())
        }
        if err != nil {
                log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetBlockMsg get block from chain")
@@ -137,15 +137,15 @@ func (cm *ChainManager) handleGetBlockMsg(peer *peers.Peer, msg *msgs.GetBlockMe
 
        ok, err := peer.SendBlock(block)
        if !ok {
-               cm.peers.RemovePeer(peer.ID())
+               m.peers.RemovePeer(peer.ID())
        }
        if err != nil {
                log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlockMsg sentBlock")
        }
 }
 
-func (cm *ChainManager) handleGetBlocksMsg(peer *peers.Peer, msg *msgs.GetBlocksMessage) {
-       blocks, err := cm.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
+func (m *Manager) handleGetBlocksMsg(peer *peers.Peer, msg *msgs.GetBlocksMessage) {
+       blocks, err := m.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
        if err != nil || len(blocks) == 0 {
                return
        }
@@ -168,15 +168,15 @@ func (cm *ChainManager) handleGetBlocksMsg(peer *peers.Peer, msg *msgs.GetBlocks
 
        ok, err := peer.SendBlocks(sendBlocks)
        if !ok {
-               cm.peers.RemovePeer(peer.ID())
+               m.peers.RemovePeer(peer.ID())
        }
        if err != nil {
                log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg sentBlock")
        }
 }
 
-func (cm *ChainManager) handleGetHeadersMsg(peer *peers.Peer, msg *msgs.GetHeadersMessage) {
-       headers, err := cm.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash())
+func (m *Manager) handleGetHeadersMsg(peer *peers.Peer, msg *msgs.GetHeadersMessage) {
+       headers, err := m.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash())
        if err != nil || len(headers) == 0 {
                log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleGetHeadersMsg locateHeaders")
                return
@@ -184,20 +184,20 @@ func (cm *ChainManager) handleGetHeadersMsg(peer *peers.Peer, msg *msgs.GetHeade
 
        ok, err := peer.SendHeaders(headers)
        if !ok {
-               cm.peers.RemovePeer(peer.ID())
+               m.peers.RemovePeer(peer.ID())
        }
        if err != nil {
                log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetHeadersMsg sentBlock")
        }
 }
 
-func (cm *ChainManager) handleGetMerkleBlockMsg(peer *peers.Peer, msg *msgs.GetMerkleBlockMessage) {
+func (m *Manager) handleGetMerkleBlockMsg(peer *peers.Peer, msg *msgs.GetMerkleBlockMessage) {
        var err error
        var block *types.Block
        if msg.Height != 0 {
-               block, err = cm.chain.GetBlockByHeight(msg.Height)
+               block, err = m.chain.GetBlockByHeight(msg.Height)
        } else {
-               block, err = cm.chain.GetBlockByHash(msg.GetHash())
+               block, err = m.chain.GetBlockByHash(msg.GetHash())
        }
        if err != nil {
                log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get block from chain")
@@ -205,7 +205,7 @@ func (cm *ChainManager) handleGetMerkleBlockMsg(peer *peers.Peer, msg *msgs.GetM
        }
 
        blockHash := block.Hash()
-       txStatus, err := cm.chain.GetTransactionStatus(&blockHash)
+       txStatus, err := m.chain.GetTransactionStatus(&blockHash)
        if err != nil {
                log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get transaction status")
                return
@@ -218,63 +218,63 @@ func (cm *ChainManager) handleGetMerkleBlockMsg(peer *peers.Peer, msg *msgs.GetM
        }
 
        if !ok {
-               cm.peers.RemovePeer(peer.ID())
+               m.peers.RemovePeer(peer.ID())
        }
 }
 
-func (cm *ChainManager) handleHeadersMsg(peer *peers.Peer, msg *msgs.HeadersMessage) {
+func (m *Manager) handleHeadersMsg(peer *peers.Peer, msg *msgs.HeadersMessage) {
        headers, err := msg.GetHeaders()
        if err != nil {
                log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleHeadersMsg GetHeaders")
                return
        }
 
-       cm.blockKeeper.processHeaders(peer.ID(), headers)
+       m.blockKeeper.processHeaders(peer.ID(), headers)
 }
 
-func (cm *ChainManager) handleStatusMsg(basePeer peers.BasePeer, msg *msgs.StatusMessage) {
-       if peer := cm.peers.GetPeer(basePeer.ID()); peer != nil {
+func (m *Manager) handleStatusMsg(basePeer peers.BasePeer, msg *msgs.StatusMessage) {
+       if peer := m.peers.GetPeer(basePeer.ID()); peer != nil {
                peer.SetStatus(msg.Height, msg.GetHash())
                return
        }
 }
 
-func (cm *ChainManager) handleTransactionMsg(peer *peers.Peer, msg *msgs.TransactionMessage) {
+func (m *Manager) handleTransactionMsg(peer *peers.Peer, msg *msgs.TransactionMessage) {
        tx, err := msg.GetTransaction()
        if err != nil {
-               cm.peers.AddBanScore(peer.ID(), 0, 10, "fail on get tx from message")
+               m.peers.AddBanScore(peer.ID(), 0, 10, "fail on get tx from message")
                return
        }
 
-       if isOrphan, err := cm.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
-               cm.peers.AddBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
+       if isOrphan, err := m.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
+               m.peers.AddBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
        }
-       cm.peers.MarkTx(peer.ID(), tx.ID)
+       m.peers.MarkTx(peer.ID(), tx.ID)
 }
 
-func (cm *ChainManager) handleTransactionsMsg(peer *peers.Peer, msg *msgs.TransactionsMessage) {
+func (m *Manager) handleTransactionsMsg(peer *peers.Peer, msg *msgs.TransactionsMessage) {
        txs, err := msg.GetTransactions()
        if err != nil {
-               cm.peers.AddBanScore(peer.ID(), 0, 20, "fail on get txs from message")
+               m.peers.AddBanScore(peer.ID(), 0, 20, "fail on get txs from message")
                return
        }
 
        if len(txs) > msgs.TxsMsgMaxTxNum {
-               cm.peers.AddBanScore(peer.ID(), 20, 0, "exceeded the maximum tx number limit")
+               m.peers.AddBanScore(peer.ID(), 20, 0, "exceeded the maximum tx number limit")
                return
        }
 
        for _, tx := range txs {
-               if isOrphan, err := cm.chain.ValidateTx(tx); err != nil && !isOrphan {
-                       cm.peers.AddBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
+               if isOrphan, err := m.chain.ValidateTx(tx); err != nil && !isOrphan {
+                       m.peers.AddBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
                        return
                }
-               cm.peers.MarkTx(peer.ID(), tx.ID)
+               m.peers.MarkTx(peer.ID(), tx.ID)
        }
 }
 
-func (cm *ChainManager) processMsg(basePeer peers.BasePeer, msgType byte, msg msgs.BlockchainMessage) {
-       peer := cm.peers.GetPeer(basePeer.ID())
+func (m *Manager) processMsg(basePeer peers.BasePeer, msgType byte, msg msgs.BlockchainMessage) {
+       peer := m.peers.GetPeer(basePeer.ID())
        if peer == nil {
                return
        }
@@ -288,43 +288,43 @@ func (cm *ChainManager) processMsg(basePeer peers.BasePeer, msgType byte, msg ms
 
        switch msg := msg.(type) {
        case *msgs.GetBlockMessage:
-               cm.handleGetBlockMsg(peer, msg)
+               m.handleGetBlockMsg(peer, msg)
 
        case *msgs.BlockMessage:
-               cm.handleBlockMsg(peer, msg)
+               m.handleBlockMsg(peer, msg)
 
        case *msgs.StatusMessage:
-               cm.handleStatusMsg(basePeer, msg)
+               m.handleStatusMsg(basePeer, msg)
 
        case *msgs.TransactionMessage:
-               cm.handleTransactionMsg(peer, msg)
+               m.handleTransactionMsg(peer, msg)
 
        case *msgs.TransactionsMessage:
-               cm.handleTransactionsMsg(peer, msg)
+               m.handleTransactionsMsg(peer, msg)
 
        case *msgs.GetHeadersMessage:
-               cm.handleGetHeadersMsg(peer, msg)
+               m.handleGetHeadersMsg(peer, msg)
 
        case *msgs.HeadersMessage:
-               cm.handleHeadersMsg(peer, msg)
+               m.handleHeadersMsg(peer, msg)
 
        case *msgs.GetBlocksMessage:
-               cm.handleGetBlocksMsg(peer, msg)
+               m.handleGetBlocksMsg(peer, msg)
 
        case *msgs.BlocksMessage:
-               cm.handleBlocksMsg(peer, msg)
+               m.handleBlocksMsg(peer, msg)
 
        case *msgs.FilterLoadMessage:
-               cm.handleFilterLoadMsg(peer, msg)
+               m.handleFilterLoadMsg(peer, msg)
 
        case *msgs.FilterAddMessage:
-               cm.handleFilterAddMsg(peer, msg)
+               m.handleFilterAddMsg(peer, msg)
 
        case *msgs.FilterClearMessage:
-               cm.handleFilterClearMsg(peer)
+               m.handleFilterClearMsg(peer)
 
        case *msgs.GetMerkleBlockMessage:
-               cm.handleGetMerkleBlockMsg(peer, msg)
+               m.handleGetMerkleBlockMsg(peer, msg)
 
        default:
                log.WithFields(log.Fields{
@@ -335,38 +335,38 @@ func (cm *ChainManager) processMsg(basePeer peers.BasePeer, msgType byte, msg ms
        }
 }
 
-func (cm *ChainManager) RemovePeer(peerID string) {
-       cm.peers.RemovePeer(peerID)
+func (m *Manager) RemovePeer(peerID string) {
+       m.peers.RemovePeer(peerID)
 }
 
-func (cm *ChainManager) SendStatus(peer peers.BasePeer) error {
-       p := cm.peers.GetPeer(peer.ID())
+func (m *Manager) SendStatus(peer peers.BasePeer) error {
+       p := m.peers.GetPeer(peer.ID())
        if p == nil {
                return errors.New("invalid peer")
        }
 
-       if err := p.SendStatus(cm.chain.BestBlockHeader()); err != nil {
-               cm.peers.RemovePeer(p.ID())
+       if err := p.SendStatus(m.chain.BestBlockHeader()); err != nil {
+               m.peers.RemovePeer(p.ID())
                return err
        }
        return nil
 }
 
-func (cm *ChainManager) Start() error {
+func (m *Manager) Start() error {
        var err error
-       cm.txMsgSub, err = cm.eventDispatcher.Subscribe(core.TxMsgEvent{})
+       m.txMsgSub, err = m.eventDispatcher.Subscribe(core.TxMsgEvent{})
        if err != nil {
                return err
        }
 
        // broadcast transactions
-       go cm.txBroadcastLoop()
-       go cm.txSyncLoop()
+       go m.txBroadcastLoop()
+       go m.txSyncLoop()
 
        return nil
 }
 
 //Stop stop sync manager
-func (cm *ChainManager) Stop() {
-       close(cm.quitSync)
+func (m *Manager) Stop() {
+       close(m.quitSync)
 }