9 log "github.com/sirupsen/logrus"
10 "github.com/tendermint/tmlibs/flowrate"
11 "gopkg.in/fatih/set.v0"
13 "github.com/vapor/consensus"
14 "github.com/vapor/errors"
15 msgs "github.com/vapor/netsync/messages"
16 "github.com/vapor/protocol/bc"
17 "github.com/vapor/protocol/bc/types"
21 maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
22 maxKnownSignatures = 1024 // Maximum block signatures to keep in the known list (prevent DOS)
23 maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
24 maxFilterAddressSize = 50
25 maxFilterAddressCount = 1000
31 errSendStatusMsg = errors.New("send status msg fail")
32 ErrPeerMisbehave = errors.New("peer is misbehave")
35 //BasePeer is the interface for connection level peer
36 type BasePeer interface {
39 ServiceFlag() consensus.ServiceFlag
40 TrafficStatus() (*flowrate.Status, *flowrate.Status)
41 TrySend(byte, interface{}) bool
45 //BasePeerSet is the intergace for connection level peer manager
46 type BasePeerSet interface {
47 StopPeerGracefully(string)
48 IsBanned(peerID string, level byte, reason string) bool
51 type BroadcastMsg interface {
52 FilterTargetPeers(ps *PeerSet) []string
53 MarkSendRecord(ps *PeerSet, peers []string)
59 // PeerInfo indicate peer status snap
60 type PeerInfo struct {
61 ID string `json:"peer_id"`
62 RemoteAddr string `json:"remote_addr"`
63 Height uint64 `json:"height"`
64 Ping string `json:"ping"`
65 Duration string `json:"duration"`
66 TotalSent int64 `json:"total_sent"`
67 TotalReceived int64 `json:"total_received"`
68 AverageSentRate int64 `json:"average_sent_rate"`
69 AverageReceivedRate int64 `json:"average_received_rate"`
70 CurrentSentRate int64 `json:"current_sent_rate"`
71 CurrentReceivedRate int64 `json:"current_received_rate"`
77 services consensus.ServiceFlag
80 irreversibleHeight uint64
81 irreversibleHash *bc.Hash
82 knownTxs *set.Set // Set of transaction hashes known to be known by this peer
83 knownBlocks *set.Set // Set of block hashes known to be known by this peer
84 knownSignatures *set.Set // Set of block signatures known to be known by this peer
85 knownStatus uint64 // Set of chain status known to be known by this peer
86 filterAdds *set.Set // Set of addresses that the spv node cares about.
89 func newPeer(basePeer BasePeer) *Peer {
92 services: basePeer.ServiceFlag(),
94 knownBlocks: set.New(),
95 knownSignatures: set.New(),
96 filterAdds: set.New(),
100 func (p *Peer) Height() uint64 {
102 defer p.mtx.RUnlock()
107 func (p *Peer) IrreversibleHeight() uint64 {
109 defer p.mtx.RUnlock()
111 return p.irreversibleHeight
114 func (p *Peer) AddFilterAddress(address []byte) {
118 if p.filterAdds.Size() >= maxFilterAddressCount {
119 log.WithField("module", logModule).Warn("the count of filter addresses is greater than limit")
122 if len(address) > maxFilterAddressSize {
123 log.WithField("module", logModule).Warn("the size of filter address is greater than limit")
127 p.filterAdds.Add(hex.EncodeToString(address))
130 func (p *Peer) AddFilterAddresses(addresses [][]byte) {
131 if !p.filterAdds.IsEmpty() {
134 for _, address := range addresses {
135 p.AddFilterAddress(address)
139 func (p *Peer) FilterClear() {
143 func (p *Peer) GetBlockByHeight(height uint64) bool {
144 msg := struct{ msgs.BlockchainMessage }{&msgs.GetBlockMessage{Height: height}}
145 return p.TrySend(msgs.BlockchainChannel, msg)
148 func (p *Peer) GetBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
149 msg := struct{ msgs.BlockchainMessage }{msgs.NewGetBlocksMessage(locator, stopHash)}
150 return p.TrySend(msgs.BlockchainChannel, msg)
153 func (p *Peer) GetHeaders(locator []*bc.Hash, stopHash *bc.Hash, skip uint64) bool {
154 msg := struct{ msgs.BlockchainMessage }{msgs.NewGetHeadersMessage(locator, stopHash, skip)}
155 return p.TrySend(msgs.BlockchainChannel, msg)
158 func (p *Peer) GetPeerInfo() *PeerInfo {
160 defer p.mtx.RUnlock()
162 sentStatus, receivedStatus := p.TrafficStatus()
163 ping := sentStatus.Idle - receivedStatus.Idle
164 if receivedStatus.Idle > sentStatus.Idle {
170 RemoteAddr: p.Addr().String(),
171 Height: p.bestHeight,
173 Duration: sentStatus.Duration.String(),
174 TotalSent: sentStatus.Bytes,
175 TotalReceived: receivedStatus.Bytes,
176 AverageSentRate: sentStatus.AvgRate,
177 AverageReceivedRate: receivedStatus.AvgRate,
178 CurrentSentRate: sentStatus.CurRate,
179 CurrentReceivedRate: receivedStatus.CurRate,
183 func (p *Peer) getRelatedTxAndStatus(txs []*types.Tx, txStatuses *bc.TransactionStatus) ([]*types.Tx, []*bc.TxVerifyResult) {
184 var relatedTxs []*types.Tx
185 var relatedStatuses []*bc.TxVerifyResult
186 for i, tx := range txs {
187 if p.isRelatedTx(tx) {
188 relatedTxs = append(relatedTxs, tx)
189 relatedStatuses = append(relatedStatuses, txStatuses.VerifyStatus[i])
192 return relatedTxs, relatedStatuses
195 func (p *Peer) isRelatedTx(tx *types.Tx) bool {
196 for _, input := range tx.Inputs {
197 switch inp := input.TypedInput.(type) {
198 case *types.SpendInput:
199 if p.filterAdds.Has(hex.EncodeToString(inp.ControlProgram)) {
204 for _, output := range tx.Outputs {
205 if p.filterAdds.Has(hex.EncodeToString(output.ControlProgram())) {
212 func (p *Peer) isSPVNode() bool {
213 return !p.services.IsEnable(consensus.SFFullNode)
216 func (p *Peer) MarkBlock(hash *bc.Hash) {
220 for p.knownBlocks.Size() >= maxKnownBlocks {
223 p.knownBlocks.Add(hash.String())
226 func (p *Peer) markNewStatus(height uint64) {
230 p.knownStatus = height
233 func (p *Peer) markSign(signature []byte) {
237 for p.knownSignatures.Size() >= maxKnownSignatures {
238 p.knownSignatures.Pop()
240 p.knownSignatures.Add(hex.EncodeToString(signature))
243 func (p *Peer) markTransaction(hash *bc.Hash) {
247 for p.knownTxs.Size() >= maxKnownTxs {
250 p.knownTxs.Add(hash.String())
253 func (ps *PeerSet) PeersWithoutBlock(hash bc.Hash) []string {
255 defer ps.mtx.RUnlock()
258 for _, peer := range ps.peers {
259 if !peer.knownBlocks.Has(hash.String()) {
260 peers = append(peers, peer.ID())
266 func (ps *PeerSet) PeersWithoutSign(signature []byte) []string {
268 defer ps.mtx.RUnlock()
271 for _, peer := range ps.peers {
272 if !peer.knownSignatures.Has(hex.EncodeToString(signature)) {
273 peers = append(peers, peer.ID())
279 func (p *Peer) SendBlock(block *types.Block) (bool, error) {
280 msg, err := msgs.NewBlockMessage(block)
282 return false, errors.Wrap(err, "fail on NewBlockMessage")
285 ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
287 blcokHash := block.Hash()
288 p.knownBlocks.Add(blcokHash.String())
293 func (p *Peer) SendBlocks(blocks []*types.Block) (bool, error) {
294 msg, err := msgs.NewBlocksMessage(blocks)
296 return false, errors.Wrap(err, "fail on NewBlocksMessage")
299 if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
303 for _, block := range blocks {
304 blcokHash := block.Hash()
305 p.knownBlocks.Add(blcokHash.String())
310 func (p *Peer) SendHeaders(headers []*types.BlockHeader) (bool, error) {
311 msg, err := msgs.NewHeadersMessage(headers)
313 return false, errors.New("fail on NewHeadersMessage")
316 ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
320 func (p *Peer) SendMerkleBlock(block *types.Block, txStatuses *bc.TransactionStatus) (bool, error) {
321 msg := msgs.NewMerkleBlockMessage()
322 if err := msg.SetRawBlockHeader(block.BlockHeader); err != nil {
326 relatedTxs, relatedStatuses := p.getRelatedTxAndStatus(block.Transactions, txStatuses)
328 txHashes, txFlags := types.GetTxMerkleTreeProof(block.Transactions, relatedTxs)
329 if err := msg.SetTxInfo(txHashes, txFlags, relatedTxs); err != nil {
333 statusHashes := types.GetStatusMerkleTreeProof(txStatuses.VerifyStatus, txFlags)
334 if err := msg.SetStatusInfo(statusHashes, relatedStatuses); err != nil {
338 ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
342 func (p *Peer) SendTransactions(txs []*types.Tx) error {
343 validTxs := make([]*types.Tx, 0, len(txs))
344 for i, tx := range txs {
345 if p.isSPVNode() && !p.isRelatedTx(tx) || p.knownTxs.Has(tx.ID.String()) {
349 validTxs = append(validTxs, tx)
350 if len(validTxs) != msgs.TxsMsgMaxTxNum && i != len(txs)-1 {
354 msg, err := msgs.NewTransactionsMessage(validTxs)
359 if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
360 return errors.New("failed to send txs msg")
363 for _, validTx := range validTxs {
364 p.knownTxs.Add(validTx.ID.String())
367 validTxs = make([]*types.Tx, 0, len(txs))
373 func (p *Peer) SendStatus(bestHeader, irreversibleHeader *types.BlockHeader) error {
374 msg := msgs.NewStatusMessage(bestHeader, irreversibleHeader)
375 if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
376 return errSendStatusMsg
378 p.markNewStatus(bestHeader.Height)
382 func (p *Peer) SetBestStatus(bestHeight uint64, bestHash *bc.Hash) {
386 p.bestHeight = bestHeight
387 p.bestHash = bestHash
390 func (p *Peer) SetIrreversibleStatus(irreversibleHeight uint64, irreversibleHash *bc.Hash) {
394 p.irreversibleHeight = irreversibleHeight
395 p.irreversibleHash = irreversibleHash
398 type PeerSet struct {
401 peers map[string]*Peer
404 // newPeerSet creates a new peer set to track the active participants.
405 func NewPeerSet(basePeerSet BasePeerSet) *PeerSet {
407 BasePeerSet: basePeerSet,
408 peers: make(map[string]*Peer),
412 func (ps *PeerSet) ProcessIllegal(peerID string, level byte, reason string) {
414 peer := ps.peers[peerID]
420 if banned := ps.IsBanned(peer.Addr().String(), level, reason); banned {
421 ps.RemovePeer(peerID)
426 func (ps *PeerSet) AddPeer(peer BasePeer) {
428 defer ps.mtx.Unlock()
430 if _, ok := ps.peers[peer.ID()]; !ok {
431 ps.peers[peer.ID()] = newPeer(peer)
434 log.WithField("module", logModule).Warning("add existing peer to blockKeeper")
437 func (ps *PeerSet) BestPeer(flag consensus.ServiceFlag) *Peer {
439 defer ps.mtx.RUnlock()
442 for _, p := range ps.peers {
443 if !p.services.IsEnable(flag) {
446 if bestPeer == nil || p.bestHeight > bestPeer.bestHeight || (p.bestHeight == bestPeer.bestHeight && p.IsLAN()) {
453 func (ps *PeerSet) BestIrreversiblePeer(flag consensus.ServiceFlag) *Peer {
455 defer ps.mtx.RUnlock()
458 for _, p := range ps.peers {
459 if !p.services.IsEnable(flag) {
462 if bestPeer == nil || p.irreversibleHeight > bestPeer.irreversibleHeight || (p.irreversibleHeight == bestPeer.irreversibleHeight && p.IsLAN()) {
469 //SendMsg send message to the target peer.
470 func (ps *PeerSet) SendMsg(peerID string, msgChannel byte, msg interface{}) bool {
471 peer := ps.GetPeer(peerID)
476 ok := peer.TrySend(msgChannel, msg)
478 ps.RemovePeer(peerID)
483 //BroadcastMsg Broadcast message to the target peers
484 // and mark the message send record
485 func (ps *PeerSet) BroadcastMsg(bm BroadcastMsg) error {
486 //filter target peers
487 peers := bm.FilterTargetPeers(ps)
489 //broadcast to target peers
490 peersSuccess := make([]string, 0)
491 for _, peer := range peers {
492 if ok := ps.SendMsg(peer, bm.GetChan(), bm.GetMsg()); !ok {
493 log.WithFields(log.Fields{"module": logModule, "peer": peer, "type": reflect.TypeOf(bm.GetMsg()), "message": bm.MsgString()}).Warning("send message to peer error")
496 peersSuccess = append(peersSuccess, peer)
499 //mark the message send record
500 bm.MarkSendRecord(ps, peersSuccess)
504 func (ps *PeerSet) BroadcastNewStatus(bestHeader, irreversibleHeader *types.BlockHeader) error {
505 msg := msgs.NewStatusMessage(bestHeader, irreversibleHeader)
506 peers := ps.peersWithoutNewStatus(bestHeader.Height)
507 for _, peer := range peers {
508 if ok := peer.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
509 ps.RemovePeer(peer.ID())
513 peer.markNewStatus(bestHeader.Height)
518 func (ps *PeerSet) BroadcastTx(tx *types.Tx) error {
519 msg, err := msgs.NewTransactionMessage(tx)
521 return errors.Wrap(err, "fail on broadcast tx")
524 peers := ps.peersWithoutTx(&tx.ID)
525 for _, peer := range peers {
526 if peer.isSPVNode() && !peer.isRelatedTx(tx) {
529 if ok := peer.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
530 log.WithFields(log.Fields{
533 "type": reflect.TypeOf(msg),
534 "message": msg.String(),
535 }).Warning("send message to peer error")
536 ps.RemovePeer(peer.ID())
539 peer.markTransaction(&tx.ID)
544 func (ps *PeerSet) ErrorHandler(peerID string, level byte, err error) {
545 if errors.Root(err) == ErrPeerMisbehave {
546 ps.ProcessIllegal(peerID, level, err.Error())
548 ps.RemovePeer(peerID)
552 // Peer retrieves the registered peer with the given id.
553 func (ps *PeerSet) GetPeer(id string) *Peer {
555 defer ps.mtx.RUnlock()
559 func (ps *PeerSet) GetPeerInfos() []*PeerInfo {
561 defer ps.mtx.RUnlock()
563 result := []*PeerInfo{}
564 for _, peer := range ps.peers {
565 result = append(result, peer.GetPeerInfo())
570 func (ps *PeerSet) MarkBlock(peerID string, hash *bc.Hash) {
571 peer := ps.GetPeer(peerID)
578 func (ps *PeerSet) MarkBlockSignature(peerID string, signature []byte) {
579 peer := ps.GetPeer(peerID)
583 peer.markSign(signature)
586 func (ps *PeerSet) MarkStatus(peerID string, height uint64) {
587 peer := ps.GetPeer(peerID)
591 peer.markNewStatus(height)
594 func (ps *PeerSet) MarkTx(peerID string, txHash bc.Hash) {
596 peer := ps.peers[peerID]
602 peer.markTransaction(&txHash)
605 func (ps *PeerSet) peersWithoutBlock(hash *bc.Hash) []*Peer {
607 defer ps.mtx.RUnlock()
610 for _, peer := range ps.peers {
611 if !peer.knownBlocks.Has(hash.String()) {
612 peers = append(peers, peer)
618 func (ps *PeerSet) peersWithoutNewStatus(height uint64) []*Peer {
620 defer ps.mtx.RUnlock()
623 for _, peer := range ps.peers {
624 if peer.knownStatus < height {
625 peers = append(peers, peer)
631 func (ps *PeerSet) peersWithoutTx(hash *bc.Hash) []*Peer {
633 defer ps.mtx.RUnlock()
636 for _, peer := range ps.peers {
637 if !peer.knownTxs.Has(hash.String()) {
638 peers = append(peers, peer)
644 func (ps *PeerSet) RemovePeer(peerID string) {
646 delete(ps.peers, peerID)
648 ps.StopPeerGracefully(peerID)
651 func (ps *PeerSet) SetStatus(peerID string, height uint64, hash *bc.Hash) {
652 peer := ps.GetPeer(peerID)
657 peer.SetBestStatus(height, hash)