X-Git-Url: http://git.osdn.net/view?p=bytom%2Fvapor.git;a=blobdiff_plain;f=netsync%2Fpeers%2Fpeer.go;h=ef90812f8f4e449b909b47cede1adb696d3d5ea5;hp=0f62149e5dea56826f556e427a4710bd9fbe208f;hb=669d176c004324fe81a26261a6e41ddea95b6f17;hpb=4ec54a2924dcfde48f4b6e5904d017f2ef4bd920 diff --git a/netsync/peers/peer.go b/netsync/peers/peer.go index 0f62149e..ef90812f 100644 --- a/netsync/peers/peer.go +++ b/netsync/peers/peer.go @@ -13,7 +13,6 @@ import ( "github.com/vapor/consensus" "github.com/vapor/errors" msgs "github.com/vapor/netsync/messages" - "github.com/vapor/p2p/trust" "github.com/vapor/protocol/bc" "github.com/vapor/protocol/bc/types" ) @@ -22,7 +21,6 @@ const ( maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS) maxKnownSignatures = 1024 // Maximum block signatures to keep in the known list (prevent DOS) maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS) - defaultBanThreshold = uint32(100) maxFilterAddressSize = 50 maxFilterAddressCount = 1000 @@ -32,12 +30,14 @@ const ( var ( errSendStatusMsg = errors.New("send status msg fail") ErrPeerMisbehave = errors.New("peer is misbehave") + ErrNoValidPeer = errors.New("Can't find valid fast sync peer") ) //BasePeer is the interface for connection level peer type BasePeer interface { Addr() net.Addr ID() string + RemoteAddrHost() string ServiceFlag() consensus.ServiceFlag TrafficStatus() (*flowrate.Status, *flowrate.Status) TrySend(byte, interface{}) bool @@ -46,8 +46,8 @@ type BasePeer interface { //BasePeerSet is the intergace for connection level peer manager type BasePeerSet interface { - AddBannedPeer(string) error StopPeerGracefully(string) + IsBanned(ip string, level byte, reason string) bool } type BroadcastMsg interface { @@ -75,16 +75,17 @@ type PeerInfo struct { type Peer struct { BasePeer - mtx sync.RWMutex - services consensus.ServiceFlag - height uint64 - hash *bc.Hash - banScore trust.DynamicBanScore - knownTxs *set.Set // Set of transaction hashes known to be known by this peer - knownBlocks *set.Set // Set of block hashes known to be known by this peer - knownSignatures *set.Set // Set of block signatures known to be known by this peer - knownStatus uint64 // Set of chain status known to be known by this peer - filterAdds *set.Set // Set of addresses that the spv node cares about. + mtx sync.RWMutex + services consensus.ServiceFlag + bestHeight uint64 + bestHash *bc.Hash + irreversibleHeight uint64 + irreversibleHash *bc.Hash + knownTxs *set.Set // Set of transaction hashes known to be known by this peer + knownBlocks *set.Set // Set of block hashes known to be known by this peer + knownSignatures *set.Set // Set of block signatures known to be known by this peer + knownStatus uint64 // Set of chain status known to be known by this peer + filterAdds *set.Set // Set of addresses that the spv node cares about. } func newPeer(basePeer BasePeer) *Peer { @@ -101,31 +102,15 @@ func newPeer(basePeer BasePeer) *Peer { func (p *Peer) Height() uint64 { p.mtx.RLock() defer p.mtx.RUnlock() - return p.height -} - -func (p *Peer) addBanScore(persistent, transient uint32, reason string) bool { - score := p.banScore.Increase(persistent, transient) - if score > defaultBanThreshold { - log.WithFields(log.Fields{ - "module": logModule, - "address": p.Addr(), - "score": score, - "reason": reason, - }).Errorf("banning and disconnecting") - return true - } - - warnThreshold := defaultBanThreshold >> 1 - if score > warnThreshold { - log.WithFields(log.Fields{ - "module": logModule, - "address": p.Addr(), - "score": score, - "reason": reason, - }).Warning("ban score increasing") - } - return false + + return p.bestHeight +} + +func (p *Peer) IrreversibleHeight() uint64 { + p.mtx.RLock() + defer p.mtx.RUnlock() + + return p.irreversibleHeight } func (p *Peer) AddFilterAddress(address []byte) { @@ -167,8 +152,8 @@ func (p *Peer) GetBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool { return p.TrySend(msgs.BlockchainChannel, msg) } -func (p *Peer) GetHeaders(locator []*bc.Hash, stopHash *bc.Hash) bool { - msg := struct{ msgs.BlockchainMessage }{msgs.NewGetHeadersMessage(locator, stopHash)} +func (p *Peer) GetHeaders(locator []*bc.Hash, stopHash *bc.Hash, skip uint64) bool { + msg := struct{ msgs.BlockchainMessage }{msgs.NewGetHeadersMessage(locator, stopHash, skip)} return p.TrySend(msgs.BlockchainChannel, msg) } @@ -185,7 +170,7 @@ func (p *Peer) GetPeerInfo() *PeerInfo { return &PeerInfo{ ID: p.ID(), RemoteAddr: p.Addr().String(), - Height: p.height, + Height: p.bestHeight, Ping: ping.String(), Duration: sentStatus.Duration.String(), TotalSent: sentStatus.Bytes, @@ -387,20 +372,29 @@ func (p *Peer) SendTransactions(txs []*types.Tx) error { return nil } -func (p *Peer) SendStatus(header *types.BlockHeader) error { - msg := msgs.NewStatusMessage(header) +func (p *Peer) SendStatus(bestHeader, irreversibleHeader *types.BlockHeader) error { + msg := msgs.NewStatusMessage(bestHeader, irreversibleHeader) if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok { return errSendStatusMsg } - p.markNewStatus(header.Height) + p.markNewStatus(bestHeader.Height) return nil } -func (p *Peer) SetStatus(height uint64, hash *bc.Hash) { +func (p *Peer) SetBestStatus(bestHeight uint64, bestHash *bc.Hash) { p.mtx.Lock() defer p.mtx.Unlock() - p.height = height - p.hash = hash + + p.bestHeight = bestHeight + p.bestHash = bestHash +} + +func (p *Peer) SetIrreversibleStatus(irreversibleHeight uint64, irreversibleHash *bc.Hash) { + p.mtx.Lock() + defer p.mtx.Unlock() + + p.irreversibleHeight = irreversibleHeight + p.irreversibleHash = irreversibleHash } type PeerSet struct { @@ -417,7 +411,7 @@ func NewPeerSet(basePeerSet BasePeerSet) *PeerSet { } } -func (ps *PeerSet) AddBanScore(peerID string, persistent, transient uint32, reason string) { +func (ps *PeerSet) ProcessIllegal(peerID string, level byte, reason string) { ps.mtx.Lock() peer := ps.peers[peerID] ps.mtx.Unlock() @@ -425,13 +419,11 @@ func (ps *PeerSet) AddBanScore(peerID string, persistent, transient uint32, reas if peer == nil { return } - if ban := peer.addBanScore(persistent, transient, reason); !ban { - return - } - if err := ps.AddBannedPeer(peer.Addr().String()); err != nil { - log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on add ban peer") + + if banned := ps.IsBanned(peer.RemoteAddrHost(), level, reason); banned { + ps.RemovePeer(peerID) } - ps.RemovePeer(peerID) + return } func (ps *PeerSet) AddPeer(peer BasePeer) { @@ -454,7 +446,23 @@ func (ps *PeerSet) BestPeer(flag consensus.ServiceFlag) *Peer { if !p.services.IsEnable(flag) { continue } - if bestPeer == nil || p.height > bestPeer.height || (p.height == bestPeer.height && p.IsLAN()) { + if bestPeer == nil || p.bestHeight > bestPeer.bestHeight || (p.bestHeight == bestPeer.bestHeight && p.IsLAN()) { + bestPeer = p + } + } + return bestPeer +} + +func (ps *PeerSet) BestIrreversiblePeer(flag consensus.ServiceFlag) *Peer { + ps.mtx.RLock() + defer ps.mtx.RUnlock() + + var bestPeer *Peer + for _, p := range ps.peers { + if !p.services.IsEnable(flag) { + continue + } + if bestPeer == nil || p.irreversibleHeight > bestPeer.irreversibleHeight || (p.irreversibleHeight == bestPeer.irreversibleHeight && p.IsLAN()) { bestPeer = p } } @@ -496,16 +504,16 @@ func (ps *PeerSet) BroadcastMsg(bm BroadcastMsg) error { return nil } -func (ps *PeerSet) BroadcastNewStatus(bestBlock *types.Block) error { - msg := msgs.NewStatusMessage(&bestBlock.BlockHeader) - peers := ps.peersWithoutNewStatus(bestBlock.Height) +func (ps *PeerSet) BroadcastNewStatus(bestHeader, irreversibleHeader *types.BlockHeader) error { + msg := msgs.NewStatusMessage(bestHeader, irreversibleHeader) + peers := ps.peersWithoutNewStatus(bestHeader.Height) for _, peer := range peers { if ok := peer.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok { ps.RemovePeer(peer.ID()) continue } - peer.markNewStatus(bestBlock.Height) + peer.markNewStatus(bestHeader.Height) } return nil } @@ -536,9 +544,9 @@ func (ps *PeerSet) BroadcastTx(tx *types.Tx) error { return nil } -func (ps *PeerSet) ErrorHandler(peerID string, err error) { +func (ps *PeerSet) ErrorHandler(peerID string, level byte, err error) { if errors.Root(err) == ErrPeerMisbehave { - ps.AddBanScore(peerID, 20, 0, err.Error()) + ps.ProcessIllegal(peerID, level, err.Error()) } else { ps.RemovePeer(peerID) } @@ -551,6 +559,19 @@ func (ps *PeerSet) GetPeer(id string) *Peer { return ps.peers[id] } +func (ps *PeerSet) GetPeersByHeight(height uint64) []*Peer { + ps.mtx.RLock() + defer ps.mtx.RUnlock() + + peers := []*Peer{} + for _, peer := range ps.peers { + if peer.Height() >= height { + peers = append(peers, peer) + } + } + return peers +} + func (ps *PeerSet) GetPeerInfos() []*PeerInfo { ps.mtx.RLock() defer ps.mtx.RUnlock() @@ -649,5 +670,21 @@ func (ps *PeerSet) SetStatus(peerID string, height uint64, hash *bc.Hash) { return } - peer.SetStatus(height, hash) + peer.SetBestStatus(height, hash) +} + +func (ps *PeerSet) SetIrreversibleStatus(peerID string, height uint64, hash *bc.Hash) { + peer := ps.GetPeer(peerID) + if peer == nil { + return + } + + peer.SetIrreversibleStatus(height, hash) +} + +func (ps *PeerSet) Size() int { + ps.mtx.RLock() + defer ps.mtx.RUnlock() + + return len(ps.peers) }