OSDN Git Service

mark tx before validation so it won't be sent to source again (#278)
[bytom/vapor.git] / netsync / chainmgr / handle.go
index 2a77d01..91f3949 100644 (file)
@@ -8,6 +8,7 @@ import (
 
        cfg "github.com/vapor/config"
        "github.com/vapor/consensus"
+       dbm "github.com/vapor/database/leveldb"
        "github.com/vapor/event"
        msgs "github.com/vapor/netsync/messages"
        "github.com/vapor/netsync/peers"
@@ -25,7 +26,7 @@ const (
 // Chain is the interface for Bytom core
 type Chain interface {
        BestBlockHeader() *types.BlockHeader
-       BestIrreversibleHeader() *types.BlockHeader
+       LastIrreversibleHeader() *types.BlockHeader
        BestBlockHeight() uint64
        GetBlockByHash(*bc.Hash) (*types.Block, error)
        GetBlockByHeight(uint64) (*types.Block, error)
@@ -68,12 +69,12 @@ type Manager struct {
 }
 
 //NewChainManager create a chain sync manager.
-func NewManager(config *cfg.Config, sw Switch, chain Chain, mempool Mempool, dispatcher *event.Dispatcher, peers *peers.PeerSet) (*Manager, error) {
+func NewManager(config *cfg.Config, sw Switch, chain Chain, mempool Mempool, dispatcher *event.Dispatcher, peers *peers.PeerSet, fastSyncDB dbm.DB) (*Manager, error) {
        manager := &Manager{
                sw:              sw,
                mempool:         mempool,
                chain:           chain,
-               blockKeeper:     newBlockKeeper(chain, peers),
+               blockKeeper:     newBlockKeeper(chain, peers, fastSyncDB),
                peers:           peers,
                txSyncCh:        make(chan *txSyncMsg),
                quit:            make(chan struct{}),
@@ -182,7 +183,7 @@ func (m *Manager) handleGetBlocksMsg(peer *peers.Peer, msg *msgs.GetBlocksMessag
 }
 
 func (m *Manager) handleGetHeadersMsg(peer *peers.Peer, msg *msgs.GetHeadersMessage) {
-       headers, err := m.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash(), msg.GetSkip(), maxHeadersPerMsg)
+       headers, err := m.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash(), msg.GetSkip(), maxNumOfHeadersPerMsg)
        if err != nil || len(headers) == 0 {
                log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleGetHeadersMsg locateHeaders")
                return
@@ -252,10 +253,10 @@ func (m *Manager) handleTransactionMsg(peer *peers.Peer, msg *msgs.TransactionMe
                return
        }
 
+       m.peers.MarkTx(peer.ID(), tx.ID)
        if isOrphan, err := m.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
                m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "fail on validate tx transaction")
        }
-       m.peers.MarkTx(peer.ID(), tx.ID)
 }
 
 func (m *Manager) handleTransactionsMsg(peer *peers.Peer, msg *msgs.TransactionsMessage) {
@@ -271,11 +272,11 @@ func (m *Manager) handleTransactionsMsg(peer *peers.Peer, msg *msgs.Transactions
        }
 
        for _, tx := range txs {
+               m.peers.MarkTx(peer.ID(), tx.ID)
                if isOrphan, err := m.chain.ValidateTx(tx); err != nil && !isOrphan {
                        m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "fail on validate tx transaction")
                        return
                }
-               m.peers.MarkTx(peer.ID(), tx.ID)
        }
 }
 
@@ -351,7 +352,7 @@ func (m *Manager) SendStatus(peer peers.BasePeer) error {
                return errors.New("invalid peer")
        }
 
-       if err := p.SendStatus(m.chain.BestBlockHeader(), m.chain.BestIrreversibleHeader()); err != nil {
+       if err := p.SendStatus(m.chain.BestBlockHeader(), m.chain.LastIrreversibleHeader()); err != nil {
                m.peers.RemovePeer(p.ID())
                return err
        }