OSDN Git Service

Optimize p2p listener code
[bytom/bytom.git] / netsync / fetcher.go
index dd0b17f..34aa1ef 100644 (file)
@@ -6,8 +6,6 @@ import (
        log "github.com/sirupsen/logrus"
        "gopkg.in/karalabe/cookiejar.v2/collections/prque"
 
-       "strings"
-
        "github.com/bytom/p2p"
        core "github.com/bytom/protocol"
        "github.com/bytom/protocol/bc"
@@ -100,7 +98,7 @@ func (f *Fetcher) loop() {
                                f.forgetBlock(hash)
                                continue
                        }
-                       if strings.Compare(op.block.PreviousBlockHash.String(), f.chain.BestBlockHash().String()) != 0 {
+                       if op.block.PreviousBlockHash.String() != f.chain.BestBlockHash().String() {
                                f.forgetBlock(hash)
                                continue
                        }
@@ -145,17 +143,38 @@ func (f *Fetcher) enqueue(peer string, block *types.Block) {
 // insert spawns a new goroutine to run a block insertion into the chain. If the
 // block's number is at the same height as the current import phase, it updates
 // the phase states accordingly.
-func (f *Fetcher) insert(peer string, block *types.Block) {
+func (f *Fetcher) insert(peerID string, block *types.Block) {
        // Run the import on a new thread
-       log.Info("Importing propagated block", " from peer: ", peer, " height: ", block.Height)
+       log.Info("Importing propagated block", " from peer: ", peerID, " height: ", block.Height)
        // Run the actual import and log any issues
        if _, err := f.chain.ProcessBlock(block); err != nil {
-               log.Info("Propagated block import failed", " from peer: ", peer, " height: ", block.Height, "err: ", err)
+               log.Info("Propagated block import failed", " from peer: ", peerID, " height: ", block.Height, "err: ", err)
+               fPeer, ok := f.peers.Peer(peerID)
+               if !ok {
+                       return
+               }
+               swPeer := fPeer.getPeer()
+               if ban := fPeer.addBanScore(20, 0, "block process error"); ban {
+                       f.sw.AddBannedPeer(swPeer)
+                       f.sw.StopPeerGracefully(swPeer)
+               }
                return
        }
        // If import succeeded, broadcast the block
        log.Info("success process a block from new mined blocks cache. block height: ", block.Height)
-       go f.peers.BroadcastMinedBlock(block)
+       peers, err := f.peers.BroadcastMinedBlock(block)
+       if err != nil {
+               log.Errorf("Broadcast mine block error. %v", err)
+               return
+       }
+       for _, fPeer := range peers {
+               if fPeer == nil {
+                       continue
+               }
+               swPeer := fPeer.getPeer()
+               log.Info("Fetcher broadcast block error. Stop peer.")
+               f.sw.StopPeerGracefully(swPeer)
+       }
 }
 
 // forgetBlock removes all traces of a queued block from the fetcher's internal