9 log "github.com/sirupsen/logrus"
10 "github.com/tendermint/tmlibs/flowrate"
11 "gopkg.in/fatih/set.v0"
13 "github.com/bytom/vapor/consensus"
14 "github.com/bytom/vapor/errors"
15 msgs "github.com/bytom/vapor/netsync/messages"
16 "github.com/bytom/vapor/protocol/bc"
17 "github.com/bytom/vapor/protocol/bc/types"
21 maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
22 maxKnownSignatures = 1024 // Maximum block signatures to keep in the known list (prevent DOS)
23 maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
24 maxFilterAddressSize = 50
25 maxFilterAddressCount = 1000
31 errSendStatusMsg = errors.New("send status msg fail")
32 ErrPeerMisbehave = errors.New("peer is misbehave")
33 ErrNoValidPeer = errors.New("Can't find valid fast sync peer")
36 //BasePeer is the interface for connection level peer
37 type BasePeer interface {
41 RemoteAddrHost() string
42 ServiceFlag() consensus.ServiceFlag
43 TrafficStatus() (*flowrate.Status, *flowrate.Status)
44 TrySend(byte, interface{}) bool
48 //BasePeerSet is the intergace for connection level peer manager
49 type BasePeerSet interface {
50 StopPeerGracefully(string)
51 IsBanned(ip string, level byte, reason string) bool
54 type BroadcastMsg interface {
55 FilterTargetPeers(ps *PeerSet) []string
56 MarkSendRecord(ps *PeerSet, peers []string)
62 // PeerInfo indicate peer status snap
63 type PeerInfo struct {
64 ID string `json:"peer_id"`
65 Moniker string `json:"moniker"`
66 RemoteAddr string `json:"remote_addr"`
67 Height uint64 `json:"height"`
68 Ping string `json:"ping"`
69 Duration string `json:"duration"`
70 TotalSent int64 `json:"total_sent"`
71 TotalReceived int64 `json:"total_received"`
72 AverageSentRate int64 `json:"average_sent_rate"`
73 AverageReceivedRate int64 `json:"average_received_rate"`
74 CurrentSentRate int64 `json:"current_sent_rate"`
75 CurrentReceivedRate int64 `json:"current_received_rate"`
81 services consensus.ServiceFlag
84 irreversibleHeight uint64
85 irreversibleHash *bc.Hash
86 knownTxs *set.Set // Set of transaction hashes known to be known by this peer
87 knownBlocks *set.Set // Set of block hashes known to be known by this peer
88 knownSignatures *set.Set // Set of block signatures known to be known by this peer
89 knownStatus uint64 // Set of chain status known to be known by this peer
90 filterAdds *set.Set // Set of addresses that the spv node cares about.
93 func newPeer(basePeer BasePeer) *Peer {
96 services: basePeer.ServiceFlag(),
98 knownBlocks: set.New(),
99 knownSignatures: set.New(),
100 filterAdds: set.New(),
104 func (p *Peer) Height() uint64 {
106 defer p.mtx.RUnlock()
111 func (p *Peer) IrreversibleHeight() uint64 {
113 defer p.mtx.RUnlock()
115 return p.irreversibleHeight
118 func (p *Peer) AddFilterAddress(address []byte) {
122 if p.filterAdds.Size() >= maxFilterAddressCount {
123 log.WithField("module", logModule).Warn("the count of filter addresses is greater than limit")
126 if len(address) > maxFilterAddressSize {
127 log.WithField("module", logModule).Warn("the size of filter address is greater than limit")
131 p.filterAdds.Add(hex.EncodeToString(address))
134 func (p *Peer) AddFilterAddresses(addresses [][]byte) {
135 if !p.filterAdds.IsEmpty() {
138 for _, address := range addresses {
139 p.AddFilterAddress(address)
143 func (p *Peer) FilterClear() {
147 func (p *Peer) GetBlockByHeight(height uint64) bool {
148 msg := struct{ msgs.BlockchainMessage }{&msgs.GetBlockMessage{Height: height}}
149 return p.TrySend(msgs.BlockchainChannel, msg)
152 func (p *Peer) GetBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
153 msg := struct{ msgs.BlockchainMessage }{msgs.NewGetBlocksMessage(locator, stopHash)}
154 return p.TrySend(msgs.BlockchainChannel, msg)
157 func (p *Peer) GetHeaders(locator []*bc.Hash, stopHash *bc.Hash, skip uint64) bool {
158 msg := struct{ msgs.BlockchainMessage }{msgs.NewGetHeadersMessage(locator, stopHash, skip)}
159 return p.TrySend(msgs.BlockchainChannel, msg)
162 func (p *Peer) GetPeerInfo() *PeerInfo {
164 defer p.mtx.RUnlock()
166 sentStatus, receivedStatus := p.TrafficStatus()
167 ping := sentStatus.Idle - receivedStatus.Idle
168 if receivedStatus.Idle > sentStatus.Idle {
174 Moniker: p.BasePeer.Moniker(),
175 RemoteAddr: p.Addr().String(),
176 Height: p.bestHeight,
178 Duration: sentStatus.Duration.String(),
179 TotalSent: sentStatus.Bytes,
180 TotalReceived: receivedStatus.Bytes,
181 AverageSentRate: sentStatus.AvgRate,
182 AverageReceivedRate: receivedStatus.AvgRate,
183 CurrentSentRate: sentStatus.CurRate,
184 CurrentReceivedRate: receivedStatus.CurRate,
188 func (p *Peer) getRelatedTxAndStatus(txs []*types.Tx, txStatuses *bc.TransactionStatus) ([]*types.Tx, []*bc.TxVerifyResult) {
189 var relatedTxs []*types.Tx
190 var relatedStatuses []*bc.TxVerifyResult
191 for i, tx := range txs {
192 if p.isRelatedTx(tx) {
193 relatedTxs = append(relatedTxs, tx)
194 relatedStatuses = append(relatedStatuses, txStatuses.VerifyStatus[i])
197 return relatedTxs, relatedStatuses
200 func (p *Peer) isRelatedTx(tx *types.Tx) bool {
201 for _, input := range tx.Inputs {
202 switch inp := input.TypedInput.(type) {
203 case *types.SpendInput:
204 if p.filterAdds.Has(hex.EncodeToString(inp.ControlProgram)) {
209 for _, output := range tx.Outputs {
210 if p.filterAdds.Has(hex.EncodeToString(output.ControlProgram())) {
217 func (p *Peer) isSPVNode() bool {
218 return !p.services.IsEnable(consensus.SFFullNode)
221 func (p *Peer) MarkBlock(hash *bc.Hash) {
225 for p.knownBlocks.Size() >= maxKnownBlocks {
228 p.knownBlocks.Add(hash.String())
231 func (p *Peer) markNewStatus(height uint64) {
235 p.knownStatus = height
238 func (p *Peer) markSign(signature []byte) {
242 for p.knownSignatures.Size() >= maxKnownSignatures {
243 p.knownSignatures.Pop()
245 p.knownSignatures.Add(hex.EncodeToString(signature))
248 func (p *Peer) markTransaction(hash *bc.Hash) {
252 for p.knownTxs.Size() >= maxKnownTxs {
255 p.knownTxs.Add(hash.String())
258 func (p *Peer) SendBlock(block *types.Block) (bool, error) {
259 msg, err := msgs.NewBlockMessage(block)
261 return false, errors.Wrap(err, "fail on NewBlockMessage")
264 ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
266 blcokHash := block.Hash()
267 p.knownBlocks.Add(blcokHash.String())
272 func (p *Peer) SendBlocks(blocks []*types.Block) (bool, error) {
273 msg, err := msgs.NewBlocksMessage(blocks)
275 return false, errors.Wrap(err, "fail on NewBlocksMessage")
278 if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
282 for _, block := range blocks {
283 blcokHash := block.Hash()
284 p.knownBlocks.Add(blcokHash.String())
289 func (p *Peer) SendHeaders(headers []*types.BlockHeader) (bool, error) {
290 msg, err := msgs.NewHeadersMessage(headers)
292 return false, errors.New("fail on NewHeadersMessage")
295 ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
299 func (p *Peer) SendMerkleBlock(block *types.Block, txStatuses *bc.TransactionStatus) (bool, error) {
300 msg := msgs.NewMerkleBlockMessage()
301 if err := msg.SetRawBlockHeader(block.BlockHeader); err != nil {
305 relatedTxs, relatedStatuses := p.getRelatedTxAndStatus(block.Transactions, txStatuses)
307 txHashes, txFlags := types.GetTxMerkleTreeProof(block.Transactions, relatedTxs)
308 if err := msg.SetTxInfo(txHashes, txFlags, relatedTxs); err != nil {
312 statusHashes := types.GetStatusMerkleTreeProof(txStatuses.VerifyStatus, txFlags)
313 if err := msg.SetStatusInfo(statusHashes, relatedStatuses); err != nil {
317 ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
321 func (p *Peer) SendTransactions(txs []*types.Tx) error {
322 validTxs := make([]*types.Tx, 0, len(txs))
323 for i, tx := range txs {
324 if p.isSPVNode() && !p.isRelatedTx(tx) || p.knownTxs.Has(tx.ID.String()) {
328 validTxs = append(validTxs, tx)
329 if len(validTxs) != msgs.TxsMsgMaxTxNum && i != len(txs)-1 {
333 msg, err := msgs.NewTransactionsMessage(validTxs)
338 if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
339 return errors.New("failed to send txs msg")
342 for _, validTx := range validTxs {
343 p.knownTxs.Add(validTx.ID.String())
346 validTxs = make([]*types.Tx, 0, len(txs))
352 func (p *Peer) SendStatus(bestHeader, irreversibleHeader *types.BlockHeader) error {
353 msg := msgs.NewStatusMessage(bestHeader, irreversibleHeader)
354 if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
355 return errSendStatusMsg
357 p.markNewStatus(bestHeader.Height)
361 func (p *Peer) SetBestStatus(bestHeight uint64, bestHash *bc.Hash) {
365 p.bestHeight = bestHeight
366 p.bestHash = bestHash
369 func (p *Peer) SetIrreversibleStatus(irreversibleHeight uint64, irreversibleHash *bc.Hash) {
373 p.irreversibleHeight = irreversibleHeight
374 p.irreversibleHash = irreversibleHash
377 type PeerSet struct {
380 peers map[string]*Peer
383 // newPeerSet creates a new peer set to track the active participants.
384 func NewPeerSet(basePeerSet BasePeerSet) *PeerSet {
386 BasePeerSet: basePeerSet,
387 peers: make(map[string]*Peer),
391 func (ps *PeerSet) ProcessIllegal(peerID string, level byte, reason string) {
393 peer := ps.peers[peerID]
400 if banned := ps.IsBanned(peer.RemoteAddrHost(), level, reason); banned {
401 ps.RemovePeer(peerID)
406 func (ps *PeerSet) AddPeer(peer BasePeer) {
408 defer ps.mtx.Unlock()
410 if _, ok := ps.peers[peer.ID()]; !ok {
411 ps.peers[peer.ID()] = newPeer(peer)
414 log.WithField("module", logModule).Warning("add existing peer to blockKeeper")
417 func (ps *PeerSet) BestPeer(flag consensus.ServiceFlag) *Peer {
419 defer ps.mtx.RUnlock()
422 for _, p := range ps.peers {
423 if !p.services.IsEnable(flag) {
426 if bestPeer == nil || p.bestHeight > bestPeer.bestHeight || (p.bestHeight == bestPeer.bestHeight && p.IsLAN()) {
433 func (ps *PeerSet) BestIrreversiblePeer(flag consensus.ServiceFlag) *Peer {
435 defer ps.mtx.RUnlock()
438 for _, p := range ps.peers {
439 if !p.services.IsEnable(flag) {
442 if bestPeer == nil || p.irreversibleHeight > bestPeer.irreversibleHeight || (p.irreversibleHeight == bestPeer.irreversibleHeight && p.IsLAN()) {
449 //SendMsg send message to the target peer.
450 func (ps *PeerSet) SendMsg(peerID string, msgChannel byte, msg interface{}) bool {
451 peer := ps.GetPeer(peerID)
456 ok := peer.TrySend(msgChannel, msg)
458 ps.RemovePeer(peerID)
463 //BroadcastMsg Broadcast message to the target peers
464 // and mark the message send record
465 func (ps *PeerSet) BroadcastMsg(bm BroadcastMsg) error {
466 //filter target peers
467 peers := bm.FilterTargetPeers(ps)
469 //broadcast to target peers
470 peersSuccess := make([]string, 0)
471 for _, peer := range peers {
472 if ok := ps.SendMsg(peer, bm.GetChan(), bm.GetMsg()); !ok {
473 log.WithFields(log.Fields{"module": logModule, "peer": peer, "type": reflect.TypeOf(bm.GetMsg()), "message": bm.MsgString()}).Warning("send message to peer error")
476 peersSuccess = append(peersSuccess, peer)
479 //mark the message send record
480 bm.MarkSendRecord(ps, peersSuccess)
484 func (ps *PeerSet) BroadcastNewStatus(bestHeader, irreversibleHeader *types.BlockHeader) error {
485 msg := msgs.NewStatusMessage(bestHeader, irreversibleHeader)
486 peers := ps.peersWithoutNewStatus(bestHeader.Height)
487 for _, peer := range peers {
488 if ok := peer.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
489 ps.RemovePeer(peer.ID())
493 peer.markNewStatus(bestHeader.Height)
498 func (ps *PeerSet) BroadcastTx(tx *types.Tx) error {
499 msg, err := msgs.NewTransactionMessage(tx)
501 return errors.Wrap(err, "fail on broadcast tx")
504 peers := ps.peersWithoutTx(&tx.ID)
505 for _, peer := range peers {
506 if peer.isSPVNode() && !peer.isRelatedTx(tx) {
509 if ok := peer.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
510 log.WithFields(log.Fields{
513 "type": reflect.TypeOf(msg),
514 "message": msg.String(),
515 }).Warning("send message to peer error")
516 ps.RemovePeer(peer.ID())
519 peer.markTransaction(&tx.ID)
524 // Peer retrieves the registered peer with the given id.
525 func (ps *PeerSet) GetPeer(id string) *Peer {
527 defer ps.mtx.RUnlock()
531 func (ps *PeerSet) GetPeersByHeight(height uint64) []*Peer {
533 defer ps.mtx.RUnlock()
536 for _, peer := range ps.peers {
537 if peer.Height() >= height {
538 peers = append(peers, peer)
544 func (ps *PeerSet) GetPeerInfos() []*PeerInfo {
546 defer ps.mtx.RUnlock()
548 result := []*PeerInfo{}
549 for _, peer := range ps.peers {
550 result = append(result, peer.GetPeerInfo())
555 func (ps *PeerSet) MarkBlock(peerID string, hash *bc.Hash) {
556 peer := ps.GetPeer(peerID)
563 func (ps *PeerSet) MarkBlockSignature(peerID string, signature []byte) {
564 peer := ps.GetPeer(peerID)
568 peer.markSign(signature)
571 func (ps *PeerSet) MarkStatus(peerID string, height uint64) {
572 peer := ps.GetPeer(peerID)
576 peer.markNewStatus(height)
579 func (ps *PeerSet) MarkTx(peerID string, txHash bc.Hash) {
581 peer := ps.peers[peerID]
587 peer.markTransaction(&txHash)
590 func (ps *PeerSet) PeersWithoutBlock(hash bc.Hash) []string {
592 defer ps.mtx.RUnlock()
595 for _, peer := range ps.peers {
596 if !peer.knownBlocks.Has(hash.String()) {
597 peers = append(peers, peer.ID())
603 func (ps *PeerSet) PeersWithoutSignature(signature []byte) []string {
605 defer ps.mtx.RUnlock()
608 for _, peer := range ps.peers {
609 if !peer.knownSignatures.Has(hex.EncodeToString(signature)) {
610 peers = append(peers, peer.ID())
616 func (ps *PeerSet) peersWithoutNewStatus(height uint64) []*Peer {
618 defer ps.mtx.RUnlock()
621 for _, peer := range ps.peers {
622 if peer.knownStatus < height {
623 peers = append(peers, peer)
629 func (ps *PeerSet) peersWithoutTx(hash *bc.Hash) []*Peer {
631 defer ps.mtx.RUnlock()
634 for _, peer := range ps.peers {
635 if !peer.knownTxs.Has(hash.String()) {
636 peers = append(peers, peer)
642 func (ps *PeerSet) RemovePeer(peerID string) {
644 delete(ps.peers, peerID)
646 ps.StopPeerGracefully(peerID)
649 func (ps *PeerSet) SetStatus(peerID string, height uint64, hash *bc.Hash) {
650 peer := ps.GetPeer(peerID)
655 peer.SetBestStatus(height, hash)
658 func (ps *PeerSet) SetIrreversibleStatus(peerID string, height uint64, hash *bc.Hash) {
659 peer := ps.GetPeer(peerID)
664 peer.SetIrreversibleStatus(height, hash)