Peers() *p2p.PeerSet
}
-//ChainManager is responsible for the business layer information synchronization
-type ChainManager struct {
+//Manager is responsible for the business layer information synchronization
+type Manager struct {
sw Switch
chain Chain
txPool *core.TxPool
}
//NewChainManager create a chain sync manager.
-func NewChainManager(config *cfg.Config, sw Switch, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher, peers *peers.PeerSet) (*ChainManager, error) {
- manager := &ChainManager{
+func NewManager(config *cfg.Config, sw Switch, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher, peers *peers.PeerSet) (*Manager, error) {
+ manager := &Manager{
sw: sw,
txPool: txPool,
chain: chain,
return manager, nil
}
-func (cm *ChainManager) AddPeer(peer peers.BasePeer) {
- cm.peers.AddPeer(peer)
+func (m *Manager) AddPeer(peer peers.BasePeer) {
+ m.peers.AddPeer(peer)
}
//IsCaughtUp check wheather the peer finish the sync
-func (cm *ChainManager) IsCaughtUp() bool {
- peer := cm.peers.BestPeer(consensus.SFFullNode)
- return peer == nil || peer.Height() <= cm.chain.BestBlockHeight()
+func (m *Manager) IsCaughtUp() bool {
+ peer := m.peers.BestPeer(consensus.SFFullNode)
+ return peer == nil || peer.Height() <= m.chain.BestBlockHeight()
}
-func (cm *ChainManager) handleBlockMsg(peer *peers.Peer, msg *msgs.BlockMessage) {
+func (m *Manager) handleBlockMsg(peer *peers.Peer, msg *msgs.BlockMessage) {
block, err := msg.GetBlock()
if err != nil {
return
}
- cm.blockKeeper.processBlock(peer.ID(), block)
+ m.blockKeeper.processBlock(peer.ID(), block)
}
-func (cm *ChainManager) handleBlocksMsg(peer *peers.Peer, msg *msgs.BlocksMessage) {
+func (m *Manager) handleBlocksMsg(peer *peers.Peer, msg *msgs.BlocksMessage) {
blocks, err := msg.GetBlocks()
if err != nil {
log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleBlocksMsg GetBlocks")
return
}
- cm.blockKeeper.processBlocks(peer.ID(), blocks)
+ m.blockKeeper.processBlocks(peer.ID(), blocks)
}
-func (cm *ChainManager) handleFilterAddMsg(peer *peers.Peer, msg *msgs.FilterAddMessage) {
+func (m *Manager) handleFilterAddMsg(peer *peers.Peer, msg *msgs.FilterAddMessage) {
peer.AddFilterAddress(msg.Address)
}
-func (cm *ChainManager) handleFilterClearMsg(peer *peers.Peer) {
+func (m *Manager) handleFilterClearMsg(peer *peers.Peer) {
peer.FilterClear()
}
-func (cm *ChainManager) handleFilterLoadMsg(peer *peers.Peer, msg *msgs.FilterLoadMessage) {
+func (m *Manager) handleFilterLoadMsg(peer *peers.Peer, msg *msgs.FilterLoadMessage) {
peer.AddFilterAddresses(msg.Addresses)
}
-func (cm *ChainManager) handleGetBlockMsg(peer *peers.Peer, msg *msgs.GetBlockMessage) {
+func (m *Manager) handleGetBlockMsg(peer *peers.Peer, msg *msgs.GetBlockMessage) {
var block *types.Block
var err error
if msg.Height != 0 {
- block, err = cm.chain.GetBlockByHeight(msg.Height)
+ block, err = m.chain.GetBlockByHeight(msg.Height)
} else {
- block, err = cm.chain.GetBlockByHash(msg.GetHash())
+ block, err = m.chain.GetBlockByHash(msg.GetHash())
}
if err != nil {
log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetBlockMsg get block from chain")
ok, err := peer.SendBlock(block)
if !ok {
- cm.peers.RemovePeer(peer.ID())
+ m.peers.RemovePeer(peer.ID())
}
if err != nil {
log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlockMsg sentBlock")
}
}
-func (cm *ChainManager) handleGetBlocksMsg(peer *peers.Peer, msg *msgs.GetBlocksMessage) {
- blocks, err := cm.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
+func (m *Manager) handleGetBlocksMsg(peer *peers.Peer, msg *msgs.GetBlocksMessage) {
+ blocks, err := m.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
if err != nil || len(blocks) == 0 {
return
}
ok, err := peer.SendBlocks(sendBlocks)
if !ok {
- cm.peers.RemovePeer(peer.ID())
+ m.peers.RemovePeer(peer.ID())
}
if err != nil {
log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg sentBlock")
}
}
-func (cm *ChainManager) handleGetHeadersMsg(peer *peers.Peer, msg *msgs.GetHeadersMessage) {
- headers, err := cm.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash())
+func (m *Manager) handleGetHeadersMsg(peer *peers.Peer, msg *msgs.GetHeadersMessage) {
+ headers, err := m.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash())
if err != nil || len(headers) == 0 {
log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleGetHeadersMsg locateHeaders")
return
ok, err := peer.SendHeaders(headers)
if !ok {
- cm.peers.RemovePeer(peer.ID())
+ m.peers.RemovePeer(peer.ID())
}
if err != nil {
log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetHeadersMsg sentBlock")
}
}
-func (cm *ChainManager) handleGetMerkleBlockMsg(peer *peers.Peer, msg *msgs.GetMerkleBlockMessage) {
+func (m *Manager) handleGetMerkleBlockMsg(peer *peers.Peer, msg *msgs.GetMerkleBlockMessage) {
var err error
var block *types.Block
if msg.Height != 0 {
- block, err = cm.chain.GetBlockByHeight(msg.Height)
+ block, err = m.chain.GetBlockByHeight(msg.Height)
} else {
- block, err = cm.chain.GetBlockByHash(msg.GetHash())
+ block, err = m.chain.GetBlockByHash(msg.GetHash())
}
if err != nil {
log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get block from chain")
}
blockHash := block.Hash()
- txStatus, err := cm.chain.GetTransactionStatus(&blockHash)
+ txStatus, err := m.chain.GetTransactionStatus(&blockHash)
if err != nil {
log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get transaction status")
return
}
if !ok {
- cm.peers.RemovePeer(peer.ID())
+ m.peers.RemovePeer(peer.ID())
}
}
-func (cm *ChainManager) handleHeadersMsg(peer *peers.Peer, msg *msgs.HeadersMessage) {
+func (m *Manager) handleHeadersMsg(peer *peers.Peer, msg *msgs.HeadersMessage) {
headers, err := msg.GetHeaders()
if err != nil {
log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleHeadersMsg GetHeaders")
return
}
- cm.blockKeeper.processHeaders(peer.ID(), headers)
+ m.blockKeeper.processHeaders(peer.ID(), headers)
}
-func (cm *ChainManager) handleStatusMsg(basePeer peers.BasePeer, msg *msgs.StatusMessage) {
- if peer := cm.peers.GetPeer(basePeer.ID()); peer != nil {
+func (m *Manager) handleStatusMsg(basePeer peers.BasePeer, msg *msgs.StatusMessage) {
+ if peer := m.peers.GetPeer(basePeer.ID()); peer != nil {
peer.SetStatus(msg.Height, msg.GetHash())
return
}
}
-func (cm *ChainManager) handleTransactionMsg(peer *peers.Peer, msg *msgs.TransactionMessage) {
+func (m *Manager) handleTransactionMsg(peer *peers.Peer, msg *msgs.TransactionMessage) {
tx, err := msg.GetTransaction()
if err != nil {
- cm.peers.AddBanScore(peer.ID(), 0, 10, "fail on get tx from message")
+ m.peers.AddBanScore(peer.ID(), 0, 10, "fail on get tx from message")
return
}
- if isOrphan, err := cm.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
- cm.peers.AddBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
+ if isOrphan, err := m.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
+ m.peers.AddBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
}
- cm.peers.MarkTx(peer.ID(), tx.ID)
+ m.peers.MarkTx(peer.ID(), tx.ID)
}
-func (cm *ChainManager) handleTransactionsMsg(peer *peers.Peer, msg *msgs.TransactionsMessage) {
+func (m *Manager) handleTransactionsMsg(peer *peers.Peer, msg *msgs.TransactionsMessage) {
txs, err := msg.GetTransactions()
if err != nil {
- cm.peers.AddBanScore(peer.ID(), 0, 20, "fail on get txs from message")
+ m.peers.AddBanScore(peer.ID(), 0, 20, "fail on get txs from message")
return
}
if len(txs) > msgs.TxsMsgMaxTxNum {
- cm.peers.AddBanScore(peer.ID(), 20, 0, "exceeded the maximum tx number limit")
+ m.peers.AddBanScore(peer.ID(), 20, 0, "exceeded the maximum tx number limit")
return
}
for _, tx := range txs {
- if isOrphan, err := cm.chain.ValidateTx(tx); err != nil && !isOrphan {
- cm.peers.AddBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
+ if isOrphan, err := m.chain.ValidateTx(tx); err != nil && !isOrphan {
+ m.peers.AddBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
return
}
- cm.peers.MarkTx(peer.ID(), tx.ID)
+ m.peers.MarkTx(peer.ID(), tx.ID)
}
}
-func (cm *ChainManager) processMsg(basePeer peers.BasePeer, msgType byte, msg msgs.BlockchainMessage) {
- peer := cm.peers.GetPeer(basePeer.ID())
+func (m *Manager) processMsg(basePeer peers.BasePeer, msgType byte, msg msgs.BlockchainMessage) {
+ peer := m.peers.GetPeer(basePeer.ID())
if peer == nil {
return
}
switch msg := msg.(type) {
case *msgs.GetBlockMessage:
- cm.handleGetBlockMsg(peer, msg)
+ m.handleGetBlockMsg(peer, msg)
case *msgs.BlockMessage:
- cm.handleBlockMsg(peer, msg)
+ m.handleBlockMsg(peer, msg)
case *msgs.StatusMessage:
- cm.handleStatusMsg(basePeer, msg)
+ m.handleStatusMsg(basePeer, msg)
case *msgs.TransactionMessage:
- cm.handleTransactionMsg(peer, msg)
+ m.handleTransactionMsg(peer, msg)
case *msgs.TransactionsMessage:
- cm.handleTransactionsMsg(peer, msg)
+ m.handleTransactionsMsg(peer, msg)
case *msgs.GetHeadersMessage:
- cm.handleGetHeadersMsg(peer, msg)
+ m.handleGetHeadersMsg(peer, msg)
case *msgs.HeadersMessage:
- cm.handleHeadersMsg(peer, msg)
+ m.handleHeadersMsg(peer, msg)
case *msgs.GetBlocksMessage:
- cm.handleGetBlocksMsg(peer, msg)
+ m.handleGetBlocksMsg(peer, msg)
case *msgs.BlocksMessage:
- cm.handleBlocksMsg(peer, msg)
+ m.handleBlocksMsg(peer, msg)
case *msgs.FilterLoadMessage:
- cm.handleFilterLoadMsg(peer, msg)
+ m.handleFilterLoadMsg(peer, msg)
case *msgs.FilterAddMessage:
- cm.handleFilterAddMsg(peer, msg)
+ m.handleFilterAddMsg(peer, msg)
case *msgs.FilterClearMessage:
- cm.handleFilterClearMsg(peer)
+ m.handleFilterClearMsg(peer)
case *msgs.GetMerkleBlockMessage:
- cm.handleGetMerkleBlockMsg(peer, msg)
+ m.handleGetMerkleBlockMsg(peer, msg)
default:
log.WithFields(log.Fields{
}
}
-func (cm *ChainManager) RemovePeer(peerID string) {
- cm.peers.RemovePeer(peerID)
+func (m *Manager) RemovePeer(peerID string) {
+ m.peers.RemovePeer(peerID)
}
-func (cm *ChainManager) SendStatus(peer peers.BasePeer) error {
- p := cm.peers.GetPeer(peer.ID())
+func (m *Manager) SendStatus(peer peers.BasePeer) error {
+ p := m.peers.GetPeer(peer.ID())
if p == nil {
return errors.New("invalid peer")
}
- if err := p.SendStatus(cm.chain.BestBlockHeader()); err != nil {
- cm.peers.RemovePeer(p.ID())
+ if err := p.SendStatus(m.chain.BestBlockHeader()); err != nil {
+ m.peers.RemovePeer(p.ID())
return err
}
return nil
}
-func (cm *ChainManager) Start() error {
+func (m *Manager) Start() error {
var err error
- cm.txMsgSub, err = cm.eventDispatcher.Subscribe(core.TxMsgEvent{})
+ m.txMsgSub, err = m.eventDispatcher.Subscribe(core.TxMsgEvent{})
if err != nil {
return err
}
// broadcast transactions
- go cm.txBroadcastLoop()
- go cm.txSyncLoop()
+ go m.txBroadcastLoop()
+ go m.txSyncLoop()
return nil
}
//Stop stop sync manager
-func (cm *ChainManager) Stop() {
- close(cm.quitSync)
+func (m *Manager) Stop() {
+ close(m.quitSync)
}