7 log "github.com/sirupsen/logrus"
8 "gopkg.in/fatih/set.v0"
10 "github.com/bytom/consensus"
11 "github.com/bytom/errors"
12 "github.com/bytom/p2p/trust"
13 "github.com/bytom/protocol/bc"
14 "github.com/bytom/protocol/bc/types"
18 maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
19 maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
20 defaultBanThreshold = uint64(100)
23 //BasePeer is the interface for connection level peer
24 type BasePeer interface {
27 ServiceFlag() consensus.ServiceFlag
28 TrySend(byte, interface{}) bool
31 //BasePeerSet is the intergace for connection level peer manager
32 type BasePeerSet interface {
33 AddBannedPeer(string) error
34 StopPeerGracefully(string)
37 // PeerInfo indicate peer status snap
38 type PeerInfo struct {
40 RemoteAddr string `json:"remote_addr"`
41 Height uint64 `json:"height"`
42 Delay uint32 `json:"delay"`
48 services consensus.ServiceFlag
51 banScore trust.DynamicBanScore
52 knownTxs *set.Set // Set of transaction hashes known to be known by this peer
53 knownBlocks *set.Set // Set of block hashes known to be known by this peer
56 func newPeer(height uint64, hash *bc.Hash, basePeer BasePeer) *peer {
59 services: basePeer.ServiceFlag(),
63 knownBlocks: set.New(),
67 func (p *peer) Height() uint64 {
73 func (p *peer) addBanScore(persistent, transient uint64, reason string) bool {
74 score := p.banScore.Increase(persistent, transient)
75 if score > defaultBanThreshold {
76 log.WithFields(log.Fields{"address": p.Addr(), "score": score, "reason": reason}).Errorf("banning and disconnecting")
80 warnThreshold := defaultBanThreshold >> 1
81 if score > warnThreshold {
82 log.WithFields(log.Fields{"address": p.Addr(), "score": score, "reason": reason}).Warning("ban score increasing")
87 func (p *peer) getBlockByHeight(height uint64) bool {
88 msg := struct{ BlockchainMessage }{&GetBlockMessage{Height: height}}
89 return p.TrySend(BlockchainChannel, msg)
92 func (p *peer) getBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
93 msg := struct{ BlockchainMessage }{NewGetBlocksMessage(locator, stopHash)}
94 return p.TrySend(BlockchainChannel, msg)
97 func (p *peer) getHeaders(locator []*bc.Hash, stopHash *bc.Hash) bool {
98 msg := struct{ BlockchainMessage }{NewGetHeadersMessage(locator, stopHash)}
99 return p.TrySend(BlockchainChannel, msg)
102 func (p *peer) getPeerInfo() *PeerInfo {
104 defer p.mtx.RUnlock()
107 RemoteAddr: p.Addr().String(),
112 func (p *peer) markBlock(hash *bc.Hash) {
116 for p.knownBlocks.Size() >= maxKnownBlocks {
119 p.knownBlocks.Add(hash.String())
122 func (p *peer) markTransaction(hash *bc.Hash) {
126 for p.knownTxs.Size() >= maxKnownTxs {
129 p.knownTxs.Add(hash.String())
132 func (p *peer) sendBlock(block *types.Block) (bool, error) {
133 msg, err := NewBlockMessage(block)
135 return false, errors.Wrap(err, "fail on NewBlockMessage")
138 ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
140 blcokHash := block.Hash()
141 p.knownBlocks.Add(blcokHash.String())
146 func (p *peer) sendBlocks(blocks []*types.Block) (bool, error) {
147 msg, err := NewBlocksMessage(blocks)
149 return false, errors.Wrap(err, "fail on NewBlocksMessage")
152 if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
156 for _, block := range blocks {
157 blcokHash := block.Hash()
158 p.knownBlocks.Add(blcokHash.String())
163 func (p *peer) sendHeaders(headers []*types.BlockHeader) (bool, error) {
164 msg, err := NewHeadersMessage(headers)
166 return false, errors.New("fail on NewHeadersMessage")
169 ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
173 func (p *peer) sendTransactions(txs []*types.Tx) (bool, error) {
174 for _, tx := range txs {
175 msg, err := NewTransactionMessage(tx)
177 return false, errors.Wrap(err, "failed to tx msg")
180 if p.knownTxs.Has(tx.ID.String()) {
183 if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
186 p.knownTxs.Add(tx.ID.String())
191 func (p *peer) setStatus(height uint64, hash *bc.Hash) {
198 type peerSet struct {
201 peers map[string]*peer
204 // newPeerSet creates a new peer set to track the active participants.
205 func newPeerSet(basePeerSet BasePeerSet) *peerSet {
207 BasePeerSet: basePeerSet,
208 peers: make(map[string]*peer),
212 func (ps *peerSet) addBanScore(peerID string, persistent, transient uint64, reason string) {
214 peer := ps.peers[peerID]
220 if ban := peer.addBanScore(persistent, transient, reason); !ban {
223 if err := ps.AddBannedPeer(peer.Addr().String()); err != nil {
224 log.WithField("err", err).Error("fail on add ban peer")
226 ps.removePeer(peerID)
229 func (ps *peerSet) addPeer(peer BasePeer, height uint64, hash *bc.Hash) {
231 defer ps.mtx.Unlock()
233 if _, ok := ps.peers[peer.ID()]; !ok {
234 ps.peers[peer.ID()] = newPeer(height, hash, peer)
237 log.WithField("ID", peer.ID()).Warning("add existing peer to blockKeeper")
240 func (ps *peerSet) bestPeer(flag consensus.ServiceFlag) *peer {
242 defer ps.mtx.RUnlock()
245 for _, p := range ps.peers {
246 if !p.services.IsEnable(flag) {
249 if bestPeer == nil || p.height > bestPeer.height {
256 func (ps *peerSet) broadcastMinedBlock(block *types.Block) error {
257 msg, err := NewMinedBlockMessage(block)
259 return errors.Wrap(err, "fail on broadcast mined block")
263 peers := ps.peersWithoutBlock(&hash)
264 for _, peer := range peers {
265 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
266 ps.removePeer(peer.ID())
269 peer.markBlock(&hash)
274 func (ps *peerSet) broadcastTx(tx *types.Tx) error {
275 msg, err := NewTransactionMessage(tx)
277 return errors.Wrap(err, "fail on broadcast tx")
280 peers := ps.peersWithoutTx(&tx.ID)
281 for _, peer := range peers {
282 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
283 ps.removePeer(peer.ID())
286 peer.markTransaction(&tx.ID)
291 func (ps *peerSet) errorHandler(peerID string, err error) {
292 if errors.Root(err) == errPeerMisbehave {
293 ps.addBanScore(peerID, 20, 0, err.Error())
295 ps.removePeer(peerID)
299 // Peer retrieves the registered peer with the given id.
300 func (ps *peerSet) getPeer(id string) *peer {
302 defer ps.mtx.RUnlock()
306 func (ps *peerSet) getPeerInfos() []*PeerInfo {
308 defer ps.mtx.RUnlock()
310 result := []*PeerInfo{}
311 for _, peer := range ps.peers {
312 result = append(result, peer.getPeerInfo())
317 func (ps *peerSet) peersWithoutBlock(hash *bc.Hash) []*peer {
319 defer ps.mtx.RUnlock()
322 for _, peer := range ps.peers {
323 if !peer.knownBlocks.Has(hash.String()) {
324 peers = append(peers, peer)
330 func (ps *peerSet) peersWithoutTx(hash *bc.Hash) []*peer {
332 defer ps.mtx.RUnlock()
335 for _, peer := range ps.peers {
336 if !peer.knownTxs.Has(hash.String()) {
337 peers = append(peers, peer)
343 func (ps *peerSet) removePeer(peerID string) {
345 delete(ps.peers, peerID)
347 ps.StopPeerGracefully(peerID)