OSDN Git Service

Fix fast sync bug when the chain has fork (#282)
[bytom/vapor.git] / netsync / chainmgr / handle.go
index 9dea086..91f3949 100644 (file)
@@ -8,10 +8,12 @@ import (
 
        cfg "github.com/vapor/config"
        "github.com/vapor/consensus"
+       dbm "github.com/vapor/database/leveldb"
        "github.com/vapor/event"
        msgs "github.com/vapor/netsync/messages"
        "github.com/vapor/netsync/peers"
        "github.com/vapor/p2p"
+       "github.com/vapor/p2p/security"
        core "github.com/vapor/protocol"
        "github.com/vapor/protocol/bc"
        "github.com/vapor/protocol/bc/types"
@@ -24,6 +26,7 @@ const (
 // Chain is the interface for Bytom core
 type Chain interface {
        BestBlockHeader() *types.BlockHeader
+       LastIrreversibleHeader() *types.BlockHeader
        BestBlockHeight() uint64
        GetBlockByHash(*bc.Hash) (*types.Block, error)
        GetBlockByHeight(uint64) (*types.Block, error)
@@ -37,7 +40,6 @@ type Chain interface {
 
 type Switch interface {
        AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
-       AddBannedPeer(string) error
        Start() (bool, error)
        Stop() bool
        IsListening() bool
@@ -45,16 +47,21 @@ type Switch interface {
        Peers() *p2p.PeerSet
 }
 
+// Mempool is the interface for Bytom mempool
+type Mempool interface {
+       GetTransactions() []*core.TxDesc
+}
+
 //Manager is responsible for the business layer information synchronization
 type Manager struct {
        sw          Switch
        chain       Chain
-       txPool      *core.TxPool
+       mempool     Mempool
        blockKeeper *blockKeeper
        peers       *peers.PeerSet
 
        txSyncCh chan *txSyncMsg
-       quitSync chan struct{}
+       quit     chan struct{}
        config   *cfg.Config
 
        eventDispatcher *event.Dispatcher
@@ -62,15 +69,15 @@ type Manager struct {
 }
 
 //NewChainManager create a chain sync manager.
-func NewManager(config *cfg.Config, sw Switch, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher, peers *peers.PeerSet) (*Manager, error) {
+func NewManager(config *cfg.Config, sw Switch, chain Chain, mempool Mempool, dispatcher *event.Dispatcher, peers *peers.PeerSet, fastSyncDB dbm.DB) (*Manager, error) {
        manager := &Manager{
                sw:              sw,
-               txPool:          txPool,
+               mempool:         mempool,
                chain:           chain,
-               blockKeeper:     newBlockKeeper(chain, peers),
+               blockKeeper:     newBlockKeeper(chain, peers, fastSyncDB),
                peers:           peers,
                txSyncCh:        make(chan *txSyncMsg),
-               quitSync:        make(chan struct{}),
+               quit:            make(chan struct{}),
                config:          config,
                eventDispatcher: dispatcher,
        }
@@ -156,7 +163,7 @@ func (m *Manager) handleGetBlocksMsg(peer *peers.Peer, msg *msgs.GetBlocksMessag
                rawData, err := block.MarshalText()
                if err != nil {
                        log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg marshal block")
-                       continue
+                       return
                }
 
                if totalSize+len(rawData) > msgs.MaxBlockchainResponseSize/2 {
@@ -176,7 +183,7 @@ func (m *Manager) handleGetBlocksMsg(peer *peers.Peer, msg *msgs.GetBlocksMessag
 }
 
 func (m *Manager) handleGetHeadersMsg(peer *peers.Peer, msg *msgs.GetHeadersMessage) {
-       headers, err := m.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash())
+       headers, err := m.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash(), msg.GetSkip(), maxNumOfHeadersPerMsg)
        if err != nil || len(headers) == 0 {
                log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleGetHeadersMsg locateHeaders")
                return
@@ -234,42 +241,42 @@ func (m *Manager) handleHeadersMsg(peer *peers.Peer, msg *msgs.HeadersMessage) {
 
 func (m *Manager) handleStatusMsg(basePeer peers.BasePeer, msg *msgs.StatusMessage) {
        if peer := m.peers.GetPeer(basePeer.ID()); peer != nil {
-               peer.SetStatus(msg.Height, msg.GetHash())
-               return
+               peer.SetBestStatus(msg.BestHeight, msg.GetBestHash())
+               peer.SetIrreversibleStatus(msg.IrreversibleHeight, msg.GetIrreversibleHash())
        }
 }
 
 func (m *Manager) handleTransactionMsg(peer *peers.Peer, msg *msgs.TransactionMessage) {
        tx, err := msg.GetTransaction()
        if err != nil {
-               m.peers.AddBanScore(peer.ID(), 0, 10, "fail on get tx from message")
+               m.peers.ProcessIllegal(peer.ID(), security.LevelConnException, "fail on get tx from message")
                return
        }
 
+       m.peers.MarkTx(peer.ID(), tx.ID)
        if isOrphan, err := m.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
-               m.peers.AddBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
+               m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "fail on validate tx transaction")
        }
-       m.peers.MarkTx(peer.ID(), tx.ID)
 }
 
 func (m *Manager) handleTransactionsMsg(peer *peers.Peer, msg *msgs.TransactionsMessage) {
        txs, err := msg.GetTransactions()
        if err != nil {
-               m.peers.AddBanScore(peer.ID(), 0, 20, "fail on get txs from message")
+               m.peers.ProcessIllegal(peer.ID(), security.LevelConnException, "fail on get txs from message")
                return
        }
 
        if len(txs) > msgs.TxsMsgMaxTxNum {
-               m.peers.AddBanScore(peer.ID(), 20, 0, "exceeded the maximum tx number limit")
+               m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "exceeded the maximum tx number limit")
                return
        }
 
        for _, tx := range txs {
+               m.peers.MarkTx(peer.ID(), tx.ID)
                if isOrphan, err := m.chain.ValidateTx(tx); err != nil && !isOrphan {
-                       m.peers.AddBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
+                       m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "fail on validate tx transaction")
                        return
                }
-               m.peers.MarkTx(peer.ID(), tx.ID)
        }
 }
 
@@ -284,7 +291,7 @@ func (m *Manager) processMsg(basePeer peers.BasePeer, msgType byte, msg msgs.Blo
                "peer":    basePeer.Addr(),
                "type":    reflect.TypeOf(msg),
                "message": msg.String(),
-       }).Info("receive message from peer")
+       }).Debug("receive message from peer")
 
        switch msg := msg.(type) {
        case *msgs.GetBlockMessage:
@@ -345,7 +352,7 @@ func (m *Manager) SendStatus(peer peers.BasePeer) error {
                return errors.New("invalid peer")
        }
 
-       if err := p.SendStatus(m.chain.BestBlockHeader()); err != nil {
+       if err := p.SendStatus(m.chain.BestBlockHeader(), m.chain.LastIrreversibleHeader()); err != nil {
                m.peers.RemovePeer(p.ID())
                return err
        }
@@ -358,15 +365,15 @@ func (m *Manager) Start() error {
        if err != nil {
                return err
        }
-
-       // broadcast transactions
-       go m.txBroadcastLoop()
-       go m.txSyncLoop()
+       m.blockKeeper.start()
+       go m.broadcastTxsLoop()
+       go m.syncMempoolLoop()
 
        return nil
 }
 
 //Stop stop sync manager
 func (m *Manager) Stop() {
-       close(m.quitSync)
+       m.blockKeeper.stop()
+       close(m.quit)
 }