7 log "github.com/sirupsen/logrus"
9 cfg "github.com/vapor/config"
10 "github.com/vapor/consensus"
11 "github.com/vapor/event"
12 msgs "github.com/vapor/netsync/messages"
13 "github.com/vapor/netsync/peers"
14 "github.com/vapor/p2p"
15 "github.com/vapor/p2p/security"
16 core "github.com/vapor/protocol"
17 "github.com/vapor/protocol/bc"
18 "github.com/vapor/protocol/bc/types"
25 // Chain is the interface for Bytom core
26 type Chain interface {
27 BestBlockHeader() *types.BlockHeader
28 BestIrreversibleHeader() *types.BlockHeader
29 BestBlockHeight() uint64
30 GetBlockByHash(*bc.Hash) (*types.Block, error)
31 GetBlockByHeight(uint64) (*types.Block, error)
32 GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
33 GetHeaderByHeight(uint64) (*types.BlockHeader, error)
34 GetTransactionStatus(*bc.Hash) (*bc.TransactionStatus, error)
35 InMainChain(bc.Hash) bool
36 ProcessBlock(*types.Block) (bool, error)
37 ValidateTx(*types.Tx) (bool, error)
40 type Switch interface {
41 AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
45 DialPeerWithAddress(addr *p2p.NetAddress) error
49 // Mempool is the interface for Bytom mempool
50 type Mempool interface {
51 GetTransactions() []*core.TxDesc
54 //Manager is responsible for the business layer information synchronization
59 blockKeeper *blockKeeper
62 txSyncCh chan *txSyncMsg
66 eventDispatcher *event.Dispatcher
67 txMsgSub *event.Subscription
70 //NewChainManager create a chain sync manager.
71 func NewManager(config *cfg.Config, sw Switch, chain Chain, mempool Mempool, dispatcher *event.Dispatcher, peers *peers.PeerSet) (*Manager, error) {
76 blockKeeper: newBlockKeeper(chain, peers),
78 txSyncCh: make(chan *txSyncMsg),
79 quit: make(chan struct{}),
81 eventDispatcher: dispatcher,
84 if !config.VaultMode {
85 protocolReactor := NewProtocolReactor(manager)
86 manager.sw.AddReactor("PROTOCOL", protocolReactor)
91 func (m *Manager) AddPeer(peer peers.BasePeer) {
95 //IsCaughtUp check wheather the peer finish the sync
96 func (m *Manager) IsCaughtUp() bool {
97 peer := m.peers.BestPeer(consensus.SFFullNode)
98 return peer == nil || peer.Height() <= m.chain.BestBlockHeight()
101 func (m *Manager) handleBlockMsg(peer *peers.Peer, msg *msgs.BlockMessage) {
102 block, err := msg.GetBlock()
106 m.blockKeeper.processBlock(peer.ID(), block)
109 func (m *Manager) handleBlocksMsg(peer *peers.Peer, msg *msgs.BlocksMessage) {
110 blocks, err := msg.GetBlocks()
112 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleBlocksMsg GetBlocks")
116 m.blockKeeper.processBlocks(peer.ID(), blocks)
119 func (m *Manager) handleFilterAddMsg(peer *peers.Peer, msg *msgs.FilterAddMessage) {
120 peer.AddFilterAddress(msg.Address)
123 func (m *Manager) handleFilterClearMsg(peer *peers.Peer) {
127 func (m *Manager) handleFilterLoadMsg(peer *peers.Peer, msg *msgs.FilterLoadMessage) {
128 peer.AddFilterAddresses(msg.Addresses)
131 func (m *Manager) handleGetBlockMsg(peer *peers.Peer, msg *msgs.GetBlockMessage) {
132 var block *types.Block
135 block, err = m.chain.GetBlockByHeight(msg.Height)
137 block, err = m.chain.GetBlockByHash(msg.GetHash())
140 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetBlockMsg get block from chain")
144 ok, err := peer.SendBlock(block)
146 m.peers.RemovePeer(peer.ID())
149 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlockMsg sentBlock")
153 func (m *Manager) handleGetBlocksMsg(peer *peers.Peer, msg *msgs.GetBlocksMessage) {
154 blocks, err := m.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
155 if err != nil || len(blocks) == 0 {
160 sendBlocks := []*types.Block{}
161 for _, block := range blocks {
162 rawData, err := block.MarshalText()
164 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg marshal block")
168 if totalSize+len(rawData) > msgs.MaxBlockchainResponseSize/2 {
171 totalSize += len(rawData)
172 sendBlocks = append(sendBlocks, block)
175 ok, err := peer.SendBlocks(sendBlocks)
177 m.peers.RemovePeer(peer.ID())
180 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg sentBlock")
184 func (m *Manager) handleGetHeadersMsg(peer *peers.Peer, msg *msgs.GetHeadersMessage) {
185 headers, err := m.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash(), msg.GetSkip(), maxHeadersPerMsg)
186 if err != nil || len(headers) == 0 {
187 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleGetHeadersMsg locateHeaders")
191 ok, err := peer.SendHeaders(headers)
193 m.peers.RemovePeer(peer.ID())
196 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetHeadersMsg sentBlock")
200 func (m *Manager) handleGetMerkleBlockMsg(peer *peers.Peer, msg *msgs.GetMerkleBlockMessage) {
202 var block *types.Block
204 block, err = m.chain.GetBlockByHeight(msg.Height)
206 block, err = m.chain.GetBlockByHash(msg.GetHash())
209 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get block from chain")
213 blockHash := block.Hash()
214 txStatus, err := m.chain.GetTransactionStatus(&blockHash)
216 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get transaction status")
220 ok, err := peer.SendMerkleBlock(block, txStatus)
222 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetMerkleBlockMsg sentMerkleBlock")
227 m.peers.RemovePeer(peer.ID())
231 func (m *Manager) handleHeadersMsg(peer *peers.Peer, msg *msgs.HeadersMessage) {
232 headers, err := msg.GetHeaders()
234 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleHeadersMsg GetHeaders")
238 m.blockKeeper.processHeaders(peer.ID(), headers)
241 func (m *Manager) handleStatusMsg(basePeer peers.BasePeer, msg *msgs.StatusMessage) {
242 if peer := m.peers.GetPeer(basePeer.ID()); peer != nil {
243 peer.SetBestStatus(msg.BestHeight, msg.GetBestHash())
244 peer.SetIrreversibleStatus(msg.IrreversibleHeight, msg.GetIrreversibleHash())
248 func (m *Manager) handleTransactionMsg(peer *peers.Peer, msg *msgs.TransactionMessage) {
249 tx, err := msg.GetTransaction()
251 m.peers.ProcessIllegal(peer.ID(), security.LevelConnException, "fail on get tx from message")
255 if isOrphan, err := m.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
256 m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "fail on validate tx transaction")
258 m.peers.MarkTx(peer.ID(), tx.ID)
261 func (m *Manager) handleTransactionsMsg(peer *peers.Peer, msg *msgs.TransactionsMessage) {
262 txs, err := msg.GetTransactions()
264 m.peers.ProcessIllegal(peer.ID(), security.LevelConnException, "fail on get txs from message")
268 if len(txs) > msgs.TxsMsgMaxTxNum {
269 m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "exceeded the maximum tx number limit")
273 for _, tx := range txs {
274 if isOrphan, err := m.chain.ValidateTx(tx); err != nil && !isOrphan {
275 m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "fail on validate tx transaction")
278 m.peers.MarkTx(peer.ID(), tx.ID)
282 func (m *Manager) processMsg(basePeer peers.BasePeer, msgType byte, msg msgs.BlockchainMessage) {
283 peer := m.peers.GetPeer(basePeer.ID())
288 log.WithFields(log.Fields{
290 "peer": basePeer.Addr(),
291 "type": reflect.TypeOf(msg),
292 "message": msg.String(),
293 }).Debug("receive message from peer")
295 switch msg := msg.(type) {
296 case *msgs.GetBlockMessage:
297 m.handleGetBlockMsg(peer, msg)
299 case *msgs.BlockMessage:
300 m.handleBlockMsg(peer, msg)
302 case *msgs.StatusMessage:
303 m.handleStatusMsg(basePeer, msg)
305 case *msgs.TransactionMessage:
306 m.handleTransactionMsg(peer, msg)
308 case *msgs.TransactionsMessage:
309 m.handleTransactionsMsg(peer, msg)
311 case *msgs.GetHeadersMessage:
312 m.handleGetHeadersMsg(peer, msg)
314 case *msgs.HeadersMessage:
315 m.handleHeadersMsg(peer, msg)
317 case *msgs.GetBlocksMessage:
318 m.handleGetBlocksMsg(peer, msg)
320 case *msgs.BlocksMessage:
321 m.handleBlocksMsg(peer, msg)
323 case *msgs.FilterLoadMessage:
324 m.handleFilterLoadMsg(peer, msg)
326 case *msgs.FilterAddMessage:
327 m.handleFilterAddMsg(peer, msg)
329 case *msgs.FilterClearMessage:
330 m.handleFilterClearMsg(peer)
332 case *msgs.GetMerkleBlockMessage:
333 m.handleGetMerkleBlockMsg(peer, msg)
336 log.WithFields(log.Fields{
338 "peer": basePeer.Addr(),
339 "message_type": reflect.TypeOf(msg),
340 }).Error("unhandled message type")
344 func (m *Manager) RemovePeer(peerID string) {
345 m.peers.RemovePeer(peerID)
348 func (m *Manager) SendStatus(peer peers.BasePeer) error {
349 p := m.peers.GetPeer(peer.ID())
351 return errors.New("invalid peer")
354 if err := p.SendStatus(m.chain.BestBlockHeader(), m.chain.BestIrreversibleHeader()); err != nil {
355 m.peers.RemovePeer(p.ID())
361 func (m *Manager) Start() error {
363 m.txMsgSub, err = m.eventDispatcher.Subscribe(core.TxMsgEvent{})
367 m.blockKeeper.start()
368 go m.broadcastTxsLoop()
369 go m.syncMempoolLoop()
374 //Stop stop sync manager
375 func (m *Manager) Stop() {