7 log "github.com/sirupsen/logrus"
9 cfg "github.com/vapor/config"
10 "github.com/vapor/consensus"
11 "github.com/vapor/event"
12 msgs "github.com/vapor/netsync/messages"
13 "github.com/vapor/netsync/peers"
14 "github.com/vapor/p2p"
15 "github.com/vapor/p2p/security"
16 core "github.com/vapor/protocol"
17 "github.com/vapor/protocol/bc"
18 "github.com/vapor/protocol/bc/types"
25 // Chain is the interface for Bytom core
26 type Chain interface {
27 BestBlockHeader() *types.BlockHeader
28 BestBlockHeight() uint64
29 GetBlockByHash(*bc.Hash) (*types.Block, error)
30 GetBlockByHeight(uint64) (*types.Block, error)
31 GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
32 GetHeaderByHeight(uint64) (*types.BlockHeader, error)
33 GetTransactionStatus(*bc.Hash) (*bc.TransactionStatus, error)
34 InMainChain(bc.Hash) bool
35 ProcessBlock(*types.Block) (bool, error)
36 ValidateTx(*types.Tx) (bool, error)
39 type Switch interface {
40 AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
44 DialPeerWithAddress(addr *p2p.NetAddress) error
48 // Mempool is the interface for Bytom mempool
49 type Mempool interface {
50 GetTransactions() []*core.TxDesc
53 //Manager is responsible for the business layer information synchronization
58 blockKeeper *blockKeeper
61 txSyncCh chan *txSyncMsg
65 eventDispatcher *event.Dispatcher
66 txMsgSub *event.Subscription
69 //NewChainManager create a chain sync manager.
70 func NewManager(config *cfg.Config, sw Switch, chain Chain, mempool Mempool, dispatcher *event.Dispatcher, peers *peers.PeerSet) (*Manager, error) {
75 blockKeeper: newBlockKeeper(chain, peers),
77 txSyncCh: make(chan *txSyncMsg),
78 quit: make(chan struct{}),
80 eventDispatcher: dispatcher,
83 if !config.VaultMode {
84 protocolReactor := NewProtocolReactor(manager)
85 manager.sw.AddReactor("PROTOCOL", protocolReactor)
90 func (m *Manager) AddPeer(peer peers.BasePeer) {
94 //IsCaughtUp check wheather the peer finish the sync
95 func (m *Manager) IsCaughtUp() bool {
96 peer := m.peers.BestPeer(consensus.SFFullNode)
97 return peer == nil || peer.Height() <= m.chain.BestBlockHeight()
100 func (m *Manager) handleBlockMsg(peer *peers.Peer, msg *msgs.BlockMessage) {
101 block, err := msg.GetBlock()
105 m.blockKeeper.processBlock(peer.ID(), block)
108 func (m *Manager) handleBlocksMsg(peer *peers.Peer, msg *msgs.BlocksMessage) {
109 blocks, err := msg.GetBlocks()
111 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleBlocksMsg GetBlocks")
115 m.blockKeeper.processBlocks(peer.ID(), blocks)
118 func (m *Manager) handleFilterAddMsg(peer *peers.Peer, msg *msgs.FilterAddMessage) {
119 peer.AddFilterAddress(msg.Address)
122 func (m *Manager) handleFilterClearMsg(peer *peers.Peer) {
126 func (m *Manager) handleFilterLoadMsg(peer *peers.Peer, msg *msgs.FilterLoadMessage) {
127 peer.AddFilterAddresses(msg.Addresses)
130 func (m *Manager) handleGetBlockMsg(peer *peers.Peer, msg *msgs.GetBlockMessage) {
131 var block *types.Block
134 block, err = m.chain.GetBlockByHeight(msg.Height)
136 block, err = m.chain.GetBlockByHash(msg.GetHash())
139 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetBlockMsg get block from chain")
143 ok, err := peer.SendBlock(block)
145 m.peers.RemovePeer(peer.ID())
148 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlockMsg sentBlock")
152 func (m *Manager) handleGetBlocksMsg(peer *peers.Peer, msg *msgs.GetBlocksMessage) {
153 blocks, err := m.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
154 if err != nil || len(blocks) == 0 {
159 sendBlocks := []*types.Block{}
160 for _, block := range blocks {
161 rawData, err := block.MarshalText()
163 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg marshal block")
167 if totalSize+len(rawData) > msgs.MaxBlockchainResponseSize/2 {
170 totalSize += len(rawData)
171 sendBlocks = append(sendBlocks, block)
174 ok, err := peer.SendBlocks(sendBlocks)
176 m.peers.RemovePeer(peer.ID())
179 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg sentBlock")
183 func (m *Manager) handleGetHeadersMsg(peer *peers.Peer, msg *msgs.GetHeadersMessage) {
184 headers, err := m.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash())
185 if err != nil || len(headers) == 0 {
186 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleGetHeadersMsg locateHeaders")
190 ok, err := peer.SendHeaders(headers)
192 m.peers.RemovePeer(peer.ID())
195 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetHeadersMsg sentBlock")
199 func (m *Manager) handleGetMerkleBlockMsg(peer *peers.Peer, msg *msgs.GetMerkleBlockMessage) {
201 var block *types.Block
203 block, err = m.chain.GetBlockByHeight(msg.Height)
205 block, err = m.chain.GetBlockByHash(msg.GetHash())
208 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get block from chain")
212 blockHash := block.Hash()
213 txStatus, err := m.chain.GetTransactionStatus(&blockHash)
215 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get transaction status")
219 ok, err := peer.SendMerkleBlock(block, txStatus)
221 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetMerkleBlockMsg sentMerkleBlock")
226 m.peers.RemovePeer(peer.ID())
230 func (m *Manager) handleHeadersMsg(peer *peers.Peer, msg *msgs.HeadersMessage) {
231 headers, err := msg.GetHeaders()
233 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleHeadersMsg GetHeaders")
237 m.blockKeeper.processHeaders(peer.ID(), headers)
240 func (m *Manager) handleStatusMsg(basePeer peers.BasePeer, msg *msgs.StatusMessage) {
241 if peer := m.peers.GetPeer(basePeer.ID()); peer != nil {
242 peer.SetStatus(msg.Height, msg.GetHash())
247 func (m *Manager) handleTransactionMsg(peer *peers.Peer, msg *msgs.TransactionMessage) {
248 tx, err := msg.GetTransaction()
250 m.peers.ProcessIllegal(peer.ID(), security.LevelConnException, "fail on get tx from message")
254 if isOrphan, err := m.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
255 m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "fail on validate tx transaction")
257 m.peers.MarkTx(peer.ID(), tx.ID)
260 func (m *Manager) handleTransactionsMsg(peer *peers.Peer, msg *msgs.TransactionsMessage) {
261 txs, err := msg.GetTransactions()
263 m.peers.ProcessIllegal(peer.ID(), security.LevelConnException, "fail on get txs from message")
267 if len(txs) > msgs.TxsMsgMaxTxNum {
268 m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "exceeded the maximum tx number limit")
272 for _, tx := range txs {
273 if isOrphan, err := m.chain.ValidateTx(tx); err != nil && !isOrphan {
274 m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "fail on validate tx transaction")
277 m.peers.MarkTx(peer.ID(), tx.ID)
281 func (m *Manager) processMsg(basePeer peers.BasePeer, msgType byte, msg msgs.BlockchainMessage) {
282 peer := m.peers.GetPeer(basePeer.ID())
287 log.WithFields(log.Fields{
289 "peer": basePeer.Addr(),
290 "type": reflect.TypeOf(msg),
291 "message": msg.String(),
292 }).Debug("receive message from peer")
294 switch msg := msg.(type) {
295 case *msgs.GetBlockMessage:
296 m.handleGetBlockMsg(peer, msg)
298 case *msgs.BlockMessage:
299 m.handleBlockMsg(peer, msg)
301 case *msgs.StatusMessage:
302 m.handleStatusMsg(basePeer, msg)
304 case *msgs.TransactionMessage:
305 m.handleTransactionMsg(peer, msg)
307 case *msgs.TransactionsMessage:
308 m.handleTransactionsMsg(peer, msg)
310 case *msgs.GetHeadersMessage:
311 m.handleGetHeadersMsg(peer, msg)
313 case *msgs.HeadersMessage:
314 m.handleHeadersMsg(peer, msg)
316 case *msgs.GetBlocksMessage:
317 m.handleGetBlocksMsg(peer, msg)
319 case *msgs.BlocksMessage:
320 m.handleBlocksMsg(peer, msg)
322 case *msgs.FilterLoadMessage:
323 m.handleFilterLoadMsg(peer, msg)
325 case *msgs.FilterAddMessage:
326 m.handleFilterAddMsg(peer, msg)
328 case *msgs.FilterClearMessage:
329 m.handleFilterClearMsg(peer)
331 case *msgs.GetMerkleBlockMessage:
332 m.handleGetMerkleBlockMsg(peer, msg)
335 log.WithFields(log.Fields{
337 "peer": basePeer.Addr(),
338 "message_type": reflect.TypeOf(msg),
339 }).Error("unhandled message type")
343 func (m *Manager) RemovePeer(peerID string) {
344 m.peers.RemovePeer(peerID)
347 func (m *Manager) SendStatus(peer peers.BasePeer) error {
348 p := m.peers.GetPeer(peer.ID())
350 return errors.New("invalid peer")
353 if err := p.SendStatus(m.chain.BestBlockHeader()); err != nil {
354 m.peers.RemovePeer(p.ID())
360 func (m *Manager) Start() error {
362 m.txMsgSub, err = m.eventDispatcher.Subscribe(core.TxMsgEvent{})
367 go m.broadcastTxsLoop()
368 go m.syncMempoolLoop()
373 //Stop stop sync manager
374 func (m *Manager) Stop() {