9 log "github.com/sirupsen/logrus"
10 "github.com/tendermint/tmlibs/flowrate"
11 "gopkg.in/fatih/set.v0"
13 "github.com/bytom/bytom/consensus"
14 "github.com/bytom/bytom/errors"
15 "github.com/bytom/bytom/protocol/bc"
16 "github.com/bytom/bytom/protocol/bc/types"
20 maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
21 maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
24 //BasePeer is the interface for connection level peer
25 type BasePeer interface {
28 RemoteAddrHost() string
29 ServiceFlag() consensus.ServiceFlag
30 TrafficStatus() (*flowrate.Status, *flowrate.Status)
31 TrySend(byte, interface{}) bool
35 //BasePeerSet is the intergace for connection level peer manager
36 type BasePeerSet interface {
37 StopPeerGracefully(string)
38 IsBanned(ip string, level byte, reason string) bool
41 // PeerInfo indicate peer status snap
42 type PeerInfo struct {
43 ID string `json:"peer_id"`
44 RemoteAddr string `json:"remote_addr"`
45 Height uint64 `json:"height"`
46 Ping string `json:"ping"`
47 Duration string `json:"duration"`
48 TotalSent int64 `json:"total_sent"`
49 TotalReceived int64 `json:"total_received"`
50 AverageSentRate int64 `json:"average_sent_rate"`
51 AverageReceivedRate int64 `json:"average_received_rate"`
52 CurrentSentRate int64 `json:"current_sent_rate"`
53 CurrentReceivedRate int64 `json:"current_received_rate"`
59 services consensus.ServiceFlag
62 knownTxs *set.Set // Set of transaction hashes known to be known by this peer
63 knownBlocks *set.Set // Set of block hashes known to be known by this peer
64 filterAdds *set.Set // Set of addresses that the spv node cares about.
67 func newPeer(height uint64, hash *bc.Hash, basePeer BasePeer) *peer {
70 services: basePeer.ServiceFlag(),
74 knownBlocks: set.New(),
75 filterAdds: set.New(),
79 func (p *peer) Height() uint64 {
85 func (p *peer) addFilterAddress(address []byte) {
89 if p.filterAdds.Size() >= maxFilterAddressCount {
90 log.WithField("module", logModule).Warn("the count of filter addresses is greater than limit")
93 if len(address) > maxFilterAddressSize {
94 log.WithField("module", logModule).Warn("the size of filter address is greater than limit")
98 p.filterAdds.Add(hex.EncodeToString(address))
101 func (p *peer) addFilterAddresses(addresses [][]byte) {
102 if !p.filterAdds.IsEmpty() {
105 for _, address := range addresses {
106 p.addFilterAddress(address)
110 func (p *peer) getBlockByHeight(height uint64) bool {
111 msg := struct{ BlockchainMessage }{&GetBlockMessage{Height: height}}
112 return p.TrySend(BlockchainChannel, msg)
115 func (p *peer) getBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
116 msg := struct{ BlockchainMessage }{NewGetBlocksMessage(locator, stopHash)}
117 return p.TrySend(BlockchainChannel, msg)
120 func (p *peer) getHeaders(locator []*bc.Hash, stopHash *bc.Hash) bool {
121 msg := struct{ BlockchainMessage }{NewGetHeadersMessage(locator, stopHash)}
122 return p.TrySend(BlockchainChannel, msg)
125 func (p *peer) getPeerInfo() *PeerInfo {
127 defer p.mtx.RUnlock()
129 sentStatus, receivedStatus := p.TrafficStatus()
130 ping := sentStatus.Idle - receivedStatus.Idle
131 if receivedStatus.Idle > sentStatus.Idle {
137 RemoteAddr: p.Addr().String(),
140 Duration: sentStatus.Duration.String(),
141 TotalSent: sentStatus.Bytes,
142 TotalReceived: receivedStatus.Bytes,
143 AverageSentRate: sentStatus.AvgRate,
144 AverageReceivedRate: receivedStatus.AvgRate,
145 CurrentSentRate: sentStatus.CurRate,
146 CurrentReceivedRate: receivedStatus.CurRate,
150 func (p *peer) getRelatedTxAndStatus(txs []*types.Tx, txStatuses *bc.TransactionStatus) ([]*types.Tx, []*bc.TxVerifyResult) {
151 var relatedTxs []*types.Tx
152 var relatedStatuses []*bc.TxVerifyResult
153 for i, tx := range txs {
154 if p.isRelatedTx(tx) {
155 relatedTxs = append(relatedTxs, tx)
156 relatedStatuses = append(relatedStatuses, txStatuses.VerifyStatus[i])
159 return relatedTxs, relatedStatuses
162 func (p *peer) isRelatedTx(tx *types.Tx) bool {
163 for _, input := range tx.Inputs {
164 switch inp := input.TypedInput.(type) {
165 case *types.SpendInput:
166 if p.filterAdds.Has(hex.EncodeToString(inp.ControlProgram)) {
171 for _, output := range tx.Outputs {
172 if p.filterAdds.Has(hex.EncodeToString(output.ControlProgram)) {
179 func (p *peer) isSPVNode() bool {
180 return !p.services.IsEnable(consensus.SFFullNode)
183 func (p *peer) markBlock(hash *bc.Hash) {
187 for p.knownBlocks.Size() >= maxKnownBlocks {
190 p.knownBlocks.Add(hash.String())
193 func (p *peer) markTransaction(hash *bc.Hash) {
197 for p.knownTxs.Size() >= maxKnownTxs {
200 p.knownTxs.Add(hash.String())
203 func (p *peer) sendBlock(block *types.Block) (bool, error) {
204 msg, err := NewBlockMessage(block)
206 return false, errors.Wrap(err, "fail on NewBlockMessage")
209 ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
211 blcokHash := block.Hash()
212 p.knownBlocks.Add(blcokHash.String())
217 func (p *peer) sendBlocks(blocks []*types.Block) (bool, error) {
218 msg, err := NewBlocksMessage(blocks)
220 return false, errors.Wrap(err, "fail on NewBlocksMessage")
223 if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
227 for _, block := range blocks {
228 blcokHash := block.Hash()
229 p.knownBlocks.Add(blcokHash.String())
234 func (p *peer) sendHeaders(headers []*types.BlockHeader) (bool, error) {
235 msg, err := NewHeadersMessage(headers)
237 return false, errors.New("fail on NewHeadersMessage")
240 ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
244 func (p *peer) sendMerkleBlock(block *types.Block, txStatuses *bc.TransactionStatus) (bool, error) {
245 msg := NewMerkleBlockMessage()
246 if err := msg.setRawBlockHeader(block.BlockHeader); err != nil {
250 relatedTxs, relatedStatuses := p.getRelatedTxAndStatus(block.Transactions, txStatuses)
252 txHashes, txFlags := types.GetTxMerkleTreeProof(block.Transactions, relatedTxs)
253 if err := msg.setTxInfo(txHashes, txFlags, relatedTxs); err != nil {
257 statusHashes := types.GetStatusMerkleTreeProof(txStatuses.VerifyStatus, txFlags)
258 if err := msg.setStatusInfo(statusHashes, relatedStatuses); err != nil {
262 ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
266 func (p *peer) sendTransactions(txs []*types.Tx) (bool, error) {
267 for _, tx := range txs {
268 if p.isSPVNode() && !p.isRelatedTx(tx) {
271 msg, err := NewTransactionMessage(tx)
273 return false, errors.Wrap(err, "failed to tx msg")
276 if p.knownTxs.Has(tx.ID.String()) {
279 if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
282 p.knownTxs.Add(tx.ID.String())
287 func (p *peer) setStatus(height uint64, hash *bc.Hash) {
294 type peerSet struct {
297 peers map[string]*peer
300 // newPeerSet creates a new peer set to track the active participants.
301 func newPeerSet(basePeerSet BasePeerSet) *peerSet {
303 BasePeerSet: basePeerSet,
304 peers: make(map[string]*peer),
308 func (ps *peerSet) ProcessIllegal(peerID string, level byte, reason string) {
310 peer := ps.peers[peerID]
316 if banned := ps.IsBanned(peer.RemoteAddrHost(), level, reason); banned {
317 ps.removePeer(peerID)
322 func (ps *peerSet) addPeer(peer BasePeer, height uint64, hash *bc.Hash) {
324 defer ps.mtx.Unlock()
326 if _, ok := ps.peers[peer.ID()]; !ok {
327 ps.peers[peer.ID()] = newPeer(height, hash, peer)
330 log.WithField("module", logModule).Warning("add existing peer to blockKeeper")
333 func (ps *peerSet) bestPeer(flag consensus.ServiceFlag) *peer {
335 defer ps.mtx.RUnlock()
338 for _, p := range ps.peers {
339 if !p.services.IsEnable(flag) {
342 if bestPeer == nil || p.height > bestPeer.height || (p.height == bestPeer.height && p.IsLAN()) {
349 func (ps *peerSet) broadcastMinedBlock(block *types.Block) error {
350 msg, err := NewMinedBlockMessage(block)
352 return errors.Wrap(err, "fail on broadcast mined block")
356 peers := ps.peersWithoutBlock(&hash)
357 for _, peer := range peers {
358 if peer.isSPVNode() {
361 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
362 log.WithFields(log.Fields{"module": logModule, "peer": peer.Addr(), "type": reflect.TypeOf(msg), "message": msg.String()}).Warning("send message to peer error")
363 ps.removePeer(peer.ID())
366 peer.markBlock(&hash)
371 func (ps *peerSet) broadcastNewStatus(bestBlock, genesisBlock *types.Block) error {
372 bestBlockHash := bestBlock.Hash()
373 peers := ps.peersWithoutBlock(&bestBlockHash)
375 genesisHash := genesisBlock.Hash()
376 msg := NewStatusResponseMessage(&bestBlock.BlockHeader, &genesisHash)
377 for _, peer := range peers {
378 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
379 log.WithFields(log.Fields{"module": logModule, "peer": peer.Addr(), "type": reflect.TypeOf(msg), "message": msg.String()}).Warning("send message to peer error")
380 ps.removePeer(peer.ID())
387 func (ps *peerSet) broadcastTx(tx *types.Tx) error {
388 msg, err := NewTransactionMessage(tx)
390 return errors.Wrap(err, "fail on broadcast tx")
393 peers := ps.peersWithoutTx(&tx.ID)
394 for _, peer := range peers {
395 if peer.isSPVNode() && !peer.isRelatedTx(tx) {
398 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
399 log.WithFields(log.Fields{
402 "type": reflect.TypeOf(msg),
403 "message": msg.String(),
404 }).Warning("send message to peer error")
405 ps.removePeer(peer.ID())
408 peer.markTransaction(&tx.ID)
413 // Peer retrieves the registered peer with the given id.
414 func (ps *peerSet) getPeer(id string) *peer {
416 defer ps.mtx.RUnlock()
420 func (ps *peerSet) getPeerInfos() []*PeerInfo {
422 defer ps.mtx.RUnlock()
424 result := []*PeerInfo{}
425 for _, peer := range ps.peers {
426 result = append(result, peer.getPeerInfo())
431 func (ps *peerSet) peersWithoutBlock(hash *bc.Hash) []*peer {
433 defer ps.mtx.RUnlock()
436 for _, peer := range ps.peers {
437 if !peer.knownBlocks.Has(hash.String()) {
438 peers = append(peers, peer)
444 func (ps *peerSet) peersWithoutTx(hash *bc.Hash) []*peer {
446 defer ps.mtx.RUnlock()
449 for _, peer := range ps.peers {
450 if !peer.knownTxs.Has(hash.String()) {
451 peers = append(peers, peer)
457 func (ps *peerSet) removePeer(peerID string) {
459 delete(ps.peers, peerID)
461 ps.StopPeerGracefully(peerID)