X-Git-Url: http://git.osdn.net/view?a=blobdiff_plain;ds=sidebyside;f=netsync%2Fchainmgr%2Fhandle.go;h=81baacefa3d958b09764604aa18b84ef2ac7bd61;hb=068fc645e200e34e38a75dc283e3e4f05ab15d7f;hp=8f7d679580fc2414b8fa4d3685a15f7fcb337de9;hpb=7e01ede3ce5d3688fa29f30bc766593beb9508e4;p=bytom%2Fvapor.git diff --git a/netsync/chainmgr/handle.go b/netsync/chainmgr/handle.go index 8f7d6795..81baacef 100644 --- a/netsync/chainmgr/handle.go +++ b/netsync/chainmgr/handle.go @@ -8,10 +8,12 @@ import ( cfg "github.com/vapor/config" "github.com/vapor/consensus" + dbm "github.com/vapor/database/leveldb" "github.com/vapor/event" msgs "github.com/vapor/netsync/messages" "github.com/vapor/netsync/peers" "github.com/vapor/p2p" + "github.com/vapor/p2p/security" core "github.com/vapor/protocol" "github.com/vapor/protocol/bc" "github.com/vapor/protocol/bc/types" @@ -24,6 +26,7 @@ const ( // Chain is the interface for Bytom core type Chain interface { BestBlockHeader() *types.BlockHeader + LastIrreversibleHeader() *types.BlockHeader BestBlockHeight() uint64 GetBlockByHash(*bc.Hash) (*types.Block, error) GetBlockByHeight(uint64) (*types.Block, error) @@ -37,7 +40,6 @@ type Chain interface { type Switch interface { AddReactor(name string, reactor p2p.Reactor) p2p.Reactor - AddBannedPeer(string) error Start() (bool, error) Stop() bool IsListening() bool @@ -45,16 +47,21 @@ type Switch interface { Peers() *p2p.PeerSet } -//ChainManager is responsible for the business layer information synchronization -type ChainManager struct { +// Mempool is the interface for Bytom mempool +type Mempool interface { + GetTransactions() []*core.TxDesc +} + +//Manager is responsible for the business layer information synchronization +type Manager struct { sw Switch chain Chain - txPool *core.TxPool + mempool Mempool blockKeeper *blockKeeper peers *peers.PeerSet txSyncCh chan *txSyncMsg - quitSync chan struct{} + quit chan struct{} config *cfg.Config eventDispatcher *event.Dispatcher @@ -62,15 +69,15 @@ type ChainManager struct { } //NewChainManager create a chain sync manager. -func NewChainManager(config *cfg.Config, sw Switch, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher, peers *peers.PeerSet) (*ChainManager, error) { - manager := &ChainManager{ +func NewManager(config *cfg.Config, sw Switch, chain Chain, mempool Mempool, dispatcher *event.Dispatcher, peers *peers.PeerSet, fastSyncDB dbm.DB) (*Manager, error) { + manager := &Manager{ sw: sw, - txPool: txPool, + mempool: mempool, chain: chain, - blockKeeper: newBlockKeeper(chain, peers), + blockKeeper: newBlockKeeper(chain, peers, fastSyncDB), peers: peers, txSyncCh: make(chan *txSyncMsg), - quitSync: make(chan struct{}), + quit: make(chan struct{}), config: config, eventDispatcher: dispatcher, } @@ -82,53 +89,53 @@ func NewChainManager(config *cfg.Config, sw Switch, chain Chain, txPool *core.Tx return manager, nil } -func (cm *ChainManager) AddPeer(peer peers.BasePeer) { - cm.peers.AddPeer(peer) +func (m *Manager) AddPeer(peer peers.BasePeer) { + m.peers.AddPeer(peer) } //IsCaughtUp check wheather the peer finish the sync -func (cm *ChainManager) IsCaughtUp() bool { - peer := cm.peers.BestPeer(consensus.SFFullNode) - return peer == nil || peer.Height() <= cm.chain.BestBlockHeight() +func (m *Manager) IsCaughtUp() bool { + peer := m.peers.BestPeer(consensus.SFFullNode) + return peer == nil || peer.Height() <= m.chain.BestBlockHeight() } -func (cm *ChainManager) handleBlockMsg(peer *peers.Peer, msg *msgs.BlockMessage) { +func (m *Manager) handleBlockMsg(peer *peers.Peer, msg *msgs.BlockMessage) { block, err := msg.GetBlock() if err != nil { return } - cm.blockKeeper.processBlock(peer.ID(), block) + m.blockKeeper.processBlock(peer.ID(), block) } -func (cm *ChainManager) handleBlocksMsg(peer *peers.Peer, msg *msgs.BlocksMessage) { +func (m *Manager) handleBlocksMsg(peer *peers.Peer, msg *msgs.BlocksMessage) { blocks, err := msg.GetBlocks() if err != nil { log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleBlocksMsg GetBlocks") return } - cm.blockKeeper.processBlocks(peer.ID(), blocks) + m.blockKeeper.processBlocks(peer.ID(), blocks) } -func (cm *ChainManager) handleFilterAddMsg(peer *peers.Peer, msg *msgs.FilterAddMessage) { +func (m *Manager) handleFilterAddMsg(peer *peers.Peer, msg *msgs.FilterAddMessage) { peer.AddFilterAddress(msg.Address) } -func (cm *ChainManager) handleFilterClearMsg(peer *peers.Peer) { +func (m *Manager) handleFilterClearMsg(peer *peers.Peer) { peer.FilterClear() } -func (cm *ChainManager) handleFilterLoadMsg(peer *peers.Peer, msg *msgs.FilterLoadMessage) { +func (m *Manager) handleFilterLoadMsg(peer *peers.Peer, msg *msgs.FilterLoadMessage) { peer.AddFilterAddresses(msg.Addresses) } -func (cm *ChainManager) handleGetBlockMsg(peer *peers.Peer, msg *msgs.GetBlockMessage) { +func (m *Manager) handleGetBlockMsg(peer *peers.Peer, msg *msgs.GetBlockMessage) { var block *types.Block var err error if msg.Height != 0 { - block, err = cm.chain.GetBlockByHeight(msg.Height) + block, err = m.chain.GetBlockByHeight(msg.Height) } else { - block, err = cm.chain.GetBlockByHash(msg.GetHash()) + block, err = m.chain.GetBlockByHash(msg.GetHash()) } if err != nil { log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetBlockMsg get block from chain") @@ -137,15 +144,15 @@ func (cm *ChainManager) handleGetBlockMsg(peer *peers.Peer, msg *msgs.GetBlockMe ok, err := peer.SendBlock(block) if !ok { - cm.peers.RemovePeer(peer.ID()) + m.peers.RemovePeer(peer.ID()) } if err != nil { log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlockMsg sentBlock") } } -func (cm *ChainManager) handleGetBlocksMsg(peer *peers.Peer, msg *msgs.GetBlocksMessage) { - blocks, err := cm.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash()) +func (m *Manager) handleGetBlocksMsg(peer *peers.Peer, msg *msgs.GetBlocksMessage) { + blocks, err := m.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash()) if err != nil || len(blocks) == 0 { return } @@ -156,7 +163,7 @@ func (cm *ChainManager) handleGetBlocksMsg(peer *peers.Peer, msg *msgs.GetBlocks rawData, err := block.MarshalText() if err != nil { log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg marshal block") - continue + return } if totalSize+len(rawData) > msgs.MaxBlockchainResponseSize/2 { @@ -168,15 +175,15 @@ func (cm *ChainManager) handleGetBlocksMsg(peer *peers.Peer, msg *msgs.GetBlocks ok, err := peer.SendBlocks(sendBlocks) if !ok { - cm.peers.RemovePeer(peer.ID()) + m.peers.RemovePeer(peer.ID()) } if err != nil { log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg sentBlock") } } -func (cm *ChainManager) handleGetHeadersMsg(peer *peers.Peer, msg *msgs.GetHeadersMessage) { - headers, err := cm.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash()) +func (m *Manager) handleGetHeadersMsg(peer *peers.Peer, msg *msgs.GetHeadersMessage) { + headers, err := m.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash(), msg.GetSkip(), maxNumOfHeadersPerMsg) if err != nil || len(headers) == 0 { log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleGetHeadersMsg locateHeaders") return @@ -184,20 +191,20 @@ func (cm *ChainManager) handleGetHeadersMsg(peer *peers.Peer, msg *msgs.GetHeade ok, err := peer.SendHeaders(headers) if !ok { - cm.peers.RemovePeer(peer.ID()) + m.peers.RemovePeer(peer.ID()) } if err != nil { log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetHeadersMsg sentBlock") } } -func (cm *ChainManager) handleGetMerkleBlockMsg(peer *peers.Peer, msg *msgs.GetMerkleBlockMessage) { +func (m *Manager) handleGetMerkleBlockMsg(peer *peers.Peer, msg *msgs.GetMerkleBlockMessage) { var err error var block *types.Block if msg.Height != 0 { - block, err = cm.chain.GetBlockByHeight(msg.Height) + block, err = m.chain.GetBlockByHeight(msg.Height) } else { - block, err = cm.chain.GetBlockByHash(msg.GetHash()) + block, err = m.chain.GetBlockByHash(msg.GetHash()) } if err != nil { log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get block from chain") @@ -205,7 +212,7 @@ func (cm *ChainManager) handleGetMerkleBlockMsg(peer *peers.Peer, msg *msgs.GetM } blockHash := block.Hash() - txStatus, err := cm.chain.GetTransactionStatus(&blockHash) + txStatus, err := m.chain.GetTransactionStatus(&blockHash) if err != nil { log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get transaction status") return @@ -218,63 +225,63 @@ func (cm *ChainManager) handleGetMerkleBlockMsg(peer *peers.Peer, msg *msgs.GetM } if !ok { - cm.peers.RemovePeer(peer.ID()) + m.peers.RemovePeer(peer.ID()) } } -func (cm *ChainManager) handleHeadersMsg(peer *peers.Peer, msg *msgs.HeadersMessage) { +func (m *Manager) handleHeadersMsg(peer *peers.Peer, msg *msgs.HeadersMessage) { headers, err := msg.GetHeaders() if err != nil { log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleHeadersMsg GetHeaders") return } - cm.blockKeeper.processHeaders(peer.ID(), headers) + m.blockKeeper.processHeaders(peer.ID(), headers) } -func (cm *ChainManager) handleStatusMsg(basePeer peers.BasePeer, msg *msgs.StatusMessage) { - if peer := cm.peers.GetPeer(basePeer.ID()); peer != nil { - peer.SetStatus(msg.Height, msg.GetHash()) - return +func (m *Manager) handleStatusMsg(basePeer peers.BasePeer, msg *msgs.StatusMessage) { + if peer := m.peers.GetPeer(basePeer.ID()); peer != nil { + peer.SetBestStatus(msg.BestHeight, msg.GetBestHash()) + peer.SetIrreversibleStatus(msg.IrreversibleHeight, msg.GetIrreversibleHash()) } } -func (cm *ChainManager) handleTransactionMsg(peer *peers.Peer, msg *msgs.TransactionMessage) { +func (m *Manager) handleTransactionMsg(peer *peers.Peer, msg *msgs.TransactionMessage) { tx, err := msg.GetTransaction() if err != nil { - cm.peers.AddBanScore(peer.ID(), 0, 10, "fail on get tx from message") + m.peers.ProcessIllegal(peer.ID(), security.LevelConnException, "fail on get tx from message") return } - if isOrphan, err := cm.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan { - cm.peers.AddBanScore(peer.ID(), 10, 0, "fail on validate tx transaction") + if isOrphan, err := m.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan { + m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "fail on validate tx transaction") } - cm.peers.MarkTx(peer.ID(), tx.ID) + m.peers.MarkTx(peer.ID(), tx.ID) } -func (cm *ChainManager) handleTransactionsMsg(peer *peers.Peer, msg *msgs.TransactionsMessage) { +func (m *Manager) handleTransactionsMsg(peer *peers.Peer, msg *msgs.TransactionsMessage) { txs, err := msg.GetTransactions() if err != nil { - cm.peers.AddBanScore(peer.ID(), 0, 20, "fail on get txs from message") + m.peers.ProcessIllegal(peer.ID(), security.LevelConnException, "fail on get txs from message") return } if len(txs) > msgs.TxsMsgMaxTxNum { - cm.peers.AddBanScore(peer.ID(), 20, 0, "exceeded the maximum tx number limit") + m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "exceeded the maximum tx number limit") return } for _, tx := range txs { - if isOrphan, err := cm.chain.ValidateTx(tx); err != nil && !isOrphan { - cm.peers.AddBanScore(peer.ID(), 10, 0, "fail on validate tx transaction") + if isOrphan, err := m.chain.ValidateTx(tx); err != nil && !isOrphan { + m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "fail on validate tx transaction") return } - cm.peers.MarkTx(peer.ID(), tx.ID) + m.peers.MarkTx(peer.ID(), tx.ID) } } -func (cm *ChainManager) processMsg(basePeer peers.BasePeer, msgType byte, msg msgs.BlockchainMessage) { - peer := cm.peers.GetPeer(basePeer.ID()) +func (m *Manager) processMsg(basePeer peers.BasePeer, msgType byte, msg msgs.BlockchainMessage) { + peer := m.peers.GetPeer(basePeer.ID()) if peer == nil { return } @@ -284,47 +291,47 @@ func (cm *ChainManager) processMsg(basePeer peers.BasePeer, msgType byte, msg ms "peer": basePeer.Addr(), "type": reflect.TypeOf(msg), "message": msg.String(), - }).Info("receive message from peer") + }).Debug("receive message from peer") switch msg := msg.(type) { case *msgs.GetBlockMessage: - cm.handleGetBlockMsg(peer, msg) + m.handleGetBlockMsg(peer, msg) case *msgs.BlockMessage: - cm.handleBlockMsg(peer, msg) + m.handleBlockMsg(peer, msg) case *msgs.StatusMessage: - cm.handleStatusMsg(basePeer, msg) + m.handleStatusMsg(basePeer, msg) case *msgs.TransactionMessage: - cm.handleTransactionMsg(peer, msg) + m.handleTransactionMsg(peer, msg) case *msgs.TransactionsMessage: - cm.handleTransactionsMsg(peer, msg) + m.handleTransactionsMsg(peer, msg) case *msgs.GetHeadersMessage: - cm.handleGetHeadersMsg(peer, msg) + m.handleGetHeadersMsg(peer, msg) case *msgs.HeadersMessage: - cm.handleHeadersMsg(peer, msg) + m.handleHeadersMsg(peer, msg) case *msgs.GetBlocksMessage: - cm.handleGetBlocksMsg(peer, msg) + m.handleGetBlocksMsg(peer, msg) case *msgs.BlocksMessage: - cm.handleBlocksMsg(peer, msg) + m.handleBlocksMsg(peer, msg) case *msgs.FilterLoadMessage: - cm.handleFilterLoadMsg(peer, msg) + m.handleFilterLoadMsg(peer, msg) case *msgs.FilterAddMessage: - cm.handleFilterAddMsg(peer, msg) + m.handleFilterAddMsg(peer, msg) case *msgs.FilterClearMessage: - cm.handleFilterClearMsg(peer) + m.handleFilterClearMsg(peer) case *msgs.GetMerkleBlockMessage: - cm.handleGetMerkleBlockMsg(peer, msg) + m.handleGetMerkleBlockMsg(peer, msg) default: log.WithFields(log.Fields{ @@ -335,38 +342,38 @@ func (cm *ChainManager) processMsg(basePeer peers.BasePeer, msgType byte, msg ms } } -func (cm *ChainManager) RemovePeer(peerID string) { - cm.peers.RemovePeer(peerID) +func (m *Manager) RemovePeer(peerID string) { + m.peers.RemovePeer(peerID) } -func (cm *ChainManager) SendStatus(peer peers.BasePeer) error { - p := cm.peers.GetPeer(peer.ID()) +func (m *Manager) SendStatus(peer peers.BasePeer) error { + p := m.peers.GetPeer(peer.ID()) if p == nil { return errors.New("invalid peer") } - if err := p.SendStatus(cm.chain.BestBlockHeader()); err != nil { - cm.peers.RemovePeer(p.ID()) + if err := p.SendStatus(m.chain.BestBlockHeader(), m.chain.LastIrreversibleHeader()); err != nil { + m.peers.RemovePeer(p.ID()) return err } return nil } -func (cm *ChainManager) Start() error { +func (m *Manager) Start() error { var err error - cm.txMsgSub, err = cm.eventDispatcher.Subscribe(core.TxMsgEvent{}) + m.txMsgSub, err = m.eventDispatcher.Subscribe(core.TxMsgEvent{}) if err != nil { return err } - - // broadcast transactions - go cm.txBroadcastLoop() - go cm.txSyncLoop() + m.blockKeeper.start() + go m.broadcastTxsLoop() + go m.syncMempoolLoop() return nil } //Stop stop sync manager -func (cm *ChainManager) Stop() { - close(cm.quitSync) +func (m *Manager) Stop() { + m.blockKeeper.stop() + close(m.quit) }