From 4607cea3c6d3b1501ae1edb2c078b9ae0ee4bc79 Mon Sep 17 00:00:00 2001 From: yahtoo Date: Tue, 25 Jun 2019 18:30:38 +0800 Subject: [PATCH] add fast sync func (#204) * Modify fast sync function * Fix GetHeadersMessage error * Fix skeleton init error * Fix fetchHeaders error * Add fetch date finish check * Fix FetchBodiesTask error * Fix fetchBodies bug * Add multi-peer fetch support * Fix requireHeaders bug * add lit fast sync func * Fix fetchBlocks bug * opz code format * del test file * opz code * Add test case * change to singe peer fast sync * Fix bug * Add peer exception handle * del unused code * Add test case * Fix review bug * Fix locate headers error * Fix test file error * Add irreversible block fast sync * Fix test err * Fix review bug --- netsync/chainmgr/block_keeper.go | 381 ++++++++------------------------ netsync/chainmgr/block_keeper_test.go | 401 +--------------------------------- netsync/chainmgr/fast_sync.go | 192 ++++++++++++++++ netsync/chainmgr/fast_sync_test.go | 275 +++++++++++++++++++++++ netsync/chainmgr/handle.go | 14 +- netsync/chainmgr/msg_fetcher.go | 128 +++++++++++ netsync/chainmgr/protocol_reactor.go | 1 + netsync/chainmgr/tool_test.go | 5 +- netsync/messages/chain_msg.go | 36 ++- netsync/messages/chain_msg_test.go | 25 ++- netsync/peers/peer.go | 85 ++++--- protocol/protocol.go | 8 +- test/mock/chain.go | 4 + 13 files changed, 821 insertions(+), 734 deletions(-) create mode 100644 netsync/chainmgr/fast_sync.go create mode 100644 netsync/chainmgr/fast_sync_test.go create mode 100644 netsync/chainmgr/msg_fetcher.go diff --git a/netsync/chainmgr/block_keeper.go b/netsync/chainmgr/block_keeper.go index 112fd50e..d1407e95 100644 --- a/netsync/chainmgr/block_keeper.go +++ b/netsync/chainmgr/block_keeper.go @@ -1,7 +1,6 @@ package chainmgr import ( - "container/list" "time" log "github.com/sirupsen/logrus" @@ -15,22 +14,34 @@ import ( ) const ( - syncCycle = 5 * time.Second - blockProcessChSize = 1024 - blocksProcessChSize = 128 - headersProcessChSize = 1024 + syncCycle = 5 * time.Second + + noNeedSync = iota + fastSyncType + regularSyncType ) var ( - maxBlockPerMsg = uint64(128) - maxBlockHeadersPerMsg = uint64(2048) - syncTimeout = 30 * time.Second + syncTimeout = 30 * time.Second - errAppendHeaders = errors.New("fail to append list due to order dismatch") errRequestTimeout = errors.New("request timeout") errPeerDropped = errors.New("Peer dropped") ) +type FastSync interface { + locateBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) + locateHeaders(locator []*bc.Hash, stopHash *bc.Hash, skip uint64, maxNum uint64) ([]*types.BlockHeader, error) + process() error + setSyncPeer(peer *peers.Peer) +} + +type Fetcher interface { + processBlock(peerID string, block *types.Block) + processBlocks(peerID string, blocks []*types.Block) + processHeaders(peerID string, headers []*types.BlockHeader) + requireBlock(peerID string, height uint64) (*types.Block, error) +} + type blockMsg struct { block *types.Block peerID string @@ -47,218 +58,60 @@ type headersMsg struct { } type blockKeeper struct { - chain Chain - peers *peers.PeerSet - - syncPeer *peers.Peer - blockProcessCh chan *blockMsg - blocksProcessCh chan *blocksMsg - headersProcessCh chan *headersMsg + chain Chain + fastSync FastSync + msgFetcher Fetcher + peers *peers.PeerSet + syncPeer *peers.Peer - headerList *list.List + quit chan struct{} } func newBlockKeeper(chain Chain, peers *peers.PeerSet) *blockKeeper { - bk := &blockKeeper{ - chain: chain, - peers: peers, - blockProcessCh: make(chan *blockMsg, blockProcessChSize), - blocksProcessCh: make(chan *blocksMsg, blocksProcessChSize), - headersProcessCh: make(chan *headersMsg, headersProcessChSize), - headerList: list.New(), - } - bk.resetHeaderState() - go bk.syncWorker() - return bk -} - -func (bk *blockKeeper) appendHeaderList(headers []*types.BlockHeader) error { - for _, header := range headers { - prevHeader := bk.headerList.Back().Value.(*types.BlockHeader) - if prevHeader.Hash() != header.PreviousBlockHash { - return errAppendHeaders - } - bk.headerList.PushBack(header) - } - return nil -} - -func (bk *blockKeeper) blockLocator() []*bc.Hash { - header := bk.chain.BestBlockHeader() - locator := []*bc.Hash{} - - step := uint64(1) - for header != nil { - headerHash := header.Hash() - locator = append(locator, &headerHash) - if header.Height == 0 { - break - } - - var err error - if header.Height < step { - header, err = bk.chain.GetHeaderByHeight(0) - } else { - header, err = bk.chain.GetHeaderByHeight(header.Height - step) - } - if err != nil { - log.WithFields(log.Fields{"module": logModule, "err": err}).Error("blockKeeper fail on get blockLocator") - break - } - - if len(locator) >= 9 { - step *= 2 - } - } - return locator -} - -func (bk *blockKeeper) fastBlockSync(checkPoint *consensus.Checkpoint) error { - bk.resetHeaderState() - lastHeader := bk.headerList.Back().Value.(*types.BlockHeader) - for ; lastHeader.Hash() != checkPoint.Hash; lastHeader = bk.headerList.Back().Value.(*types.BlockHeader) { - if lastHeader.Height >= checkPoint.Height { - return errors.Wrap(peers.ErrPeerMisbehave, "peer is not in the checkpoint branch") - } - - lastHash := lastHeader.Hash() - headers, err := bk.requireHeaders([]*bc.Hash{&lastHash}, &checkPoint.Hash) - if err != nil { - return err - } - - if len(headers) == 0 { - return errors.Wrap(peers.ErrPeerMisbehave, "requireHeaders return empty list") - } - - if err := bk.appendHeaderList(headers); err != nil { - return err - } + msgFetcher := newMsgFetcher(peers) + return &blockKeeper{ + chain: chain, + fastSync: newFastSync(chain, msgFetcher, peers), + msgFetcher: msgFetcher, + peers: peers, + quit: make(chan struct{}), } - - fastHeader := bk.headerList.Front() - for bk.chain.BestBlockHeight() < checkPoint.Height { - locator := bk.blockLocator() - blocks, err := bk.requireBlocks(locator, &checkPoint.Hash) - if err != nil { - return err - } - - if len(blocks) == 0 { - return errors.Wrap(peers.ErrPeerMisbehave, "requireBlocks return empty list") - } - - for _, block := range blocks { - if fastHeader = fastHeader.Next(); fastHeader == nil { - return errors.New("get block than is higher than checkpoint") - } - - if _, err = bk.chain.ProcessBlock(block); err != nil { - return errors.Wrap(err, "fail on fastBlockSync process block") - } - } - } - return nil } func (bk *blockKeeper) locateBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) { - headers, err := bk.locateHeaders(locator, stopHash) - if err != nil { - return nil, err - } - - blocks := []*types.Block{} - for i, header := range headers { - if uint64(i) >= maxBlockPerMsg { - break - } - - headerHash := header.Hash() - block, err := bk.chain.GetBlockByHash(&headerHash) - if err != nil { - return nil, err - } - - blocks = append(blocks, block) - } - return blocks, nil + return bk.fastSync.locateBlocks(locator, stopHash) } -func (bk *blockKeeper) locateHeaders(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.BlockHeader, error) { - stopHeader, err := bk.chain.GetHeaderByHash(stopHash) - if err != nil { - return nil, err - } - - startHeader, err := bk.chain.GetHeaderByHeight(0) - if err != nil { - return nil, err - } - - for _, hash := range locator { - header, err := bk.chain.GetHeaderByHash(hash) - if err == nil && bk.chain.InMainChain(header.Hash()) { - startHeader = header - break - } - } - - totalHeaders := stopHeader.Height - startHeader.Height - if totalHeaders > maxBlockHeadersPerMsg { - totalHeaders = maxBlockHeadersPerMsg - } - - headers := []*types.BlockHeader{} - for i := uint64(1); i <= totalHeaders; i++ { - header, err := bk.chain.GetHeaderByHeight(startHeader.Height + i) - if err != nil { - return nil, err - } - - headers = append(headers, header) - } - return headers, nil -} - -func (bk *blockKeeper) nextCheckpoint() *consensus.Checkpoint { - height := bk.chain.BestBlockHeader().Height - checkpoints := consensus.ActiveNetParams.Checkpoints - if len(checkpoints) == 0 || height >= checkpoints[len(checkpoints)-1].Height { - return nil - } - - nextCheckpoint := &checkpoints[len(checkpoints)-1] - for i := len(checkpoints) - 2; i >= 0; i-- { - if height >= checkpoints[i].Height { - break - } - nextCheckpoint = &checkpoints[i] - } - return nextCheckpoint +func (bk *blockKeeper) locateHeaders(locator []*bc.Hash, stopHash *bc.Hash, skip uint64, maxNum uint64) ([]*types.BlockHeader, error) { + return bk.fastSync.locateHeaders(locator, stopHash, skip, maxNum) } func (bk *blockKeeper) processBlock(peerID string, block *types.Block) { - bk.blockProcessCh <- &blockMsg{block: block, peerID: peerID} + bk.msgFetcher.processBlock(peerID, block) } func (bk *blockKeeper) processBlocks(peerID string, blocks []*types.Block) { - bk.blocksProcessCh <- &blocksMsg{blocks: blocks, peerID: peerID} + bk.msgFetcher.processBlocks(peerID, blocks) } func (bk *blockKeeper) processHeaders(peerID string, headers []*types.BlockHeader) { - bk.headersProcessCh <- &headersMsg{headers: headers, peerID: peerID} + bk.msgFetcher.processHeaders(peerID, headers) } -func (bk *blockKeeper) regularBlockSync(wantHeight uint64) error { - i := bk.chain.BestBlockHeight() + 1 - for i <= wantHeight { - block, err := bk.requireBlock(i) +func (bk *blockKeeper) regularBlockSync() error { + peerHeight := bk.syncPeer.Height() + bestHeight := bk.chain.BestBlockHeight() + i := bestHeight + 1 + for i <= peerHeight { + block, err := bk.msgFetcher.requireBlock(bk.syncPeer.ID(), i) if err != nil { + bk.peers.ErrorHandler(bk.syncPeer.ID(), security.LevelConnException, err) return err } isOrphan, err := bk.chain.ProcessBlock(block) if err != nil { + bk.peers.ErrorHandler(bk.syncPeer.ID(), security.LevelMsgIllegal, err) return err } @@ -268,115 +121,62 @@ func (bk *blockKeeper) regularBlockSync(wantHeight uint64) error { } i = bk.chain.BestBlockHeight() + 1 } + log.WithFields(log.Fields{"module": logModule, "height": bk.chain.BestBlockHeight()}).Info("regular sync success") return nil } -func (bk *blockKeeper) requireBlock(height uint64) (*types.Block, error) { - if ok := bk.syncPeer.GetBlockByHeight(height); !ok { - return nil, errPeerDropped - } - - timeout := time.NewTimer(syncTimeout) - defer timeout.Stop() - - for { - select { - case msg := <-bk.blockProcessCh: - if msg.peerID != bk.syncPeer.ID() { - continue - } - if msg.block.Height != height { - continue - } - return msg.block, nil - case <-timeout.C: - return nil, errors.Wrap(errRequestTimeout, "requireBlock") - } - } +func (bk *blockKeeper) start() { + go bk.syncWorker() } -func (bk *blockKeeper) requireBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) { - if ok := bk.syncPeer.GetBlocks(locator, stopHash); !ok { - return nil, errPeerDropped +func (bk *blockKeeper) checkSyncType() int { + peer := bk.peers.BestIrreversiblePeer(consensus.SFFullNode | consensus.SFFastSync) + if peer == nil { + log.WithFields(log.Fields{"module": logModule}).Debug("can't find fast sync peer") + return noNeedSync } - timeout := time.NewTimer(syncTimeout) - defer timeout.Stop() + bestHeight := bk.chain.BestBlockHeight() - for { - select { - case msg := <-bk.blocksProcessCh: - if msg.peerID != bk.syncPeer.ID() { - continue - } - return msg.blocks, nil - case <-timeout.C: - return nil, errors.Wrap(errRequestTimeout, "requireBlocks") - } + if peerIrreversibleHeight := peer.IrreversibleHeight(); peerIrreversibleHeight >= bestHeight+minGapStartFastSync { + bk.fastSync.setSyncPeer(peer) + return fastSyncType } -} -func (bk *blockKeeper) requireHeaders(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.BlockHeader, error) { - if ok := bk.syncPeer.GetHeaders(locator, stopHash); !ok { - return nil, errPeerDropped + peer = bk.peers.BestPeer(consensus.SFFullNode) + if peer == nil { + log.WithFields(log.Fields{"module": logModule}).Debug("can't find sync peer") + return noNeedSync } - timeout := time.NewTimer(syncTimeout) - defer timeout.Stop() - - for { - select { - case msg := <-bk.headersProcessCh: - if msg.peerID != bk.syncPeer.ID() { - continue - } - return msg.headers, nil - case <-timeout.C: - return nil, errors.Wrap(errRequestTimeout, "requireHeaders") - } + peerHeight := peer.Height() + if peerHeight > bestHeight { + bk.syncPeer = peer + return regularSyncType } -} -// resetHeaderState sets the headers-first mode state to values appropriate for -// syncing from a new peer. -func (bk *blockKeeper) resetHeaderState() { - header := bk.chain.BestBlockHeader() - bk.headerList.Init() - if bk.nextCheckpoint() != nil { - bk.headerList.PushBack(header) - } + return noNeedSync } func (bk *blockKeeper) startSync() bool { - checkPoint := bk.nextCheckpoint() - peer := bk.peers.BestPeer(consensus.SFFastSync | consensus.SFFullNode) - if peer != nil && checkPoint != nil && peer.Height() >= checkPoint.Height { - bk.syncPeer = peer - if err := bk.fastBlockSync(checkPoint); err != nil { - log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on fastBlockSync") - bk.peers.ErrorHandler(peer.ID(), security.LevelMsgIllegal, err) + switch bk.checkSyncType() { + case fastSyncType: + if err := bk.fastSync.process(); err != nil { + log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("failed on fast sync") return false } - return true - } - - blockHeight := bk.chain.BestBlockHeight() - peer = bk.peers.BestPeer(consensus.SFFullNode) - if peer != nil && peer.Height() > blockHeight { - bk.syncPeer = peer - targetHeight := blockHeight + maxBlockPerMsg - if targetHeight > peer.Height() { - targetHeight = peer.Height() - } - - if err := bk.regularBlockSync(targetHeight); err != nil { + case regularSyncType: + if err := bk.regularBlockSync(); err != nil { log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on regularBlockSync") - bk.peers.ErrorHandler(peer.ID(),security.LevelMsgIllegal, err) return false } - return true } - return false + + return true +} + +func (bk *blockKeeper) stop() { + close(bk.quit) } func (bk *blockKeeper) syncWorker() { @@ -384,18 +184,17 @@ func (bk *blockKeeper) syncWorker() { defer syncTicker.Stop() for { - <-syncTicker.C - if update := bk.startSync(); !update { - continue - } - - block, err := bk.chain.GetBlockByHeight(bk.chain.BestBlockHeight()) - if err != nil { - log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker get best block") - } + select { + case <-syncTicker.C: + if update := bk.startSync(); !update { + continue + } - if err = bk.peers.BroadcastNewStatus(block); err != nil { - log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker broadcast new status") + if err := bk.peers.BroadcastNewStatus(bk.chain.BestBlockHeader(), bk.chain.BestIrreversibleHeader()); err != nil { + log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker broadcast new status") + } + case <-bk.quit: + return } } } diff --git a/netsync/chainmgr/block_keeper_test.go b/netsync/chainmgr/block_keeper_test.go index 43e6ec7a..d855ed06 100644 --- a/netsync/chainmgr/block_keeper_test.go +++ b/netsync/chainmgr/block_keeper_test.go @@ -1,7 +1,6 @@ package chainmgr import ( - "container/list" "encoding/json" "testing" "time" @@ -11,395 +10,19 @@ import ( msgs "github.com/vapor/netsync/messages" "github.com/vapor/protocol/bc" "github.com/vapor/protocol/bc/types" - "github.com/vapor/test/mock" "github.com/vapor/testutil" ) -func TestAppendHeaderList(t *testing.T) { - blocks := mockBlocks(nil, 7) - cases := []struct { - originalHeaders []*types.BlockHeader - inputHeaders []*types.BlockHeader - wantHeaders []*types.BlockHeader - err error - }{ - { - originalHeaders: []*types.BlockHeader{&blocks[0].BlockHeader}, - inputHeaders: []*types.BlockHeader{&blocks[1].BlockHeader, &blocks[2].BlockHeader}, - wantHeaders: []*types.BlockHeader{&blocks[0].BlockHeader, &blocks[1].BlockHeader, &blocks[2].BlockHeader}, - err: nil, - }, - { - originalHeaders: []*types.BlockHeader{&blocks[5].BlockHeader}, - inputHeaders: []*types.BlockHeader{&blocks[6].BlockHeader}, - wantHeaders: []*types.BlockHeader{&blocks[5].BlockHeader, &blocks[6].BlockHeader}, - err: nil, - }, - { - originalHeaders: []*types.BlockHeader{&blocks[5].BlockHeader}, - inputHeaders: []*types.BlockHeader{&blocks[7].BlockHeader}, - wantHeaders: []*types.BlockHeader{&blocks[5].BlockHeader}, - err: errAppendHeaders, - }, - { - originalHeaders: []*types.BlockHeader{&blocks[5].BlockHeader}, - inputHeaders: []*types.BlockHeader{&blocks[7].BlockHeader, &blocks[6].BlockHeader}, - wantHeaders: []*types.BlockHeader{&blocks[5].BlockHeader}, - err: errAppendHeaders, - }, - { - originalHeaders: []*types.BlockHeader{&blocks[2].BlockHeader}, - inputHeaders: []*types.BlockHeader{&blocks[3].BlockHeader, &blocks[4].BlockHeader, &blocks[6].BlockHeader}, - wantHeaders: []*types.BlockHeader{&blocks[2].BlockHeader, &blocks[3].BlockHeader, &blocks[4].BlockHeader}, - err: errAppendHeaders, - }, - } - - for i, c := range cases { - bk := &blockKeeper{headerList: list.New()} - for _, header := range c.originalHeaders { - bk.headerList.PushBack(header) - } - - if err := bk.appendHeaderList(c.inputHeaders); err != c.err { - t.Errorf("case %d: got error %v want error %v", i, err, c.err) - } - - gotHeaders := []*types.BlockHeader{} - for e := bk.headerList.Front(); e != nil; e = e.Next() { - gotHeaders = append(gotHeaders, e.Value.(*types.BlockHeader)) - } - - if !testutil.DeepEqual(gotHeaders, c.wantHeaders) { - t.Errorf("case %d: got %v want %v", i, gotHeaders, c.wantHeaders) - } - } -} - -func TestBlockLocator(t *testing.T) { - blocks := mockBlocks(nil, 500) - cases := []struct { - bestHeight uint64 - wantHeight []uint64 - }{ - { - bestHeight: 0, - wantHeight: []uint64{0}, - }, - { - bestHeight: 1, - wantHeight: []uint64{1, 0}, - }, - { - bestHeight: 7, - wantHeight: []uint64{7, 6, 5, 4, 3, 2, 1, 0}, - }, - { - bestHeight: 10, - wantHeight: []uint64{10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0}, - }, - { - bestHeight: 100, - wantHeight: []uint64{100, 99, 98, 97, 96, 95, 94, 93, 92, 91, 89, 85, 77, 61, 29, 0}, - }, - { - bestHeight: 500, - wantHeight: []uint64{500, 499, 498, 497, 496, 495, 494, 493, 492, 491, 489, 485, 477, 461, 429, 365, 237, 0}, - }, - } - - for i, c := range cases { - mockChain := mock.NewChain(nil) - bk := &blockKeeper{chain: mockChain} - mockChain.SetBestBlockHeader(&blocks[c.bestHeight].BlockHeader) - for i := uint64(0); i <= c.bestHeight; i++ { - mockChain.SetBlockByHeight(i, blocks[i]) - } - - want := []*bc.Hash{} - for _, i := range c.wantHeight { - hash := blocks[i].Hash() - want = append(want, &hash) - } - - if got := bk.blockLocator(); !testutil.DeepEqual(got, want) { - t.Errorf("case %d: got %v want %v", i, got, want) - } - } -} - -func TestFastBlockSync(t *testing.T) { - maxBlockPerMsg = 5 - maxBlockHeadersPerMsg = 10 - baseChain := mockBlocks(nil, 300) - - cases := []struct { - syncTimeout time.Duration - aBlocks []*types.Block - bBlocks []*types.Block - checkPoint *consensus.Checkpoint - want []*types.Block - err error - }{ - { - syncTimeout: 30 * time.Second, - aBlocks: baseChain[:100], - bBlocks: baseChain[:301], - checkPoint: &consensus.Checkpoint{ - Height: baseChain[250].Height, - Hash: baseChain[250].Hash(), - }, - want: baseChain[:251], - err: nil, - }, - { - syncTimeout: 30 * time.Second, - aBlocks: baseChain[:100], - bBlocks: baseChain[:301], - checkPoint: &consensus.Checkpoint{ - Height: baseChain[100].Height, - Hash: baseChain[100].Hash(), - }, - want: baseChain[:101], - err: nil, - }, - { - syncTimeout: 1 * time.Millisecond, - aBlocks: baseChain[:100], - bBlocks: baseChain[:100], - checkPoint: &consensus.Checkpoint{ - Height: baseChain[200].Height, - Hash: baseChain[200].Hash(), - }, - want: baseChain[:100], - err: errRequestTimeout, - }, - } - - for i, c := range cases { - syncTimeout = c.syncTimeout - a := mockSync(c.aBlocks, nil) - b := mockSync(c.bBlocks, nil) - netWork := NewNetWork() - netWork.Register(a, "192.168.0.1", "test node A", consensus.SFFullNode) - netWork.Register(b, "192.168.0.2", "test node B", consensus.SFFullNode) - if B2A, A2B, err := netWork.HandsShake(a, b); err != nil { - t.Errorf("fail on peer hands shake %v", err) - } else { - go B2A.postMan() - go A2B.postMan() - } - - a.blockKeeper.syncPeer = a.peers.GetPeer("test node B") - if err := a.blockKeeper.fastBlockSync(c.checkPoint); errors.Root(err) != c.err { - t.Errorf("case %d: got %v want %v", i, err, c.err) - } - - got := []*types.Block{} - for i := uint64(0); i <= a.chain.BestBlockHeight(); i++ { - block, err := a.chain.GetBlockByHeight(i) - if err != nil { - t.Errorf("case %d got err %v", i, err) - } - got = append(got, block) - } - - if !testutil.DeepEqual(got, c.want) { - t.Errorf("case %d: got %v want %v", i, got, c.want) - } - } -} - -func TestLocateBlocks(t *testing.T) { - maxBlockPerMsg = 5 - blocks := mockBlocks(nil, 100) - cases := []struct { - locator []uint64 - stopHash bc.Hash - wantHeight []uint64 - }{ - { - locator: []uint64{20}, - stopHash: blocks[100].Hash(), - wantHeight: []uint64{21, 22, 23, 24, 25}, - }, - } - - mockChain := mock.NewChain(nil) - bk := &blockKeeper{chain: mockChain} - for _, block := range blocks { - mockChain.SetBlockByHeight(block.Height, block) - } - - for i, c := range cases { - locator := []*bc.Hash{} - for _, i := range c.locator { - hash := blocks[i].Hash() - locator = append(locator, &hash) - } - - want := []*types.Block{} - for _, i := range c.wantHeight { - want = append(want, blocks[i]) - } - - got, _ := bk.locateBlocks(locator, &c.stopHash) - if !testutil.DeepEqual(got, want) { - t.Errorf("case %d: got %v want %v", i, got, want) - } - } -} - -func TestLocateHeaders(t *testing.T) { - maxBlockHeadersPerMsg = 10 - blocks := mockBlocks(nil, 150) - cases := []struct { - chainHeight uint64 - locator []uint64 - stopHash bc.Hash - wantHeight []uint64 - err bool - }{ - { - chainHeight: 100, - locator: []uint64{}, - stopHash: blocks[100].Hash(), - wantHeight: []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, - err: false, - }, - { - chainHeight: 100, - locator: []uint64{20}, - stopHash: blocks[100].Hash(), - wantHeight: []uint64{21, 22, 23, 24, 25, 26, 27, 28, 29, 30}, - err: false, - }, - { - chainHeight: 100, - locator: []uint64{20}, - stopHash: blocks[24].Hash(), - wantHeight: []uint64{21, 22, 23, 24}, - err: false, - }, - { - chainHeight: 100, - locator: []uint64{20}, - stopHash: blocks[20].Hash(), - wantHeight: []uint64{}, - err: false, - }, - { - chainHeight: 100, - locator: []uint64{20}, - stopHash: bc.Hash{}, - wantHeight: []uint64{}, - err: true, - }, - { - chainHeight: 100, - locator: []uint64{120, 70}, - stopHash: blocks[78].Hash(), - wantHeight: []uint64{71, 72, 73, 74, 75, 76, 77, 78}, - err: false, - }, - } - - for i, c := range cases { - mockChain := mock.NewChain(nil) - bk := &blockKeeper{chain: mockChain} - for i := uint64(0); i <= c.chainHeight; i++ { - mockChain.SetBlockByHeight(i, blocks[i]) - } - - locator := []*bc.Hash{} - for _, i := range c.locator { - hash := blocks[i].Hash() - locator = append(locator, &hash) - } - - want := []*types.BlockHeader{} - for _, i := range c.wantHeight { - want = append(want, &blocks[i].BlockHeader) - } - - got, err := bk.locateHeaders(locator, &c.stopHash) - if err != nil != c.err { - t.Errorf("case %d: got %v want err = %v", i, err, c.err) - } - if !testutil.DeepEqual(got, want) { - t.Errorf("case %d: got %v want %v", i, got, want) - } - } -} - -func TestNextCheckpoint(t *testing.T) { - cases := []struct { - checkPoints []consensus.Checkpoint - bestHeight uint64 - want *consensus.Checkpoint - }{ - { - checkPoints: []consensus.Checkpoint{}, - bestHeight: 5000, - want: nil, - }, - { - checkPoints: []consensus.Checkpoint{ - {Height: 10000, Hash: bc.Hash{V0: 1}}, - }, - bestHeight: 5000, - want: &consensus.Checkpoint{Height: 10000, Hash: bc.Hash{V0: 1}}, - }, - { - checkPoints: []consensus.Checkpoint{ - {Height: 10000, Hash: bc.Hash{V0: 1}}, - {Height: 20000, Hash: bc.Hash{V0: 2}}, - {Height: 30000, Hash: bc.Hash{V0: 3}}, - }, - bestHeight: 15000, - want: &consensus.Checkpoint{Height: 20000, Hash: bc.Hash{V0: 2}}, - }, - { - checkPoints: []consensus.Checkpoint{ - {Height: 10000, Hash: bc.Hash{V0: 1}}, - {Height: 20000, Hash: bc.Hash{V0: 2}}, - {Height: 30000, Hash: bc.Hash{V0: 3}}, - }, - bestHeight: 10000, - want: &consensus.Checkpoint{Height: 20000, Hash: bc.Hash{V0: 2}}, - }, - { - checkPoints: []consensus.Checkpoint{ - {Height: 10000, Hash: bc.Hash{V0: 1}}, - {Height: 20000, Hash: bc.Hash{V0: 2}}, - {Height: 30000, Hash: bc.Hash{V0: 3}}, - }, - bestHeight: 35000, - want: nil, - }, - } - - mockChain := mock.NewChain(nil) - for i, c := range cases { - consensus.ActiveNetParams.Checkpoints = c.checkPoints - mockChain.SetBestBlockHeader(&types.BlockHeader{Height: c.bestHeight}) - bk := &blockKeeper{chain: mockChain} - - if got := bk.nextCheckpoint(); !testutil.DeepEqual(got, c.want) { - t.Errorf("case %d: got %v want %v", i, got, c.want) - } - } -} - func TestRegularBlockSync(t *testing.T) { baseChain := mockBlocks(nil, 50) chainX := append(baseChain, mockBlocks(baseChain[50], 60)...) chainY := append(baseChain, mockBlocks(baseChain[50], 70)...) + chainZ := append(baseChain, mockBlocks(baseChain[50], 200)...) + cases := []struct { syncTimeout time.Duration aBlocks []*types.Block bBlocks []*types.Block - syncHeight uint64 want []*types.Block err error }{ @@ -407,15 +30,13 @@ func TestRegularBlockSync(t *testing.T) { syncTimeout: 30 * time.Second, aBlocks: baseChain[:20], bBlocks: baseChain[:50], - syncHeight: 45, - want: baseChain[:46], + want: baseChain[:50], err: nil, }, { syncTimeout: 30 * time.Second, aBlocks: chainX, bBlocks: chainY, - syncHeight: 70, want: chainY, err: nil, }, @@ -423,17 +44,15 @@ func TestRegularBlockSync(t *testing.T) { syncTimeout: 30 * time.Second, aBlocks: chainX[:52], bBlocks: chainY[:53], - syncHeight: 52, want: chainY[:53], err: nil, }, { - syncTimeout: 1 * time.Millisecond, - aBlocks: baseChain, - bBlocks: baseChain, - syncHeight: 52, - want: baseChain, - err: errRequestTimeout, + syncTimeout: 30 * time.Second, + aBlocks: chainX[:52], + bBlocks: chainZ, + want: chainZ[:201], + err: nil, }, } @@ -452,7 +71,7 @@ func TestRegularBlockSync(t *testing.T) { } a.blockKeeper.syncPeer = a.peers.GetPeer("test node B") - if err := a.blockKeeper.regularBlockSync(c.syncHeight); errors.Root(err) != c.err { + if err := a.blockKeeper.regularBlockSync(); errors.Root(err) != c.err { t.Errorf("case %d: got %v want %v", i, err, c.err) } @@ -512,7 +131,7 @@ func TestRequireBlock(t *testing.T) { for i, c := range cases { syncTimeout = c.syncTimeout - got, err := c.testNode.blockKeeper.requireBlock(c.requireHeight) + got, err := c.testNode.blockKeeper.msgFetcher.requireBlock(c.testNode.blockKeeper.syncPeer.ID(), c.requireHeight) if !testutil.DeepEqual(got, c.want) { t.Errorf("case %d: got %v want %v", i, got, c.want) } diff --git a/netsync/chainmgr/fast_sync.go b/netsync/chainmgr/fast_sync.go new file mode 100644 index 00000000..6a522235 --- /dev/null +++ b/netsync/chainmgr/fast_sync.go @@ -0,0 +1,192 @@ +package chainmgr + +import ( + log "github.com/sirupsen/logrus" + + "github.com/vapor/errors" + "github.com/vapor/netsync/peers" + "github.com/vapor/p2p/security" + "github.com/vapor/protocol/bc" + "github.com/vapor/protocol/bc/types" +) + +var ( + maxBlocksPerMsg = uint64(1000) + maxHeadersPerMsg = uint64(1000) + fastSyncPivotGap = uint64(64) + minGapStartFastSync = uint64(128) + maxFastSyncBlocksNum = uint64(10000) + + errOrphanBlock = errors.New("fast sync block is orphan") +) + +type MsgFetcher interface { + requireBlock(peerID string, height uint64) (*types.Block, error) + requireBlocks(peerID string, locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) +} + +type fastSync struct { + chain Chain + msgFetcher MsgFetcher + peers *peers.PeerSet + syncPeer *peers.Peer + stopHeader *types.BlockHeader + length uint64 + + quite chan struct{} +} + +func newFastSync(chain Chain, msgFether MsgFetcher, peers *peers.PeerSet) *fastSync { + return &fastSync{ + chain: chain, + msgFetcher: msgFether, + peers: peers, + quite: make(chan struct{}), + } +} + +func (fs *fastSync) blockLocator() []*bc.Hash { + header := fs.chain.BestBlockHeader() + locator := []*bc.Hash{} + + step := uint64(1) + for header != nil { + headerHash := header.Hash() + locator = append(locator, &headerHash) + if header.Height == 0 { + break + } + + var err error + if header.Height < step { + header, err = fs.chain.GetHeaderByHeight(0) + } else { + header, err = fs.chain.GetHeaderByHeight(header.Height - step) + } + if err != nil { + log.WithFields(log.Fields{"module": logModule, "err": err}).Error("blockKeeper fail on get blockLocator") + break + } + + if len(locator) >= 9 { + step *= 2 + } + } + return locator +} + +func (fs *fastSync) process() error { + if err := fs.findFastSyncRange(); err != nil { + return err + } + + stopHash := fs.stopHeader.Hash() + for fs.chain.BestBlockHeight() < fs.stopHeader.Height { + blocks, err := fs.msgFetcher.requireBlocks(fs.syncPeer.ID(), fs.blockLocator(), &stopHash) + if err != nil { + fs.peers.ErrorHandler(fs.syncPeer.ID(), security.LevelConnException, err) + return err + } + + if err := fs.verifyBlocks(blocks); err != nil { + fs.peers.ErrorHandler(fs.syncPeer.ID(), security.LevelMsgIllegal, err) + return err + } + } + + log.WithFields(log.Fields{"module": logModule, "height": fs.chain.BestBlockHeight()}).Info("fast sync success") + return nil +} + +func (fs *fastSync) findFastSyncRange() error { + bestHeight := fs.chain.BestBlockHeight() + fs.length = fs.syncPeer.IrreversibleHeight() - fastSyncPivotGap - bestHeight + if fs.length > maxFastSyncBlocksNum { + fs.length = maxFastSyncBlocksNum + } + + stopBlock, err := fs.msgFetcher.requireBlock(fs.syncPeer.ID(), bestHeight+fs.length) + if err != nil { + return err + } + + fs.stopHeader = &stopBlock.BlockHeader + return nil +} + +func (fs *fastSync) locateBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) { + headers, err := fs.locateHeaders(locator, stopHash, 0, maxBlocksPerMsg) + if err != nil { + return nil, err + } + + blocks := []*types.Block{} + for _, header := range headers { + headerHash := header.Hash() + block, err := fs.chain.GetBlockByHash(&headerHash) + if err != nil { + return nil, err + } + + blocks = append(blocks, block) + } + return blocks, nil +} + +func (fs *fastSync) locateHeaders(locator []*bc.Hash, stopHash *bc.Hash, skip uint64, maxNum uint64) ([]*types.BlockHeader, error) { + startHeader, err := fs.chain.GetHeaderByHeight(0) + if err != nil { + return nil, err + } + + for _, hash := range locator { + header, err := fs.chain.GetHeaderByHash(hash) + if err == nil && fs.chain.InMainChain(header.Hash()) { + startHeader = header + break + } + } + + headers := make([]*types.BlockHeader, 0) + stopHeader, err := fs.chain.GetHeaderByHash(stopHash) + if err != nil { + return headers, nil + } + + if !fs.chain.InMainChain(*stopHash) { + return headers, nil + } + + num := uint64(0) + for i := startHeader.Height; i <= stopHeader.Height && num < maxNum; i += skip + 1 { + header, err := fs.chain.GetHeaderByHeight(i) + if err != nil { + return nil, err + } + + headers = append(headers, header) + num++ + } + + return headers, nil +} + +func (fs *fastSync) setSyncPeer(peer *peers.Peer) { + fs.syncPeer = peer +} + +func (fs *fastSync) verifyBlocks(blocks []*types.Block) error { + for _, block := range blocks { + isOrphan, err := fs.chain.ProcessBlock(block) + if err != nil { + return err + } + + if isOrphan { + log.WithFields(log.Fields{"module": logModule, "height": block.Height, "hash": block.Hash()}).Error("fast sync block is orphan") + return errOrphanBlock + } + } + + return nil +} diff --git a/netsync/chainmgr/fast_sync_test.go b/netsync/chainmgr/fast_sync_test.go new file mode 100644 index 00000000..0ff37013 --- /dev/null +++ b/netsync/chainmgr/fast_sync_test.go @@ -0,0 +1,275 @@ +package chainmgr + +import ( + "testing" + "time" + + "github.com/vapor/consensus" + "github.com/vapor/errors" + "github.com/vapor/protocol/bc" + "github.com/vapor/protocol/bc/types" + "github.com/vapor/test/mock" + "github.com/vapor/testutil" +) + +func TestBlockLocator(t *testing.T) { + blocks := mockBlocks(nil, 500) + cases := []struct { + bestHeight uint64 + wantHeight []uint64 + }{ + { + bestHeight: 0, + wantHeight: []uint64{0}, + }, + { + bestHeight: 1, + wantHeight: []uint64{1, 0}, + }, + { + bestHeight: 7, + wantHeight: []uint64{7, 6, 5, 4, 3, 2, 1, 0}, + }, + { + bestHeight: 10, + wantHeight: []uint64{10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0}, + }, + { + bestHeight: 100, + wantHeight: []uint64{100, 99, 98, 97, 96, 95, 94, 93, 92, 91, 89, 85, 77, 61, 29, 0}, + }, + { + bestHeight: 500, + wantHeight: []uint64{500, 499, 498, 497, 496, 495, 494, 493, 492, 491, 489, 485, 477, 461, 429, 365, 237, 0}, + }, + } + + for i, c := range cases { + mockChain := mock.NewChain(nil) + fs := &fastSync{chain: mockChain} + mockChain.SetBestBlockHeader(&blocks[c.bestHeight].BlockHeader) + for i := uint64(0); i <= c.bestHeight; i++ { + mockChain.SetBlockByHeight(i, blocks[i]) + } + + want := []*bc.Hash{} + for _, i := range c.wantHeight { + hash := blocks[i].Hash() + want = append(want, &hash) + } + + if got := fs.blockLocator(); !testutil.DeepEqual(got, want) { + t.Errorf("case %d: got %v want %v", i, got, want) + } + } +} + +func TestFastBlockSync(t *testing.T) { + maxBlocksPerMsg = 10 + maxHeadersPerMsg = 10 + maxFastSyncBlocksNum = 200 + baseChain := mockBlocks(nil, 300) + + cases := []struct { + syncTimeout time.Duration + aBlocks []*types.Block + bBlocks []*types.Block + want []*types.Block + err error + }{ + { + syncTimeout: 30 * time.Second, + aBlocks: baseChain[:50], + bBlocks: baseChain[:301], + want: baseChain[:237], + err: nil, + }, + { + syncTimeout: 30 * time.Second, + aBlocks: baseChain[:2], + bBlocks: baseChain[:300], + want: baseChain[:202], + err: nil, + }, + } + + for i, c := range cases { + syncTimeout = c.syncTimeout + a := mockSync(c.aBlocks, nil) + b := mockSync(c.bBlocks, nil) + netWork := NewNetWork() + netWork.Register(a, "192.168.0.1", "test node A", consensus.SFFullNode|consensus.SFFastSync) + netWork.Register(b, "192.168.0.2", "test node B", consensus.SFFullNode|consensus.SFFastSync) + if B2A, A2B, err := netWork.HandsShake(a, b); err != nil { + t.Errorf("fail on peer hands shake %v", err) + } else { + go B2A.postMan() + go A2B.postMan() + } + a.blockKeeper.syncPeer = a.peers.GetPeer("test node B") + a.blockKeeper.fastSync.setSyncPeer(a.blockKeeper.syncPeer) + + if err := a.blockKeeper.fastSync.process(); errors.Root(err) != c.err { + t.Errorf("case %d: got %v want %v", i, err, c.err) + } + + got := []*types.Block{} + for i := uint64(0); i <= a.chain.BestBlockHeight(); i++ { + block, err := a.chain.GetBlockByHeight(i) + if err != nil { + t.Errorf("case %d got err %v", i, err) + } + got = append(got, block) + } + if !testutil.DeepEqual(got, c.want) { + t.Errorf("case %d: got %v want %v", i, got, c.want) + } + } +} + +func TestLocateBlocks(t *testing.T) { + maxBlocksPerMsg = 5 + blocks := mockBlocks(nil, 100) + cases := []struct { + locator []uint64 + stopHash bc.Hash + wantHeight []uint64 + }{ + { + locator: []uint64{20}, + stopHash: blocks[100].Hash(), + wantHeight: []uint64{20, 21, 22, 23, 24}, + }, + } + + mockChain := mock.NewChain(nil) + fs := &fastSync{chain: mockChain} + for _, block := range blocks { + mockChain.SetBlockByHeight(block.Height, block) + } + + for i, c := range cases { + locator := []*bc.Hash{} + for _, i := range c.locator { + hash := blocks[i].Hash() + locator = append(locator, &hash) + } + + want := []*types.Block{} + for _, i := range c.wantHeight { + want = append(want, blocks[i]) + } + + got, _ := fs.locateBlocks(locator, &c.stopHash) + if !testutil.DeepEqual(got, want) { + t.Errorf("case %d: got %v want %v", i, got, want) + } + } +} + +func TestLocateHeaders(t *testing.T) { + maxHeadersPerMsg = 10 + blocks := mockBlocks(nil, 150) + blocksHash := []bc.Hash{} + for _, block := range blocks { + blocksHash = append(blocksHash, block.Hash()) + } + + cases := []struct { + chainHeight uint64 + locator []uint64 + stopHash *bc.Hash + skip uint64 + wantHeight []uint64 + err bool + }{ + { + chainHeight: 100, + locator: []uint64{90}, + stopHash: &blocksHash[100], + skip: 0, + wantHeight: []uint64{90, 91, 92, 93, 94, 95, 96, 97, 98, 99}, + err: false, + }, + { + chainHeight: 100, + locator: []uint64{20}, + stopHash: &blocksHash[24], + skip: 0, + wantHeight: []uint64{20, 21, 22, 23, 24}, + err: false, + }, + { + chainHeight: 100, + locator: []uint64{20}, + stopHash: &blocksHash[20], + wantHeight: []uint64{20}, + err: false, + }, + { + chainHeight: 100, + locator: []uint64{20}, + stopHash: &blocksHash[120], + wantHeight: []uint64{}, + err: false, + }, + { + chainHeight: 100, + locator: []uint64{120, 70}, + stopHash: &blocksHash[78], + wantHeight: []uint64{70, 71, 72, 73, 74, 75, 76, 77, 78}, + err: false, + }, + { + chainHeight: 100, + locator: []uint64{15}, + stopHash: &blocksHash[10], + skip: 10, + wantHeight: []uint64{}, + err: false, + }, + { + chainHeight: 100, + locator: []uint64{15}, + stopHash: &blocksHash[80], + skip: 10, + wantHeight: []uint64{15, 26, 37, 48, 59, 70}, + err: false, + }, + { + chainHeight: 100, + locator: []uint64{0}, + stopHash: &blocksHash[100], + skip: 9, + wantHeight: []uint64{0, 10, 20, 30, 40, 50, 60, 70, 80, 90}, + err: false, + }, + } + + for i, c := range cases { + mockChain := mock.NewChain(nil) + fs := &fastSync{chain: mockChain} + for i := uint64(0); i <= c.chainHeight; i++ { + mockChain.SetBlockByHeight(i, blocks[i]) + } + + locator := []*bc.Hash{} + for _, i := range c.locator { + hash := blocks[i].Hash() + locator = append(locator, &hash) + } + + want := []*types.BlockHeader{} + for _, i := range c.wantHeight { + want = append(want, &blocks[i].BlockHeader) + } + + got, err := fs.locateHeaders(locator, c.stopHash, c.skip, maxHeadersPerMsg) + if err != nil != c.err { + t.Errorf("case %d: got %v want err = %v", i, err, c.err) + } + if !testutil.DeepEqual(got, want) { + t.Errorf("case %d: got %v want %v", i, got, want) + } + } +} diff --git a/netsync/chainmgr/handle.go b/netsync/chainmgr/handle.go index 992a76c9..2a77d013 100644 --- a/netsync/chainmgr/handle.go +++ b/netsync/chainmgr/handle.go @@ -25,6 +25,7 @@ const ( // Chain is the interface for Bytom core type Chain interface { BestBlockHeader() *types.BlockHeader + BestIrreversibleHeader() *types.BlockHeader BestBlockHeight() uint64 GetBlockByHash(*bc.Hash) (*types.Block, error) GetBlockByHeight(uint64) (*types.Block, error) @@ -161,7 +162,7 @@ func (m *Manager) handleGetBlocksMsg(peer *peers.Peer, msg *msgs.GetBlocksMessag rawData, err := block.MarshalText() if err != nil { log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg marshal block") - continue + return } if totalSize+len(rawData) > msgs.MaxBlockchainResponseSize/2 { @@ -181,7 +182,7 @@ func (m *Manager) handleGetBlocksMsg(peer *peers.Peer, msg *msgs.GetBlocksMessag } func (m *Manager) handleGetHeadersMsg(peer *peers.Peer, msg *msgs.GetHeadersMessage) { - headers, err := m.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash()) + headers, err := m.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash(), msg.GetSkip(), maxHeadersPerMsg) if err != nil || len(headers) == 0 { log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleGetHeadersMsg locateHeaders") return @@ -239,8 +240,8 @@ func (m *Manager) handleHeadersMsg(peer *peers.Peer, msg *msgs.HeadersMessage) { func (m *Manager) handleStatusMsg(basePeer peers.BasePeer, msg *msgs.StatusMessage) { if peer := m.peers.GetPeer(basePeer.ID()); peer != nil { - peer.SetStatus(msg.Height, msg.GetHash()) - return + peer.SetBestStatus(msg.BestHeight, msg.GetBestHash()) + peer.SetIrreversibleStatus(msg.IrreversibleHeight, msg.GetIrreversibleHash()) } } @@ -350,7 +351,7 @@ func (m *Manager) SendStatus(peer peers.BasePeer) error { return errors.New("invalid peer") } - if err := p.SendStatus(m.chain.BestBlockHeader()); err != nil { + if err := p.SendStatus(m.chain.BestBlockHeader(), m.chain.BestIrreversibleHeader()); err != nil { m.peers.RemovePeer(p.ID()) return err } @@ -363,7 +364,7 @@ func (m *Manager) Start() error { if err != nil { return err } - + m.blockKeeper.start() go m.broadcastTxsLoop() go m.syncMempoolLoop() @@ -372,5 +373,6 @@ func (m *Manager) Start() error { //Stop stop sync manager func (m *Manager) Stop() { + m.blockKeeper.stop() close(m.quit) } diff --git a/netsync/chainmgr/msg_fetcher.go b/netsync/chainmgr/msg_fetcher.go new file mode 100644 index 00000000..f635667c --- /dev/null +++ b/netsync/chainmgr/msg_fetcher.go @@ -0,0 +1,128 @@ +package chainmgr + +import ( + "time" + + "github.com/vapor/errors" + "github.com/vapor/netsync/peers" + "github.com/vapor/protocol/bc" + "github.com/vapor/protocol/bc/types" +) + +const ( + blockProcessChSize = 1024 + blocksProcessChSize = 128 + headersProcessChSize = 1024 +) + +type msgFetcher struct { + peers *peers.PeerSet + + blockProcessCh chan *blockMsg + blocksProcessCh chan *blocksMsg + headersProcessCh chan *headersMsg +} + +func newMsgFetcher(peers *peers.PeerSet) *msgFetcher { + return &msgFetcher{ + peers: peers, + blockProcessCh: make(chan *blockMsg, blockProcessChSize), + blocksProcessCh: make(chan *blocksMsg, blocksProcessChSize), + headersProcessCh: make(chan *headersMsg, headersProcessChSize), + } +} + +func (mf *msgFetcher) processBlock(peerID string, block *types.Block) { + mf.blockProcessCh <- &blockMsg{block: block, peerID: peerID} +} + +func (mf *msgFetcher) processBlocks(peerID string, blocks []*types.Block) { + mf.blocksProcessCh <- &blocksMsg{blocks: blocks, peerID: peerID} +} + +func (mf *msgFetcher) processHeaders(peerID string, headers []*types.BlockHeader) { + mf.headersProcessCh <- &headersMsg{headers: headers, peerID: peerID} +} + +func (mf *msgFetcher) requireBlock(peerID string, height uint64) (*types.Block, error) { + peer := mf.peers.GetPeer(peerID) + if peer == nil { + return nil, errPeerDropped + } + + if ok := peer.GetBlockByHeight(height); !ok { + return nil, errPeerDropped + } + + timeout := time.NewTimer(syncTimeout) + defer timeout.Stop() + + for { + select { + case msg := <-mf.blockProcessCh: + if msg.peerID != peerID { + continue + } + if msg.block.Height != height { + continue + } + return msg.block, nil + case <-timeout.C: + return nil, errors.Wrap(errRequestTimeout, "requireBlock") + } + } +} + +func (mf *msgFetcher) requireBlocks(peerID string, locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) { + peer := mf.peers.GetPeer(peerID) + if peer == nil { + return nil, errPeerDropped + } + + if ok := peer.GetBlocks(locator, stopHash); !ok { + return nil, errPeerDropped + } + + timeout := time.NewTimer(syncTimeout) + defer timeout.Stop() + + for { + select { + case msg := <-mf.blocksProcessCh: + if msg.peerID != peerID { + continue + } + + return msg.blocks, nil + case <-timeout.C: + return nil, errors.Wrap(errRequestTimeout, "requireBlocks") + } + } +} + +func (mf *msgFetcher) requireHeaders(peerID string, locator []*bc.Hash, stopHash *bc.Hash, skip uint64) ([]*types.BlockHeader, error) { + peer := mf.peers.GetPeer(peerID) + if peer == nil { + return nil, errPeerDropped + } + + if ok := peer.GetHeaders(locator, stopHash, skip); !ok { + return nil, errPeerDropped + } + + timeout := time.NewTimer(syncTimeout) + defer timeout.Stop() + + for { + select { + case msg := <-mf.headersProcessCh: + if msg.peerID != peerID { + continue + } + + return msg.headers, nil + case <-timeout.C: + return nil, errors.Wrap(errRequestTimeout, "requireHeaders") + } + } +} diff --git a/netsync/chainmgr/protocol_reactor.go b/netsync/chainmgr/protocol_reactor.go index 7cf0909a..0b3c5a7c 100644 --- a/netsync/chainmgr/protocol_reactor.go +++ b/netsync/chainmgr/protocol_reactor.go @@ -56,6 +56,7 @@ func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error { if err := pr.manager.SendStatus(peer); err != nil { return err } + pr.manager.syncMempool(peer.Key) return nil } diff --git a/netsync/chainmgr/tool_test.go b/netsync/chainmgr/tool_test.go index db17b4ff..63816417 100644 --- a/netsync/chainmgr/tool_test.go +++ b/netsync/chainmgr/tool_test.go @@ -5,7 +5,7 @@ import ( "math/rand" "net" - wire "github.com/tendermint/go-wire" + "github.com/tendermint/go-wire" "github.com/tendermint/tmlibs/flowrate" "github.com/vapor/consensus" @@ -123,7 +123,8 @@ func (nw *NetWork) HandsShake(nodeA, nodeB *Manager) (*P2PPeer, *P2PPeer, error) nodeA.AddPeer(&A2B) nodeB.AddPeer(&B2A) - + nodeA.SendStatus(B2A.srcPeer) + nodeB.SendStatus(A2B.srcPeer) A2B.setAsync(true) B2A.setAsync(true) return &B2A, &A2B, nil diff --git a/netsync/messages/chain_msg.go b/netsync/messages/chain_msg.go index 84bc0311..3ef82dce 100644 --- a/netsync/messages/chain_msg.go +++ b/netsync/messages/chain_msg.go @@ -117,12 +117,14 @@ func (m *BlockMessage) String() string { type GetHeadersMessage struct { RawBlockLocator [][32]byte RawStopHash [32]byte + Skip uint64 } //NewGetHeadersMessage return a new GetHeadersMessage -func NewGetHeadersMessage(blockLocator []*bc.Hash, stopHash *bc.Hash) *GetHeadersMessage { +func NewGetHeadersMessage(blockLocator []*bc.Hash, stopHash *bc.Hash, skip uint64) *GetHeadersMessage { msg := &GetHeadersMessage{ RawStopHash: stopHash.Byte32(), + Skip: skip, } for _, hash := range blockLocator { msg.RawBlockLocator = append(msg.RawBlockLocator, hash.Byte32()) @@ -141,7 +143,8 @@ func (m *GetHeadersMessage) GetBlockLocator() []*bc.Hash { } func (m *GetHeadersMessage) String() string { - return fmt.Sprintf("{stop_hash: %s}", hex.EncodeToString(m.RawStopHash[:])) + stopHash := bc.NewHash(m.RawStopHash) + return fmt.Sprintf("{skip:%d,stopHash:%s}", m.Skip, stopHash.String()) } //GetStopHash return the stop hash of the msg @@ -150,6 +153,10 @@ func (m *GetHeadersMessage) GetStopHash() *bc.Hash { return &hash } +func (m *GetHeadersMessage) GetSkip() uint64 { + return m.Skip +} + //HeadersMessage is one of the bytom msg type type HeadersMessage struct { RawHeaders [][]byte @@ -263,26 +270,35 @@ func (m *BlocksMessage) String() string { //StatusResponseMessage get status response msg type StatusMessage struct { - Height uint64 - RawHash [32]byte + BestHeight uint64 + BestHash [32]byte + IrreversibleHeight uint64 + IrreversibleHash [32]byte } //NewStatusResponseMessage construct get status response msg -func NewStatusMessage(blockHeader *types.BlockHeader) *StatusMessage { +func NewStatusMessage(bestHeader, irreversibleHeader *types.BlockHeader) *StatusMessage { return &StatusMessage{ - Height: blockHeader.Height, - RawHash: blockHeader.Hash().Byte32(), + BestHeight: bestHeader.Height, + BestHash: bestHeader.Hash().Byte32(), + IrreversibleHeight: irreversibleHeader.Height, + IrreversibleHash: irreversibleHeader.Hash().Byte32(), } } //GetHash get hash from msg -func (m *StatusMessage) GetHash() *bc.Hash { - hash := bc.NewHash(m.RawHash) +func (m *StatusMessage) GetBestHash() *bc.Hash { + hash := bc.NewHash(m.BestHash) + return &hash +} + +func (m *StatusMessage) GetIrreversibleHash() *bc.Hash { + hash := bc.NewHash(m.IrreversibleHash) return &hash } func (m *StatusMessage) String() string { - return fmt.Sprintf("{height: %d, hash: %s}", m.Height, hex.EncodeToString(m.RawHash[:])) + return fmt.Sprintf("{best hash: %s, irreversible hash: %s}", hex.EncodeToString(m.BestHash[:]), hex.EncodeToString(m.IrreversibleHash[:])) } //TransactionMessage notify new tx msg diff --git a/netsync/messages/chain_msg_test.go b/netsync/messages/chain_msg_test.go index c87d6bcf..19900b74 100644 --- a/netsync/messages/chain_msg_test.go +++ b/netsync/messages/chain_msg_test.go @@ -170,23 +170,30 @@ func TestGetBlockMessage(t *testing.T) { type testGetHeadersMessage struct { blockLocator []*bc.Hash stopHash *bc.Hash + skip uint64 } func TestGetHeadersMessage(t *testing.T) { testMsg := testGetHeadersMessage{ blockLocator: []*bc.Hash{{V0: 0x01}, {V0: 0x02}, {V0: 0x03}}, - stopHash: &bc.Hash{V0: 0xaa, V2: 0x55}, + stopHash: &bc.Hash{V0: 0x01}, + skip: 888, } - getHeadersMsg := NewGetHeadersMessage(testMsg.blockLocator, testMsg.stopHash) + getHeadersMsg := NewGetHeadersMessage(testMsg.blockLocator, testMsg.stopHash, testMsg.skip) gotBlockLocator := getHeadersMsg.GetBlockLocator() gotStopHash := getHeadersMsg.GetStopHash() + gotSkip := getHeadersMsg.GetSkip() if !reflect.DeepEqual(testMsg.blockLocator, gotBlockLocator) { t.Errorf("get headers msg test err: got %s\nwant %s", spew.Sdump(gotBlockLocator), spew.Sdump(testMsg.blockLocator)) } if !reflect.DeepEqual(testMsg.stopHash, gotStopHash) { - t.Errorf("get headers msg test err: got %s\nwant %s", spew.Sdump(gotStopHash), spew.Sdump(testMsg.stopHash)) + t.Errorf("get headers msg test err: amount:got %d\nwant %d", gotStopHash, testMsg.stopHash) + } + + if !reflect.DeepEqual(testMsg.skip, gotSkip) { + t.Errorf("get headers msg test err: skip:got %d\nwant %d", gotSkip, testMsg.skip) } } @@ -233,9 +240,13 @@ func TestBlocksMessage(t *testing.T) { } func TestStatusMessage(t *testing.T) { - statusResponseMsg := NewStatusMessage(&testBlock.BlockHeader) - gotHash := statusResponseMsg.GetHash() - if !reflect.DeepEqual(*gotHash, testBlock.Hash()) { - t.Errorf("status response msg test err: got %s\nwant %s", spew.Sdump(*gotHash), spew.Sdump(testBlock.Hash())) + statusResponseMsg := NewStatusMessage(&testBlock.BlockHeader, &testBlock.BlockHeader) + gotBestHash := statusResponseMsg.GetBestHash() + if !reflect.DeepEqual(*gotBestHash, testBlock.Hash()) { + t.Errorf("status response msg test err: got %s\nwant %s", spew.Sdump(*gotBestHash), spew.Sdump(testBlock.Hash())) + } + gotIrreversibleHash := statusResponseMsg.GetIrreversibleHash() + if !reflect.DeepEqual(*gotIrreversibleHash, testBlock.Hash()) { + t.Errorf("status response msg test err: got %s\nwant %s", spew.Sdump(*gotIrreversibleHash), spew.Sdump(testBlock.Hash())) } } diff --git a/netsync/peers/peer.go b/netsync/peers/peer.go index 1f0ac249..d0eb7eeb 100644 --- a/netsync/peers/peer.go +++ b/netsync/peers/peer.go @@ -73,15 +73,17 @@ type PeerInfo struct { type Peer struct { BasePeer - mtx sync.RWMutex - services consensus.ServiceFlag - height uint64 - hash *bc.Hash - knownTxs *set.Set // Set of transaction hashes known to be known by this peer - knownBlocks *set.Set // Set of block hashes known to be known by this peer - knownSignatures *set.Set // Set of block signatures known to be known by this peer - knownStatus uint64 // Set of chain status known to be known by this peer - filterAdds *set.Set // Set of addresses that the spv node cares about. + mtx sync.RWMutex + services consensus.ServiceFlag + bestHeight uint64 + bestHash *bc.Hash + irreversibleHeight uint64 + irreversibleHash *bc.Hash + knownTxs *set.Set // Set of transaction hashes known to be known by this peer + knownBlocks *set.Set // Set of block hashes known to be known by this peer + knownSignatures *set.Set // Set of block signatures known to be known by this peer + knownStatus uint64 // Set of chain status known to be known by this peer + filterAdds *set.Set // Set of addresses that the spv node cares about. } func newPeer(basePeer BasePeer) *Peer { @@ -98,7 +100,15 @@ func newPeer(basePeer BasePeer) *Peer { func (p *Peer) Height() uint64 { p.mtx.RLock() defer p.mtx.RUnlock() - return p.height + + return p.bestHeight +} + +func (p *Peer) IrreversibleHeight() uint64 { + p.mtx.RLock() + defer p.mtx.RUnlock() + + return p.irreversibleHeight } func (p *Peer) AddFilterAddress(address []byte) { @@ -140,8 +150,8 @@ func (p *Peer) GetBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool { return p.TrySend(msgs.BlockchainChannel, msg) } -func (p *Peer) GetHeaders(locator []*bc.Hash, stopHash *bc.Hash) bool { - msg := struct{ msgs.BlockchainMessage }{msgs.NewGetHeadersMessage(locator, stopHash)} +func (p *Peer) GetHeaders(locator []*bc.Hash, stopHash *bc.Hash, skip uint64) bool { + msg := struct{ msgs.BlockchainMessage }{msgs.NewGetHeadersMessage(locator, stopHash, skip)} return p.TrySend(msgs.BlockchainChannel, msg) } @@ -158,7 +168,7 @@ func (p *Peer) GetPeerInfo() *PeerInfo { return &PeerInfo{ ID: p.ID(), RemoteAddr: p.Addr().String(), - Height: p.height, + Height: p.bestHeight, Ping: ping.String(), Duration: sentStatus.Duration.String(), TotalSent: sentStatus.Bytes, @@ -360,20 +370,29 @@ func (p *Peer) SendTransactions(txs []*types.Tx) error { return nil } -func (p *Peer) SendStatus(header *types.BlockHeader) error { - msg := msgs.NewStatusMessage(header) +func (p *Peer) SendStatus(bestHeader, irreversibleHeader *types.BlockHeader) error { + msg := msgs.NewStatusMessage(bestHeader, irreversibleHeader) if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok { return errSendStatusMsg } - p.markNewStatus(header.Height) + p.markNewStatus(bestHeader.Height) return nil } -func (p *Peer) SetStatus(height uint64, hash *bc.Hash) { +func (p *Peer) SetBestStatus(bestHeight uint64, bestHash *bc.Hash) { p.mtx.Lock() defer p.mtx.Unlock() - p.height = height - p.hash = hash + + p.bestHeight = bestHeight + p.bestHash = bestHash +} + +func (p *Peer) SetIrreversibleStatus(irreversibleHeight uint64, irreversibleHash *bc.Hash) { + p.mtx.Lock() + defer p.mtx.Unlock() + + p.irreversibleHeight = irreversibleHeight + p.irreversibleHash = irreversibleHash } type PeerSet struct { @@ -424,7 +443,23 @@ func (ps *PeerSet) BestPeer(flag consensus.ServiceFlag) *Peer { if !p.services.IsEnable(flag) { continue } - if bestPeer == nil || p.height > bestPeer.height || (p.height == bestPeer.height && p.IsLAN()) { + if bestPeer == nil || p.bestHeight > bestPeer.bestHeight || (p.bestHeight == bestPeer.bestHeight && p.IsLAN()) { + bestPeer = p + } + } + return bestPeer +} + +func (ps *PeerSet) BestIrreversiblePeer(flag consensus.ServiceFlag) *Peer { + ps.mtx.RLock() + defer ps.mtx.RUnlock() + + var bestPeer *Peer + for _, p := range ps.peers { + if !p.services.IsEnable(flag) { + continue + } + if bestPeer == nil || p.irreversibleHeight > bestPeer.irreversibleHeight || (p.irreversibleHeight == bestPeer.irreversibleHeight && p.IsLAN()) { bestPeer = p } } @@ -466,16 +501,16 @@ func (ps *PeerSet) BroadcastMsg(bm BroadcastMsg) error { return nil } -func (ps *PeerSet) BroadcastNewStatus(bestBlock *types.Block) error { - msg := msgs.NewStatusMessage(&bestBlock.BlockHeader) - peers := ps.peersWithoutNewStatus(bestBlock.Height) +func (ps *PeerSet) BroadcastNewStatus(bestHeader, irreversibleHeader *types.BlockHeader) error { + msg := msgs.NewStatusMessage(bestHeader, irreversibleHeader) + peers := ps.peersWithoutNewStatus(bestHeader.Height) for _, peer := range peers { if ok := peer.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok { ps.RemovePeer(peer.ID()) continue } - peer.markNewStatus(bestBlock.Height) + peer.markNewStatus(bestHeader.Height) } return nil } @@ -619,5 +654,5 @@ func (ps *PeerSet) SetStatus(peerID string, height uint64, hash *bc.Hash) { return } - peer.SetStatus(height, hash) + peer.SetBestStatus(height, hash) } diff --git a/protocol/protocol.go b/protocol/protocol.go index 98b285b0..cb31fe39 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -5,12 +5,12 @@ import ( log "github.com/sirupsen/logrus" + "github.com/vapor/common" "github.com/vapor/config" "github.com/vapor/event" "github.com/vapor/protocol/bc" "github.com/vapor/protocol/bc/types" "github.com/vapor/protocol/state" - "github.com/vapor/common" ) const maxProcessBlockChSize = 1024 @@ -112,7 +112,11 @@ func (c *Chain) BestBlockHash() *bc.Hash { return &c.bestNode.Hash } -// BestBlockHeader returns the chain tail block +// BestIrreversibleHeader returns the chain best irreversible block +func (c *Chain) BestIrreversibleHeader() *types.BlockHeader { + return c.bestIrreversibleNode.BlockHeader() +} + func (c *Chain) BestBlockHeader() *types.BlockHeader { node := c.index.BestNode() return node.BlockHeader() diff --git a/test/mock/chain.go b/test/mock/chain.go index 0b93ce9e..2b85ff03 100644 --- a/test/mock/chain.go +++ b/test/mock/chain.go @@ -37,6 +37,10 @@ func (c *Chain) BestBlockHeight() uint64 { return c.bestBlockHeader.Height } +func (c *Chain) BestIrreversibleHeader() *types.BlockHeader { + return c.bestBlockHeader +} + func (c *Chain) CalcNextSeed(hash *bc.Hash) (*bc.Hash, error) { return &bc.Hash{V0: hash.V1, V1: hash.V2, V2: hash.V3, V3: hash.V0}, nil } -- 2.11.0