9 log "github.com/sirupsen/logrus"
10 "github.com/tendermint/tmlibs/flowrate"
11 "gopkg.in/fatih/set.v0"
13 "github.com/vapor/consensus"
14 "github.com/vapor/errors"
15 "github.com/vapor/p2p/trust"
16 "github.com/vapor/protocol/bc"
17 "github.com/vapor/protocol/bc/types"
21 maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
22 maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
23 defaultBanThreshold = uint32(100)
26 var errSendStatusMsg = errors.New("send status msg fail")
28 //BasePeer is the interface for connection level peer
29 type BasePeer interface {
32 ServiceFlag() consensus.ServiceFlag
33 TrafficStatus() (*flowrate.Status, *flowrate.Status)
34 TrySend(byte, interface{}) bool
38 //BasePeerSet is the intergace for connection level peer manager
39 type BasePeerSet interface {
40 AddBannedPeer(string) error
41 StopPeerGracefully(string)
44 // PeerInfo indicate peer status snap
45 type PeerInfo struct {
46 ID string `json:"peer_id"`
47 RemoteAddr string `json:"remote_addr"`
48 Height uint64 `json:"height"`
49 Ping string `json:"ping"`
50 Duration string `json:"duration"`
51 TotalSent int64 `json:"total_sent"`
52 TotalReceived int64 `json:"total_received"`
53 AverageSentRate int64 `json:"average_sent_rate"`
54 AverageReceivedRate int64 `json:"average_received_rate"`
55 CurrentSentRate int64 `json:"current_sent_rate"`
56 CurrentReceivedRate int64 `json:"current_received_rate"`
62 services consensus.ServiceFlag
65 banScore trust.DynamicBanScore
66 knownTxs *set.Set // Set of transaction hashes known to be known by this peer
67 knownBlocks *set.Set // Set of block hashes known to be known by this peer
68 knownStatus uint64 // Set of chain status known to be known by this peer
69 filterAdds *set.Set // Set of addresses that the spv node cares about.
72 func newPeer(basePeer BasePeer) *peer {
75 services: basePeer.ServiceFlag(),
77 knownBlocks: set.New(),
78 filterAdds: set.New(),
82 func (p *peer) Height() uint64 {
88 func (p *peer) addBanScore(persistent, transient uint32, reason string) bool {
89 score := p.banScore.Increase(persistent, transient)
90 if score > defaultBanThreshold {
91 log.WithFields(log.Fields{
96 }).Errorf("banning and disconnecting")
100 warnThreshold := defaultBanThreshold >> 1
101 if score > warnThreshold {
102 log.WithFields(log.Fields{
107 }).Warning("ban score increasing")
112 func (p *peer) addFilterAddress(address []byte) {
116 if p.filterAdds.Size() >= maxFilterAddressCount {
117 log.WithField("module", logModule).Warn("the count of filter addresses is greater than limit")
120 if len(address) > maxFilterAddressSize {
121 log.WithField("module", logModule).Warn("the size of filter address is greater than limit")
125 p.filterAdds.Add(hex.EncodeToString(address))
128 func (p *peer) addFilterAddresses(addresses [][]byte) {
129 if !p.filterAdds.IsEmpty() {
132 for _, address := range addresses {
133 p.addFilterAddress(address)
137 func (p *peer) getBlockByHeight(height uint64) bool {
138 msg := struct{ BlockchainMessage }{&GetBlockMessage{Height: height}}
139 return p.TrySend(BlockchainChannel, msg)
142 func (p *peer) getBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
143 msg := struct{ BlockchainMessage }{NewGetBlocksMessage(locator, stopHash)}
144 return p.TrySend(BlockchainChannel, msg)
147 func (p *peer) getHeaders(locator []*bc.Hash, stopHash *bc.Hash) bool {
148 msg := struct{ BlockchainMessage }{NewGetHeadersMessage(locator, stopHash)}
149 return p.TrySend(BlockchainChannel, msg)
152 func (p *peer) getPeerInfo() *PeerInfo {
154 defer p.mtx.RUnlock()
156 sentStatus, receivedStatus := p.TrafficStatus()
157 ping := sentStatus.Idle - receivedStatus.Idle
158 if receivedStatus.Idle > sentStatus.Idle {
164 RemoteAddr: p.Addr().String(),
167 Duration: sentStatus.Duration.String(),
168 TotalSent: sentStatus.Bytes,
169 TotalReceived: receivedStatus.Bytes,
170 AverageSentRate: sentStatus.AvgRate,
171 AverageReceivedRate: receivedStatus.AvgRate,
172 CurrentSentRate: sentStatus.CurRate,
173 CurrentReceivedRate: receivedStatus.CurRate,
177 func (p *peer) getRelatedTxAndStatus(txs []*types.Tx, txStatuses *bc.TransactionStatus) ([]*types.Tx, []*bc.TxVerifyResult) {
178 var relatedTxs []*types.Tx
179 var relatedStatuses []*bc.TxVerifyResult
180 for i, tx := range txs {
181 if p.isRelatedTx(tx) {
182 relatedTxs = append(relatedTxs, tx)
183 relatedStatuses = append(relatedStatuses, txStatuses.VerifyStatus[i])
186 return relatedTxs, relatedStatuses
189 func (p *peer) isRelatedTx(tx *types.Tx) bool {
190 for _, input := range tx.Inputs {
191 switch inp := input.TypedInput.(type) {
192 case *types.SpendInput:
193 if p.filterAdds.Has(hex.EncodeToString(inp.ControlProgram)) {
198 for _, output := range tx.Outputs {
199 if p.filterAdds.Has(hex.EncodeToString(output.ControlProgram())) {
206 func (p *peer) isSPVNode() bool {
207 return !p.services.IsEnable(consensus.SFFullNode)
210 func (p *peer) markBlock(hash *bc.Hash) {
214 for p.knownBlocks.Size() >= maxKnownBlocks {
217 p.knownBlocks.Add(hash.String())
220 func (p *peer) markNewStatus(height uint64) {
224 p.knownStatus = height
227 func (p *peer) markTransaction(hash *bc.Hash) {
231 for p.knownTxs.Size() >= maxKnownTxs {
234 p.knownTxs.Add(hash.String())
237 func (p *peer) sendBlock(block *types.Block) (bool, error) {
238 msg, err := NewBlockMessage(block)
240 return false, errors.Wrap(err, "fail on NewBlockMessage")
243 ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
245 blcokHash := block.Hash()
246 p.knownBlocks.Add(blcokHash.String())
251 func (p *peer) sendBlocks(blocks []*types.Block) (bool, error) {
252 msg, err := NewBlocksMessage(blocks)
254 return false, errors.Wrap(err, "fail on NewBlocksMessage")
257 if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
261 for _, block := range blocks {
262 blcokHash := block.Hash()
263 p.knownBlocks.Add(blcokHash.String())
268 func (p *peer) sendHeaders(headers []*types.BlockHeader) (bool, error) {
269 msg, err := NewHeadersMessage(headers)
271 return false, errors.New("fail on NewHeadersMessage")
274 ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
278 func (p *peer) sendMerkleBlock(block *types.Block, txStatuses *bc.TransactionStatus) (bool, error) {
279 msg := NewMerkleBlockMessage()
280 if err := msg.setRawBlockHeader(block.BlockHeader); err != nil {
284 relatedTxs, relatedStatuses := p.getRelatedTxAndStatus(block.Transactions, txStatuses)
286 txHashes, txFlags := types.GetTxMerkleTreeProof(block.Transactions, relatedTxs)
287 if err := msg.setTxInfo(txHashes, txFlags, relatedTxs); err != nil {
291 statusHashes := types.GetStatusMerkleTreeProof(txStatuses.VerifyStatus, txFlags)
292 if err := msg.setStatusInfo(statusHashes, relatedStatuses); err != nil {
296 ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
300 func (p *peer) sendTransactions(txs []*types.Tx) (bool, error) {
301 for _, tx := range txs {
302 if p.isSPVNode() && !p.isRelatedTx(tx) {
305 msg, err := NewTransactionMessage(tx)
307 return false, errors.Wrap(err, "failed to tx msg")
310 if p.knownTxs.Has(tx.ID.String()) {
313 if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
316 p.knownTxs.Add(tx.ID.String())
321 func (p *peer) sendStatus(header *types.BlockHeader) error {
322 msg := NewStatusMessage(header)
323 if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
324 return errSendStatusMsg
326 p.markNewStatus(header.Height)
330 func (p *peer) setStatus(height uint64, hash *bc.Hash) {
337 type peerSet struct {
340 peers map[string]*peer
343 // newPeerSet creates a new peer set to track the active participants.
344 func newPeerSet(basePeerSet BasePeerSet) *peerSet {
346 BasePeerSet: basePeerSet,
347 peers: make(map[string]*peer),
351 func (ps *peerSet) addBanScore(peerID string, persistent, transient uint32, reason string) {
353 peer := ps.peers[peerID]
359 if ban := peer.addBanScore(persistent, transient, reason); !ban {
362 if err := ps.AddBannedPeer(peer.Addr().String()); err != nil {
363 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on add ban peer")
365 ps.removePeer(peerID)
368 func (ps *peerSet) addPeer(peer BasePeer) {
370 defer ps.mtx.Unlock()
372 if _, ok := ps.peers[peer.ID()]; !ok {
373 ps.peers[peer.ID()] = newPeer(peer)
376 log.WithField("module", logModule).Warning("add existing peer to blockKeeper")
379 func (ps *peerSet) bestPeer(flag consensus.ServiceFlag) *peer {
381 defer ps.mtx.RUnlock()
384 for _, p := range ps.peers {
385 if !p.services.IsEnable(flag) {
388 if bestPeer == nil || p.height > bestPeer.height || (p.height == bestPeer.height && p.IsLAN()) {
395 func (ps *peerSet) broadcastMinedBlock(block *types.Block) error {
396 msg, err := NewMinedBlockMessage(block)
398 return errors.Wrap(err, "fail on broadcast mined block")
402 peers := ps.peersWithoutBlock(&hash)
403 for _, peer := range peers {
404 if peer.isSPVNode() {
407 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
408 log.WithFields(log.Fields{"module": logModule, "peer": peer.Addr(), "type": reflect.TypeOf(msg), "message": msg.String()}).Warning("send message to peer error")
409 ps.removePeer(peer.ID())
412 peer.markBlock(&hash)
413 peer.markNewStatus(block.Height)
418 func (ps *peerSet) broadcastNewStatus(bestBlock *types.Block) error {
419 msg := NewStatusMessage(&bestBlock.BlockHeader)
420 peers := ps.peersWithoutNewStatus(bestBlock.Height)
421 for _, peer := range peers {
422 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
423 ps.removePeer(peer.ID())
427 peer.markNewStatus(bestBlock.Height)
432 func (ps *peerSet) broadcastTx(tx *types.Tx) error {
433 msg, err := NewTransactionMessage(tx)
435 return errors.Wrap(err, "fail on broadcast tx")
438 peers := ps.peersWithoutTx(&tx.ID)
439 for _, peer := range peers {
440 if peer.isSPVNode() && !peer.isRelatedTx(tx) {
443 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
444 log.WithFields(log.Fields{
447 "type": reflect.TypeOf(msg),
448 "message": msg.String(),
449 }).Warning("send message to peer error")
450 ps.removePeer(peer.ID())
453 peer.markTransaction(&tx.ID)
458 func (ps *peerSet) errorHandler(peerID string, err error) {
459 if errors.Root(err) == errPeerMisbehave {
460 ps.addBanScore(peerID, 20, 0, err.Error())
462 ps.removePeer(peerID)
466 // Peer retrieves the registered peer with the given id.
467 func (ps *peerSet) getPeer(id string) *peer {
469 defer ps.mtx.RUnlock()
473 func (ps *peerSet) getPeerInfos() []*PeerInfo {
475 defer ps.mtx.RUnlock()
477 result := []*PeerInfo{}
478 for _, peer := range ps.peers {
479 result = append(result, peer.getPeerInfo())
484 func (ps *peerSet) peersWithoutBlock(hash *bc.Hash) []*peer {
486 defer ps.mtx.RUnlock()
489 for _, peer := range ps.peers {
490 if !peer.knownBlocks.Has(hash.String()) {
491 peers = append(peers, peer)
497 func (ps *peerSet) peersWithoutNewStatus(height uint64) []*peer {
499 defer ps.mtx.RUnlock()
502 for _, peer := range ps.peers {
503 if peer.knownStatus < height {
504 peers = append(peers, peer)
510 func (ps *peerSet) peersWithoutTx(hash *bc.Hash) []*peer {
512 defer ps.mtx.RUnlock()
515 for _, peer := range ps.peers {
516 if !peer.knownTxs.Has(hash.String()) {
517 peers = append(peers, peer)
523 func (ps *peerSet) removePeer(peerID string) {
525 delete(ps.peers, peerID)
527 ps.StopPeerGracefully(peerID)