OSDN Git Service

add filter load message process (#1255)
authormuscle_boy <shenao.78@163.com>
Thu, 16 Aug 2018 02:37:21 +0000 (10:37 +0800)
committerPaladz <yzhu101@uottawa.ca>
Thu, 16 Aug 2018 02:37:21 +0000 (10:37 +0800)
* the transaction output amout prohibit set zero

* add network access control api

* format import code style

* refactor

* code refactor

* bug fix

* the struct node_info add json field

* estimate gas support multi-sign

* add testcase of estimate gas

* add testcase

* bug fix

* add test case

* test case refactor

* list-tx,list-address,list-utxo support partition

* list-addresses list-tx list-utxo support pagging

* refactor pagging

* fix save asset

* fix save external assets

* remove blank

* remove useless context

* remove redudant web address config

* fix bug

* remove useless ctx

* add spv message struct

* remove redundant

* refactor message struct

* refactor message struct

* add filter load message handler

* add debug log

* bug fix spv

* bug fix

* bug fix

* refactor

* refactor

netsync/handle.go
netsync/peer.go

index 3f45988..fe3e753 100644 (file)
@@ -165,6 +165,18 @@ func (sm *SyncManager) handleBlocksMsg(peer *peer, msg *BlocksMessage) {
        sm.blockKeeper.processBlocks(peer.ID(), blocks)
 }
 
+func (sm *SyncManager) handleFilterClearMsg(peer *peer) {
+       peer.filterAdds.Clear()
+}
+
+func (sm *SyncManager) handleFilterLoadMsg(peer *peer, msg *FilterLoadMessage) {
+       if (len(msg.Addresses) == 0) {
+               log.Info("the addresses is empty from filter load message")
+               return
+       }
+       peer.addFilterAddresses(msg.Addresses)
+}
+
 func (sm *SyncManager) handleGetBlockMsg(peer *peer, msg *GetBlockMessage) {
        var block *types.Block
        var err error
@@ -234,6 +246,8 @@ func (sm *SyncManager) handleGetHeadersMsg(peer *peer, msg *GetHeadersMessage) {
        }
 }
 
+func (sm *SyncManager) handleGetMerkleBlockMsg(peer *peer, msg *GetMerkleBlockMessage) {}
+
 func (sm *SyncManager) handleHeadersMsg(peer *peer, msg *HeadersMessage) {
        headers, err := msg.GetHeaders()
        if err != nil {
@@ -337,6 +351,15 @@ func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg Blockchai
        case *BlocksMessage:
                sm.handleBlocksMsg(peer, msg)
 
+       case *FilterLoadMessage:
+               sm.handleFilterLoadMsg(peer, msg)
+
+       case *FilterClearMessage:
+               sm.handleFilterClearMsg(peer)
+               
+       case *GetMerkleBlockMessage:
+               sm.handleGetMerkleBlockMsg(peer, msg)
+
        default:
                log.Errorf("unknown message type %v", reflect.TypeOf(msg))
        }
index 72191c1..240eacc 100644 (file)
@@ -3,6 +3,7 @@ package netsync
 import (
        "net"
        "sync"
+       "encoding/hex"
 
        log "github.com/sirupsen/logrus"
        "gopkg.in/fatih/set.v0"
@@ -51,6 +52,7 @@ type peer struct {
        banScore    trust.DynamicBanScore
        knownTxs    *set.Set // Set of transaction hashes known to be known by this peer
        knownBlocks *set.Set // Set of block hashes known to be known by this peer
+       filterAdds  *set.Set // Set of addresses that the spv node cares about.
 }
 
 func newPeer(height uint64, hash *bc.Hash, basePeer BasePeer) *peer {
@@ -61,6 +63,7 @@ func newPeer(height uint64, hash *bc.Hash, basePeer BasePeer) *peer {
                hash:        hash,
                knownTxs:    set.New(),
                knownBlocks: set.New(),
+               filterAdds:  set.New(),
        }
 }
 
@@ -84,6 +87,18 @@ func (p *peer) addBanScore(persistent, transient uint64, reason string) bool {
        return false
 }
 
+func (p *peer) addFilterAddresses(addresses [][]byte) {
+       p.mtx.Lock()
+       defer p.mtx.Unlock()
+       
+       if (!p.filterAdds.IsEmpty()) {
+               p.filterAdds.Clear()
+       }
+       for _, address := range addresses {
+               p.filterAdds.Add(hex.EncodeToString(address))
+       }
+}
+
 func (p *peer) getBlockByHeight(height uint64) bool {
        msg := struct{ BlockchainMessage }{&GetBlockMessage{Height: height}}
        return p.TrySend(BlockchainChannel, msg)
@@ -109,6 +124,27 @@ func (p *peer) getPeerInfo() *PeerInfo {
        }
 }
 
+func (p *peer) isRelatedTx(tx *types.Tx) bool {
+       for _, input := range(tx.Inputs) {
+               switch inp := input.TypedInput.(type) {
+               case *types.SpendInput:
+                       if p.filterAdds.Has(hex.EncodeToString(inp.ControlProgram)) {
+                               return true
+                       }
+               }
+       }
+       for _, output := range(tx.Outputs) {
+               if p.filterAdds.Has(hex.EncodeToString(output.ControlProgram)) {
+                       return true
+               }
+       }
+       return false
+}
+
+func (p *peer) isSPVNode() bool {
+       return !p.services.IsEnable(consensus.SFFullNode)
+}
+
 func (p *peer) markBlock(hash *bc.Hash) {
        p.mtx.Lock()
        defer p.mtx.Unlock()
@@ -172,6 +208,9 @@ func (p *peer) sendHeaders(headers []*types.BlockHeader) (bool, error) {
 
 func (p *peer) sendTransactions(txs []*types.Tx) (bool, error) {
        for _, tx := range txs {
+               if p.isSPVNode() && !p.isRelatedTx(tx) {
+                       continue
+               }
                msg, err := NewTransactionMessage(tx)
                if err != nil {
                        return false, errors.Wrap(err, "failed to tx msg")
@@ -291,6 +330,9 @@ func (ps *peerSet) broadcastTx(tx *types.Tx) error {
 
        peers := ps.peersWithoutTx(&tx.ID)
        for _, peer := range peers {
+               if peer.isSPVNode() && !peer.isRelatedTx(tx) {
+                       continue
+               }
                if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
                        ps.removePeer(peer.ID())
                        continue