defaultBanThreshold = uint32(100)
)
+var errSendStatusMsg = errors.New("send status msg fail")
+
//BasePeer is the interface for connection level peer
type BasePeer interface {
Addr() net.Addr
banScore trust.DynamicBanScore
knownTxs *set.Set // Set of transaction hashes known to be known by this peer
knownBlocks *set.Set // Set of block hashes known to be known by this peer
+ knownStatus uint64 // Set of chain status known to be known by this peer
filterAdds *set.Set // Set of addresses that the spv node cares about.
}
-func newPeer(height uint64, hash *bc.Hash, basePeer BasePeer) *peer {
+func newPeer(basePeer BasePeer) *peer {
return &peer{
BasePeer: basePeer,
services: basePeer.ServiceFlag(),
- height: height,
- hash: hash,
knownTxs: set.New(),
knownBlocks: set.New(),
filterAdds: set.New(),
p.knownBlocks.Add(hash.String())
}
+func (p *peer) markNewStatus(height uint64) {
+ p.mtx.Lock()
+ defer p.mtx.Unlock()
+
+ p.knownStatus = height
+}
+
func (p *peer) markTransaction(hash *bc.Hash) {
p.mtx.Lock()
defer p.mtx.Unlock()
return true, nil
}
+func (p *peer) sendStatus(header *types.BlockHeader) error {
+ msg := NewStatusMessage(header)
+ if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+ return errSendStatusMsg
+ }
+ p.markNewStatus(header.Height)
+ return nil
+}
+
func (p *peer) setStatus(height uint64, hash *bc.Hash) {
p.mtx.Lock()
defer p.mtx.Unlock()
ps.removePeer(peerID)
}
-func (ps *peerSet) addPeer(peer BasePeer, height uint64, hash *bc.Hash) {
+func (ps *peerSet) addPeer(peer BasePeer) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
if _, ok := ps.peers[peer.ID()]; !ok {
- ps.peers[peer.ID()] = newPeer(height, hash, peer)
+ ps.peers[peer.ID()] = newPeer(peer)
return
}
log.WithField("module", logModule).Warning("add existing peer to blockKeeper")
continue
}
peer.markBlock(&hash)
+ peer.markNewStatus(block.Height)
}
return nil
}
-func (ps *peerSet) broadcastNewStatus(bestBlock, genesisBlock *types.Block) error {
- bestBlockHash := bestBlock.Hash()
- peers := ps.peersWithoutBlock(&bestBlockHash)
-
- genesisHash := genesisBlock.Hash()
- msg := NewStatusResponseMessage(&bestBlock.BlockHeader, &genesisHash)
+func (ps *peerSet) broadcastNewStatus(bestBlock *types.Block) error {
+ msg := NewStatusMessage(&bestBlock.BlockHeader)
+ peers := ps.peersWithoutNewStatus(bestBlock.Height)
for _, peer := range peers {
if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
- log.WithFields(log.Fields{"module": logModule, "peer": peer.Addr(), "type": reflect.TypeOf(msg), "message": msg.String()}).Warning("send message to peer error")
ps.removePeer(peer.ID())
continue
}
+
+ peer.markNewStatus(bestBlock.Height)
}
return nil
}
return peers
}
+func (ps *peerSet) peersWithoutNewStatus(height uint64) []*peer {
+ ps.mtx.RLock()
+ defer ps.mtx.RUnlock()
+
+ var peers []*peer
+ for _, peer := range ps.peers {
+ if peer.knownStatus < height {
+ peers = append(peers, peer)
+ }
+ }
+ return peers
+}
+
func (ps *peerSet) peersWithoutTx(hash *bc.Hash) []*peer {
ps.mtx.RLock()
defer ps.mtx.RUnlock()