errGetBlockByHash = errors.New("Get block by hash error")
errBroadcastStatus = errors.New("Broadcast new status block error")
errReqBlock = errors.New("Request block error")
+ errPeerNotRegister = errors.New("peer is not registered")
)
//TODO: add retry mechanism
orphanNum := uint64(0)
reqNum := uint64(0)
isOrphan := false
- peer := bk.peers.Peer(peerID)
+ bkPeer, ok := bk.peers.Peer(peerID)
+ if !ok {
+ log.Info("peer is not registered")
+ return errPeerNotRegister
+ }
+ swPeer := bkPeer.getPeer()
for num <= maxPeerHeight && num > 0 {
if isOrphan {
reqNum = orphanNum
block, err := bk.BlockRequest(peerID, reqNum)
if errors.Root(err) == errPeerDropped || errors.Root(err) == errGetBlockTimeout || errors.Root(err) == errReqBlock {
log.WithField("Peer abnormality. PeerID: ", peerID).Info(err)
- if peer == nil {
+ if bkPeer == nil {
log.Info("peer is not registered")
break
}
- log.Info("Peer communication abnormality")
- if ban := peer.addBanScore(0, 50, "block request error"); ban {
- bk.sw.AddBannedPeer(peer.getPeer())
+ if ban := bkPeer.addBanScore(0, 50, "block request error"); ban {
+ bk.sw.AddBannedPeer(swPeer)
}
- bk.sw.StopPeerGracefully(peer.getPeer())
+ bk.sw.StopPeerGracefully(swPeer)
break
}
isOrphan, err = bk.chain.ProcessBlock(block)
if err != nil {
- if ban := peer.addBanScore(50, 0, "block process error"); ban {
- bk.sw.AddBannedPeer(peer.getPeer())
- bk.sw.StopPeerGracefully(peer.getPeer())
+ if bkPeer == nil {
+ log.Info("peer is deleted")
+ break
+ }
+ if ban := bkPeer.addBanScore(50, 0, "block process error"); ban {
+ bk.sw.AddBannedPeer(swPeer)
+ bk.sw.StopPeerGracefully(swPeer)
}
log.WithField("hash:", block.Hash()).Errorf("blockKeeper fail process block %v ", err)
break
return errBroadcastStatus
}
for _, peer := range peers {
+ if peer == nil {
+ return errPeerNotRegister
+ }
+ swPeer := peer.getPeer()
if ban := peer.addBanScore(0, 50, "Broadcast block error"); ban {
- peer := bk.peers.Peer(peer.id).getPeer()
- bk.sw.AddBannedPeer(peer)
- bk.sw.StopPeerGracefully(peer)
+ bk.sw.AddBannedPeer(swPeer)
+ bk.sw.StopPeerGracefully(swPeer)
}
}
}
func (bk *blockKeeper) txsProcessWorker() {
for txsResponse := range bk.txsProcessCh {
tx := txsResponse.tx
- peer:=bk.peers.Peer(txsResponse.peerID)
log.Info("Receive new tx from remote peer. TxID:", tx.ID.String())
bk.peers.MarkTransaction(txsResponse.peerID, &tx.ID)
if isOrphan, err := bk.chain.ValidateTx(tx); err != nil && isOrphan == false {
- if ban := peer.addBanScore(50, 0, "tx error"); ban {
- bk.sw.AddBannedPeer(peer.getPeer())
- bk.sw.StopPeerGracefully(peer.getPeer())
+ if bkPeer, ok := bk.peers.Peer(txsResponse.peerID); ok {
+ swPeer := bkPeer.getPeer()
+ if ban := bkPeer.addBanScore(50, 0, "tx error"); ban {
+ bk.sw.AddBannedPeer(swPeer)
+ bk.sw.StopPeerGracefully(swPeer)
+ }
}
}
}
// Run the actual import and log any issues
if _, err := f.chain.ProcessBlock(block); err != nil {
log.Info("Propagated block import failed", " from peer: ", peerID, " height: ", block.Height, "err: ", err)
- peer := f.peers.Peer(peerID)
- if ban := peer.addBanScore(50, 0, "block process error"); ban {
- f.sw.AddBannedPeer(peer.getPeer())
- f.sw.StopPeerGracefully(peer.getPeer())
+ fPeer, ok := f.peers.Peer(peerID)
+ if !ok {
+ return
+ }
+ swPeer := fPeer.getPeer()
+ if ban := fPeer.addBanScore(50, 0, "block process error"); ban {
+ f.sw.AddBannedPeer(swPeer)
+ f.sw.StopPeerGracefully(swPeer)
}
return
}
log.Errorf("Broadcast mine block error. %v", err)
return
}
- for _, peer := range peers {
- if ban := peer.addBanScore(0, 50, "Broadcast block error"); ban {
- peer := f.peers.Peer(peer.id).getPeer()
- f.sw.AddBannedPeer(peer)
- f.sw.StopPeerGracefully(peer)
+ for _, fPeer := range peers {
+ if fPeer == nil {
+ continue
+ }
+ swPeer := fPeer.getPeer()
+ if ban := fPeer.addBanScore(0, 50, "Broadcast block error"); ban {
+ f.sw.AddBannedPeer(swPeer)
+ f.sw.StopPeerGracefully(swPeer)
}
}
}
log.Errorf("Broadcast new tx error. %v", err)
return
}
- for _, peer := range peers {
- if ban := peer.addBanScore(0, 50, "Broadcast new tx error"); ban {
- peer := sm.peers.Peer(peer.id).getPeer()
- sm.sw.AddBannedPeer(peer)
- sm.sw.StopPeerGracefully(peer)
+ for _, smPeer := range peers {
+ if smPeer == nil {
+ continue
+ }
+ swPeer := smPeer.getPeer()
+ if ban := smPeer.addBanScore(0, 50, "Broadcast new tx error"); ban {
+ sm.sw.AddBannedPeer(swPeer)
+ sm.sw.StopPeerGracefully(swPeer)
}
}
case <-sm.quitSync:
log.Errorf("Broadcast mine block error. %v", err)
return
}
- for _, peer := range peers {
- if ban := peer.addBanScore(0, 50, "Broadcast block error"); ban {
- peer := sm.peers.Peer(peer.id).getPeer()
- sm.sw.AddBannedPeer(peer)
- sm.sw.StopPeerGracefully(peer)
+ for _, smPeer := range peers {
+ if smPeer == nil {
+ continue
+ }
+ swPeer := smPeer.getPeer()
+ if ban := smPeer.addBanScore(0, 50, "Broadcast block error"); ban {
+ sm.sw.AddBannedPeer(swPeer)
+ sm.sw.StopPeerGracefully(swPeer)
}
}
case <-sm.quitSync:
}
hash := &tx.ID
p.knownTxs.Add(hash.String())
+ if p.swPeer == nil {
+ return errPeerDropped
+ }
p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
}
return nil
}
// Peer retrieves the registered peer with the given id.
-func (ps *peerSet) Peer(id string) *peer {
+func (ps *peerSet) Peer(id string) (*peer, bool) {
ps.lock.RLock()
defer ps.lock.RUnlock()
-
- return ps.peers[id]
+ p, ok := ps.peers[id]
+ return p, ok
}
// Len returns if the current number of peers in the set.
}
func (ps *peerSet) requestBlockByHash(peerID string, hash *bc.Hash) error {
- ps.lock.Lock()
- defer ps.lock.Unlock()
-
- peer, ok := ps.peers[peerID]
+ peer, ok := ps.Peer(peerID)
if !ok {
return errors.New("Can't find peer. ")
}
}
func (ps *peerSet) requestBlockByHeight(peerID string, height uint64) error {
- ps.lock.Lock()
- defer ps.lock.Unlock()
-
- peer, ok := ps.peers[peerID]
+ peer, ok := ps.Peer(peerID)
if !ok {
return errors.New("Can't find peer. ")
}
abnormalPeers = append(abnormalPeers, peer)
continue
}
- if p, ok := ps.peers[peer.id]; ok {
+ if p, ok := ps.Peer(peer.id); ok {
p.MarkBlock(&hash)
}
}
abnormalPeers = append(abnormalPeers, peer)
continue
}
- if p, ok := ps.peers[peer.id]; ok {
+ if p, ok := ps.Peer(peer.id); ok {
p.MarkTransaction(&tx.ID)
}
}
// syncTransactions starts sending all currently pending transactions to the given peer.
func (pr *ProtocolReactor) syncTransactions(p *peer) {
+ if p == nil {
+ return
+ }
pending := pr.txPool.GetTransactions()
if len(pending) == 0 {
return
func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error {
pr.handshakeMu.Lock()
defer pr.handshakeMu.Unlock()
-
+ if peer == nil {
+ return errPeerDropped
+ }
if ok := peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}}); !ok {
return ErrStatusRequest
}
}
pr.peers.AddPeer(peer)
pr.peers.SetPeerStatus(status.peerID, status.height, status.hash)
- pr.syncTransactions(pr.peers.Peer(peer.Key))
+ prPeer, ok := pr.peers.Peer(peer.Key)
+ if !ok {
+ return errPeerDropped
+ }
+ pr.syncTransactions(prPeer)
pr.newPeerCh <- struct{}{}
return nil
}
case <-retryTicker:
+ if peer == nil {
+ return errPeerDropped
+ }
if ok := peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}}); !ok {
return ErrStatusRequest
}
// RemovePeer implements Reactor by removing peer from the pool.
func (pr *ProtocolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
- pr.quitReqBlockCh <- &peer.Key
+ select {
+ case pr.quitReqBlockCh <- &peer.Key:
+ default:
+ log.Warning("quitReqBlockCh is full")
+ }
pr.peers.RemovePeer(peer.Key)
}
func (sw *Switch) AddBannedPeer(peer *Peer) error {
sw.mtx.Lock()
defer sw.mtx.Unlock()
-
+ if peer == nil {
+ return nil
+ }
key := peer.mconn.RemoteAddress.IP.String()
sw.bannedPeer[key] = time.Now().Add(defaultBanDuration)
datajson, err := json.Marshal(sw.bannedPeer)