8 log "github.com/sirupsen/logrus"
9 "github.com/tendermint/tmlibs/flowrate"
10 "gopkg.in/fatih/set.v0"
12 "github.com/vapor/consensus"
13 "github.com/vapor/errors"
14 "github.com/vapor/p2p/trust"
15 "github.com/vapor/protocol/bc"
16 "github.com/vapor/protocol/bc/types"
20 maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
21 maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
22 defaultBanThreshold = uint64(100)
25 //BasePeer is the interface for connection level peer
26 type BasePeer interface {
29 ServiceFlag() consensus.ServiceFlag
30 TrafficStatus() (*flowrate.Status, *flowrate.Status)
31 TrySend(byte, interface{}) bool
34 //BasePeerSet is the intergace for connection level peer manager
35 type BasePeerSet interface {
36 AddBannedPeer(string) error
37 StopPeerGracefully(string)
40 // PeerInfo indicate peer status snap
41 type PeerInfo struct {
42 ID string `json:"peer_id"`
43 RemoteAddr string `json:"remote_addr"`
44 Height uint64 `json:"height"`
45 Ping string `json:"ping"`
46 Duration string `json:"duration"`
47 TotalSent int64 `json:"total_sent"`
48 TotalReceived int64 `json:"total_received"`
49 AverageSentRate int64 `json:"average_sent_rate"`
50 AverageReceivedRate int64 `json:"average_received_rate"`
51 CurrentSentRate int64 `json:"current_sent_rate"`
52 CurrentReceivedRate int64 `json:"current_received_rate"`
58 services consensus.ServiceFlag
61 banScore trust.DynamicBanScore
62 knownTxs *set.Set // Set of transaction hashes known to be known by this peer
63 knownBlocks *set.Set // Set of block hashes known to be known by this peer
64 filterAdds *set.Set // Set of addresses that the spv node cares about.
67 func newPeer(height uint64, hash *bc.Hash, basePeer BasePeer) *peer {
70 services: basePeer.ServiceFlag(),
74 knownBlocks: set.New(),
75 filterAdds: set.New(),
79 func (p *peer) Height() uint64 {
85 func (p *peer) addBanScore(persistent, transient uint64, reason string) bool {
86 score := p.banScore.Increase(persistent, transient)
87 if score > defaultBanThreshold {
88 log.WithFields(log.Fields{
93 }).Errorf("banning and disconnecting")
97 warnThreshold := defaultBanThreshold >> 1
98 if score > warnThreshold {
99 log.WithFields(log.Fields{
104 }).Warning("ban score increasing")
109 func (p *peer) addFilterAddress(address []byte) {
113 if p.filterAdds.Size() >= maxFilterAddressCount {
114 log.WithField("module", logModule).Warn("the count of filter addresses is greater than limit")
117 if len(address) > maxFilterAddressSize {
118 log.WithField("module", logModule).Warn("the size of filter address is greater than limit")
121 p.filterAdds.Add(hex.EncodeToString(address))
124 func (p *peer) addFilterAddresses(addresses [][]byte) {
125 if !p.filterAdds.IsEmpty() {
128 for _, address := range addresses {
129 p.addFilterAddress(address)
133 func (p *peer) getBlockByHeight(height uint64) bool {
134 msg := struct{ BlockchainMessage }{&GetBlockMessage{Height: height}}
135 return p.TrySend(BlockchainChannel, msg)
138 func (p *peer) getBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
139 msg := struct{ BlockchainMessage }{NewGetBlocksMessage(locator, stopHash)}
140 return p.TrySend(BlockchainChannel, msg)
143 func (p *peer) getHeaders(locator []*bc.Hash, stopHash *bc.Hash) bool {
144 msg := struct{ BlockchainMessage }{NewGetHeadersMessage(locator, stopHash)}
145 return p.TrySend(BlockchainChannel, msg)
148 func (p *peer) getPeerInfo() *PeerInfo {
150 defer p.mtx.RUnlock()
152 sentStatus, receivedStatus := p.TrafficStatus()
153 ping := sentStatus.Idle - receivedStatus.Idle
154 if receivedStatus.Idle > sentStatus.Idle {
160 RemoteAddr: p.Addr().String(),
163 Duration: sentStatus.Duration.String(),
164 TotalSent: sentStatus.Bytes,
165 TotalReceived: receivedStatus.Bytes,
166 AverageSentRate: sentStatus.AvgRate,
167 AverageReceivedRate: receivedStatus.AvgRate,
168 CurrentSentRate: sentStatus.CurRate,
169 CurrentReceivedRate: receivedStatus.CurRate,
173 func (p *peer) getRelatedTxAndStatus(txs []*types.Tx, txStatuses *bc.TransactionStatus) ([]*types.Tx, []*bc.TxVerifyResult) {
174 var relatedTxs []*types.Tx
175 var relatedStatuses []*bc.TxVerifyResult
176 for i, tx := range txs {
177 if p.isRelatedTx(tx) {
178 relatedTxs = append(relatedTxs, tx)
179 relatedStatuses = append(relatedStatuses, txStatuses.VerifyStatus[i])
182 return relatedTxs, relatedStatuses
185 func (p *peer) isRelatedTx(tx *types.Tx) bool {
186 for _, input := range tx.Inputs {
187 switch inp := input.TypedInput.(type) {
188 case *types.SpendInput:
189 if p.filterAdds.Has(hex.EncodeToString(inp.ControlProgram)) {
194 for _, output := range tx.Outputs {
195 if p.filterAdds.Has(hex.EncodeToString(output.ControlProgram)) {
202 func (p *peer) isSPVNode() bool {
203 return !p.services.IsEnable(consensus.SFFullNode)
206 func (p *peer) markBlock(hash *bc.Hash) {
210 for p.knownBlocks.Size() >= maxKnownBlocks {
213 p.knownBlocks.Add(hash.String())
216 func (p *peer) markTransaction(hash *bc.Hash) {
220 for p.knownTxs.Size() >= maxKnownTxs {
223 p.knownTxs.Add(hash.String())
226 func (p *peer) sendBlock(block *types.Block) (bool, error) {
227 msg, err := NewBlockMessage(block)
229 return false, errors.Wrap(err, "fail on NewBlockMessage")
232 ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
234 blcokHash := block.Hash()
235 p.knownBlocks.Add(blcokHash.String())
240 func (p *peer) sendBlocks(blocks []*types.Block) (bool, error) {
241 msg, err := NewBlocksMessage(blocks)
243 return false, errors.Wrap(err, "fail on NewBlocksMessage")
246 if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
250 for _, block := range blocks {
251 blcokHash := block.Hash()
252 p.knownBlocks.Add(blcokHash.String())
257 func (p *peer) sendHeaders(headers []*types.BlockHeader) (bool, error) {
258 msg, err := NewHeadersMessage(headers)
260 return false, errors.New("fail on NewHeadersMessage")
263 ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
267 func (p *peer) sendMerkleBlock(block *types.Block, txStatuses *bc.TransactionStatus) (bool, error) {
268 msg := NewMerkleBlockMessage()
269 if err := msg.setRawBlockHeader(block.BlockHeader); err != nil {
273 relatedTxs, relatedStatuses := p.getRelatedTxAndStatus(block.Transactions, txStatuses)
275 txHashes, txFlags := types.GetTxMerkleTreeProof(block.Transactions, relatedTxs)
276 if err := msg.setTxInfo(txHashes, txFlags, relatedTxs); err != nil {
280 statusHashes := types.GetStatusMerkleTreeProof(txStatuses.VerifyStatus, txFlags)
281 if err := msg.setStatusInfo(statusHashes, relatedStatuses); err != nil {
285 ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
289 func (p *peer) sendTransactions(txs []*types.Tx) (bool, error) {
290 for _, tx := range txs {
291 if p.isSPVNode() && !p.isRelatedTx(tx) {
294 msg, err := NewTransactionMessage(tx)
296 return false, errors.Wrap(err, "failed to tx msg")
299 if p.knownTxs.Has(tx.ID.String()) {
302 if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
305 p.knownTxs.Add(tx.ID.String())
310 func (p *peer) setStatus(height uint64, hash *bc.Hash) {
317 type peerSet struct {
320 peers map[string]*peer
323 // newPeerSet creates a new peer set to track the active participants.
324 func newPeerSet(basePeerSet BasePeerSet) *peerSet {
326 BasePeerSet: basePeerSet,
327 peers: make(map[string]*peer),
331 func (ps *peerSet) addBanScore(peerID string, persistent, transient uint64, reason string) {
333 peer := ps.peers[peerID]
339 if ban := peer.addBanScore(persistent, transient, reason); !ban {
342 if err := ps.AddBannedPeer(peer.Addr().String()); err != nil {
343 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on add ban peer")
345 ps.removePeer(peerID)
348 func (ps *peerSet) addPeer(peer BasePeer, height uint64, hash *bc.Hash) {
350 defer ps.mtx.Unlock()
352 if _, ok := ps.peers[peer.ID()]; !ok {
353 ps.peers[peer.ID()] = newPeer(height, hash, peer)
356 log.WithField("module", logModule).Warning("add existing peer to blockKeeper")
359 func (ps *peerSet) bestPeer(flag consensus.ServiceFlag) *peer {
361 defer ps.mtx.RUnlock()
364 for _, p := range ps.peers {
365 if !p.services.IsEnable(flag) {
368 if bestPeer == nil || p.height > bestPeer.height {
375 func (ps *peerSet) broadcastMinedBlock(block *types.Block) error {
376 msg, err := NewMinedBlockMessage(block)
378 return errors.Wrap(err, "fail on broadcast mined block")
382 peers := ps.peersWithoutBlock(&hash)
383 for _, peer := range peers {
384 if peer.isSPVNode() {
387 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
388 ps.removePeer(peer.ID())
391 peer.markBlock(&hash)
396 func (ps *peerSet) broadcastNewStatus(bestBlock, genesisBlock *types.Block) error {
397 bestBlockHash := bestBlock.Hash()
398 peers := ps.peersWithoutBlock(&bestBlockHash)
400 genesisHash := genesisBlock.Hash()
401 msg := NewStatusResponseMessage(&bestBlock.BlockHeader, &genesisHash)
402 for _, peer := range peers {
403 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
404 ps.removePeer(peer.ID())
411 func (ps *peerSet) broadcastTx(tx *types.Tx) error {
412 msg, err := NewTransactionMessage(tx)
414 return errors.Wrap(err, "fail on broadcast tx")
417 peers := ps.peersWithoutTx(&tx.ID)
418 for _, peer := range peers {
419 if peer.isSPVNode() && !peer.isRelatedTx(tx) {
422 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
423 ps.removePeer(peer.ID())
426 peer.markTransaction(&tx.ID)
431 func (ps *peerSet) errorHandler(peerID string, err error) {
432 if errors.Root(err) == errPeerMisbehave {
433 ps.addBanScore(peerID, 20, 0, err.Error())
435 ps.removePeer(peerID)
439 // Peer retrieves the registered peer with the given id.
440 func (ps *peerSet) getPeer(id string) *peer {
442 defer ps.mtx.RUnlock()
446 func (ps *peerSet) getPeerInfos() []*PeerInfo {
448 defer ps.mtx.RUnlock()
450 result := []*PeerInfo{}
451 for _, peer := range ps.peers {
452 result = append(result, peer.getPeerInfo())
457 func (ps *peerSet) peersWithoutBlock(hash *bc.Hash) []*peer {
459 defer ps.mtx.RUnlock()
462 for _, peer := range ps.peers {
463 if !peer.knownBlocks.Has(hash.String()) {
464 peers = append(peers, peer)
470 func (ps *peerSet) peersWithoutTx(hash *bc.Hash) []*peer {
472 defer ps.mtx.RUnlock()
475 for _, peer := range ps.peers {
476 if !peer.knownTxs.Has(hash.String()) {
477 peers = append(peers, peer)
483 func (ps *peerSet) removePeer(peerID string) {
485 delete(ps.peers, peerID)
487 ps.StopPeerGracefully(peerID)