defaultBanThreshold = uint32(100)
)
+var errSendStatusMsg = errors.New("send status msg fail")
+
//BasePeer is the interface for connection level peer
type BasePeer interface {
Addr() net.Addr
banScore trust.DynamicBanScore
knownTxs *set.Set // Set of transaction hashes known to be known by this peer
knownBlocks *set.Set // Set of block hashes known to be known by this peer
+ knownStatus uint64 // Set of chain status known to be known by this peer
filterAdds *set.Set // Set of addresses that the spv node cares about.
}
-func newPeer(height uint64, hash *bc.Hash, basePeer BasePeer) *peer {
+func newPeer(basePeer BasePeer) *peer {
return &peer{
BasePeer: basePeer,
services: basePeer.ServiceFlag(),
- height: height,
- hash: hash,
knownTxs: set.New(),
knownBlocks: set.New(),
filterAdds: set.New(),
}
}
for _, output := range tx.Outputs {
- if p.filterAdds.Has(hex.EncodeToString(output.ControlProgram)) {
+ if p.filterAdds.Has(hex.EncodeToString(output.ControlProgram())) {
return true
}
}
p.knownBlocks.Add(hash.String())
}
+func (p *peer) markNewStatus(height uint64) {
+ p.mtx.Lock()
+ defer p.mtx.Unlock()
+
+ p.knownStatus = height
+}
+
func (p *peer) markTransaction(hash *bc.Hash) {
p.mtx.Lock()
defer p.mtx.Unlock()
return ok, nil
}
-func (p *peer) sendTransactions(txs []*types.Tx) (bool, error) {
- for _, tx := range txs {
- if p.isSPVNode() && !p.isRelatedTx(tx) {
+func (p *peer) sendTransactions(txs []*types.Tx) error {
+ validTxs := make([]*types.Tx, 0, len(txs))
+ for i, tx := range txs {
+ if p.isSPVNode() && !p.isRelatedTx(tx) || p.knownTxs.Has(tx.ID.String()) {
continue
}
- msg, err := NewTransactionMessage(tx)
- if err != nil {
- return false, errors.Wrap(err, "failed to tx msg")
- }
- if p.knownTxs.Has(tx.ID.String()) {
+ validTxs = append(validTxs, tx)
+ if len(validTxs) != txsMsgMaxTxNum && i != len(txs)-1 {
continue
}
+
+ msg, err := NewTransactionsMessage(validTxs)
+ if err != nil {
+ return err
+ }
+
if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
- return ok, nil
+ return errors.New("failed to send txs msg")
+ }
+
+ for _, validTx := range validTxs {
+ p.knownTxs.Add(validTx.ID.String())
}
- p.knownTxs.Add(tx.ID.String())
+
+ validTxs = make([]*types.Tx, 0, len(txs))
}
- return true, nil
+
+ return nil
+}
+
+func (p *peer) sendStatus(header *types.BlockHeader) error {
+ msg := NewStatusMessage(header)
+ if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+ return errSendStatusMsg
+ }
+ p.markNewStatus(header.Height)
+ return nil
}
func (p *peer) setStatus(height uint64, hash *bc.Hash) {
ps.removePeer(peerID)
}
-func (ps *peerSet) addPeer(peer BasePeer, height uint64, hash *bc.Hash) {
+func (ps *peerSet) addPeer(peer BasePeer) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
if _, ok := ps.peers[peer.ID()]; !ok {
- ps.peers[peer.ID()] = newPeer(height, hash, peer)
+ ps.peers[peer.ID()] = newPeer(peer)
return
}
log.WithField("module", logModule).Warning("add existing peer to blockKeeper")
continue
}
peer.markBlock(&hash)
+ peer.markNewStatus(block.Height)
}
return nil
}
-func (ps *peerSet) broadcastNewStatus(bestBlock, genesisBlock *types.Block) error {
- bestBlockHash := bestBlock.Hash()
- peers := ps.peersWithoutBlock(&bestBlockHash)
-
- genesisHash := genesisBlock.Hash()
- msg := NewStatusResponseMessage(&bestBlock.BlockHeader, &genesisHash)
+func (ps *peerSet) broadcastNewStatus(bestBlock *types.Block) error {
+ msg := NewStatusMessage(&bestBlock.BlockHeader)
+ peers := ps.peersWithoutNewStatus(bestBlock.Height)
for _, peer := range peers {
if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
- log.WithFields(log.Fields{"module": logModule, "peer": peer.Addr(), "type": reflect.TypeOf(msg), "message": msg.String()}).Warning("send message to peer error")
ps.removePeer(peer.ID())
continue
}
+
+ peer.markNewStatus(bestBlock.Height)
}
return nil
}
return result
}
+func (ps *peerSet) markTx(peerID string, txHash bc.Hash) {
+ ps.mtx.Lock()
+ peer := ps.peers[peerID]
+ ps.mtx.Unlock()
+
+ if peer == nil {
+ return
+ }
+ peer.markTransaction(&txHash)
+}
+
func (ps *peerSet) peersWithoutBlock(hash *bc.Hash) []*peer {
ps.mtx.RLock()
defer ps.mtx.RUnlock()
return peers
}
+func (ps *peerSet) peersWithoutNewStatus(height uint64) []*peer {
+ ps.mtx.RLock()
+ defer ps.mtx.RUnlock()
+
+ var peers []*peer
+ for _, peer := range ps.peers {
+ if peer.knownStatus < height {
+ peers = append(peers, peer)
+ }
+ }
+ return peers
+}
+
func (ps *peerSet) peersWithoutTx(hash *bc.Hash) []*peer {
ps.mtx.RLock()
defer ps.mtx.RUnlock()