"github.com/tendermint/tmlibs/flowrate"
"gopkg.in/fatih/set.v0"
- "github.com/vapor/consensus"
- "github.com/vapor/errors"
- msgs "github.com/vapor/netsync/messages"
- "github.com/vapor/p2p/trust"
- "github.com/vapor/protocol/bc"
- "github.com/vapor/protocol/bc/types"
+ "github.com/bytom/vapor/consensus"
+ "github.com/bytom/vapor/errors"
+ msgs "github.com/bytom/vapor/netsync/messages"
+ "github.com/bytom/vapor/protocol/bc"
+ "github.com/bytom/vapor/protocol/bc/types"
)
const (
maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
maxKnownSignatures = 1024 // Maximum block signatures to keep in the known list (prevent DOS)
maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
- defaultBanThreshold = uint32(100)
maxFilterAddressSize = 50
maxFilterAddressCount = 1000
var (
errSendStatusMsg = errors.New("send status msg fail")
ErrPeerMisbehave = errors.New("peer is misbehave")
+ ErrNoValidPeer = errors.New("Can't find valid fast sync peer")
)
//BasePeer is the interface for connection level peer
type BasePeer interface {
+ Moniker() string
Addr() net.Addr
ID() string
+ RemoteAddrHost() string
ServiceFlag() consensus.ServiceFlag
TrafficStatus() (*flowrate.Status, *flowrate.Status)
TrySend(byte, interface{}) bool
//BasePeerSet is the intergace for connection level peer manager
type BasePeerSet interface {
- AddBannedPeer(string) error
StopPeerGracefully(string)
+ IsBanned(ip string, level byte, reason string) bool
}
type BroadcastMsg interface {
// PeerInfo indicate peer status snap
type PeerInfo struct {
ID string `json:"peer_id"`
+ Moniker string `json:"moniker"`
RemoteAddr string `json:"remote_addr"`
Height uint64 `json:"height"`
Ping string `json:"ping"`
type Peer struct {
BasePeer
- mtx sync.RWMutex
- services consensus.ServiceFlag
- height uint64
- hash *bc.Hash
- banScore trust.DynamicBanScore
- knownTxs *set.Set // Set of transaction hashes known to be known by this peer
- knownBlocks *set.Set // Set of block hashes known to be known by this peer
- knownSignatures *set.Set // Set of block signatures known to be known by this peer
- knownStatus uint64 // Set of chain status known to be known by this peer
- filterAdds *set.Set // Set of addresses that the spv node cares about.
+ mtx sync.RWMutex
+ services consensus.ServiceFlag
+ bestHeight uint64
+ bestHash *bc.Hash
+ irreversibleHeight uint64
+ irreversibleHash *bc.Hash
+ knownTxs *set.Set // Set of transaction hashes known to be known by this peer
+ knownBlocks *set.Set // Set of block hashes known to be known by this peer
+ knownSignatures *set.Set // Set of block signatures known to be known by this peer
+ knownStatus uint64 // Set of chain status known to be known by this peer
+ filterAdds *set.Set // Set of addresses that the spv node cares about.
}
func newPeer(basePeer BasePeer) *Peer {
func (p *Peer) Height() uint64 {
p.mtx.RLock()
defer p.mtx.RUnlock()
- return p.height
-}
-
-func (p *Peer) addBanScore(persistent, transient uint32, reason string) bool {
- score := p.banScore.Increase(persistent, transient)
- if score > defaultBanThreshold {
- log.WithFields(log.Fields{
- "module": logModule,
- "address": p.Addr(),
- "score": score,
- "reason": reason,
- }).Errorf("banning and disconnecting")
- return true
- }
-
- warnThreshold := defaultBanThreshold >> 1
- if score > warnThreshold {
- log.WithFields(log.Fields{
- "module": logModule,
- "address": p.Addr(),
- "score": score,
- "reason": reason,
- }).Warning("ban score increasing")
- }
- return false
+
+ return p.bestHeight
+}
+
+func (p *Peer) IrreversibleHeight() uint64 {
+ p.mtx.RLock()
+ defer p.mtx.RUnlock()
+
+ return p.irreversibleHeight
}
func (p *Peer) AddFilterAddress(address []byte) {
return p.TrySend(msgs.BlockchainChannel, msg)
}
-func (p *Peer) GetHeaders(locator []*bc.Hash, stopHash *bc.Hash) bool {
- msg := struct{ msgs.BlockchainMessage }{msgs.NewGetHeadersMessage(locator, stopHash)}
+func (p *Peer) GetHeaders(locator []*bc.Hash, stopHash *bc.Hash, skip uint64) bool {
+ msg := struct{ msgs.BlockchainMessage }{msgs.NewGetHeadersMessage(locator, stopHash, skip)}
return p.TrySend(msgs.BlockchainChannel, msg)
}
return &PeerInfo{
ID: p.ID(),
+ Moniker: p.BasePeer.Moniker(),
RemoteAddr: p.Addr().String(),
- Height: p.height,
+ Height: p.bestHeight,
Ping: ping.String(),
Duration: sentStatus.Duration.String(),
TotalSent: sentStatus.Bytes,
for p.knownSignatures.Size() >= maxKnownSignatures {
p.knownSignatures.Pop()
}
- p.knownSignatures.Add(signature)
+ p.knownSignatures.Add(hex.EncodeToString(signature))
}
func (p *Peer) markTransaction(hash *bc.Hash) {
p.knownTxs.Add(hash.String())
}
-func (ps *PeerSet) PeersWithoutBlock(hash bc.Hash) []string {
- ps.mtx.RLock()
- defer ps.mtx.RUnlock()
-
- var peers []string
- for _, peer := range ps.peers {
- if !peer.knownBlocks.Has(hash.String()) {
- peers = append(peers, peer.ID())
- }
- }
- return peers
-}
-
-func (ps *PeerSet) PeersWithoutSign(signature []byte) []string {
- ps.mtx.RLock()
- defer ps.mtx.RUnlock()
-
- var peers []string
- for _, peer := range ps.peers {
- if !peer.knownSignatures.Has(signature) {
- peers = append(peers, peer.ID())
- }
- }
- return peers
-}
-
func (p *Peer) SendBlock(block *types.Block) (bool, error) {
msg, err := msgs.NewBlockMessage(block)
if err != nil {
return nil
}
-func (p *Peer) SendStatus(header *types.BlockHeader) error {
- msg := msgs.NewStatusMessage(header)
+func (p *Peer) SendStatus(bestHeader, irreversibleHeader *types.BlockHeader) error {
+ msg := msgs.NewStatusMessage(bestHeader, irreversibleHeader)
if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
return errSendStatusMsg
}
- p.markNewStatus(header.Height)
+ p.markNewStatus(bestHeader.Height)
return nil
}
-func (p *Peer) SetStatus(height uint64, hash *bc.Hash) {
+func (p *Peer) SetBestStatus(bestHeight uint64, bestHash *bc.Hash) {
p.mtx.Lock()
defer p.mtx.Unlock()
- p.height = height
- p.hash = hash
+
+ p.bestHeight = bestHeight
+ p.bestHash = bestHash
+}
+
+func (p *Peer) SetIrreversibleStatus(irreversibleHeight uint64, irreversibleHash *bc.Hash) {
+ p.mtx.Lock()
+ defer p.mtx.Unlock()
+
+ p.irreversibleHeight = irreversibleHeight
+ p.irreversibleHash = irreversibleHash
}
type PeerSet struct {
}
}
-func (ps *PeerSet) AddBanScore(peerID string, persistent, transient uint32, reason string) {
+func (ps *PeerSet) ProcessIllegal(peerID string, level byte, reason string) {
ps.mtx.Lock()
peer := ps.peers[peerID]
ps.mtx.Unlock()
if peer == nil {
return
}
- if ban := peer.addBanScore(persistent, transient, reason); !ban {
- return
- }
- if err := ps.AddBannedPeer(peer.Addr().String()); err != nil {
- log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on add ban peer")
+
+ if banned := ps.IsBanned(peer.RemoteAddrHost(), level, reason); banned {
+ ps.RemovePeer(peerID)
}
- ps.RemovePeer(peerID)
+ return
}
func (ps *PeerSet) AddPeer(peer BasePeer) {
if !p.services.IsEnable(flag) {
continue
}
- if bestPeer == nil || p.height > bestPeer.height || (p.height == bestPeer.height && p.IsLAN()) {
+ if bestPeer == nil || p.bestHeight > bestPeer.bestHeight || (p.bestHeight == bestPeer.bestHeight && p.IsLAN()) {
+ bestPeer = p
+ }
+ }
+ return bestPeer
+}
+
+func (ps *PeerSet) BestIrreversiblePeer(flag consensus.ServiceFlag) *Peer {
+ ps.mtx.RLock()
+ defer ps.mtx.RUnlock()
+
+ var bestPeer *Peer
+ for _, p := range ps.peers {
+ if !p.services.IsEnable(flag) {
+ continue
+ }
+ if bestPeer == nil || p.irreversibleHeight > bestPeer.irreversibleHeight || (p.irreversibleHeight == bestPeer.irreversibleHeight && p.IsLAN()) {
bestPeer = p
}
}
return nil
}
-func (ps *PeerSet) BroadcastNewStatus(bestBlock *types.Block) error {
- msg := msgs.NewStatusMessage(&bestBlock.BlockHeader)
- peers := ps.peersWithoutNewStatus(bestBlock.Height)
+func (ps *PeerSet) BroadcastNewStatus(bestHeader, irreversibleHeader *types.BlockHeader) error {
+ msg := msgs.NewStatusMessage(bestHeader, irreversibleHeader)
+ peers := ps.peersWithoutNewStatus(bestHeader.Height)
for _, peer := range peers {
if ok := peer.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
ps.RemovePeer(peer.ID())
continue
}
- peer.markNewStatus(bestBlock.Height)
+ peer.markNewStatus(bestHeader.Height)
}
return nil
}
return nil
}
-func (ps *PeerSet) ErrorHandler(peerID string, err error) {
- if errors.Root(err) == ErrPeerMisbehave {
- ps.AddBanScore(peerID, 20, 0, err.Error())
- } else {
- ps.RemovePeer(peerID)
- }
-}
-
// Peer retrieves the registered peer with the given id.
func (ps *PeerSet) GetPeer(id string) *Peer {
ps.mtx.RLock()
return ps.peers[id]
}
+func (ps *PeerSet) GetPeersByHeight(height uint64) []*Peer {
+ ps.mtx.RLock()
+ defer ps.mtx.RUnlock()
+
+ peers := []*Peer{}
+ for _, peer := range ps.peers {
+ if peer.Height() >= height {
+ peers = append(peers, peer)
+ }
+ }
+ return peers
+}
+
func (ps *PeerSet) GetPeerInfos() []*PeerInfo {
ps.mtx.RLock()
defer ps.mtx.RUnlock()
peer.markTransaction(&txHash)
}
-func (ps *PeerSet) peersWithoutBlock(hash *bc.Hash) []*Peer {
+func (ps *PeerSet) PeersWithoutBlock(hash bc.Hash) []string {
ps.mtx.RLock()
defer ps.mtx.RUnlock()
- peers := []*Peer{}
+ var peers []string
for _, peer := range ps.peers {
if !peer.knownBlocks.Has(hash.String()) {
- peers = append(peers, peer)
+ peers = append(peers, peer.ID())
+ }
+ }
+ return peers
+}
+
+func (ps *PeerSet) PeersWithoutSignature(signature []byte) []string {
+ ps.mtx.RLock()
+ defer ps.mtx.RUnlock()
+
+ var peers []string
+ for _, peer := range ps.peers {
+ if !peer.knownSignatures.Has(hex.EncodeToString(signature)) {
+ peers = append(peers, peer.ID())
}
}
return peers
return
}
- peer.SetStatus(height, hash)
+ peer.SetBestStatus(height, hash)
+}
+
+func (ps *PeerSet) SetIrreversibleStatus(peerID string, height uint64, hash *bc.Hash) {
+ peer := ps.GetPeer(peerID)
+ if peer == nil {
+ return
+ }
+
+ peer.SetIrreversibleStatus(height, hash)
}