// Mempool is the interface for Bytom mempool
type Mempool interface {
GetTransactions() []*core.TxDesc
+ IsDust(tx *types.Tx) bool
}
//Manager is responsible for the business layer information synchronization
return
}
+ if ok := m.mempool.IsDust(tx); ok {
+ m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "receive dust tx msg")
+ return
+ }
+
m.peers.MarkTx(peer.ID(), tx.ID)
if isOrphan, err := m.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "fail on validate tx transaction")
}
for _, tx := range txs {
+ if ok := m.mempool.IsDust(tx); ok {
+ m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "receive dust txs msg")
+ continue
+ }
+
m.peers.MarkTx(peer.ID(), tx.ID)
if isOrphan, err := m.chain.ValidateTx(tx); err != nil && !isOrphan {
m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "fail on validate tx transaction")
cmn.Exit(cmn.Fmt("Failed to load federated information:[%s]", err.Error()))
}
- if err:=vaporLog.InitLogFile(config);err!=nil{
- log.WithField("err",err).Fatalln("InitLogFile failed")
+ if err := vaporLog.InitLogFile(config); err != nil {
+ log.WithField("err", err).Fatalln("InitLogFile failed")
}
log.WithFields(log.Fields{
initCommonConfig(config)
+ mov := protocol.NewMOV()
// Get store
if config.DBBackend != "memdb" && config.DBBackend != "leveldb" {
cmn.Exit(cmn.Fmt("Param db_backend [%v] is invalid, use leveldb or memdb", config.DBBackend))
accessTokens := accesstoken.NewStore(tokenDB)
dispatcher := event.NewDispatcher()
- txPool := protocol.NewTxPool(store, dispatcher)
- chain, err := protocol.NewChain(store, txPool, dispatcher)
+ txPool := protocol.NewTxPool(store, []protocol.DustFilterer{mov}, dispatcher)
+ chain, err := protocol.NewChain(store, txPool, []protocol.Protocoler{mov}, dispatcher)
if err != nil {
cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err))
}
notificationMgr: notificationMgr,
}
- node.cpuMiner = blockproposer.NewBlockProposer(chain, accounts, txPool, dispatcher)
+ node.cpuMiner = blockproposer.NewBlockProposer(chain, accounts, txPool, []blockproposer.Preprocessor{mov}, dispatcher)
node.BaseService = *cmn.NewBaseService(nil, "Node", node)
return node
}
import (
"encoding/hex"
+ "sort"
"sync"
"time"
"github.com/vapor/event"
"github.com/vapor/proposal"
"github.com/vapor/protocol"
+ "github.com/vapor/protocol/bc/types"
)
const (
logModule = "blockproposer"
)
+type Preprocessor interface {
+ BeforeProposalBlock(txs []*types.Tx) ([]*types.Tx, error)
+}
+
// BlockProposer propose several block in specified time range
type BlockProposer struct {
sync.Mutex
chain *protocol.Chain
accountManager *account.Manager
txPool *protocol.TxPool
+ Preprocessors []Preprocessor
started bool
quit chan struct{}
eventDispatcher *event.Dispatcher
continue
}
- block, err := proposal.NewBlockTemplate(b.chain, b.txPool, b.accountManager, nextBlockTime)
+ packageTxs := []*types.Tx{}
+ txs := b.txPool.GetTransactions()
+ sort.Sort(byTime(txs))
+ for _, txDesc := range txs {
+ packageTxs = append(packageTxs, txDesc.Tx)
+ }
+
+ for i, p := range b.Preprocessors {
+ txs, err := p.BeforeProposalBlock(packageTxs)
+ if err != nil {
+ log.WithFields(log.Fields{"module": logModule, "index": i, "error": err}).Error("failed on sub protocol txs package")
+ continue
+ }
+ packageTxs = append(packageTxs, txs...)
+ }
+
+ block, err := proposal.NewBlockTemplate(b.chain, b.txPool, b.accountManager, packageTxs, nextBlockTime)
if err != nil {
log.WithFields(log.Fields{"module": logModule, "error": err}).Error("failed on create NewBlockTemplate")
continue
// NewBlockProposer returns a new instance of a block proposer for the provided configuration.
// Use Start to begin the proposal process. See the documentation for BlockProposer
// type for more details.
-func NewBlockProposer(c *protocol.Chain, accountManager *account.Manager, txPool *protocol.TxPool, dispatcher *event.Dispatcher) *BlockProposer {
+func NewBlockProposer(c *protocol.Chain, accountManager *account.Manager, txPool *protocol.TxPool, preprocessors []Preprocessor, dispatcher *event.Dispatcher) *BlockProposer {
return &BlockProposer{
chain: c,
accountManager: accountManager,
txPool: txPool,
+ Preprocessors: preprocessors,
eventDispatcher: dispatcher,
}
}
-package proposal
+package blockproposer
import "github.com/vapor/protocol"
package proposal
import (
- "sort"
"strconv"
"time"
}
// NewBlockTemplate returns a new block template that is ready to be solved
-func NewBlockTemplate(c *protocol.Chain, txPool *protocol.TxPool, accountManager *account.Manager, timestamp uint64) (b *types.Block, err error) {
+func NewBlockTemplate(c *protocol.Chain, txPool *protocol.TxPool, accountManager *account.Manager, txs []*types.Tx, timestamp uint64) (b *types.Block, err error) {
view := state.NewUtxoViewpoint()
txStatus := bc.NewTransactionStatus()
if err := txStatus.SetStatus(0, false); err != nil {
bcBlock := &bc.Block{BlockHeader: &bc.BlockHeader{Height: nextBlockHeight}}
b.Transactions = []*types.Tx{nil}
- txs := txPool.GetTransactions()
- sort.Sort(byTime(txs))
-
entriesTxs := []*bc.Tx{}
- for _, txDesc := range txs {
- entriesTxs = append(entriesTxs, txDesc.Tx.Tx)
+ for _, tx := range txs {
+ entriesTxs = append(entriesTxs, tx.Tx)
}
validateResults := validation.ValidateTxs(entriesTxs, bcBlock)
for i, validateResult := range validateResults {
- txDesc := txs[i]
- tx := txDesc.Tx.Tx
+ tx := txs[i].Tx
gasOnlyTx := false
gasStatus := validateResult.GetGasState()
return nil, err
}
- b.Transactions = append(b.Transactions, txDesc.Tx)
+ b.Transactions = append(b.Transactions, txs[i])
txEntries = append(txEntries, tx)
gasUsed += uint64(gasStatus.GasUsed)
if gasUsed == consensus.ActiveNetParams.MaxBlockGas {
return err
}
+ for _, p := range c.subProtocols {
+ if err := p.ApplyBlock(block); err != nil {
+ return errors.Wrap(err, "sub protocol connect block")
+ }
+ }
+
irrBlockHeader := c.lastIrrBlockHeader
if c.isIrreversible(&block.BlockHeader) && block.Height > irrBlockHeader.Height {
irrBlockHeader = &block.BlockHeader
return err
}
+ for _, p := range c.subProtocols {
+ if err := p.DetachBlock(b); err != nil {
+ return errors.Wrap(err, "sub protocol detach block")
+ }
+ }
+
for _, tx := range b.Transactions {
txsToRestore[tx.ID] = tx
}
return err
}
+ for _, p := range c.subProtocols {
+ if err := p.ApplyBlock(b); err != nil {
+ return errors.Wrap(err, "sub protocol attach block")
+ }
+ }
+
if consensusResult.IsFinalize() {
consensusResults = append(consensusResults, consensusResult.Fork())
}
return errors.Sub(ErrBadBlock, err)
}
+ for _, p := range c.subProtocols {
+ if err := p.ValidateBlock(bcBlock); err != nil {
+ return errors.Wrap(err, "sub protocol save block")
+ }
+ }
+
if err := c.store.SaveBlock(block, bcBlock.TransactionStatus); err != nil {
return err
}
--- /dev/null
+package protocol
+
+import (
+ "github.com/vapor/protocol/bc"
+ "github.com/vapor/protocol/bc/types"
+)
+
+// startHeight mov protocol startup height.
+const startHeight = 0
+
+type combination interface {
+ ApplyBlock(block *types.Block) error
+ BeforeProposalBlock(txs []*types.Tx) ([]*types.Tx, error)
+}
+
+type MOV struct {
+ combination combination
+}
+
+func NewMOV() *MOV {
+ return &MOV{}
+}
+
+func (m MOV) ApplyBlock(block *types.Block) error {
+ return m.combination.ApplyBlock(block)
+}
+
+func (m MOV) BeforeProposalBlock(txs []*types.Tx) ([]*types.Tx, error) {
+ return m.combination.BeforeProposalBlock(txs)
+}
+
+func (m MOV) ChainStatus() (uint64, *bc.Hash) {
+ return 0, nil
+}
+
+func (m MOV) DetachBlock(block *types.Block) error {
+ return nil
+}
+
+func (m MOV) IsDust(tx *types.Tx) bool {
+ return false
+}
+
+func (m MOV) Name() string {
+ return "MOV"
+}
+
+func (m MOV) ValidateBlock(block *bc.Block) error {
+ return nil
+}
+
+func (m MOV) ValidateTxs(txs []*bc.Tx) error {
+ return nil
+}
+
+func (m MOV) Status() (uint64, *bc.Hash){
+ return 0,nil
+}
+
+func (m MOV) SyncStatus() error {
+ return nil
+}
"github.com/vapor/common"
"github.com/vapor/config"
+ "github.com/vapor/errors"
"github.com/vapor/event"
"github.com/vapor/protocol/bc"
"github.com/vapor/protocol/bc/types"
maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
)
+type Protocoler interface {
+ Name() string
+ SyncStatus() error
+ Status() (uint64, *bc.Hash)
+ ValidateBlock(block *bc.Block) error
+ ValidateTxs(txs []*bc.Tx) error
+ ApplyBlock(block *types.Block) error
+ DetachBlock(block *types.Block) error
+}
+
// Chain provides functions for working with the Bytom block chain.
type Chain struct {
orphanManage *OrphanManage
txPool *TxPool
store Store
processBlockCh chan *processBlockMsg
+ subProtocols []Protocoler
signatureCache *common.Cache
eventDispatcher *event.Dispatcher
}
// NewChain returns a new Chain using store as the underlying storage.
-func NewChain(store Store, txPool *TxPool, eventDispatcher *event.Dispatcher) (*Chain, error) {
+func NewChain(store Store, txPool *TxPool, subProtocols []Protocoler, eventDispatcher *event.Dispatcher) (*Chain, error) {
knownTxs, _ := common.NewOrderedSet(maxKnownTxs)
c := &Chain{
orphanManage: NewOrphanManage(),
txPool: txPool,
store: store,
+ subProtocols: subProtocols,
signatureCache: common.NewCache(maxSignatureCacheSize),
eventDispatcher: eventDispatcher,
processBlockCh: make(chan *processBlockMsg, maxProcessBlockChSize),
if err != nil {
return nil, err
}
+
+ for _, p := range c.subProtocols {
+ if err := c.syncProtocolStatus(p); err != nil {
+ return nil, errors.Wrap(err, p.Name(), "sync sub protocol status")
+ }
+ }
+
go c.blockProcesser()
return c, nil
}
}
}
+func (c *Chain) syncProtocolStatus(subProtocol Protocoler) error {
+ protocolHeight, protocolHash := subProtocol.Status()
+ if protocolHeight == c.BestBlockHeight() && protocolHash == c.BestBlockHash() {
+ return nil
+ }
+
+ for !c.InMainChain(*protocolHash) {
+ block, err := c.GetBlockByHash(protocolHash)
+ if err != nil {
+ return errors.Wrap(err, subProtocol.Name(), "can't get block by hash in chain")
+ }
+
+ if err := subProtocol.DetachBlock(block); err != nil {
+ return errors.Wrap(err, subProtocol.Name(), "sub protocol detach block err")
+ }
+
+ protocolHeight, protocolHash = subProtocol.Status()
+ }
+
+ for height := protocolHeight + 1; height <= c.BestBlockHeight(); height++ {
+ block, err := c.GetBlockByHeight(height)
+ if err != nil {
+ return errors.Wrap(err, subProtocol.Name(), "can't get block by height in chain")
+ }
+
+ if err := subProtocol.ApplyBlock(block); err != nil {
+ return errors.Wrap(err, subProtocol.Name(), "sub protocol apply block err")
+ }
+
+ protocolHeight, protocolHash = subProtocol.Status()
+ if *protocolHash != block.Hash() {
+ return errors.Wrap(errors.New("sub protocol status sync err"), subProtocol.Name())
+ }
+ }
+
+ return nil
+}
+
// This function must be called with mu lock in above level
func (c *Chain) setState(blockHeader, irrBlockHeader *types.BlockHeader, mainBlockHeaders []*types.BlockHeader, view *state.UtxoViewpoint, consensusResults []*state.ConsensusResult) error {
if err := c.store.SaveChainStatus(blockHeader, irrBlockHeader, mainBlockHeaders, view, consensusResults); err != nil {
ErrDustTx = errors.New("transaction is dust tx")
)
+type DustFilterer interface {
+ IsDust(tx *types.Tx) bool
+}
+
type TxMsgEvent struct{ TxMsg *TxPoolMsg }
// TxDesc store tx and related info for mining strategy
orphans map[bc.Hash]*orphanTx
orphansByPrev map[bc.Hash]map[bc.Hash]*orphanTx
errCache *lru.Cache
+ filters []DustFilterer
eventDispatcher *event.Dispatcher
}
// NewTxPool init a new TxPool
-func NewTxPool(store Store, dispatcher *event.Dispatcher) *TxPool {
+func NewTxPool(store Store, filters []DustFilterer, dispatcher *event.Dispatcher) *TxPool {
tp := &TxPool{
lastUpdated: time.Now().Unix(),
store: store,
orphans: make(map[bc.Hash]*orphanTx),
orphansByPrev: make(map[bc.Hash]map[bc.Hash]*orphanTx),
errCache: lru.New(maxCachedErrTxs),
+ filters: filters,
eventDispatcher: dispatcher,
}
go tp.orphanExpireWorker()
//IsDust checks if a tx has zero output
func (tp *TxPool) IsDust(tx *types.Tx) bool {
- return isTransactionZeroOutput(tx)
+ if ok := isTransactionZeroOutput(tx); ok {
+ return ok
+ }
+
+ for _, filter := range tp.filters {
+ if ok := filter.IsDust(tx); ok {
+ return ok
+ }
+ }
+ return false
}
func (tp *TxPool) processTransaction(tx *types.Tx, statusFail bool, height, fee uint64) (bool, error) {