OSDN Git Service

small fix (#517)
authorPaladz <yzhu101@uottawa.ca>
Thu, 19 Mar 2020 12:28:13 +0000 (20:28 +0800)
committerGitHub <noreply@github.com>
Thu, 19 Mar 2020 12:28:13 +0000 (20:28 +0800)
* small fix

* prevent timeout

* edit for golint

Co-authored-by: paladz <453256728@qq.com>
netsync/chainmgr/block_keeper.go
netsync/chainmgr/block_keeper_test.go
netsync/chainmgr/block_process.go
netsync/chainmgr/fast_sync.go
netsync/chainmgr/handle.go
netsync/chainmgr/msg_fetcher.go
netsync/chainmgr/storage.go
protocol/bbft.go

index 5d0522d..9cdfe1b 100644 (file)
@@ -27,11 +27,7 @@ var (
        maxNumOfBlocksRegularSync = uint64(128)
 )
 
-type FastSync interface {
-       process() error
-       setSyncPeer(peer *peers.Peer)
-}
-
+// Fetcher is the interface for fetch struct
 type Fetcher interface {
        processBlock(peerID string, block *types.Block)
        processBlocks(peerID string, blocks []*types.Block)
@@ -56,7 +52,7 @@ type headersMsg struct {
 
 type blockKeeper struct {
        chain      Chain
-       fastSync   FastSync
+       fastSync   *fastSync
        msgFetcher Fetcher
        peers      *peers.PeerSet
        syncPeer   *peers.Peer
@@ -76,7 +72,7 @@ func newBlockKeeper(chain Chain, peers *peers.PeerSet, fastSyncDB dbm.DB) *block
        }
 }
 
-func (bk *blockKeeper) locateBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
+func (bk *blockKeeper) locateBlocks(locator []*bc.Hash, stopHash *bc.Hash, isTimeout func() bool) ([]*types.Block, error) {
        headers, err := bk.locateHeaders(locator, stopHash, 0, maxNumOfBlocksPerMsg)
        if err != nil {
                return nil, err
@@ -91,6 +87,9 @@ func (bk *blockKeeper) locateBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*t
                }
 
                blocks = append(blocks, block)
+               if isTimeout() {
+                       break
+               }
        }
        return blocks, nil
 }
index e34e48b..52b206c 100644 (file)
@@ -448,7 +448,8 @@ func TestLocateBlocks(t *testing.T) {
                        want = append(want, blocks[i])
                }
 
-               got, err := bk.locateBlocks(locator, &c.stopHash)
+               mockTimeout := func() bool { return false }
+               got, err := bk.locateBlocks(locator, &c.stopHash, mockTimeout)
                if err != c.wantErr {
                        t.Errorf("case %d: got %v want err = %v", i, err, c.wantErr)
                }
index d1e45a6..50a110b 100644 (file)
@@ -12,17 +12,13 @@ import (
 
 var errOrphanBlock = errors.New("fast sync inserting orphan block")
 
-type BlockProcessor interface {
-       process(chan struct{}, chan struct{}, uint64, *sync.WaitGroup)
-}
-
 type blockProcessor struct {
        chain   Chain
-       storage Storage
+       storage *storage
        peers   *peers.PeerSet
 }
 
-func newBlockProcessor(chain Chain, storage Storage, peers *peers.PeerSet) *blockProcessor {
+func newBlockProcessor(chain Chain, storage *storage, peers *peers.PeerSet) *blockProcessor {
        return &blockProcessor{
                chain:   chain,
                peers:   peers,
index 10659ce..a802227 100644 (file)
@@ -29,12 +29,12 @@ var (
 type fastSync struct {
        chain          Chain
        msgFetcher     MsgFetcher
-       blockProcessor BlockProcessor
+       blockProcessor *blockProcessor
        peers          *peers.PeerSet
        mainSyncPeer   *peers.Peer
 }
 
-func newFastSync(chain Chain, msgFetcher MsgFetcher, storage Storage, peers *peers.PeerSet) *fastSync {
+func newFastSync(chain Chain, msgFetcher MsgFetcher, storage *storage, peers *peers.PeerSet) *fastSync {
        return &fastSync{
                chain:          chain,
                msgFetcher:     msgFetcher,
index 0e9c620..35d1f9a 100644 (file)
@@ -3,6 +3,7 @@ package chainmgr
 import (
        "errors"
        "reflect"
+       "time"
 
        log "github.com/sirupsen/logrus"
 
@@ -38,6 +39,7 @@ type Chain interface {
        ValidateTx(*types.Tx) (bool, error)
 }
 
+// Switch is the interface for network layer
 type Switch interface {
        AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
        Start() (bool, error)
@@ -90,6 +92,7 @@ func NewManager(config *cfg.Config, sw Switch, chain Chain, mempool Mempool, dis
        return manager, nil
 }
 
+// AddPeer add the network layer peer to logic layer
 func (m *Manager) AddPeer(peer peers.BasePeer) {
        m.peers.AddPeer(peer)
 }
@@ -154,7 +157,12 @@ func (m *Manager) handleGetBlockMsg(peer *peers.Peer, msg *msgs.GetBlockMessage)
 }
 
 func (m *Manager) handleGetBlocksMsg(peer *peers.Peer, msg *msgs.GetBlocksMessage) {
-       blocks, err := m.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
+       endTime := time.Now().Add(requireBlocksTimeout / 2)
+       isTimeout := func() bool {
+               return time.Now().After(endTime)
+       }
+
+       blocks, err := m.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash(), isTimeout)
        if err != nil || len(blocks) == 0 {
                return
        }
@@ -354,10 +362,12 @@ func (m *Manager) processMsg(basePeer peers.BasePeer, msgType byte, msg msgs.Blo
        }
 }
 
+// RemovePeer delete peer for peer set
 func (m *Manager) RemovePeer(peerID string) {
        m.peers.RemovePeer(peerID)
 }
 
+// SendStatus sent the current self status to remote peer
 func (m *Manager) SendStatus(peer peers.BasePeer) error {
        p := m.peers.GetPeer(peer.ID())
        if p == nil {
@@ -371,6 +381,7 @@ func (m *Manager) SendStatus(peer peers.BasePeer) error {
        return nil
 }
 
+// Start the network logic layer
 func (m *Manager) Start() error {
        var err error
        m.txMsgSub, err = m.eventDispatcher.Subscribe(core.TxMsgEvent{})
index 457574b..aa8d657 100644 (file)
@@ -24,7 +24,7 @@ const (
 var (
        requireBlockTimeout      = 20 * time.Second
        requireHeadersTimeout    = 30 * time.Second
-       requireBlocksTimeout     = 50 * time.Second
+       requireBlocksTimeout     = 90 * time.Second
        checkSyncPeerNumInterval = 5 * time.Second
 
        errRequestBlocksTimeout = errors.New("request blocks timeout")
@@ -33,6 +33,7 @@ var (
        errSendMsg              = errors.New("send message error")
 )
 
+// MsgFetcher is the interface for msg fetch struct
 type MsgFetcher interface {
        resetParameter()
        addSyncPeer(peerID string)
@@ -51,7 +52,7 @@ type fetchBlocksResult struct {
 }
 
 type msgFetcher struct {
-       storage          Storage
+       storage          *storage
        syncPeers        *fastSyncPeers
        peers            *peers.PeerSet
        blockProcessCh   chan *blockMsg
@@ -61,7 +62,7 @@ type msgFetcher struct {
        mux              sync.RWMutex
 }
 
-func newMsgFetcher(storage Storage, peers *peers.PeerSet) *msgFetcher {
+func newMsgFetcher(storage *storage, peers *peers.PeerSet) *msgFetcher {
        return &msgFetcher{
                storage:          storage,
                syncPeers:        newFastSyncPeers(),
index ff6b758..8cc8b12 100644 (file)
@@ -15,13 +15,7 @@ var (
        errDBFindBlock      = errors.New("can't find block from DB")
 )
 
-type Storage interface {
-       resetParameter()
-       writeBlocks(peerID string, blocks []*types.Block) error
-       readBlock(height uint64) (*blockStorage, error)
-       deleteBlock(height uint64)
-}
-
+// LocalStore is the interface for persistent storage
 type LocalStore interface {
        writeBlock(block *types.Block) error
        readBlock(height uint64) (*types.Block, error)
index 382f5ad..3b3c317 100644 (file)
@@ -234,7 +234,7 @@ func (c *Chain) signBlockHeader(blockHeader *types.BlockHeader) ([]byte, error)
        node, err := c.getConsensusNode(&blockHeader.PreviousBlockHash, xpub.String())
        blockHash := blockHeader.Hash()
        if err == errNotFoundConsensusNode {
-               log.WithFields(log.Fields{"module": logModule, "blockHash": blockHash.String()}).Warn("can't find consensus node of current node")
+               log.WithFields(log.Fields{"module": logModule, "blockHash": blockHash.String()}).Debug("can't find consensus node of current node")
                return nil, nil
        } else if err != nil {
                return nil, err