6 log "github.com/sirupsen/logrus"
7 "gopkg.in/fatih/set.v0"
9 "github.com/bytom/errors"
10 "github.com/bytom/p2p"
11 "github.com/bytom/p2p/trust"
12 "github.com/bytom/protocol/bc"
13 "github.com/bytom/protocol/bc/types"
17 errClosed = errors.New("peer set is closed")
18 errAlreadyRegistered = errors.New("peer is already registered")
19 errNotRegistered = errors.New("peer is not registered")
24 defaultBanThreshold = uint64(100)
29 version int // Protocol version negotiated
33 banScore trust.DynamicBanScore
37 knownTxs *set.Set // Set of transaction hashes known to be known by this peer
38 knownBlocks *set.Set // Set of block hashes known to be known by this peer
41 func newPeer(height uint64, hash *bc.Hash, Peer *p2p.Peer) *peer {
43 version: defaultVersion,
49 knownBlocks: set.New(),
53 func (p *peer) GetStatus() (height uint64, hash *bc.Hash) {
56 return p.height, p.hash
59 func (p *peer) SetStatus(height uint64, hash *bc.Hash) {
67 func (p *peer) requestBlockByHash(hash *bc.Hash) error {
68 msg := &BlockRequestMessage{RawHash: hash.Byte32()}
69 p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
73 func (p *peer) requestBlockByHeight(height uint64) error {
74 msg := &BlockRequestMessage{Height: height}
75 p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
79 func (p *peer) SendTransactions(txs []*types.Tx) error {
80 for _, tx := range txs {
81 msg, err := NewTransactionNotifyMessage(tx)
83 return errors.New("Failed construction tx msg")
86 p.knownTxs.Add(hash.String())
90 p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
95 func (p *peer) getPeer() *p2p.Peer {
102 // MarkTransaction marks a transaction as known for the peer, ensuring that it
103 // will never be propagated to this particular peer.
104 func (p *peer) MarkTransaction(hash *bc.Hash) {
108 // If we reached the memory allowance, drop a previously known transaction hash
109 for p.knownTxs.Size() >= maxKnownTxs {
112 p.knownTxs.Add(hash.String())
115 // MarkBlock marks a block as known for the peer, ensuring that the block will
116 // never be propagated to this particular peer.
117 func (p *peer) MarkBlock(hash *bc.Hash) {
121 // If we reached the memory allowance, drop a previously known block hash
122 for p.knownBlocks.Size() >= maxKnownBlocks {
125 p.knownBlocks.Add(hash.String())
128 // addBanScore increases the persistent and decaying ban score fields by the
129 // values passed as parameters. If the resulting score exceeds half of the ban
130 // threshold, a warning is logged including the reason provided. Further, if
131 // the score is above the ban threshold, the peer will be banned and
133 func (p *peer) addBanScore(persistent, transient uint64, reason string) bool {
134 warnThreshold := defaultBanThreshold >> 1
135 if transient == 0 && persistent == 0 {
136 // The score is not being increased, but a warning message is still
137 // logged if the score is above the warn threshold.
138 score := p.banScore.Int()
139 if score > warnThreshold {
140 log.Infof("Misbehaving peer %s: %s -- ban score is %d, "+"it was not increased this time", p.id, reason, score)
144 score := p.banScore.Increase(persistent, transient)
145 if score > warnThreshold {
146 log.Infof("Misbehaving peer %s: %s -- ban score increased to %d", p.id, reason, score)
147 if score > defaultBanThreshold {
148 log.Errorf("Misbehaving peer %s -- banning and disconnecting", p.id)
155 type peerSet struct {
156 peers map[string]*peer
161 // newPeerSet creates a new peer set to track the active participants.
162 func newPeerSet() *peerSet {
164 peers: make(map[string]*peer),
168 // Register injects a new peer into the working set, or returns an error if the
169 // peer is already known.
170 func (ps *peerSet) Register(p *peer) error {
172 defer ps.lock.Unlock()
177 if _, ok := ps.peers[p.id]; ok {
178 return errAlreadyRegistered
184 // Unregister removes a remote peer from the active set, disabling any further
185 // actions to/from that particular entity.
186 func (ps *peerSet) Unregister(id string) error {
188 defer ps.lock.Unlock()
190 if _, ok := ps.peers[id]; !ok {
191 return errNotRegistered
197 // Peer retrieves the registered peer with the given id.
198 func (ps *peerSet) Peer(id string) (*peer, bool) {
200 defer ps.lock.RUnlock()
201 p, ok := ps.peers[id]
205 // Len returns if the current number of peers in the set.
206 func (ps *peerSet) Len() int {
208 defer ps.lock.RUnlock()
213 // MarkTransaction marks a transaction as known for the peer, ensuring that it
214 // will never be propagated to this particular peer.
215 func (ps *peerSet) MarkTransaction(peerID string, hash *bc.Hash) {
217 defer ps.lock.RUnlock()
219 if peer, ok := ps.peers[peerID]; ok {
220 peer.MarkTransaction(hash)
224 // MarkBlock marks a block as known for the peer, ensuring that the block will
225 // never be propagated to this particular peer.
226 func (ps *peerSet) MarkBlock(peerID string, hash *bc.Hash) {
228 defer ps.lock.RUnlock()
230 if peer, ok := ps.peers[peerID]; ok {
235 // PeersWithoutBlock retrieves a list of peers that do not have a given block in
236 // their set of known hashes.
237 func (ps *peerSet) PeersWithoutBlock(hash *bc.Hash) []*peer {
239 defer ps.lock.RUnlock()
241 list := make([]*peer, 0, len(ps.peers))
242 for _, p := range ps.peers {
243 if !p.knownBlocks.Has(hash.String()) {
244 list = append(list, p)
250 // PeersWithoutTx retrieves a list of peers that do not have a given transaction
251 // in their set of known hashes.
252 func (ps *peerSet) PeersWithoutTx(hash *bc.Hash) []*peer {
254 defer ps.lock.RUnlock()
256 list := make([]*peer, 0, len(ps.peers))
257 for _, p := range ps.peers {
258 if !p.knownTxs.Has(hash.String()) {
259 list = append(list, p)
265 // BestPeer retrieves the known peer with the currently highest total difficulty.
266 func (ps *peerSet) BestPeer() (*p2p.Peer, uint64) {
268 defer ps.lock.RUnlock()
270 var bestPeer *p2p.Peer
271 var bestHeight uint64
273 for _, p := range ps.peers {
274 if bestPeer == nil || p.height > bestHeight {
275 bestPeer, bestHeight = p.swPeer, p.height
279 return bestPeer, bestHeight
282 // Close disconnects all peers.
283 // No new peers can be registered after Close has returned.
284 func (ps *peerSet) Close() {
286 defer ps.lock.Unlock()
288 for _, p := range ps.peers {
294 func (ps *peerSet) AddPeer(peer *p2p.Peer) {
296 defer ps.lock.Unlock()
298 if _, ok := ps.peers[peer.Key]; !ok {
299 keeperPeer := newPeer(0, nil, peer)
300 ps.peers[peer.Key] = keeperPeer
301 log.WithFields(log.Fields{"ID": peer.Key}).Info("Add new peer to blockKeeper")
304 log.WithField("ID", peer.Key).Warning("Add existing peer to blockKeeper")
307 func (ps *peerSet) RemovePeer(peerID string) {
309 defer ps.lock.Unlock()
311 delete(ps.peers, peerID)
312 log.WithField("ID", peerID).Info("Delete peer from peerset")
315 func (ps *peerSet) SetPeerStatus(peerID string, height uint64, hash *bc.Hash) {
317 defer ps.lock.Unlock()
319 if peer, ok := ps.peers[peerID]; ok {
320 peer.SetStatus(height, hash)
324 func (ps *peerSet) requestBlockByHash(peerID string, hash *bc.Hash) error {
325 peer, ok := ps.Peer(peerID)
327 return errors.New("Can't find peer. ")
329 return peer.requestBlockByHash(hash)
332 func (ps *peerSet) requestBlockByHeight(peerID string, height uint64) error {
333 peer, ok := ps.Peer(peerID)
335 return errors.New("Can't find peer. ")
337 return peer.requestBlockByHeight(height)
340 func (ps *peerSet) BroadcastMinedBlock(block *types.Block) ([]*peer, error) {
341 msg, err := NewMinedBlockMessage(block)
343 return nil, errors.New("Failed construction block msg")
346 peers := ps.PeersWithoutBlock(&hash)
347 abnormalPeers := make([]*peer, 0)
348 for _, peer := range peers {
349 if ok := peer.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
350 abnormalPeers = append(abnormalPeers, peer)
353 if p, ok := ps.Peer(peer.id); ok {
357 return abnormalPeers, nil
360 func (ps *peerSet) BroadcastNewStatus(block *types.Block) ([]*peer, error) {
361 return ps.BroadcastMinedBlock(block)
364 func (ps *peerSet) BroadcastTx(tx *types.Tx) ([]*peer, error) {
365 msg, err := NewTransactionNotifyMessage(tx)
367 return nil, errors.New("Failed construction tx msg")
369 peers := ps.PeersWithoutTx(&tx.ID)
370 abnormalPeers := make([]*peer, 0)
371 for _, peer := range peers {
372 if ok := peer.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
373 abnormalPeers = append(abnormalPeers, peer)
376 if p, ok := ps.Peer(peer.id); ok {
377 p.MarkTransaction(&tx.ID)
380 return abnormalPeers, nil