7 log "github.com/sirupsen/logrus"
9 cfg "github.com/vapor/config"
10 "github.com/vapor/consensus"
11 "github.com/vapor/event"
12 msgs "github.com/vapor/netsync/messages"
13 "github.com/vapor/netsync/peers"
14 "github.com/vapor/p2p"
15 core "github.com/vapor/protocol"
16 "github.com/vapor/protocol/bc"
17 "github.com/vapor/protocol/bc/types"
25 errVaultModeDialPeer = errors.New("can't dial peer in vault mode")
28 // Chain is the interface for Bytom core
29 type Chain interface {
30 BestBlockHeader() *types.BlockHeader
31 BestBlockHeight() uint64
32 GetBlockByHash(*bc.Hash) (*types.Block, error)
33 GetBlockByHeight(uint64) (*types.Block, error)
34 GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
35 GetHeaderByHeight(uint64) (*types.BlockHeader, error)
36 GetTransactionStatus(*bc.Hash) (*bc.TransactionStatus, error)
37 InMainChain(bc.Hash) bool
38 ProcessBlock(*types.Block) (bool, error)
39 ValidateTx(*types.Tx) (bool, error)
42 type Switch interface {
43 AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
44 AddBannedPeer(string) error
48 DialPeerWithAddress(addr *p2p.NetAddress) error
52 //SyncManager Sync Manager is responsible for the business layer information synchronization
53 type ChainManager struct {
57 blockKeeper *blockKeeper
60 txSyncCh chan *txSyncMsg
61 quitSync chan struct{}
64 eventDispatcher *event.Dispatcher
65 txMsgSub *event.Subscription
68 //NewSyncManager create a sync manager
69 func NewChainManager(config *cfg.Config, sw Switch, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher, peers *peers.PeerSet) (*ChainManager, error) {
70 manager := &ChainManager{
74 blockKeeper: newBlockKeeper(chain, peers),
76 txSyncCh: make(chan *txSyncMsg),
77 quitSync: make(chan struct{}),
79 eventDispatcher: dispatcher,
82 if !config.VaultMode {
83 protocolReactor := NewProtocolReactor(manager)
84 manager.sw.AddReactor("PROTOCOL", protocolReactor)
89 func (cm *ChainManager) AddPeer(peer peers.BasePeer) {
90 cm.peers.AddPeer(peer)
93 //IsCaughtUp check wheather the peer finish the sync
94 func (cm *ChainManager) IsCaughtUp() bool {
95 peer := cm.peers.BestPeer(consensus.SFFullNode)
96 return peer == nil || peer.Height() <= cm.chain.BestBlockHeight()
99 func (cm *ChainManager) handleBlockMsg(peer *peers.Peer, msg *msgs.BlockMessage) {
100 block, err := msg.GetBlock()
104 cm.blockKeeper.processBlock(peer.ID(), block)
107 func (cm *ChainManager) handleBlocksMsg(peer *peers.Peer, msg *msgs.BlocksMessage) {
108 blocks, err := msg.GetBlocks()
110 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleBlocksMsg GetBlocks")
114 cm.blockKeeper.processBlocks(peer.ID(), blocks)
117 func (cm *ChainManager) handleFilterAddMsg(peer *peers.Peer, msg *msgs.FilterAddMessage) {
118 peer.AddFilterAddress(msg.Address)
121 func (cm *ChainManager) handleFilterClearMsg(peer *peers.Peer) {
125 func (cm *ChainManager) handleFilterLoadMsg(peer *peers.Peer, msg *msgs.FilterLoadMessage) {
126 peer.AddFilterAddresses(msg.Addresses)
129 func (cm *ChainManager) handleGetBlockMsg(peer *peers.Peer, msg *msgs.GetBlockMessage) {
130 var block *types.Block
133 block, err = cm.chain.GetBlockByHeight(msg.Height)
135 block, err = cm.chain.GetBlockByHash(msg.GetHash())
138 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetBlockMsg get block from chain")
142 ok, err := peer.SendBlock(block)
144 cm.peers.RemovePeer(peer.ID())
147 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlockMsg sentBlock")
151 func (cm *ChainManager) handleGetBlocksMsg(peer *peers.Peer, msg *msgs.GetBlocksMessage) {
152 blocks, err := cm.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
153 if err != nil || len(blocks) == 0 {
158 sendBlocks := []*types.Block{}
159 for _, block := range blocks {
160 rawData, err := block.MarshalText()
162 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg marshal block")
166 if totalSize+len(rawData) > msgs.MaxBlockchainResponseSize/2 {
169 totalSize += len(rawData)
170 sendBlocks = append(sendBlocks, block)
173 ok, err := peer.SendBlocks(sendBlocks)
175 cm.peers.RemovePeer(peer.ID())
178 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg sentBlock")
182 func (cm *ChainManager) handleGetHeadersMsg(peer *peers.Peer, msg *msgs.GetHeadersMessage) {
183 headers, err := cm.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash())
184 if err != nil || len(headers) == 0 {
185 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleGetHeadersMsg locateHeaders")
189 ok, err := peer.SendHeaders(headers)
191 cm.peers.RemovePeer(peer.ID())
194 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetHeadersMsg sentBlock")
198 func (cm *ChainManager) handleGetMerkleBlockMsg(peer *peers.Peer, msg *msgs.GetMerkleBlockMessage) {
200 var block *types.Block
202 block, err = cm.chain.GetBlockByHeight(msg.Height)
204 block, err = cm.chain.GetBlockByHash(msg.GetHash())
207 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get block from chain")
211 blockHash := block.Hash()
212 txStatus, err := cm.chain.GetTransactionStatus(&blockHash)
214 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get transaction status")
218 ok, err := peer.SendMerkleBlock(block, txStatus)
220 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetMerkleBlockMsg sentMerkleBlock")
225 cm.peers.RemovePeer(peer.ID())
229 func (cm *ChainManager) handleHeadersMsg(peer *peers.Peer, msg *msgs.HeadersMessage) {
230 headers, err := msg.GetHeaders()
232 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleHeadersMsg GetHeaders")
236 cm.blockKeeper.processHeaders(peer.ID(), headers)
239 func (cm *ChainManager) handleStatusMsg(basePeer peers.BasePeer, msg *msgs.StatusMessage) {
240 if peer := cm.peers.GetPeer(basePeer.ID()); peer != nil {
241 peer.SetStatus(msg.Height, msg.GetHash())
246 func (cm *ChainManager) handleTransactionMsg(peer *peers.Peer, msg *msgs.TransactionMessage) {
247 tx, err := msg.GetTransaction()
249 cm.peers.AddBanScore(peer.ID(), 0, 10, "fail on get tx from message")
253 if isOrphan, err := cm.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
254 cm.peers.AddBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
256 cm.peers.MarkTx(peer.ID(), tx.ID)
259 func (cm *ChainManager) handleTransactionsMsg(peer *peers.Peer, msg *msgs.TransactionsMessage) {
260 txs, err := msg.GetTransactions()
262 cm.peers.AddBanScore(peer.ID(), 0, 20, "fail on get txs from message")
266 if len(txs) > msgs.TxsMsgMaxTxNum {
267 cm.peers.AddBanScore(peer.ID(), 20, 0, "exceeded the maximum tx number limit")
271 for _, tx := range txs {
272 if isOrphan, err := cm.chain.ValidateTx(tx); err != nil && !isOrphan {
273 cm.peers.AddBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
276 cm.peers.MarkTx(peer.ID(), tx.ID)
280 func (cm *ChainManager) processMsg(basePeer peers.BasePeer, msgType byte, msg msgs.BlockchainMessage) {
281 peer := cm.peers.GetPeer(basePeer.ID())
286 log.WithFields(log.Fields{
288 "peer": basePeer.Addr(),
289 "type": reflect.TypeOf(msg),
290 "message": msg.String(),
291 }).Info("receive message from peer")
293 switch msg := msg.(type) {
294 case *msgs.GetBlockMessage:
295 cm.handleGetBlockMsg(peer, msg)
297 case *msgs.BlockMessage:
298 cm.handleBlockMsg(peer, msg)
300 case *msgs.StatusMessage:
301 cm.handleStatusMsg(basePeer, msg)
303 case *msgs.TransactionMessage:
304 cm.handleTransactionMsg(peer, msg)
306 case *msgs.TransactionsMessage:
307 cm.handleTransactionsMsg(peer, msg)
309 case *msgs.GetHeadersMessage:
310 cm.handleGetHeadersMsg(peer, msg)
312 case *msgs.HeadersMessage:
313 cm.handleHeadersMsg(peer, msg)
315 case *msgs.GetBlocksMessage:
316 cm.handleGetBlocksMsg(peer, msg)
318 case *msgs.BlocksMessage:
319 cm.handleBlocksMsg(peer, msg)
321 case *msgs.FilterLoadMessage:
322 cm.handleFilterLoadMsg(peer, msg)
324 case *msgs.FilterAddMessage:
325 cm.handleFilterAddMsg(peer, msg)
327 case *msgs.FilterClearMessage:
328 cm.handleFilterClearMsg(peer)
330 case *msgs.GetMerkleBlockMessage:
331 cm.handleGetMerkleBlockMsg(peer, msg)
334 log.WithFields(log.Fields{
336 "peer": basePeer.Addr(),
337 "message_type": reflect.TypeOf(msg),
338 }).Error("unhandled message type")
342 func (cm *ChainManager) RemovePeer(peerID string) {
343 cm.peers.RemovePeer(peerID)
346 func (cm *ChainManager) SendStatus(peer peers.BasePeer) error {
347 p := cm.peers.GetPeer(peer.ID())
349 return errors.New("invalid peer")
352 if err := p.SendStatus(cm.chain.BestBlockHeader()); err != nil {
353 cm.peers.RemovePeer(p.ID())
359 func (cm *ChainManager) Start() error {
361 cm.txMsgSub, err = cm.eventDispatcher.Subscribe(core.TxMsgEvent{})
366 // broadcast transactions
367 go cm.txBroadcastLoop()
373 //Stop stop sync manager
374 func (cm *ChainManager) Stop() {