OSDN Git Service

Add MOV subprotol framework support
authorYahtoo Ma <yahtoo.ma@gmail.com>
Fri, 27 Sep 2019 07:00:13 +0000 (15:00 +0800)
committerYahtoo Ma <yahtoo.ma@gmail.com>
Fri, 27 Sep 2019 07:00:13 +0000 (15:00 +0800)
netsync/chainmgr/handle.go
node/node.go
proposal/blockproposer/blockproposer.go
proposal/blockproposer/sort.go [moved from proposal/sort.go with 92% similarity]
proposal/proposal.go
protocol/block.go
protocol/mov.go [new file with mode: 0644]
protocol/protocol.go
protocol/txpool.go

index 656a950..6eb116e 100644 (file)
@@ -50,6 +50,7 @@ type Switch interface {
 // Mempool is the interface for Bytom mempool
 type Mempool interface {
        GetTransactions() []*core.TxDesc
+       IsDust(tx *types.Tx) bool
 }
 
 //Manager is responsible for the business layer information synchronization
@@ -254,6 +255,11 @@ func (m *Manager) handleTransactionMsg(peer *peers.Peer, msg *msgs.TransactionMe
                return
        }
 
+       if ok := m.mempool.IsDust(tx); ok {
+               m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "receive dust tx msg")
+               return
+       }
+
        m.peers.MarkTx(peer.ID(), tx.ID)
        if isOrphan, err := m.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
                m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "fail on validate tx transaction")
@@ -273,6 +279,11 @@ func (m *Manager) handleTransactionsMsg(peer *peers.Peer, msg *msgs.Transactions
        }
 
        for _, tx := range txs {
+               if ok := m.mempool.IsDust(tx); ok {
+                       m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "receive dust txs msg")
+                       continue
+               }
+
                m.peers.MarkTx(peer.ID(), tx.ID)
                if isOrphan, err := m.chain.ValidateTx(tx); err != nil && !isOrphan {
                        m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "fail on validate tx transaction")
index c160f17..b305e37 100644 (file)
@@ -66,8 +66,8 @@ func NewNode(config *cfg.Config) *Node {
                cmn.Exit(cmn.Fmt("Failed to load federated information:[%s]", err.Error()))
        }
 
