7 log "github.com/sirupsen/logrus"
9 cfg "github.com/vapor/config"
10 "github.com/vapor/consensus"
11 dbm "github.com/vapor/database/leveldb"
12 "github.com/vapor/event"
13 msgs "github.com/vapor/netsync/messages"
14 "github.com/vapor/netsync/peers"
15 "github.com/vapor/p2p"
16 "github.com/vapor/p2p/security"
17 core "github.com/vapor/protocol"
18 "github.com/vapor/protocol/bc"
19 "github.com/vapor/protocol/bc/types"
26 // Chain is the interface for Bytom core
27 type Chain interface {
28 BestBlockHeader() *types.BlockHeader
29 LastIrreversibleHeader() *types.BlockHeader
30 BestBlockHeight() uint64
31 GetBlockByHash(*bc.Hash) (*types.Block, error)
32 GetBlockByHeight(uint64) (*types.Block, error)
33 GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
34 GetHeaderByHeight(uint64) (*types.BlockHeader, error)
35 GetTransactionStatus(*bc.Hash) (*bc.TransactionStatus, error)
36 InMainChain(bc.Hash) bool
37 ProcessBlock(*types.Block) (bool, error)
38 ValidateTx(*types.Tx) (bool, error)
41 type Switch interface {
42 AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
46 DialPeerWithAddress(addr *p2p.NetAddress) error
50 // Mempool is the interface for Bytom mempool
51 type Mempool interface {
52 GetTransactions() []*core.TxDesc
53 IsDust(tx *types.Tx) bool
56 //Manager is responsible for the business layer information synchronization
61 blockKeeper *blockKeeper
64 txSyncCh chan *txSyncMsg
68 eventDispatcher *event.Dispatcher
69 txMsgSub *event.Subscription
72 //NewManager create a chain sync manager.
73 func NewManager(config *cfg.Config, sw Switch, chain Chain, mempool Mempool, dispatcher *event.Dispatcher, peers *peers.PeerSet, fastSyncDB dbm.DB) (*Manager, error) {
78 blockKeeper: newBlockKeeper(chain, peers, fastSyncDB),
80 txSyncCh: make(chan *txSyncMsg),
81 quit: make(chan struct{}),
83 eventDispatcher: dispatcher,
86 if !config.VaultMode {
87 protocolReactor := NewProtocolReactor(manager)
88 manager.sw.AddReactor("PROTOCOL", protocolReactor)
93 func (m *Manager) AddPeer(peer peers.BasePeer) {
97 //IsCaughtUp check wheather the peer finish the sync
98 func (m *Manager) IsCaughtUp() bool {
99 peer := m.peers.BestPeer(consensus.SFFullNode)
100 return peer == nil || peer.Height() <= m.chain.BestBlockHeight()
103 func (m *Manager) handleBlockMsg(peer *peers.Peer, msg *msgs.BlockMessage) {
104 block, err := msg.GetBlock()
109 m.blockKeeper.processBlock(peer.ID(), block)
112 func (m *Manager) handleBlocksMsg(peer *peers.Peer, msg *msgs.BlocksMessage) {
113 blocks, err := msg.GetBlocks()
115 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleBlocksMsg GetBlocks")
119 m.blockKeeper.processBlocks(peer.ID(), blocks)
122 func (m *Manager) handleFilterAddMsg(peer *peers.Peer, msg *msgs.FilterAddMessage) {
123 peer.AddFilterAddress(msg.Address)
126 func (m *Manager) handleFilterClearMsg(peer *peers.Peer) {
130 func (m *Manager) handleFilterLoadMsg(peer *peers.Peer, msg *msgs.FilterLoadMessage) {
131 peer.AddFilterAddresses(msg.Addresses)
134 func (m *Manager) handleGetBlockMsg(peer *peers.Peer, msg *msgs.GetBlockMessage) {
135 var block *types.Block
138 block, err = m.chain.GetBlockByHeight(msg.Height)
140 block, err = m.chain.GetBlockByHash(msg.GetHash())
143 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetBlockMsg get block from chain")
147 ok, err := peer.SendBlock(block)
149 m.peers.RemovePeer(peer.ID())
152 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlockMsg sentBlock")
156 func (m *Manager) handleGetBlocksMsg(peer *peers.Peer, msg *msgs.GetBlocksMessage) {
157 blocks, err := m.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
158 if err != nil || len(blocks) == 0 {
163 sendBlocks := []*types.Block{}
164 for _, block := range blocks {
165 rawData, err := block.MarshalText()
167 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg marshal block")
171 if totalSize+len(rawData) > msgs.MaxBlockchainResponseSize/2 {
174 totalSize += len(rawData)
175 sendBlocks = append(sendBlocks, block)
178 ok, err := peer.SendBlocks(sendBlocks)
180 m.peers.RemovePeer(peer.ID())
183 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg sentBlock")
187 func (m *Manager) handleGetHeadersMsg(peer *peers.Peer, msg *msgs.GetHeadersMessage) {
188 headers, err := m.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash(), msg.GetSkip(), maxNumOfHeadersPerMsg)
189 if err != nil || len(headers) == 0 {
190 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleGetHeadersMsg locateHeaders")
194 ok, err := peer.SendHeaders(headers)
196 m.peers.RemovePeer(peer.ID())
199 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetHeadersMsg sentBlock")
203 func (m *Manager) handleGetMerkleBlockMsg(peer *peers.Peer, msg *msgs.GetMerkleBlockMessage) {
205 var block *types.Block
207 block, err = m.chain.GetBlockByHeight(msg.Height)
209 block, err = m.chain.GetBlockByHash(msg.GetHash())
212 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get block from chain")
216 blockHash := block.Hash()
217 txStatus, err := m.chain.GetTransactionStatus(&blockHash)
219 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get transaction status")
223 ok, err := peer.SendMerkleBlock(block, txStatus)
225 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetMerkleBlockMsg sentMerkleBlock")
230 m.peers.RemovePeer(peer.ID())
234 func (m *Manager) handleHeadersMsg(peer *peers.Peer, msg *msgs.HeadersMessage) {
235 headers, err := msg.GetHeaders()
237 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleHeadersMsg GetHeaders")
241 m.blockKeeper.processHeaders(peer.ID(), headers)
244 func (m *Manager) handleStatusMsg(basePeer peers.BasePeer, msg *msgs.StatusMessage) {
245 if peer := m.peers.GetPeer(basePeer.ID()); peer != nil {
246 peer.SetBestStatus(msg.BestHeight, msg.GetBestHash())
247 peer.SetIrreversibleStatus(msg.IrreversibleHeight, msg.GetIrreversibleHash())
251 func (m *Manager) handleTransactionMsg(peer *peers.Peer, msg *msgs.TransactionMessage) {
252 tx, err := msg.GetTransaction()
254 m.peers.ProcessIllegal(peer.ID(), security.LevelConnException, "fail on get tx from message")
258 if m.mempool.IsDust(tx) {
259 m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "receive dust tx msg")
263 m.peers.MarkTx(peer.ID(), tx.ID)
264 if isOrphan, err := m.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
265 m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "fail on validate tx transaction")
269 func (m *Manager) handleTransactionsMsg(peer *peers.Peer, msg *msgs.TransactionsMessage) {
270 txs, err := msg.GetTransactions()
272 m.peers.ProcessIllegal(peer.ID(), security.LevelConnException, "fail on get txs from message")
276 if len(txs) > msgs.TxsMsgMaxTxNum {
277 m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "exceeded the maximum tx number limit")
281 for _, tx := range txs {
282 if m.mempool.IsDust(tx) {
283 m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "receive dust txs msg")
287 m.peers.MarkTx(peer.ID(), tx.ID)
288 if isOrphan, err := m.chain.ValidateTx(tx); err != nil && !isOrphan {
289 m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "fail on validate tx transaction")
295 func (m *Manager) processMsg(basePeer peers.BasePeer, msgType byte, msg msgs.BlockchainMessage) {
296 peer := m.peers.GetPeer(basePeer.ID())
301 log.WithFields(log.Fields{
303 "peer": basePeer.Addr(),
304 "type": reflect.TypeOf(msg),
305 "message": msg.String(),
306 }).Debug("receive message from peer")
308 switch msg := msg.(type) {
309 case *msgs.GetBlockMessage:
310 m.handleGetBlockMsg(peer, msg)
312 case *msgs.BlockMessage:
313 m.handleBlockMsg(peer, msg)
315 case *msgs.StatusMessage:
316 m.handleStatusMsg(basePeer, msg)
318 case *msgs.TransactionMessage:
319 m.handleTransactionMsg(peer, msg)
321 case *msgs.TransactionsMessage:
322 m.handleTransactionsMsg(peer, msg)
324 case *msgs.GetHeadersMessage:
325 m.handleGetHeadersMsg(peer, msg)
327 case *msgs.HeadersMessage:
328 m.handleHeadersMsg(peer, msg)
330 case *msgs.GetBlocksMessage:
331 m.handleGetBlocksMsg(peer, msg)
333 case *msgs.BlocksMessage:
334 m.handleBlocksMsg(peer, msg)
336 case *msgs.FilterLoadMessage:
337 m.handleFilterLoadMsg(peer, msg)
339 case *msgs.FilterAddMessage:
340 m.handleFilterAddMsg(peer, msg)
342 case *msgs.FilterClearMessage:
343 m.handleFilterClearMsg(peer)
345 case *msgs.GetMerkleBlockMessage:
346 m.handleGetMerkleBlockMsg(peer, msg)
349 log.WithFields(log.Fields{
351 "peer": basePeer.Addr(),
352 "message_type": reflect.TypeOf(msg),
353 }).Error("unhandled message type")
357 func (m *Manager) RemovePeer(peerID string) {
358 m.peers.RemovePeer(peerID)
361 func (m *Manager) SendStatus(peer peers.BasePeer) error {
362 p := m.peers.GetPeer(peer.ID())
364 return errors.New("invalid peer")
367 if err := p.SendStatus(m.chain.BestBlockHeader(), m.chain.LastIrreversibleHeader()); err != nil {
368 m.peers.RemovePeer(p.ID())
374 func (m *Manager) Start() error {
376 m.txMsgSub, err = m.eventDispatcher.Subscribe(core.TxMsgEvent{})
380 m.blockKeeper.start()
381 go m.broadcastTxsLoop()
382 go m.syncMempoolLoop()
387 //Stop stop sync manager
388 func (m *Manager) Stop() {