msgs "github.com/vapor/netsync/messages"
"github.com/vapor/netsync/peers"
"github.com/vapor/p2p"
+ "github.com/vapor/p2p/security"
core "github.com/vapor/protocol"
"github.com/vapor/protocol/bc"
"github.com/vapor/protocol/bc/types"
type Switch interface {
AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
- AddBannedPeer(string) error
Start() (bool, error)
Stop() bool
IsListening() bool
Peers() *p2p.PeerSet
}
+// Mempool is the interface for Bytom mempool
+type Mempool interface {
+ GetTransactions() []*core.TxDesc
+}
+
//Manager is responsible for the business layer information synchronization
type Manager struct {
sw Switch
chain Chain
- txPool *core.TxPool
+ mempool Mempool
blockKeeper *blockKeeper
peers *peers.PeerSet
txSyncCh chan *txSyncMsg
- quitSync chan struct{}
+ quit chan struct{}
config *cfg.Config
eventDispatcher *event.Dispatcher
}
//NewChainManager create a chain sync manager.
-func NewManager(config *cfg.Config, sw Switch, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher, peers *peers.PeerSet) (*Manager, error) {
+func NewManager(config *cfg.Config, sw Switch, chain Chain, mempool Mempool, dispatcher *event.Dispatcher, peers *peers.PeerSet) (*Manager, error) {
manager := &Manager{
sw: sw,
- txPool: txPool,
+ mempool: mempool,
chain: chain,
blockKeeper: newBlockKeeper(chain, peers),
peers: peers,
txSyncCh: make(chan *txSyncMsg),
- quitSync: make(chan struct{}),
+ quit: make(chan struct{}),
config: config,
eventDispatcher: dispatcher,
}
func (m *Manager) handleTransactionMsg(peer *peers.Peer, msg *msgs.TransactionMessage) {
tx, err := msg.GetTransaction()
if err != nil {
- m.peers.AddBanScore(peer.ID(), 0, 10, "fail on get tx from message")
+ m.peers.ProcessIllegal(peer.ID(), security.LevelConnException, "fail on get tx from message")
return
}
if isOrphan, err := m.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
- m.peers.AddBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
+ m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "fail on validate tx transaction")
}
m.peers.MarkTx(peer.ID(), tx.ID)
}
func (m *Manager) handleTransactionsMsg(peer *peers.Peer, msg *msgs.TransactionsMessage) {
txs, err := msg.GetTransactions()
if err != nil {
- m.peers.AddBanScore(peer.ID(), 0, 20, "fail on get txs from message")
+ m.peers.ProcessIllegal(peer.ID(), security.LevelConnException, "fail on get txs from message")
return
}
if len(txs) > msgs.TxsMsgMaxTxNum {
- m.peers.AddBanScore(peer.ID(), 20, 0, "exceeded the maximum tx number limit")
+ m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "exceeded the maximum tx number limit")
return
}
for _, tx := range txs {
if isOrphan, err := m.chain.ValidateTx(tx); err != nil && !isOrphan {
- m.peers.AddBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
+ m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "fail on validate tx transaction")
return
}
m.peers.MarkTx(peer.ID(), tx.ID)
return err
}
- // broadcast transactions
- go m.txBroadcastLoop()
- go m.txSyncLoop()
+ go m.broadcastTxsLoop()
+ go m.syncMempoolLoop()
return nil
}
//Stop stop sync manager
func (m *Manager) Stop() {
- close(m.quitSync)
+ close(m.quit)
}