X-Git-Url: http://git.osdn.net/view?p=bytom%2Fvapor.git;a=blobdiff_plain;f=netsync%2Fchainmgr%2Fhandle.go;h=91f39499a94d8d79eb79e957670e94f4e5ecb71f;hp=0d13847db5f6deb9ec2a508dee473374f577a261;hb=8adef3bf53dd78bcc2c550e810191dc2e50cc076;hpb=807d99726f6a0610fa9c835e2aabd983801d3510 diff --git a/netsync/chainmgr/handle.go b/netsync/chainmgr/handle.go index 0d13847d..91f39499 100644 --- a/netsync/chainmgr/handle.go +++ b/netsync/chainmgr/handle.go @@ -8,6 +8,7 @@ import ( cfg "github.com/vapor/config" "github.com/vapor/consensus" + dbm "github.com/vapor/database/leveldb" "github.com/vapor/event" msgs "github.com/vapor/netsync/messages" "github.com/vapor/netsync/peers" @@ -25,6 +26,7 @@ const ( // Chain is the interface for Bytom core type Chain interface { BestBlockHeader() *types.BlockHeader + LastIrreversibleHeader() *types.BlockHeader BestBlockHeight() uint64 GetBlockByHash(*bc.Hash) (*types.Block, error) GetBlockByHeight(uint64) (*types.Block, error) @@ -67,12 +69,12 @@ type Manager struct { } //NewChainManager create a chain sync manager. -func NewManager(config *cfg.Config, sw Switch, chain Chain, mempool Mempool, dispatcher *event.Dispatcher, peers *peers.PeerSet) (*Manager, error) { +func NewManager(config *cfg.Config, sw Switch, chain Chain, mempool Mempool, dispatcher *event.Dispatcher, peers *peers.PeerSet, fastSyncDB dbm.DB) (*Manager, error) { manager := &Manager{ sw: sw, mempool: mempool, chain: chain, - blockKeeper: newBlockKeeper(chain, peers), + blockKeeper: newBlockKeeper(chain, peers, fastSyncDB), peers: peers, txSyncCh: make(chan *txSyncMsg), quit: make(chan struct{}), @@ -161,7 +163,7 @@ func (m *Manager) handleGetBlocksMsg(peer *peers.Peer, msg *msgs.GetBlocksMessag rawData, err := block.MarshalText() if err != nil { log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg marshal block") - continue + return } if totalSize+len(rawData) > msgs.MaxBlockchainResponseSize/2 { @@ -181,7 +183,7 @@ func (m *Manager) handleGetBlocksMsg(peer *peers.Peer, msg *msgs.GetBlocksMessag } func (m *Manager) handleGetHeadersMsg(peer *peers.Peer, msg *msgs.GetHeadersMessage) { - headers, err := m.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash()) + headers, err := m.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash(), msg.GetSkip(), maxNumOfHeadersPerMsg) if err != nil || len(headers) == 0 { log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleGetHeadersMsg locateHeaders") return @@ -239,8 +241,8 @@ func (m *Manager) handleHeadersMsg(peer *peers.Peer, msg *msgs.HeadersMessage) { func (m *Manager) handleStatusMsg(basePeer peers.BasePeer, msg *msgs.StatusMessage) { if peer := m.peers.GetPeer(basePeer.ID()); peer != nil { - peer.SetStatus(msg.Height, msg.GetHash()) - return + peer.SetBestStatus(msg.BestHeight, msg.GetBestHash()) + peer.SetIrreversibleStatus(msg.IrreversibleHeight, msg.GetIrreversibleHash()) } } @@ -251,10 +253,10 @@ func (m *Manager) handleTransactionMsg(peer *peers.Peer, msg *msgs.TransactionMe return } + m.peers.MarkTx(peer.ID(), tx.ID) if isOrphan, err := m.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan { m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "fail on validate tx transaction") } - m.peers.MarkTx(peer.ID(), tx.ID) } func (m *Manager) handleTransactionsMsg(peer *peers.Peer, msg *msgs.TransactionsMessage) { @@ -270,11 +272,11 @@ func (m *Manager) handleTransactionsMsg(peer *peers.Peer, msg *msgs.Transactions } for _, tx := range txs { + m.peers.MarkTx(peer.ID(), tx.ID) if isOrphan, err := m.chain.ValidateTx(tx); err != nil && !isOrphan { m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "fail on validate tx transaction") return } - m.peers.MarkTx(peer.ID(), tx.ID) } } @@ -289,7 +291,7 @@ func (m *Manager) processMsg(basePeer peers.BasePeer, msgType byte, msg msgs.Blo "peer": basePeer.Addr(), "type": reflect.TypeOf(msg), "message": msg.String(), - }).Info("receive message from peer") + }).Debug("receive message from peer") switch msg := msg.(type) { case *msgs.GetBlockMessage: @@ -350,7 +352,7 @@ func (m *Manager) SendStatus(peer peers.BasePeer) error { return errors.New("invalid peer") } - if err := p.SendStatus(m.chain.BestBlockHeader()); err != nil { + if err := p.SendStatus(m.chain.BestBlockHeader(), m.chain.LastIrreversibleHeader()); err != nil { m.peers.RemovePeer(p.ID()) return err } @@ -363,7 +365,7 @@ func (m *Manager) Start() error { if err != nil { return err } - + m.blockKeeper.start() go m.broadcastTxsLoop() go m.syncMempoolLoop() @@ -372,5 +374,6 @@ func (m *Manager) Start() error { //Stop stop sync manager func (m *Manager) Stop() { + m.blockKeeper.stop() close(m.quit) }