9 log "github.com/sirupsen/logrus"
10 "github.com/tendermint/tmlibs/flowrate"
11 "gopkg.in/fatih/set.v0"
13 "github.com/vapor/consensus"
14 "github.com/vapor/errors"
15 "github.com/vapor/p2p/trust"
16 "github.com/vapor/protocol/bc"
17 "github.com/vapor/protocol/bc/types"
21 maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
22 maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
23 defaultBanThreshold = uint32(100)
26 //BasePeer is the interface for connection level peer
27 type BasePeer interface {
30 ServiceFlag() consensus.ServiceFlag
31 TrafficStatus() (*flowrate.Status, *flowrate.Status)
32 TrySend(byte, interface{}) bool
36 //BasePeerSet is the intergace for connection level peer manager
37 type BasePeerSet interface {
38 AddBannedPeer(string) error
39 StopPeerGracefully(string)
42 // PeerInfo indicate peer status snap
43 type PeerInfo struct {
44 ID string `json:"peer_id"`
45 RemoteAddr string `json:"remote_addr"`
46 Height uint64 `json:"height"`
47 Ping string `json:"ping"`
48 Duration string `json:"duration"`
49 TotalSent int64 `json:"total_sent"`
50 TotalReceived int64 `json:"total_received"`
51 AverageSentRate int64 `json:"average_sent_rate"`
52 AverageReceivedRate int64 `json:"average_received_rate"`
53 CurrentSentRate int64 `json:"current_sent_rate"`
54 CurrentReceivedRate int64 `json:"current_received_rate"`
60 services consensus.ServiceFlag
63 banScore trust.DynamicBanScore
64 knownTxs *set.Set // Set of transaction hashes known to be known by this peer
65 knownBlocks *set.Set // Set of block hashes known to be known by this peer
66 filterAdds *set.Set // Set of addresses that the spv node cares about.
69 func newPeer(height uint64, hash *bc.Hash, basePeer BasePeer) *peer {
72 services: basePeer.ServiceFlag(),
76 knownBlocks: set.New(),
77 filterAdds: set.New(),
81 func (p *peer) Height() uint64 {
87 func (p *peer) addBanScore(persistent, transient uint32, reason string) bool {
88 score := p.banScore.Increase(persistent, transient)
89 if score > defaultBanThreshold {
90 log.WithFields(log.Fields{
95 }).Errorf("banning and disconnecting")
99 warnThreshold := defaultBanThreshold >> 1
100 if score > warnThreshold {
101 log.WithFields(log.Fields{
106 }).Warning("ban score increasing")
111 func (p *peer) addFilterAddress(address []byte) {
115 if p.filterAdds.Size() >= maxFilterAddressCount {
116 log.WithField("module", logModule).Warn("the count of filter addresses is greater than limit")
119 if len(address) > maxFilterAddressSize {
120 log.WithField("module", logModule).Warn("the size of filter address is greater than limit")
124 p.filterAdds.Add(hex.EncodeToString(address))
127 func (p *peer) addFilterAddresses(addresses [][]byte) {
128 if !p.filterAdds.IsEmpty() {
131 for _, address := range addresses {
132 p.addFilterAddress(address)
136 func (p *peer) getBlockByHeight(height uint64) bool {
137 msg := struct{ BlockchainMessage }{&GetBlockMessage{Height: height}}
138 return p.TrySend(BlockchainChannel, msg)
141 func (p *peer) getBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
142 msg := struct{ BlockchainMessage }{NewGetBlocksMessage(locator, stopHash)}
143 return p.TrySend(BlockchainChannel, msg)
146 func (p *peer) getHeaders(locator []*bc.Hash, stopHash *bc.Hash) bool {
147 msg := struct{ BlockchainMessage }{NewGetHeadersMessage(locator, stopHash)}
148 return p.TrySend(BlockchainChannel, msg)
151 func (p *peer) getPeerInfo() *PeerInfo {
153 defer p.mtx.RUnlock()
155 sentStatus, receivedStatus := p.TrafficStatus()
156 ping := sentStatus.Idle - receivedStatus.Idle
157 if receivedStatus.Idle > sentStatus.Idle {
163 RemoteAddr: p.Addr().String(),
166 Duration: sentStatus.Duration.String(),
167 TotalSent: sentStatus.Bytes,
168 TotalReceived: receivedStatus.Bytes,
169 AverageSentRate: sentStatus.AvgRate,
170 AverageReceivedRate: receivedStatus.AvgRate,
171 CurrentSentRate: sentStatus.CurRate,
172 CurrentReceivedRate: receivedStatus.CurRate,
176 func (p *peer) getRelatedTxAndStatus(txs []*types.Tx, txStatuses *bc.TransactionStatus) ([]*types.Tx, []*bc.TxVerifyResult) {
177 var relatedTxs []*types.Tx
178 var relatedStatuses []*bc.TxVerifyResult
179 for i, tx := range txs {
180 if p.isRelatedTx(tx) {
181 relatedTxs = append(relatedTxs, tx)
182 relatedStatuses = append(relatedStatuses, txStatuses.VerifyStatus[i])
185 return relatedTxs, relatedStatuses
188 func (p *peer) isRelatedTx(tx *types.Tx) bool {
189 for _, input := range tx.Inputs {
190 switch inp := input.TypedInput.(type) {
191 case *types.SpendInput:
192 if p.filterAdds.Has(hex.EncodeToString(inp.ControlProgram)) {
197 for _, output := range tx.Outputs {
198 if p.filterAdds.Has(hex.EncodeToString(output.ControlProgram())) {
205 func (p *peer) isSPVNode() bool {
206 return !p.services.IsEnable(consensus.SFFullNode)
209 func (p *peer) markBlock(hash *bc.Hash) {
213 for p.knownBlocks.Size() >= maxKnownBlocks {
216 p.knownBlocks.Add(hash.String())
219 func (p *peer) markTransaction(hash *bc.Hash) {
223 for p.knownTxs.Size() >= maxKnownTxs {
226 p.knownTxs.Add(hash.String())
229 func (p *peer) sendBlock(block *types.Block) (bool, error) {
230 msg, err := NewBlockMessage(block)
232 return false, errors.Wrap(err, "fail on NewBlockMessage")
235 ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
237 blcokHash := block.Hash()
238 p.knownBlocks.Add(blcokHash.String())
243 func (p *peer) sendBlocks(blocks []*types.Block) (bool, error) {
244 msg, err := NewBlocksMessage(blocks)
246 return false, errors.Wrap(err, "fail on NewBlocksMessage")
249 if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
253 for _, block := range blocks {
254 blcokHash := block.Hash()
255 p.knownBlocks.Add(blcokHash.String())
260 func (p *peer) sendHeaders(headers []*types.BlockHeader) (bool, error) {
261 msg, err := NewHeadersMessage(headers)
263 return false, errors.New("fail on NewHeadersMessage")
266 ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
270 func (p *peer) sendMerkleBlock(block *types.Block, txStatuses *bc.TransactionStatus) (bool, error) {
271 msg := NewMerkleBlockMessage()
272 if err := msg.setRawBlockHeader(block.BlockHeader); err != nil {
276 relatedTxs, relatedStatuses := p.getRelatedTxAndStatus(block.Transactions, txStatuses)
278 txHashes, txFlags := types.GetTxMerkleTreeProof(block.Transactions, relatedTxs)
279 if err := msg.setTxInfo(txHashes, txFlags, relatedTxs); err != nil {
283 statusHashes := types.GetStatusMerkleTreeProof(txStatuses.VerifyStatus, txFlags)
284 if err := msg.setStatusInfo(statusHashes, relatedStatuses); err != nil {
288 ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
292 func (p *peer) sendTransactions(txs []*types.Tx) (bool, error) {
293 for _, tx := range txs {
294 if p.isSPVNode() && !p.isRelatedTx(tx) {
297 msg, err := NewTransactionMessage(tx)
299 return false, errors.Wrap(err, "failed to tx msg")
302 if p.knownTxs.Has(tx.ID.String()) {
305 if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
308 p.knownTxs.Add(tx.ID.String())
313 func (p *peer) setStatus(height uint64, hash *bc.Hash) {
320 type peerSet struct {
323 peers map[string]*peer
326 // newPeerSet creates a new peer set to track the active participants.
327 func newPeerSet(basePeerSet BasePeerSet) *peerSet {
329 BasePeerSet: basePeerSet,
330 peers: make(map[string]*peer),
334 func (ps *peerSet) addBanScore(peerID string, persistent, transient uint32, reason string) {
336 peer := ps.peers[peerID]
342 if ban := peer.addBanScore(persistent, transient, reason); !ban {
345 if err := ps.AddBannedPeer(peer.Addr().String()); err != nil {
346 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on add ban peer")
348 ps.removePeer(peerID)
351 func (ps *peerSet) addPeer(peer BasePeer, height uint64, hash *bc.Hash) {
353 defer ps.mtx.Unlock()
355 if _, ok := ps.peers[peer.ID()]; !ok {
356 ps.peers[peer.ID()] = newPeer(height, hash, peer)
359 log.WithField("module", logModule).Warning("add existing peer to blockKeeper")
362 func (ps *peerSet) bestPeer(flag consensus.ServiceFlag) *peer {
364 defer ps.mtx.RUnlock()
367 for _, p := range ps.peers {
368 if !p.services.IsEnable(flag) {
371 if bestPeer == nil || p.height > bestPeer.height || (p.height == bestPeer.height && p.IsLAN()) {
378 func (ps *peerSet) broadcastMinedBlock(block *types.Block) error {
379 msg, err := NewMinedBlockMessage(block)
381 return errors.Wrap(err, "fail on broadcast mined block")
385 peers := ps.peersWithoutBlock(&hash)
386 for _, peer := range peers {
387 if peer.isSPVNode() {
390 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
391 log.WithFields(log.Fields{"module": logModule, "peer": peer.Addr(), "type": reflect.TypeOf(msg), "message": msg.String()}).Warning("send message to peer error")
392 ps.removePeer(peer.ID())
395 peer.markBlock(&hash)
400 func (ps *peerSet) broadcastNewStatus(bestBlock, genesisBlock *types.Block) error {
401 bestBlockHash := bestBlock.Hash()
402 peers := ps.peersWithoutBlock(&bestBlockHash)
404 genesisHash := genesisBlock.Hash()
405 msg := NewStatusResponseMessage(&bestBlock.BlockHeader, &genesisHash)
406 for _, peer := range peers {
407 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
408 log.WithFields(log.Fields{"module": logModule, "peer": peer.Addr(), "type": reflect.TypeOf(msg), "message": msg.String()}).Warning("send message to peer error")
409 ps.removePeer(peer.ID())
416 func (ps *peerSet) broadcastTx(tx *types.Tx) error {
417 msg, err := NewTransactionMessage(tx)
419 return errors.Wrap(err, "fail on broadcast tx")
422 peers := ps.peersWithoutTx(&tx.ID)
423 for _, peer := range peers {
424 if peer.isSPVNode() && !peer.isRelatedTx(tx) {
427 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
428 log.WithFields(log.Fields{
431 "type": reflect.TypeOf(msg),
432 "message": msg.String(),
433 }).Warning("send message to peer error")
434 ps.removePeer(peer.ID())
437 peer.markTransaction(&tx.ID)
442 func (ps *peerSet) errorHandler(peerID string, err error) {
443 if errors.Root(err) == errPeerMisbehave {
444 ps.addBanScore(peerID, 20, 0, err.Error())
446 ps.removePeer(peerID)
450 // Peer retrieves the registered peer with the given id.
451 func (ps *peerSet) getPeer(id string) *peer {
453 defer ps.mtx.RUnlock()
457 func (ps *peerSet) getPeerInfos() []*PeerInfo {
459 defer ps.mtx.RUnlock()
461 result := []*PeerInfo{}
462 for _, peer := range ps.peers {
463 result = append(result, peer.getPeerInfo())
468 func (ps *peerSet) peersWithoutBlock(hash *bc.Hash) []*peer {
470 defer ps.mtx.RUnlock()
473 for _, peer := range ps.peers {
474 if !peer.knownBlocks.Has(hash.String()) {
475 peers = append(peers, peer)
481 func (ps *peerSet) peersWithoutTx(hash *bc.Hash) []*peer {
483 defer ps.mtx.RUnlock()
486 for _, peer := range ps.peers {
487 if !peer.knownTxs.Has(hash.String()) {
488 peers = append(peers, peer)
494 func (ps *peerSet) removePeer(peerID string) {
496 delete(ps.peers, peerID)
498 ps.StopPeerGracefully(peerID)