X-Git-Url: http://git.osdn.net/view?p=bytom%2Fvapor.git;a=blobdiff_plain;f=netsync%2Fpeers%2Fpeer.go;h=ef90812f8f4e449b909b47cede1adb696d3d5ea5;hp=1f0ac2496b9c9c613a9ca8f9f2fa2cf53aa59cba;hb=669d176c004324fe81a26261a6e41ddea95b6f17;hpb=807d99726f6a0610fa9c835e2aabd983801d3510 diff --git a/netsync/peers/peer.go b/netsync/peers/peer.go index 1f0ac249..ef90812f 100644 --- a/netsync/peers/peer.go +++ b/netsync/peers/peer.go @@ -30,12 +30,14 @@ const ( var ( errSendStatusMsg = errors.New("send status msg fail") ErrPeerMisbehave = errors.New("peer is misbehave") + ErrNoValidPeer = errors.New("Can't find valid fast sync peer") ) //BasePeer is the interface for connection level peer type BasePeer interface { Addr() net.Addr ID() string + RemoteAddrHost() string ServiceFlag() consensus.ServiceFlag TrafficStatus() (*flowrate.Status, *flowrate.Status) TrySend(byte, interface{}) bool @@ -45,7 +47,7 @@ type BasePeer interface { //BasePeerSet is the intergace for connection level peer manager type BasePeerSet interface { StopPeerGracefully(string) - IsBanned(peerID string, level byte, reason string) bool + IsBanned(ip string, level byte, reason string) bool } type BroadcastMsg interface { @@ -73,15 +75,17 @@ type PeerInfo struct { type Peer struct { BasePeer - mtx sync.RWMutex - services consensus.ServiceFlag - height uint64 - hash *bc.Hash - knownTxs *set.Set // Set of transaction hashes known to be known by this peer - knownBlocks *set.Set // Set of block hashes known to be known by this peer - knownSignatures *set.Set // Set of block signatures known to be known by this peer - knownStatus uint64 // Set of chain status known to be known by this peer - filterAdds *set.Set // Set of addresses that the spv node cares about. + mtx sync.RWMutex + services consensus.ServiceFlag + bestHeight uint64 + bestHash *bc.Hash + irreversibleHeight uint64 + irreversibleHash *bc.Hash + knownTxs *set.Set // Set of transaction hashes known to be known by this peer + knownBlocks *set.Set // Set of block hashes known to be known by this peer + knownSignatures *set.Set // Set of block signatures known to be known by this peer + knownStatus uint64 // Set of chain status known to be known by this peer + filterAdds *set.Set // Set of addresses that the spv node cares about. } func newPeer(basePeer BasePeer) *Peer { @@ -98,7 +102,15 @@ func newPeer(basePeer BasePeer) *Peer { func (p *Peer) Height() uint64 { p.mtx.RLock() defer p.mtx.RUnlock() - return p.height + + return p.bestHeight +} + +func (p *Peer) IrreversibleHeight() uint64 { + p.mtx.RLock() + defer p.mtx.RUnlock() + + return p.irreversibleHeight } func (p *Peer) AddFilterAddress(address []byte) { @@ -140,8 +152,8 @@ func (p *Peer) GetBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool { return p.TrySend(msgs.BlockchainChannel, msg) } -func (p *Peer) GetHeaders(locator []*bc.Hash, stopHash *bc.Hash) bool { - msg := struct{ msgs.BlockchainMessage }{msgs.NewGetHeadersMessage(locator, stopHash)} +func (p *Peer) GetHeaders(locator []*bc.Hash, stopHash *bc.Hash, skip uint64) bool { + msg := struct{ msgs.BlockchainMessage }{msgs.NewGetHeadersMessage(locator, stopHash, skip)} return p.TrySend(msgs.BlockchainChannel, msg) } @@ -158,7 +170,7 @@ func (p *Peer) GetPeerInfo() *PeerInfo { return &PeerInfo{ ID: p.ID(), RemoteAddr: p.Addr().String(), - Height: p.height, + Height: p.bestHeight, Ping: ping.String(), Duration: sentStatus.Duration.String(), TotalSent: sentStatus.Bytes, @@ -360,20 +372,29 @@ func (p *Peer) SendTransactions(txs []*types.Tx) error { return nil } -func (p *Peer) SendStatus(header *types.BlockHeader) error { - msg := msgs.NewStatusMessage(header) +func (p *Peer) SendStatus(bestHeader, irreversibleHeader *types.BlockHeader) error { + msg := msgs.NewStatusMessage(bestHeader, irreversibleHeader) if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok { return errSendStatusMsg } - p.markNewStatus(header.Height) + p.markNewStatus(bestHeader.Height) return nil } -func (p *Peer) SetStatus(height uint64, hash *bc.Hash) { +func (p *Peer) SetBestStatus(bestHeight uint64, bestHash *bc.Hash) { p.mtx.Lock() defer p.mtx.Unlock() - p.height = height - p.hash = hash + + p.bestHeight = bestHeight + p.bestHash = bestHash +} + +func (p *Peer) SetIrreversibleStatus(irreversibleHeight uint64, irreversibleHash *bc.Hash) { + p.mtx.Lock() + defer p.mtx.Unlock() + + p.irreversibleHeight = irreversibleHeight + p.irreversibleHash = irreversibleHash } type PeerSet struct { @@ -398,7 +419,8 @@ func (ps *PeerSet) ProcessIllegal(peerID string, level byte, reason string) { if peer == nil { return } - if banned := ps.IsBanned(peer.Addr().String(), level, reason); banned { + + if banned := ps.IsBanned(peer.RemoteAddrHost(), level, reason); banned { ps.RemovePeer(peerID) } return @@ -424,7 +446,23 @@ func (ps *PeerSet) BestPeer(flag consensus.ServiceFlag) *Peer { if !p.services.IsEnable(flag) { continue } - if bestPeer == nil || p.height > bestPeer.height || (p.height == bestPeer.height && p.IsLAN()) { + if bestPeer == nil || p.bestHeight > bestPeer.bestHeight || (p.bestHeight == bestPeer.bestHeight && p.IsLAN()) { + bestPeer = p + } + } + return bestPeer +} + +func (ps *PeerSet) BestIrreversiblePeer(flag consensus.ServiceFlag) *Peer { + ps.mtx.RLock() + defer ps.mtx.RUnlock() + + var bestPeer *Peer + for _, p := range ps.peers { + if !p.services.IsEnable(flag) { + continue + } + if bestPeer == nil || p.irreversibleHeight > bestPeer.irreversibleHeight || (p.irreversibleHeight == bestPeer.irreversibleHeight && p.IsLAN()) { bestPeer = p } } @@ -466,16 +504,16 @@ func (ps *PeerSet) BroadcastMsg(bm BroadcastMsg) error { return nil } -func (ps *PeerSet) BroadcastNewStatus(bestBlock *types.Block) error { - msg := msgs.NewStatusMessage(&bestBlock.BlockHeader) - peers := ps.peersWithoutNewStatus(bestBlock.Height) +func (ps *PeerSet) BroadcastNewStatus(bestHeader, irreversibleHeader *types.BlockHeader) error { + msg := msgs.NewStatusMessage(bestHeader, irreversibleHeader) + peers := ps.peersWithoutNewStatus(bestHeader.Height) for _, peer := range peers { if ok := peer.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok { ps.RemovePeer(peer.ID()) continue } - peer.markNewStatus(bestBlock.Height) + peer.markNewStatus(bestHeader.Height) } return nil } @@ -521,6 +559,19 @@ func (ps *PeerSet) GetPeer(id string) *Peer { return ps.peers[id] } +func (ps *PeerSet) GetPeersByHeight(height uint64) []*Peer { + ps.mtx.RLock() + defer ps.mtx.RUnlock() + + peers := []*Peer{} + for _, peer := range ps.peers { + if peer.Height() >= height { + peers = append(peers, peer) + } + } + return peers +} + func (ps *PeerSet) GetPeerInfos() []*PeerInfo { ps.mtx.RLock() defer ps.mtx.RUnlock() @@ -619,5 +670,21 @@ func (ps *PeerSet) SetStatus(peerID string, height uint64, hash *bc.Hash) { return } - peer.SetStatus(height, hash) + peer.SetBestStatus(height, hash) +} + +func (ps *PeerSet) SetIrreversibleStatus(peerID string, height uint64, hash *bc.Hash) { + peer := ps.GetPeer(peerID) + if peer == nil { + return + } + + peer.SetIrreversibleStatus(height, hash) +} + +func (ps *PeerSet) Size() int { + ps.mtx.RLock() + defer ps.mtx.RUnlock() + + return len(ps.peers) }