sm.blockKeeper.processBlocks(peer.ID(), blocks)
}
+func (sm *SyncManager) handleFilterClearMsg(peer *peer) {
+ peer.filterAdds.Clear()
+}
+
+func (sm *SyncManager) handleFilterLoadMsg(peer *peer, msg *FilterLoadMessage) {
+ if (len(msg.Addresses) == 0) {
+ log.Info("the addresses is empty from filter load message")
+ return
+ }
+ peer.addFilterAddresses(msg.Addresses)
+}
+
func (sm *SyncManager) handleGetBlockMsg(peer *peer, msg *GetBlockMessage) {
var block *types.Block
var err error
}
}
+func (sm *SyncManager) handleGetMerkleBlockMsg(peer *peer, msg *GetMerkleBlockMessage) {}
+
func (sm *SyncManager) handleHeadersMsg(peer *peer, msg *HeadersMessage) {
headers, err := msg.GetHeaders()
if err != nil {
case *BlocksMessage:
sm.handleBlocksMsg(peer, msg)
+ case *FilterLoadMessage:
+ sm.handleFilterLoadMsg(peer, msg)
+
+ case *FilterClearMessage:
+ sm.handleFilterClearMsg(peer)
+
+ case *GetMerkleBlockMessage:
+ sm.handleGetMerkleBlockMsg(peer, msg)
+
default:
log.Errorf("unknown message type %v", reflect.TypeOf(msg))
}
import (
"net"
"sync"
+ "encoding/hex"
log "github.com/sirupsen/logrus"
"gopkg.in/fatih/set.v0"
banScore trust.DynamicBanScore
knownTxs *set.Set // Set of transaction hashes known to be known by this peer
knownBlocks *set.Set // Set of block hashes known to be known by this peer
+ filterAdds *set.Set // Set of addresses that the spv node cares about.
}
func newPeer(height uint64, hash *bc.Hash, basePeer BasePeer) *peer {
hash: hash,
knownTxs: set.New(),
knownBlocks: set.New(),
+ filterAdds: set.New(),
}
}
return false
}
+func (p *peer) addFilterAddresses(addresses [][]byte) {
+ p.mtx.Lock()
+ defer p.mtx.Unlock()
+
+ if (!p.filterAdds.IsEmpty()) {
+ p.filterAdds.Clear()
+ }
+ for _, address := range addresses {
+ p.filterAdds.Add(hex.EncodeToString(address))
+ }
+}
+
func (p *peer) getBlockByHeight(height uint64) bool {
msg := struct{ BlockchainMessage }{&GetBlockMessage{Height: height}}
return p.TrySend(BlockchainChannel, msg)
}
}
+func (p *peer) isRelatedTx(tx *types.Tx) bool {
+ for _, input := range(tx.Inputs) {
+ switch inp := input.TypedInput.(type) {
+ case *types.SpendInput:
+ if p.filterAdds.Has(hex.EncodeToString(inp.ControlProgram)) {
+ return true
+ }
+ }
+ }
+ for _, output := range(tx.Outputs) {
+ if p.filterAdds.Has(hex.EncodeToString(output.ControlProgram)) {
+ return true
+ }
+ }
+ return false
+}
+
+func (p *peer) isSPVNode() bool {
+ return !p.services.IsEnable(consensus.SFFullNode)
+}
+
func (p *peer) markBlock(hash *bc.Hash) {
p.mtx.Lock()
defer p.mtx.Unlock()
func (p *peer) sendTransactions(txs []*types.Tx) (bool, error) {
for _, tx := range txs {
+ if p.isSPVNode() && !p.isRelatedTx(tx) {
+ continue
+ }
msg, err := NewTransactionMessage(tx)
if err != nil {
return false, errors.Wrap(err, "failed to tx msg")
peers := ps.peersWithoutTx(&tx.ID)
for _, peer := range peers {
+ if peer.isSPVNode() && !peer.isRelatedTx(tx) {
+ continue
+ }
if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
ps.removePeer(peer.ID())
continue