OSDN Git Service

Fix block keeper peer nil bug
authorYahtoo Ma <yahtoo.ma@gmail.com>
Sat, 21 Apr 2018 14:08:15 +0000 (22:08 +0800)
committerYahtoo Ma <yahtoo.ma@gmail.com>
Sun, 22 Apr 2018 13:30:36 +0000 (21:30 +0800)
netsync/block_keeper.go
netsync/fetcher.go
netsync/handle.go
netsync/peer.go
netsync/protocol_reactor.go
p2p/switch.go

index c27becb..b851442 100644 (file)
@@ -29,6 +29,7 @@ var (
        errGetBlockByHash  = errors.New("Get block by hash error")
        errBroadcastStatus = errors.New("Broadcast new status block error")
        errReqBlock        = errors.New("Request block error")
+       errPeerNotRegister = errors.New("peer is not registered")
 )
 
 //TODO: add retry mechanism
@@ -74,7 +75,12 @@ func (bk *blockKeeper) BlockRequestWorker(peerID string, maxPeerHeight uint64) e
        orphanNum := uint64(0)
        reqNum := uint64(0)
        isOrphan := false
-       peer := bk.peers.Peer(peerID)
+       bkPeer, ok := bk.peers.Peer(peerID)
+       if !ok {
+               log.Info("peer is not registered")
+               return errPeerNotRegister
+       }
+       swPeer := bkPeer.getPeer()
        for num <= maxPeerHeight && num > 0 {
                if isOrphan {
                        reqNum = orphanNum
@@ -84,22 +90,25 @@ func (bk *blockKeeper) BlockRequestWorker(peerID string, maxPeerHeight uint64) e
                block, err := bk.BlockRequest(peerID, reqNum)
                if errors.Root(err) == errPeerDropped || errors.Root(err) == errGetBlockTimeout || errors.Root(err) == errReqBlock {
                        log.WithField("Peer abnormality. PeerID: ", peerID).Info(err)
-                       if peer == nil {
+                       if bkPeer == nil {
                                log.Info("peer is not registered")
                                break
                        }
-                       log.Info("Peer communication abnormality")
-                       if ban := peer.addBanScore(0, 50, "block request error"); ban {
-                               bk.sw.AddBannedPeer(peer.getPeer())
+                       if ban := bkPeer.addBanScore(0, 50, "block request error"); ban {
+                               bk.sw.AddBannedPeer(swPeer)
                        }
-                       bk.sw.StopPeerGracefully(peer.getPeer())
+                       bk.sw.StopPeerGracefully(swPeer)
                        break
                }
                isOrphan, err = bk.chain.ProcessBlock(block)
                if err != nil {
-                       if ban := peer.addBanScore(50, 0, "block process error"); ban {
-                               bk.sw.AddBannedPeer(peer.getPeer())
-                               bk.sw.StopPeerGracefully(peer.getPeer())
+                       if bkPeer == nil {
+                               log.Info("peer is deleted")
+                               break
+                       }
+                       if ban := bkPeer.addBanScore(50, 0, "block process error"); ban {
+                               bk.sw.AddBannedPeer(swPeer)
+                               bk.sw.StopPeerGracefully(swPeer)
                        }
                        log.WithField("hash:", block.Hash()).Errorf("blockKeeper fail process block %v ", err)
                        break
@@ -127,10 +136,13 @@ func (bk *blockKeeper) BlockRequestWorker(peerID string, maxPeerHeight uint64) e
                        return errBroadcastStatus
                }
                for _, peer := range peers {
+                       if peer == nil {
+                               return errPeerNotRegister
+                       }
+                       swPeer := peer.getPeer()
                        if ban := peer.addBanScore(0, 50, "Broadcast block error"); ban {
-                               peer := bk.peers.Peer(peer.id).getPeer()
-                               bk.sw.AddBannedPeer(peer)
-                               bk.sw.StopPeerGracefully(peer)
+                               bk.sw.AddBannedPeer(swPeer)
+                               bk.sw.StopPeerGracefully(swPeer)
                        }
                }
        }
@@ -182,13 +194,15 @@ func (bk *blockKeeper) BlockRequest(peerID string, height uint64) (*types.Block,
 func (bk *blockKeeper) txsProcessWorker() {
        for txsResponse := range bk.txsProcessCh {
                tx := txsResponse.tx
-               peer:=bk.peers.Peer(txsResponse.peerID)
                log.Info("Receive new tx from remote peer. TxID:", tx.ID.String())
                bk.peers.MarkTransaction(txsResponse.peerID, &tx.ID)
                if isOrphan, err := bk.chain.ValidateTx(tx); err != nil && isOrphan == false {
-                       if ban := peer.addBanScore(50, 0, "tx error"); ban {
-                               bk.sw.AddBannedPeer(peer.getPeer())
-                               bk.sw.StopPeerGracefully(peer.getPeer())
+                       if bkPeer, ok := bk.peers.Peer(txsResponse.peerID); ok {
+                               swPeer := bkPeer.getPeer()
+                               if ban := bkPeer.addBanScore(50, 0, "tx error"); ban {
+                                       bk.sw.AddBannedPeer(swPeer)
+                                       bk.sw.StopPeerGracefully(swPeer)
+                               }
                        }
                }
        }
index 3375cd1..9b444b0 100644 (file)
@@ -149,10 +149,14 @@ func (f *Fetcher) insert(peerID string, block *types.Block) {
        // Run the actual import and log any issues
        if _, err := f.chain.ProcessBlock(block); err != nil {
                log.Info("Propagated block import failed", " from peer: ", peerID, " height: ", block.Height, "err: ", err)
-               peer := f.peers.Peer(peerID)
-               if ban := peer.addBanScore(50, 0, "block process error"); ban {
-                       f.sw.AddBannedPeer(peer.getPeer())
-                       f.sw.StopPeerGracefully(peer.getPeer())
+               fPeer, ok := f.peers.Peer(peerID)
+               if !ok {
+                       return
+               }
+               swPeer := fPeer.getPeer()
+               if ban := fPeer.addBanScore(50, 0, "block process error"); ban {
+                       f.sw.AddBannedPeer(swPeer)
+                       f.sw.StopPeerGracefully(swPeer)
                }
                return
        }
@@ -163,11 +167,14 @@ func (f *Fetcher) insert(peerID string, block *types.Block) {
                log.Errorf("Broadcast mine block error. %v", err)
                return
        }
-       for _, peer := range peers {
-               if ban := peer.addBanScore(0, 50, "Broadcast block error"); ban {
-                       peer := f.peers.Peer(peer.id).getPeer()
-                       f.sw.AddBannedPeer(peer)
-                       f.sw.StopPeerGracefully(peer)
+       for _, fPeer := range peers {
+               if fPeer == nil {
+                       continue
+               }
+               swPeer := fPeer.getPeer()
+               if ban := fPeer.addBanScore(0, 50, "Broadcast block error"); ban {
+                       f.sw.AddBannedPeer(swPeer)
+                       f.sw.StopPeerGracefully(swPeer)
                }
        }
 }
index 177fe8b..dcb64d8 100644 (file)
@@ -168,11 +168,14 @@ func (sm *SyncManager) txBroadcastLoop() {
                                log.Errorf("Broadcast new tx error. %v", err)
                                return
                        }
-                       for _, peer := range peers {
-                               if ban := peer.addBanScore(0, 50, "Broadcast new tx error"); ban {
-                                       peer := sm.peers.Peer(peer.id).getPeer()
-                                       sm.sw.AddBannedPeer(peer)
-                                       sm.sw.StopPeerGracefully(peer)
+                       for _, smPeer := range peers {
+                               if smPeer == nil {
+                                       continue
+                               }
+                               swPeer := smPeer.getPeer()
+                               if ban := smPeer.addBanScore(0, 50, "Broadcast new tx error"); ban {
+                                       sm.sw.AddBannedPeer(swPeer)
+                                       sm.sw.StopPeerGracefully(swPeer)
                                }
                        }
                case <-sm.quitSync:
@@ -195,11 +198,14 @@ func (sm *SyncManager) minedBroadcastLoop() {
                                log.Errorf("Broadcast mine block error. %v", err)
                                return
                        }
-                       for _, peer := range peers {
-                               if ban := peer.addBanScore(0, 50, "Broadcast block error"); ban {
-                                       peer := sm.peers.Peer(peer.id).getPeer()
-                                       sm.sw.AddBannedPeer(peer)
-                                       sm.sw.StopPeerGracefully(peer)
+                       for _, smPeer := range peers {
+                               if smPeer == nil {
+                                       continue
+                               }
+                               swPeer := smPeer.getPeer()
+                               if ban := smPeer.addBanScore(0, 50, "Broadcast block error"); ban {
+                                       sm.sw.AddBannedPeer(swPeer)
+                                       sm.sw.StopPeerGracefully(swPeer)
                                }
                        }
                case <-sm.quitSync:
index f638250..ef64ef8 100644 (file)
@@ -84,6 +84,9 @@ func (p *peer) SendTransactions(txs []*types.Tx) error {
                }
                hash := &tx.ID
                p.knownTxs.Add(hash.String())
+               if p.swPeer == nil {
+                       return errPeerDropped
+               }
                p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
        }
        return nil
@@ -192,11 +195,11 @@ func (ps *peerSet) Unregister(id string) error {
 }
 
 // Peer retrieves the registered peer with the given id.
-func (ps *peerSet) Peer(id string) *peer {
+func (ps *peerSet) Peer(id string) (*peer, bool) {
        ps.lock.RLock()
        defer ps.lock.RUnlock()
-
-       return ps.peers[id]
+       p, ok := ps.peers[id]
+       return p, ok
 }
 
 // Len returns if the current number of peers in the set.
@@ -319,10 +322,7 @@ func (ps *peerSet) SetPeerStatus(peerID string, height uint64, hash *bc.Hash) {
 }
 
 func (ps *peerSet) requestBlockByHash(peerID string, hash *bc.Hash) error {
-       ps.lock.Lock()
-       defer ps.lock.Unlock()
-
-       peer, ok := ps.peers[peerID]
+       peer, ok := ps.Peer(peerID)
        if !ok {
                return errors.New("Can't find peer. ")
        }
@@ -330,10 +330,7 @@ func (ps *peerSet) requestBlockByHash(peerID string, hash *bc.Hash) error {
 }
 
 func (ps *peerSet) requestBlockByHeight(peerID string, height uint64) error {
-       ps.lock.Lock()
-       defer ps.lock.Unlock()
-
-       peer, ok := ps.peers[peerID]
+       peer, ok := ps.Peer(peerID)
        if !ok {
                return errors.New("Can't find peer. ")
        }
@@ -353,7 +350,7 @@ func (ps *peerSet) BroadcastMinedBlock(block *types.Block) ([]*peer, error) {
                        abnormalPeers = append(abnormalPeers, peer)
                        continue
                }
-               if p, ok := ps.peers[peer.id]; ok {
+               if p, ok := ps.Peer(peer.id); ok {
                        p.MarkBlock(&hash)
                }
        }
@@ -376,7 +373,7 @@ func (ps *peerSet) BroadcastTx(tx *types.Tx) ([]*peer, error) {
                        abnormalPeers = append(abnormalPeers, peer)
                        continue
                }
-               if p, ok := ps.peers[peer.id]; ok {
+               if p, ok := ps.Peer(peer.id); ok {
                        p.MarkTransaction(&tx.ID)
                }
        }
index a07802c..c31d8c4 100644 (file)
@@ -108,6 +108,9 @@ func (pr *ProtocolReactor) OnStop() {
 
 // syncTransactions starts sending all currently pending transactions to the given peer.
 func (pr *ProtocolReactor) syncTransactions(p *peer) {
+       if p == nil {
+               return
+       }
        pending := pr.txPool.GetTransactions()
        if len(pending) == 0 {
                return
@@ -123,7 +126,9 @@ func (pr *ProtocolReactor) syncTransactions(p *peer) {
 func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error {
        pr.handshakeMu.Lock()
        defer pr.handshakeMu.Unlock()
-
+       if peer == nil {
+               return errPeerDropped
+       }
        if ok := peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}}); !ok {
                return ErrStatusRequest
        }
@@ -139,11 +144,18 @@ func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error {
                                }
                                pr.peers.AddPeer(peer)
                                pr.peers.SetPeerStatus(status.peerID, status.height, status.hash)
-                               pr.syncTransactions(pr.peers.Peer(peer.Key))
+                               prPeer, ok := pr.peers.Peer(peer.Key)
+                               if !ok {
+                                       return errPeerDropped
+                               }
+                               pr.syncTransactions(prPeer)
                                pr.newPeerCh <- struct{}{}
                                return nil
                        }
                case <-retryTicker:
+                       if peer == nil {
+                               return errPeerDropped
+                       }
                        if ok := peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}}); !ok {
                                return ErrStatusRequest
                        }
@@ -155,7 +167,11 @@ func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error {
 
 // RemovePeer implements Reactor by removing peer from the pool.
 func (pr *ProtocolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
-       pr.quitReqBlockCh <- &peer.Key
+       select {
+       case pr.quitReqBlockCh <- &peer.Key:
+       default:
+               log.Warning("quitReqBlockCh is full")
+       }
        pr.peers.RemovePeer(peer.Key)
 }
 
index f009929..02e970f 100644 (file)
@@ -660,7 +660,9 @@ func (sw *Switch) addPeerWithConnectionAndConfig(conn net.Conn, config *PeerConf
 func (sw *Switch) AddBannedPeer(peer *Peer) error {
        sw.mtx.Lock()
        defer sw.mtx.Unlock()
-
+       if peer == nil {
+               return nil
+       }
        key := peer.mconn.RemoteAddress.IP.String()
        sw.bannedPeer[key] = time.Now().Add(defaultBanDuration)
        datajson, err := json.Marshal(sw.bannedPeer)