From 0d7621dbacbf1672c9aa6df893e0d7b0c629df54 Mon Sep 17 00:00:00 2001 From: Paladz Date: Thu, 19 Mar 2020 20:28:13 +0800 Subject: [PATCH] small fix (#517) * small fix * prevent timeout * edit for golint Co-authored-by: paladz <453256728@qq.com> --- netsync/chainmgr/block_keeper.go | 13 ++++++------- netsync/chainmgr/block_keeper_test.go | 3 ++- netsync/chainmgr/block_process.go | 8 ++------ netsync/chainmgr/fast_sync.go | 4 ++-- netsync/chainmgr/handle.go | 13 ++++++++++++- netsync/chainmgr/msg_fetcher.go | 7 ++++--- netsync/chainmgr/storage.go | 8 +------- protocol/bbft.go | 2 +- 8 files changed, 30 insertions(+), 28 deletions(-) diff --git a/netsync/chainmgr/block_keeper.go b/netsync/chainmgr/block_keeper.go index 5d0522d9..9cdfe1b0 100644 --- a/netsync/chainmgr/block_keeper.go +++ b/netsync/chainmgr/block_keeper.go @@ -27,11 +27,7 @@ var ( maxNumOfBlocksRegularSync = uint64(128) ) -type FastSync interface { - process() error - setSyncPeer(peer *peers.Peer) -} - +// Fetcher is the interface for fetch struct type Fetcher interface { processBlock(peerID string, block *types.Block) processBlocks(peerID string, blocks []*types.Block) @@ -56,7 +52,7 @@ type headersMsg struct { type blockKeeper struct { chain Chain - fastSync FastSync + fastSync *fastSync msgFetcher Fetcher peers *peers.PeerSet syncPeer *peers.Peer @@ -76,7 +72,7 @@ func newBlockKeeper(chain Chain, peers *peers.PeerSet, fastSyncDB dbm.DB) *block } } -func (bk *blockKeeper) locateBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) { +func (bk *blockKeeper) locateBlocks(locator []*bc.Hash, stopHash *bc.Hash, isTimeout func() bool) ([]*types.Block, error) { headers, err := bk.locateHeaders(locator, stopHash, 0, maxNumOfBlocksPerMsg) if err != nil { return nil, err @@ -91,6 +87,9 @@ func (bk *blockKeeper) locateBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*t } blocks = append(blocks, block) + if isTimeout() { + break + } } return blocks, nil } diff --git a/netsync/chainmgr/block_keeper_test.go b/netsync/chainmgr/block_keeper_test.go index e34e48bd..52b206c7 100644 --- a/netsync/chainmgr/block_keeper_test.go +++ b/netsync/chainmgr/block_keeper_test.go @@ -448,7 +448,8 @@ func TestLocateBlocks(t *testing.T) { want = append(want, blocks[i]) } - got, err := bk.locateBlocks(locator, &c.stopHash) + mockTimeout := func() bool { return false } + got, err := bk.locateBlocks(locator, &c.stopHash, mockTimeout) if err != c.wantErr { t.Errorf("case %d: got %v want err = %v", i, err, c.wantErr) } diff --git a/netsync/chainmgr/block_process.go b/netsync/chainmgr/block_process.go index d1e45a65..50a110b6 100644 --- a/netsync/chainmgr/block_process.go +++ b/netsync/chainmgr/block_process.go @@ -12,17 +12,13 @@ import ( var errOrphanBlock = errors.New("fast sync inserting orphan block") -type BlockProcessor interface { - process(chan struct{}, chan struct{}, uint64, *sync.WaitGroup) -} - type blockProcessor struct { chain Chain - storage Storage + storage *storage peers *peers.PeerSet } -func newBlockProcessor(chain Chain, storage Storage, peers *peers.PeerSet) *blockProcessor { +func newBlockProcessor(chain Chain, storage *storage, peers *peers.PeerSet) *blockProcessor { return &blockProcessor{ chain: chain, peers: peers, diff --git a/netsync/chainmgr/fast_sync.go b/netsync/chainmgr/fast_sync.go index 10659ce7..a8022279 100644 --- a/netsync/chainmgr/fast_sync.go +++ b/netsync/chainmgr/fast_sync.go @@ -29,12 +29,12 @@ var ( type fastSync struct { chain Chain msgFetcher MsgFetcher - blockProcessor BlockProcessor + blockProcessor *blockProcessor peers *peers.PeerSet mainSyncPeer *peers.Peer } -func newFastSync(chain Chain, msgFetcher MsgFetcher, storage Storage, peers *peers.PeerSet) *fastSync { +func newFastSync(chain Chain, msgFetcher MsgFetcher, storage *storage, peers *peers.PeerSet) *fastSync { return &fastSync{ chain: chain, msgFetcher: msgFetcher, diff --git a/netsync/chainmgr/handle.go b/netsync/chainmgr/handle.go index 0e9c6201..35d1f9a5 100644 --- a/netsync/chainmgr/handle.go +++ b/netsync/chainmgr/handle.go @@ -3,6 +3,7 @@ package chainmgr import ( "errors" "reflect" + "time" log "github.com/sirupsen/logrus" @@ -38,6 +39,7 @@ type Chain interface { ValidateTx(*types.Tx) (bool, error) } +// Switch is the interface for network layer type Switch interface { AddReactor(name string, reactor p2p.Reactor) p2p.Reactor Start() (bool, error) @@ -90,6 +92,7 @@ func NewManager(config *cfg.Config, sw Switch, chain Chain, mempool Mempool, dis return manager, nil } +// AddPeer add the network layer peer to logic layer func (m *Manager) AddPeer(peer peers.BasePeer) { m.peers.AddPeer(peer) } @@ -154,7 +157,12 @@ func (m *Manager) handleGetBlockMsg(peer *peers.Peer, msg *msgs.GetBlockMessage) } func (m *Manager) handleGetBlocksMsg(peer *peers.Peer, msg *msgs.GetBlocksMessage) { - blocks, err := m.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash()) + endTime := time.Now().Add(requireBlocksTimeout / 2) + isTimeout := func() bool { + return time.Now().After(endTime) + } + + blocks, err := m.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash(), isTimeout) if err != nil || len(blocks) == 0 { return } @@ -354,10 +362,12 @@ func (m *Manager) processMsg(basePeer peers.BasePeer, msgType byte, msg msgs.Blo } } +// RemovePeer delete peer for peer set func (m *Manager) RemovePeer(peerID string) { m.peers.RemovePeer(peerID) } +// SendStatus sent the current self status to remote peer func (m *Manager) SendStatus(peer peers.BasePeer) error { p := m.peers.GetPeer(peer.ID()) if p == nil { @@ -371,6 +381,7 @@ func (m *Manager) SendStatus(peer peers.BasePeer) error { return nil } +// Start the network logic layer func (m *Manager) Start() error { var err error m.txMsgSub, err = m.eventDispatcher.Subscribe(core.TxMsgEvent{}) diff --git a/netsync/chainmgr/msg_fetcher.go b/netsync/chainmgr/msg_fetcher.go index 457574bf..aa8d657b 100644 --- a/netsync/chainmgr/msg_fetcher.go +++ b/netsync/chainmgr/msg_fetcher.go @@ -24,7 +24,7 @@ const ( var ( requireBlockTimeout = 20 * time.Second requireHeadersTimeout = 30 * time.Second - requireBlocksTimeout = 50 * time.Second + requireBlocksTimeout = 90 * time.Second checkSyncPeerNumInterval = 5 * time.Second errRequestBlocksTimeout = errors.New("request blocks timeout") @@ -33,6 +33,7 @@ var ( errSendMsg = errors.New("send message error") ) +// MsgFetcher is the interface for msg fetch struct type MsgFetcher interface { resetParameter() addSyncPeer(peerID string) @@ -51,7 +52,7 @@ type fetchBlocksResult struct { } type msgFetcher struct { - storage Storage + storage *storage syncPeers *fastSyncPeers peers *peers.PeerSet blockProcessCh chan *blockMsg @@ -61,7 +62,7 @@ type msgFetcher struct { mux sync.RWMutex } -func newMsgFetcher(storage Storage, peers *peers.PeerSet) *msgFetcher { +func newMsgFetcher(storage *storage, peers *peers.PeerSet) *msgFetcher { return &msgFetcher{ storage: storage, syncPeers: newFastSyncPeers(), diff --git a/netsync/chainmgr/storage.go b/netsync/chainmgr/storage.go index ff6b758c..8cc8b12e 100644 --- a/netsync/chainmgr/storage.go +++ b/netsync/chainmgr/storage.go @@ -15,13 +15,7 @@ var ( errDBFindBlock = errors.New("can't find block from DB") ) -type Storage interface { - resetParameter() - writeBlocks(peerID string, blocks []*types.Block) error - readBlock(height uint64) (*blockStorage, error) - deleteBlock(height uint64) -} - +// LocalStore is the interface for persistent storage type LocalStore interface { writeBlock(block *types.Block) error readBlock(height uint64) (*types.Block, error) diff --git a/protocol/bbft.go b/protocol/bbft.go index 382f5ad5..3b3c317a 100644 --- a/protocol/bbft.go +++ b/protocol/bbft.go @@ -234,7 +234,7 @@ func (c *Chain) signBlockHeader(blockHeader *types.BlockHeader) ([]byte, error) node, err := c.getConsensusNode(&blockHeader.PreviousBlockHash, xpub.String()) blockHash := blockHeader.Hash() if err == errNotFoundConsensusNode { - log.WithFields(log.Fields{"module": logModule, "blockHash": blockHash.String()}).Warn("can't find consensus node of current node") + log.WithFields(log.Fields{"module": logModule, "blockHash": blockHash.String()}).Debug("can't find consensus node of current node") return nil, nil } else if err != nil { return nil, err -- 2.11.0