-       if err:=vaporLog.InitLogFile(config);err!=nil{
-               log.WithField("err",err).Fatalln("InitLogFile failed")
+       if err := vaporLog.InitLogFile(config); err != nil {
+               log.WithField("err", err).Fatalln("InitLogFile failed")
        }
 
        log.WithFields(log.Fields{
@@ -84,6 +84,7 @@ func NewNode(config *cfg.Config) *Node {
 
        initCommonConfig(config)
 
+       mov := protocol.NewMOV()
        // Get store
        if config.DBBackend != "memdb" && config.DBBackend != "leveldb" {
                cmn.Exit(cmn.Fmt("Param db_backend [%v] is invalid, use leveldb or memdb", config.DBBackend))
@@ -95,8 +96,8 @@ func NewNode(config *cfg.Config) *Node {
        accessTokens := accesstoken.NewStore(tokenDB)
 
        dispatcher := event.NewDispatcher()
-       txPool := protocol.NewTxPool(store, dispatcher)
-       chain, err := protocol.NewChain(store, txPool, dispatcher)
+       txPool := protocol.NewTxPool(store, []protocol.DustFilterer{mov}, dispatcher)
+       chain, err := protocol.NewChain(store, txPool, []protocol.Protocoler{mov}, dispatcher)
        if err != nil {
                cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err))
        }
@@ -162,7 +163,7 @@ func NewNode(config *cfg.Config) *Node {
                notificationMgr: notificationMgr,
        }
 
-       node.cpuMiner = blockproposer.NewBlockProposer(chain, accounts, txPool, dispatcher)
+       node.cpuMiner = blockproposer.NewBlockProposer(chain, accounts, txPool, []blockproposer.Preprocessor{mov}, dispatcher)
        node.BaseService = *cmn.NewBaseService(nil, "Node", node)
        return node
 }
index 7f8c1da..7216e9b 100644 (file)
@@ -2,6 +2,7 @@ package blockproposer
 
 import (
        "encoding/hex"
+       "sort"
        "sync"
        "time"
 
@@ -13,18 +14,24 @@ import (
        "github.com/vapor/event"
        "github.com/vapor/proposal"
        "github.com/vapor/protocol"
+       "github.com/vapor/protocol/bc/types"
 )
 
 const (
        logModule = "blockproposer"
 )
 
+type Preprocessor interface {
+       BeforeProposalBlock(txs []*types.Tx) ([]*types.Tx, error)
+}
+
 // BlockProposer propose several block in specified time range
 type BlockProposer struct {
        sync.Mutex
        chain           *protocol.Chain
        accountManager  *account.Manager
        txPool          *protocol.TxPool
+       Preprocessors   []Preprocessor
        started         bool
        quit            chan struct{}
        eventDispatcher *event.Dispatcher
@@ -74,7 +81,23 @@ func (b *BlockProposer) generateBlocks() {
                        continue
                }
 
-               block, err := proposal.NewBlockTemplate(b.chain, b.txPool, b.accountManager, nextBlockTime)
+               packageTxs := []*types.Tx{}
+               txs := b.txPool.GetTransactions()
+               sort.Sort(byTime(txs))
+               for _, txDesc := range txs {
+                       packageTxs = append(packageTxs, txDesc.Tx)
+               }
+
+               for i, p := range b.Preprocessors {
+                       txs, err := p.BeforeProposalBlock(packageTxs)
+                       if err != nil {
+                               log.WithFields(log.Fields{"module": logModule, "index": i, "error": err}).Error("failed on sub protocol txs package")
+                               continue
+                       }
+                       packageTxs = append(packageTxs, txs...)
+               }
+
+               block, err := proposal.NewBlockTemplate(b.chain, b.txPool, b.accountManager, packageTxs, nextBlockTime)
                if err != nil {
                        log.WithFields(log.Fields{"module": logModule, "error": err}).Error("failed on create NewBlockTemplate")
                        continue
@@ -147,11 +170,12 @@ func (b *BlockProposer) IsProposing() bool {
 // NewBlockProposer returns a new instance of a block proposer for the provided configuration.
 // Use Start to begin the proposal process.  See the documentation for BlockProposer
 // type for more details.
-func NewBlockProposer(c *protocol.Chain, accountManager *account.Manager, txPool *protocol.TxPool, dispatcher *event.Dispatcher) *BlockProposer {
+func NewBlockProposer(c *protocol.Chain, accountManager *account.Manager, txPool *protocol.TxPool, preprocessors []Preprocessor, dispatcher *event.Dispatcher) *BlockProposer {
        return &BlockProposer{
                chain:           c,
                accountManager:  accountManager,
                txPool:          txPool,
+               Preprocessors:   preprocessors,
                eventDispatcher: dispatcher,
        }
 }
similarity index 92%
rename from proposal/sort.go
rename to proposal/blockproposer/sort.go
index 45e80e2..02a3a5e 100644 (file)
@@ -1,4 +1,4 @@
-package proposal
+package blockproposer
 
 import "github.com/vapor/protocol"
 
index 50d85d1..319d019 100644 (file)
@@ -1,7 +1,6 @@
 package proposal
 
 import (
-       "sort"
        "strconv"
        "time"
 
@@ -74,7 +73,7 @@ func createCoinbaseTx(accountManager *account.Manager, blockHeight uint64, rewar
 }
 
 // NewBlockTemplate returns a new block template that is ready to be solved
-func NewBlockTemplate(c *protocol.Chain, txPool *protocol.TxPool, accountManager *account.Manager, timestamp uint64) (b *types.Block, err error) {
+func NewBlockTemplate(c *protocol.Chain, txPool *protocol.TxPool, accountManager *account.Manager, txs []*types.Tx, timestamp uint64) (b *types.Block, err error) {
        view := state.NewUtxoViewpoint()
        txStatus := bc.NewTransactionStatus()
        if err := txStatus.SetStatus(0, false); err != nil {
@@ -101,18 +100,14 @@ func NewBlockTemplate(c *protocol.Chain, txPool *protocol.TxPool, accountManager
        bcBlock := &bc.Block{BlockHeader: &bc.BlockHeader{Height: nextBlockHeight}}
        b.Transactions = []*types.Tx{nil}
 
-       txs := txPool.GetTransactions()
-       sort.Sort(byTime(txs))
-
        entriesTxs := []*bc.Tx{}
-       for _, txDesc := range txs {
-               entriesTxs = append(entriesTxs, txDesc.Tx.Tx)
+       for _, tx := range txs {
+               entriesTxs = append(entriesTxs, tx.Tx)
        }
 
        validateResults := validation.ValidateTxs(entriesTxs, bcBlock)
        for i, validateResult := range validateResults {
-               txDesc := txs[i]
-               tx := txDesc.Tx.Tx
+               tx := txs[i].Tx
                gasOnlyTx := false
 
                gasStatus := validateResult.GetGasState()
@@ -142,7 +137,7 @@ func NewBlockTemplate(c *protocol.Chain, txPool *protocol.TxPool, accountManager
                        return nil, err
                }
 
-               b.Transactions = append(b.Transactions, txDesc.Tx)
+               b.Transactions = append(b.Transactions, txs[i])
                txEntries = append(txEntries, tx)
                gasUsed += uint64(gasStatus.GasUsed)
                if gasUsed == consensus.ActiveNetParams.MaxBlockGas {
index 86c9650..b7e6240 100644 (file)
@@ -110,6 +110,12 @@ func (c *Chain) connectBlock(block *types.Block) (err error) {
                return err
        }
 
+       for _, p := range c.subProtocols {
+               if err := p.ApplyBlock(block); err != nil {
+                       return errors.Wrap(err, "sub protocol connect block")
+               }
+       }
+
        irrBlockHeader := c.lastIrrBlockHeader
        if c.isIrreversible(&block.BlockHeader) && block.Height > irrBlockHeader.Height {
                irrBlockHeader = &block.BlockHeader
@@ -164,6 +170,12 @@ func (c *Chain) reorganizeChain(blockHeader *types.BlockHeader) error {
                        return err
                }
 
+               for _, p := range c.subProtocols {
+                       if err := p.DetachBlock(b); err != nil {
+                               return errors.Wrap(err, "sub protocol detach block")
+                       }
+               }
+
                for _, tx := range b.Transactions {
                        txsToRestore[tx.ID] = tx
                }
@@ -199,6 +211,12 @@ func (c *Chain) reorganizeChain(blockHeader *types.BlockHeader) error {
                        return err
                }
 
+               for _, p := range c.subProtocols {
+                       if err := p.ApplyBlock(b); err != nil {
+                               return errors.Wrap(err, "sub protocol attach block")
+                       }
+               }
+
                if consensusResult.IsFinalize() {
                        consensusResults = append(consensusResults, consensusResult.Fork())
                }
@@ -280,6 +298,12 @@ func (c *Chain) saveBlock(block *types.Block) error {
                return errors.Sub(ErrBadBlock, err)
        }
 
+       for _, p := range c.subProtocols {
+               if err := p.ValidateBlock(bcBlock); err != nil {
+                       return errors.Wrap(err, "sub protocol save block")
+               }
+       }
+
        if err := c.store.SaveBlock(block, bcBlock.TransactionStatus); err != nil {
                return err
        }
diff --git a/protocol/mov.go b/protocol/mov.go
new file mode 100644 (file)
index 0000000..b9bec61
--- /dev/null
@@ -0,0 +1,62 @@
+package protocol
+
+import (
+       "github.com/vapor/protocol/bc"
+       "github.com/vapor/protocol/bc/types"
+)
+
+// startHeight mov protocol startup height.
+const startHeight = 0
+
+type combination interface {
+       ApplyBlock(block *types.Block) error
+       BeforeProposalBlock(txs []*types.Tx) ([]*types.Tx, error)
+}
+
+type MOV struct {
+       combination combination
+}
+
+func NewMOV() *MOV {
+       return &MOV{}
+}
+
+func (m MOV) ApplyBlock(block *types.Block) error {
+       return m.combination.ApplyBlock(block)
+}
+
+func (m MOV) BeforeProposalBlock(txs []*types.Tx) ([]*types.Tx, error) {
+       return m.combination.BeforeProposalBlock(txs)
+}
+
+func (m MOV) ChainStatus() (uint64, *bc.Hash) {
+       return 0, nil
+}
+
+func (m MOV) DetachBlock(block *types.Block) error {
+       return nil
+}
+
+func (m MOV) IsDust(tx *types.Tx) bool {
+       return false
+}
+
+func (m MOV) Name() string {
+       return "MOV"
+}
+
+func (m MOV) ValidateBlock(block *bc.Block) error {
+       return nil
+}
+
+func (m MOV) ValidateTxs(txs []*bc.Tx) error {
+       return nil
+}
+
+func (m MOV) Status() (uint64, *bc.Hash){
+       return 0,nil
+}
+
+func (m MOV) SyncStatus() error {
+       return nil
+}
index d7ea944..a91b9db 100644 (file)
@@ -7,6 +7,7 @@ import (
 
        "github.com/vapor/common"
        "github.com/vapor/config"
+       "github.com/vapor/errors"
        "github.com/vapor/event"
        "github.com/vapor/protocol/bc"
        "github.com/vapor/protocol/bc/types"
@@ -18,12 +19,23 @@ const (
        maxKnownTxs           = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
 )
 
+type Protocoler interface {
+       Name() string
+       SyncStatus() error
+       Status() (uint64, *bc.Hash)
+       ValidateBlock(block *bc.Block) error
+       ValidateTxs(txs []*bc.Tx) error
+       ApplyBlock(block *types.Block) error
+       DetachBlock(block *types.Block) error
+}
+
 // Chain provides functions for working with the Bytom block chain.
 type Chain struct {
        orphanManage   *OrphanManage
        txPool         *TxPool
        store          Store
        processBlockCh chan *processBlockMsg
+       subProtocols   []Protocoler
 
        signatureCache  *common.Cache
        eventDispatcher *event.Dispatcher
@@ -36,12 +48,13 @@ type Chain struct {
 }
 
 // NewChain returns a new Chain using store as the underlying storage.
-func NewChain(store Store, txPool *TxPool, eventDispatcher *event.Dispatcher) (*Chain, error) {
+func NewChain(store Store, txPool *TxPool, subProtocols []Protocoler, eventDispatcher *event.Dispatcher) (*Chain, error) {
        knownTxs, _ := common.NewOrderedSet(maxKnownTxs)
        c := &Chain{
                orphanManage:    NewOrphanManage(),
                txPool:          txPool,
                store:           store,
+               subProtocols:    subProtocols,
                signatureCache:  common.NewCache(maxSignatureCacheSize),
                eventDispatcher: eventDispatcher,
                processBlockCh:  make(chan *processBlockMsg, maxProcessBlockChSize),
@@ -67,6 +80,13 @@ func NewChain(store Store, txPool *TxPool, eventDispatcher *event.Dispatcher) (*
        if err != nil {
                return nil, err
        }
+
+       for _, p := range c.subProtocols {
+               if err := c.syncProtocolStatus(p); err != nil {
+                       return nil, errors.Wrap(err, p.Name(), "sync sub protocol status")
+               }
+       }
+
        go c.blockProcesser()
        return c, nil
 }
@@ -182,6 +202,44 @@ func (c *Chain) markTransactions(txs ...*types.Tx) {
        }
 }
 
+func (c *Chain) syncProtocolStatus(subProtocol Protocoler) error {
+       protocolHeight, protocolHash := subProtocol.Status()
+       if protocolHeight == c.BestBlockHeight() && protocolHash == c.BestBlockHash() {
+               return nil
+       }
+
+       for !c.InMainChain(*protocolHash) {
+               block, err := c.GetBlockByHash(protocolHash)
+               if err != nil {
+                       return errors.Wrap(err, subProtocol.Name(), "can't get block by hash in chain")
+               }
+
+               if err := subProtocol.DetachBlock(block); err != nil {
+                       return errors.Wrap(err, subProtocol.Name(), "sub protocol detach block err")
+               }
+
+               protocolHeight, protocolHash = subProtocol.Status()
+       }
+
+       for height := protocolHeight + 1; height <= c.BestBlockHeight(); height++ {
+               block, err := c.GetBlockByHeight(height)
+               if err != nil {
+                       return errors.Wrap(err, subProtocol.Name(), "can't get block by height in chain")
+               }
+
+               if err := subProtocol.ApplyBlock(block); err != nil {
+                       return errors.Wrap(err, subProtocol.Name(), "sub protocol apply block err")
+               }
+
+               protocolHeight, protocolHash = subProtocol.Status()
+               if *protocolHash != block.Hash() {
+                       return errors.Wrap(errors.New("sub protocol status sync err"), subProtocol.Name())
+               }
+       }
+
+       return nil
+}
+
 // This function must be called with mu lock in above level
 func (c *Chain) setState(blockHeader, irrBlockHeader *types.BlockHeader, mainBlockHeaders []*types.BlockHeader, view *state.UtxoViewpoint, consensusResults []*state.ConsensusResult) error {
        if err := c.store.SaveChainStatus(blockHeader, irrBlockHeader, mainBlockHeaders, view, consensusResults); err != nil {
index 09e32b1..ae237bd 100644 (file)
@@ -43,6 +43,10 @@ var (
        ErrDustTx = errors.New("transaction is dust tx")
 )
 
+type DustFilterer interface {
+       IsDust(tx *types.Tx) bool
+}
+
 type TxMsgEvent struct{ TxMsg *TxPoolMsg }
 
 // TxDesc store tx and related info for mining strategy
@@ -76,11 +80,12 @@ type TxPool struct {
        orphans         map[bc.Hash]*orphanTx
        orphansByPrev   map[bc.Hash]map[bc.Hash]*orphanTx
        errCache        *lru.Cache
+       filters         []DustFilterer
        eventDispatcher *event.Dispatcher
 }
 
 // NewTxPool init a new TxPool
-func NewTxPool(store Store, dispatcher *event.Dispatcher) *TxPool {
+func NewTxPool(store Store, filters []DustFilterer, dispatcher *event.Dispatcher) *TxPool {
        tp := &TxPool{
                lastUpdated:     time.Now().Unix(),
                store:           store,
@@ -89,6 +94,7 @@ func NewTxPool(store Store, dispatcher *event.Dispatcher) *TxPool {
                orphans:         make(map[bc.Hash]*orphanTx),
                orphansByPrev:   make(map[bc.Hash]map[bc.Hash]*orphanTx),
                errCache:        lru.New(maxCachedErrTxs),
+               filters:         filters,
                eventDispatcher: dispatcher,
        }
        go tp.orphanExpireWorker()
@@ -212,7 +218,16 @@ func isTransactionZeroOutput(tx *types.Tx) bool {
 
 //IsDust checks if a tx has zero output
 func (tp *TxPool) IsDust(tx *types.Tx) bool {
-       return isTransactionZeroOutput(tx)
+       if ok := isTransactionZeroOutput(tx); ok {
+               return ok
+       }
+
+       for _, filter := range tp.filters {
+               if ok := filter.IsDust(tx); ok {
+                       return ok
+               }
+       }
+       return false
 }
 
 func (tp *TxPool) processTransaction(tx *types.Tx, statusFail bool, height, fee uint64) (bool, error) {