OSDN Git Service

Merge branch 'dev' into docker
[bytom/bytom.git] / netsync / peer.go
index 84391e2..c48e5ee 100644 (file)
@@ -41,9 +41,10 @@ type peer struct {
 func newPeer(height uint64, hash *bc.Hash, Peer *p2p.Peer) *peer {
        return &peer{
                version:     defaultVersion,
+               id:          Peer.Key,
                height:      height,
                hash:        hash,
-               swPeer:        Peer,
+               swPeer:      Peer,
                knownTxs:    set.New(),
                knownBlocks: set.New(),
        }
@@ -83,6 +84,9 @@ func (p *peer) SendTransactions(txs []*types.Tx) error {
                }
                hash := &tx.ID
                p.knownTxs.Add(hash.String())
+               if p.swPeer == nil {
+                       return errPeerDropped
+               }
                p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
        }
        return nil
@@ -133,15 +137,15 @@ func (p *peer) addBanScore(persistent, transient uint64, reason string) bool {
                // logged if the score is above the warn threshold.
                score := p.banScore.Int()
                if score > warnThreshold {
-                       log.Info("Misbehaving peer %s: %s -- ban score is %d, "+"it was not increased this time", p, reason, score)
+                       log.Infof("Misbehaving peer %s: %s -- ban score is %d, "+"it was not increased this time", p.id, reason, score)
                }
                return false
        }
        score := p.banScore.Increase(persistent, transient)
        if score > warnThreshold {
-               log.Infof("Misbehaving peer %s: %s -- ban score increased to %d", p, reason, score)
+               log.Infof("Misbehaving peer %s: %s -- ban score increased to %d", p.id, reason, score)
                if score > defaultBanThreshold {
-                       log.Errorf("Misbehaving peer %s -- banning and disconnecting", p)
+                       log.Errorf("Misbehaving peer %s -- banning and disconnecting", p.id)
                        return true
                }
        }
@@ -191,11 +195,11 @@ func (ps *peerSet) Unregister(id string) error {
 }
 
 // Peer retrieves the registered peer with the given id.
-func (ps *peerSet) Peer(id string) *peer {
+func (ps *peerSet) Peer(id string) (*peer, bool) {
        ps.lock.RLock()
        defer ps.lock.RUnlock()
-
-       return ps.peers[id]
+       p, ok := ps.peers[id]
+       return p, ok
 }
 
 // Len returns if the current number of peers in the set.
@@ -318,10 +322,7 @@ func (ps *peerSet) SetPeerStatus(peerID string, height uint64, hash *bc.Hash) {
 }
 
 func (ps *peerSet) requestBlockByHash(peerID string, hash *bc.Hash) error {
-       ps.lock.Lock()
-       defer ps.lock.Unlock()
-
-       peer, ok := ps.peers[peerID]
+       peer, ok := ps.Peer(peerID)
        if !ok {
                return errors.New("Can't find peer. ")
        }
@@ -329,43 +330,52 @@ func (ps *peerSet) requestBlockByHash(peerID string, hash *bc.Hash) error {
 }
 
 func (ps *peerSet) requestBlockByHeight(peerID string, height uint64) error {
-       ps.lock.Lock()
-       defer ps.lock.Unlock()
-
-       peer, ok := ps.peers[peerID]
+       peer, ok := ps.Peer(peerID)
        if !ok {
                return errors.New("Can't find peer. ")
        }
        return peer.requestBlockByHeight(height)
 }
 
-func (ps *peerSet) BroadcastMinedBlock(block *types.Block) error {
+func (ps *peerSet) BroadcastMinedBlock(block *types.Block) ([]*peer, error) {
        msg, err := NewMinedBlockMessage(block)
        if err != nil {
-               return errors.New("Failed construction block msg")
+               return nil, errors.New("Failed construction block msg")
        }
        hash := block.Hash()
        peers := ps.PeersWithoutBlock(&hash)
+       abnormalPeers := make([]*peer, 0)
        for _, peer := range peers {
-               ps.MarkBlock(peer.swPeer.Key, &hash)
-               peer.swPeer.Send(BlockchainChannel, struct{ BlockchainMessage }{msg})
+               if ok := peer.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+                       abnormalPeers = append(abnormalPeers, peer)
+                       continue
+               }
+               if p, ok := ps.Peer(peer.id); ok {
+                       p.MarkBlock(&hash)
+               }
        }
-       return nil
+       return abnormalPeers, nil
 }
 
-func (ps *peerSet) BroadcastNewStatus(block *types.Block) error {
+func (ps *peerSet) BroadcastNewStatus(block *types.Block) ([]*peer, error) {
        return ps.BroadcastMinedBlock(block)
 }
 
-func (ps *peerSet) BroadcastTx(tx *types.Tx) error {
+func (ps *peerSet) BroadcastTx(tx *types.Tx) ([]*peer, error) {
        msg, err := NewTransactionNotifyMessage(tx)
        if err != nil {
-               return errors.New("Failed construction tx msg")
+               return nil, errors.New("Failed construction tx msg")
        }
        peers := ps.PeersWithoutTx(&tx.ID)
+       abnormalPeers := make([]*peer, 0)
        for _, peer := range peers {
-               ps.peers[peer.swPeer.Key].MarkTransaction(&tx.ID)
-               peer.swPeer.Send(BlockchainChannel, struct{ BlockchainMessage }{msg})
+               if ok := peer.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+                       abnormalPeers = append(abnormalPeers, peer)
+                       continue
+               }
+               if p, ok := ps.Peer(peer.id); ok {
+                       p.MarkTransaction(&tx.ID)
+               }
        }
-       return nil
+       return abnormalPeers, nil
 }