func newPeer(height uint64, hash *bc.Hash, Peer *p2p.Peer) *peer {
return &peer{
version: defaultVersion,
+ id: Peer.Key,
height: height,
hash: hash,
- swPeer: Peer,
+ swPeer: Peer,
knownTxs: set.New(),
knownBlocks: set.New(),
}
}
hash := &tx.ID
p.knownTxs.Add(hash.String())
+ if p.swPeer == nil {
+ return errPeerDropped
+ }
p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
}
return nil
// logged if the score is above the warn threshold.
score := p.banScore.Int()
if score > warnThreshold {
- log.Info("Misbehaving peer %s: %s -- ban score is %d, "+"it was not increased this time", p, reason, score)
+ log.Infof("Misbehaving peer %s: %s -- ban score is %d, "+"it was not increased this time", p.id, reason, score)
}
return false
}
score := p.banScore.Increase(persistent, transient)
if score > warnThreshold {
- log.Infof("Misbehaving peer %s: %s -- ban score increased to %d", p, reason, score)
+ log.Infof("Misbehaving peer %s: %s -- ban score increased to %d", p.id, reason, score)
if score > defaultBanThreshold {
- log.Errorf("Misbehaving peer %s -- banning and disconnecting", p)
+ log.Errorf("Misbehaving peer %s -- banning and disconnecting", p.id)
return true
}
}
}
// Peer retrieves the registered peer with the given id.
-func (ps *peerSet) Peer(id string) *peer {
+func (ps *peerSet) Peer(id string) (*peer, bool) {
ps.lock.RLock()
defer ps.lock.RUnlock()
-
- return ps.peers[id]
+ p, ok := ps.peers[id]
+ return p, ok
}
// Len returns if the current number of peers in the set.
}
func (ps *peerSet) requestBlockByHash(peerID string, hash *bc.Hash) error {
- ps.lock.Lock()
- defer ps.lock.Unlock()
-
- peer, ok := ps.peers[peerID]
+ peer, ok := ps.Peer(peerID)
if !ok {
return errors.New("Can't find peer. ")
}
}
func (ps *peerSet) requestBlockByHeight(peerID string, height uint64) error {
- ps.lock.Lock()
- defer ps.lock.Unlock()
-
- peer, ok := ps.peers[peerID]
+ peer, ok := ps.Peer(peerID)
if !ok {
return errors.New("Can't find peer. ")
}
return peer.requestBlockByHeight(height)
}
-func (ps *peerSet) BroadcastMinedBlock(block *types.Block) error {
+func (ps *peerSet) BroadcastMinedBlock(block *types.Block) ([]*peer, error) {
msg, err := NewMinedBlockMessage(block)
if err != nil {
- return errors.New("Failed construction block msg")
+ return nil, errors.New("Failed construction block msg")
}
hash := block.Hash()
peers := ps.PeersWithoutBlock(&hash)
+ abnormalPeers := make([]*peer, 0)
for _, peer := range peers {
- ps.MarkBlock(peer.swPeer.Key, &hash)
- peer.swPeer.Send(BlockchainChannel, struct{ BlockchainMessage }{msg})
+ if ok := peer.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+ abnormalPeers = append(abnormalPeers, peer)
+ continue
+ }
+ if p, ok := ps.Peer(peer.id); ok {
+ p.MarkBlock(&hash)
+ }
}
- return nil
+ return abnormalPeers, nil
}
-func (ps *peerSet) BroadcastNewStatus(block *types.Block) error {
+func (ps *peerSet) BroadcastNewStatus(block *types.Block) ([]*peer, error) {
return ps.BroadcastMinedBlock(block)
}
-func (ps *peerSet) BroadcastTx(tx *types.Tx) error {
+func (ps *peerSet) BroadcastTx(tx *types.Tx) ([]*peer, error) {
msg, err := NewTransactionNotifyMessage(tx)
if err != nil {
- return errors.New("Failed construction tx msg")
+ return nil, errors.New("Failed construction tx msg")
}
peers := ps.PeersWithoutTx(&tx.ID)
+ abnormalPeers := make([]*peer, 0)
for _, peer := range peers {
- ps.peers[peer.swPeer.Key].MarkTransaction(&tx.ID)
- peer.swPeer.Send(BlockchainChannel, struct{ BlockchainMessage }{msg})
+ if ok := peer.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+ abnormalPeers = append(abnormalPeers, peer)
+ continue
+ }
+ if p, ok := ps.Peer(peer.id); ok {
+ p.MarkTransaction(&tx.ID)
+ }
}
- return nil
+ return abnormalPeers, nil
}