8 log "github.com/sirupsen/logrus"
10 cfg "github.com/bytom/vapor/config"
11 "github.com/bytom/vapor/consensus"
12 dbm "github.com/bytom/vapor/database/leveldb"
13 "github.com/bytom/vapor/event"
14 msgs "github.com/bytom/vapor/netsync/messages"
15 "github.com/bytom/vapor/netsync/peers"
16 "github.com/bytom/vapor/p2p"
17 "github.com/bytom/vapor/p2p/security"
18 core "github.com/bytom/vapor/protocol"
19 "github.com/bytom/vapor/protocol/bc"
20 "github.com/bytom/vapor/protocol/bc/types"
27 // Chain is the interface for Bytom core
28 type Chain interface {
29 BestBlockHeader() *types.BlockHeader
30 LastIrreversibleHeader() *types.BlockHeader
31 BestBlockHeight() uint64
32 GetBlockByHash(*bc.Hash) (*types.Block, error)
33 GetBlockByHeight(uint64) (*types.Block, error)
34 GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
35 GetHeaderByHeight(uint64) (*types.BlockHeader, error)
36 GetTransactionStatus(*bc.Hash) (*bc.TransactionStatus, error)
37 InMainChain(bc.Hash) bool
38 ProcessBlock(*types.Block) (bool, error)
39 ValidateTx(*types.Tx) (bool, error)
42 // Switch is the interface for network layer
43 type Switch interface {
44 AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
48 DialPeerWithAddress(addr *p2p.NetAddress) error
52 // Mempool is the interface for Bytom mempool
53 type Mempool interface {
54 GetTransactions() []*core.TxDesc
55 IsDust(tx *types.Tx) bool
58 //Manager is responsible for the business layer information synchronization
63 blockKeeper *blockKeeper
66 txSyncCh chan *txSyncMsg
70 eventDispatcher *event.Dispatcher
71 txMsgSub *event.Subscription
74 //NewManager create a chain sync manager.
75 func NewManager(config *cfg.Config, sw Switch, chain Chain, mempool Mempool, dispatcher *event.Dispatcher, peers *peers.PeerSet, fastSyncDB dbm.DB) (*Manager, error) {
80 blockKeeper: newBlockKeeper(chain, peers, fastSyncDB),
82 txSyncCh: make(chan *txSyncMsg),
83 quit: make(chan struct{}),
85 eventDispatcher: dispatcher,
88 if !config.VaultMode {
89 protocolReactor := NewProtocolReactor(manager)
90 manager.sw.AddReactor("PROTOCOL", protocolReactor)
95 // AddPeer add the network layer peer to logic layer
96 func (m *Manager) AddPeer(peer peers.BasePeer) {
100 //IsCaughtUp check wheather the peer finish the sync
101 func (m *Manager) IsCaughtUp() bool {
102 peer := m.peers.BestPeer(consensus.SFFullNode)
103 return peer == nil || peer.Height() <= m.chain.BestBlockHeight()
106 func (m *Manager) handleBlockMsg(peer *peers.Peer, msg *msgs.BlockMessage) {
107 block, err := msg.GetBlock()
112 m.blockKeeper.processBlock(peer.ID(), block)
115 func (m *Manager) handleBlocksMsg(peer *peers.Peer, msg *msgs.BlocksMessage) {
116 blocks, err := msg.GetBlocks()
118 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleBlocksMsg GetBlocks")
122 m.blockKeeper.processBlocks(peer.ID(), blocks)
125 func (m *Manager) handleFilterAddMsg(peer *peers.Peer, msg *msgs.FilterAddMessage) {
126 peer.AddFilterAddress(msg.Address)
129 func (m *Manager) handleFilterClearMsg(peer *peers.Peer) {
133 func (m *Manager) handleFilterLoadMsg(peer *peers.Peer, msg *msgs.FilterLoadMessage) {
134 peer.AddFilterAddresses(msg.Addresses)
137 func (m *Manager) handleGetBlockMsg(peer *peers.Peer, msg *msgs.GetBlockMessage) {
138 var block *types.Block
141 block, err = m.chain.GetBlockByHeight(msg.Height)
143 block, err = m.chain.GetBlockByHash(msg.GetHash())
146 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetBlockMsg get block from chain")
150 ok, err := peer.SendBlock(block)
152 m.peers.RemovePeer(peer.ID())
155 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlockMsg sentBlock")
159 func (m *Manager) handleGetBlocksMsg(peer *peers.Peer, msg *msgs.GetBlocksMessage) {
160 endTime := time.Now().Add(requireBlocksTimeout / 10)
161 isTimeout := func() bool {
162 return time.Now().After(endTime)
165 blocks, err := m.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash(), isTimeout)
166 if err != nil || len(blocks) == 0 {
167 log.WithFields(log.Fields{
171 }).Error("fail on handleGetBlocksMsg locateBlocks")
176 sendBlocks := []*types.Block{}
177 for _, block := range blocks {
178 rawData, err := block.MarshalText()
180 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg marshal block")
184 if totalSize+len(rawData) > msgs.MaxBlockchainResponseSize/2 {
187 totalSize += len(rawData)
188 sendBlocks = append(sendBlocks, block)
191 ok, err := peer.SendBlocks(sendBlocks)
193 m.peers.RemovePeer(peer.ID())
196 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg sentBlock")
200 func (m *Manager) handleGetHeadersMsg(peer *peers.Peer, msg *msgs.GetHeadersMessage) {
201 headers, err := m.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash(), msg.GetSkip(), maxNumOfHeadersPerMsg)
202 if err != nil || len(headers) == 0 {
203 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleGetHeadersMsg locateHeaders")
207 ok, err := peer.SendHeaders(headers)
209 m.peers.RemovePeer(peer.ID())
212 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetHeadersMsg sentBlock")
216 func (m *Manager) handleGetMerkleBlockMsg(peer *peers.Peer, msg *msgs.GetMerkleBlockMessage) {
218 var block *types.Block
220 block, err = m.chain.GetBlockByHeight(msg.Height)
222 block, err = m.chain.GetBlockByHash(msg.GetHash())
225 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get block from chain")
229 blockHash := block.Hash()
230 txStatus, err := m.chain.GetTransactionStatus(&blockHash)
232 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get transaction status")
236 ok, err := peer.SendMerkleBlock(block, txStatus)
238 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetMerkleBlockMsg sentMerkleBlock")
243 m.peers.RemovePeer(peer.ID())
247 func (m *Manager) handleHeadersMsg(peer *peers.Peer, msg *msgs.HeadersMessage) {
248 headers, err := msg.GetHeaders()
250 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleHeadersMsg GetHeaders")
254 m.blockKeeper.processHeaders(peer.ID(), headers)
257 func (m *Manager) handleStatusMsg(basePeer peers.BasePeer, msg *msgs.StatusMessage) {
258 if peer := m.peers.GetPeer(basePeer.ID()); peer != nil {
259 peer.SetBestStatus(msg.BestHeight, msg.GetBestHash())
260 peer.SetIrreversibleStatus(msg.IrreversibleHeight, msg.GetIrreversibleHash())
264 func (m *Manager) handleTransactionMsg(peer *peers.Peer, msg *msgs.TransactionMessage) {
265 tx, err := msg.GetTransaction()
267 m.peers.ProcessIllegal(peer.ID(), security.LevelConnException, "fail on get tx from message")
271 if m.mempool.IsDust(tx) {
272 log.WithFields(log.Fields{"tx_hash": tx.ID.String(), "peer": peer.Addr()}).Warn("receive dust tx msg")
276 m.peers.MarkTx(peer.ID(), tx.ID)
277 if isOrphan, err := m.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
278 m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "fail on validate tx transaction")
282 func (m *Manager) handleTransactionsMsg(peer *peers.Peer, msg *msgs.TransactionsMessage) {
283 txs, err := msg.GetTransactions()
285 m.peers.ProcessIllegal(peer.ID(), security.LevelConnException, "fail on get txs from message")
289 if len(txs) > msgs.TxsMsgMaxTxNum {
290 m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "exceeded the maximum tx number limit")
294 for _, tx := range txs {
295 if m.mempool.IsDust(tx) {
296 m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "receive dust txs msg")
300 m.peers.MarkTx(peer.ID(), tx.ID)
301 if isOrphan, err := m.chain.ValidateTx(tx); err != nil && !isOrphan {
302 m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "fail on validate tx transaction")
308 func (m *Manager) processMsg(basePeer peers.BasePeer, msgType byte, msg msgs.BlockchainMessage) {
309 peer := m.peers.GetPeer(basePeer.ID())
314 log.WithFields(log.Fields{
316 "peer": basePeer.Addr(),
317 "type": reflect.TypeOf(msg),
318 "message": msg.String(),
319 }).Debug("receive message from peer")
321 switch msg := msg.(type) {
322 case *msgs.GetBlockMessage:
323 m.handleGetBlockMsg(peer, msg)
325 case *msgs.BlockMessage:
326 m.handleBlockMsg(peer, msg)
328 case *msgs.StatusMessage:
329 m.handleStatusMsg(basePeer, msg)
331 case *msgs.TransactionMessage:
332 m.handleTransactionMsg(peer, msg)
334 case *msgs.TransactionsMessage:
335 m.handleTransactionsMsg(peer, msg)
337 case *msgs.GetHeadersMessage:
338 m.handleGetHeadersMsg(peer, msg)
340 case *msgs.HeadersMessage:
341 m.handleHeadersMsg(peer, msg)
343 case *msgs.GetBlocksMessage:
344 m.handleGetBlocksMsg(peer, msg)
346 case *msgs.BlocksMessage:
347 m.handleBlocksMsg(peer, msg)
349 case *msgs.FilterLoadMessage:
350 m.handleFilterLoadMsg(peer, msg)
352 case *msgs.FilterAddMessage:
353 m.handleFilterAddMsg(peer, msg)
355 case *msgs.FilterClearMessage:
356 m.handleFilterClearMsg(peer)
358 case *msgs.GetMerkleBlockMessage:
359 m.handleGetMerkleBlockMsg(peer, msg)
362 log.WithFields(log.Fields{
364 "peer": basePeer.Addr(),
365 "message_type": reflect.TypeOf(msg),
366 }).Error("unhandled message type")
370 // RemovePeer delete peer for peer set
371 func (m *Manager) RemovePeer(peerID string) {
372 m.peers.RemovePeer(peerID)
375 // SendStatus sent the current self status to remote peer
376 func (m *Manager) SendStatus(peer peers.BasePeer) error {
377 p := m.peers.GetPeer(peer.ID())
379 return errors.New("invalid peer")
382 if err := p.SendStatus(m.chain.BestBlockHeader(), m.chain.LastIrreversibleHeader()); err != nil {
383 m.peers.RemovePeer(p.ID())
389 // Start the network logic layer
390 func (m *Manager) Start() error {
392 m.txMsgSub, err = m.eventDispatcher.Subscribe(core.TxMsgEvent{})
396 m.blockKeeper.start()
397 go m.broadcastTxsLoop()
398 go m.syncMempoolLoop()
403 //Stop stop sync manager
404 func (m *Manager) Stop() {