OSDN Git Service

prepare 1.1.6
[bytom/vapor.git] / netsync / chainmgr / handle.go
1 package chainmgr
2
3 import (
4         "errors"
5         "reflect"
6         "time"
7
8         log "github.com/sirupsen/logrus"
9
10         cfg "github.com/bytom/vapor/config"
11         "github.com/bytom/vapor/consensus"
12         dbm "github.com/bytom/vapor/database/leveldb"
13         "github.com/bytom/vapor/event"
14         msgs "github.com/bytom/vapor/netsync/messages"
15         "github.com/bytom/vapor/netsync/peers"
16         "github.com/bytom/vapor/p2p"
17         "github.com/bytom/vapor/p2p/security"
18         core "github.com/bytom/vapor/protocol"
19         "github.com/bytom/vapor/protocol/bc"
20         "github.com/bytom/vapor/protocol/bc/types"
21 )
22
23 const (
24         logModule = "netsync"
25 )
26
27 // Chain is the interface for Bytom core
28 type Chain interface {
29         BestBlockHeader() *types.BlockHeader
30         LastIrreversibleHeader() *types.BlockHeader
31         BestBlockHeight() uint64
32         GetBlockByHash(*bc.Hash) (*types.Block, error)
33         GetBlockByHeight(uint64) (*types.Block, error)
34         GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
35         GetHeaderByHeight(uint64) (*types.BlockHeader, error)
36         GetTransactionStatus(*bc.Hash) (*bc.TransactionStatus, error)
37         InMainChain(bc.Hash) bool
38         ProcessBlock(*types.Block) (bool, error)
39         ValidateTx(*types.Tx) (bool, error)
40 }
41
42 // Switch is the interface for network layer
43 type Switch interface {
44         AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
45         Start() (bool, error)
46         Stop() bool
47         IsListening() bool
48         DialPeerWithAddress(addr *p2p.NetAddress) error
49         Peers() *p2p.PeerSet
50 }
51
52 // Mempool is the interface for Bytom mempool
53 type Mempool interface {
54         GetTransactions() []*core.TxDesc
55         IsDust(tx *types.Tx) bool
56 }
57
58 //Manager is responsible for the business layer information synchronization
59 type Manager struct {
60         sw          Switch
61         chain       Chain
62         mempool     Mempool
63         blockKeeper *blockKeeper
64         peers       *peers.PeerSet
65
66         txSyncCh chan *txSyncMsg
67         quit     chan struct{}
68         config   *cfg.Config
69
70         eventDispatcher *event.Dispatcher
71         txMsgSub        *event.Subscription
72 }
73
74 //NewManager create a chain sync manager.
75 func NewManager(config *cfg.Config, sw Switch, chain Chain, mempool Mempool, dispatcher *event.Dispatcher, peers *peers.PeerSet, fastSyncDB dbm.DB) (*Manager, error) {
76         manager := &Manager{
77                 sw:              sw,
78                 mempool:         mempool,
79                 chain:           chain,
80                 blockKeeper:     newBlockKeeper(chain, peers, fastSyncDB),
81                 peers:           peers,
82                 txSyncCh:        make(chan *txSyncMsg),
83                 quit:            make(chan struct{}),
84                 config:          config,
85                 eventDispatcher: dispatcher,
86         }
87
88         if !config.VaultMode {
89                 protocolReactor := NewProtocolReactor(manager)
90                 manager.sw.AddReactor("PROTOCOL", protocolReactor)
91         }
92         return manager, nil
93 }
94
95 // AddPeer add the network layer peer to logic layer
96 func (m *Manager) AddPeer(peer peers.BasePeer) {
97         m.peers.AddPeer(peer)
98 }
99
100 //IsCaughtUp check wheather the peer finish the sync
101 func (m *Manager) IsCaughtUp() bool {
102         peer := m.peers.BestPeer(consensus.SFFullNode)
103         return peer == nil || peer.Height() <= m.chain.BestBlockHeight()
104 }
105
106 func (m *Manager) handleBlockMsg(peer *peers.Peer, msg *msgs.BlockMessage) {
107         block, err := msg.GetBlock()
108         if err != nil {
109                 return
110         }
111
112         m.blockKeeper.processBlock(peer.ID(), block)
113 }
114
115 func (m *Manager) handleBlocksMsg(peer *peers.Peer, msg *msgs.BlocksMessage) {
116         blocks, err := msg.GetBlocks()
117         if err != nil {
118                 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleBlocksMsg GetBlocks")
119                 return
120         }
121
122         m.blockKeeper.processBlocks(peer.ID(), blocks)
123 }
124
125 func (m *Manager) handleFilterAddMsg(peer *peers.Peer, msg *msgs.FilterAddMessage) {
126         peer.AddFilterAddress(msg.Address)
127 }
128
129 func (m *Manager) handleFilterClearMsg(peer *peers.Peer) {
130         peer.FilterClear()
131 }
132
133 func (m *Manager) handleFilterLoadMsg(peer *peers.Peer, msg *msgs.FilterLoadMessage) {
134         peer.AddFilterAddresses(msg.Addresses)
135 }
136
137 func (m *Manager) handleGetBlockMsg(peer *peers.Peer, msg *msgs.GetBlockMessage) {
138         var block *types.Block
139         var err error
140         if msg.Height != 0 {
141                 block, err = m.chain.GetBlockByHeight(msg.Height)
142         } else {
143                 block, err = m.chain.GetBlockByHash(msg.GetHash())
144         }
145         if err != nil {
146                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetBlockMsg get block from chain")
147                 return
148         }
149
150         ok, err := peer.SendBlock(block)
151         if !ok {
152                 m.peers.RemovePeer(peer.ID())
153         }
154         if err != nil {
155                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlockMsg sentBlock")
156         }
157 }
158
159 func (m *Manager) handleGetBlocksMsg(peer *peers.Peer, msg *msgs.GetBlocksMessage) {
160         endTime := time.Now().Add(requireBlocksTimeout / 2)
161         isTimeout := func() bool {
162                 return time.Now().After(endTime)
163         }
164
165         blocks, err := m.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash(), isTimeout)
166         if err != nil || len(blocks) == 0 {
167                 return
168         }
169
170         totalSize := 0
171         sendBlocks := []*types.Block{}
172         for _, block := range blocks {
173                 rawData, err := block.MarshalText()
174                 if err != nil {
175                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg marshal block")
176                         return
177                 }
178
179                 if totalSize+len(rawData) > msgs.MaxBlockchainResponseSize/2 {
180                         break
181                 }
182                 totalSize += len(rawData)
183                 sendBlocks = append(sendBlocks, block)
184         }
185
186         ok, err := peer.SendBlocks(sendBlocks)
187         if !ok {
188                 m.peers.RemovePeer(peer.ID())
189         }
190         if err != nil {
191                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg sentBlock")
192         }
193 }
194
195 func (m *Manager) handleGetHeadersMsg(peer *peers.Peer, msg *msgs.GetHeadersMessage) {
196         headers, err := m.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash(), msg.GetSkip(), maxNumOfHeadersPerMsg)
197         if err != nil || len(headers) == 0 {
198                 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleGetHeadersMsg locateHeaders")
199                 return
200         }
201
202         ok, err := peer.SendHeaders(headers)
203         if !ok {
204                 m.peers.RemovePeer(peer.ID())
205         }
206         if err != nil {
207                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetHeadersMsg sentBlock")
208         }
209 }
210
211 func (m *Manager) handleGetMerkleBlockMsg(peer *peers.Peer, msg *msgs.GetMerkleBlockMessage) {
212         var err error
213         var block *types.Block
214         if msg.Height != 0 {
215                 block, err = m.chain.GetBlockByHeight(msg.Height)
216         } else {
217                 block, err = m.chain.GetBlockByHash(msg.GetHash())
218         }
219         if err != nil {
220                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get block from chain")
221                 return
222         }
223
224         blockHash := block.Hash()
225         txStatus, err := m.chain.GetTransactionStatus(&blockHash)
226         if err != nil {
227                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get transaction status")
228                 return
229         }
230
231         ok, err := peer.SendMerkleBlock(block, txStatus)
232         if err != nil {
233                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetMerkleBlockMsg sentMerkleBlock")
234                 return
235         }
236
237         if !ok {
238                 m.peers.RemovePeer(peer.ID())
239         }
240 }
241
242 func (m *Manager) handleHeadersMsg(peer *peers.Peer, msg *msgs.HeadersMessage) {
243         headers, err := msg.GetHeaders()
244         if err != nil {
245                 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleHeadersMsg GetHeaders")
246                 return
247         }
248
249         m.blockKeeper.processHeaders(peer.ID(), headers)
250 }
251
252 func (m *Manager) handleStatusMsg(basePeer peers.BasePeer, msg *msgs.StatusMessage) {
253         if peer := m.peers.GetPeer(basePeer.ID()); peer != nil {
254                 peer.SetBestStatus(msg.BestHeight, msg.GetBestHash())
255                 peer.SetIrreversibleStatus(msg.IrreversibleHeight, msg.GetIrreversibleHash())
256         }
257 }
258
259 func (m *Manager) handleTransactionMsg(peer *peers.Peer, msg *msgs.TransactionMessage) {
260         tx, err := msg.GetTransaction()
261         if err != nil {
262                 m.peers.ProcessIllegal(peer.ID(), security.LevelConnException, "fail on get tx from message")
263                 return
264         }
265
266         if m.mempool.IsDust(tx) {
267                 log.WithFields(log.Fields{"tx_hash": tx.ID.String(), "peer": peer.Addr()}).Warn("receive dust tx msg")
268                 return
269         }
270
271         m.peers.MarkTx(peer.ID(), tx.ID)
272         if isOrphan, err := m.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
273                 m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "fail on validate tx transaction")
274         }
275 }
276
277 func (m *Manager) handleTransactionsMsg(peer *peers.Peer, msg *msgs.TransactionsMessage) {
278         txs, err := msg.GetTransactions()
279         if err != nil {
280                 m.peers.ProcessIllegal(peer.ID(), security.LevelConnException, "fail on get txs from message")
281                 return
282         }
283
284         if len(txs) > msgs.TxsMsgMaxTxNum {
285                 m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "exceeded the maximum tx number limit")
286                 return
287         }
288
289         for _, tx := range txs {
290                 if m.mempool.IsDust(tx) {
291                         m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "receive dust txs msg")
292                         continue
293                 }
294
295                 m.peers.MarkTx(peer.ID(), tx.ID)
296                 if isOrphan, err := m.chain.ValidateTx(tx); err != nil && !isOrphan {
297                         m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "fail on validate tx transaction")
298                         return
299                 }
300         }
301 }
302
303 func (m *Manager) processMsg(basePeer peers.BasePeer, msgType byte, msg msgs.BlockchainMessage) {
304         peer := m.peers.GetPeer(basePeer.ID())
305         if peer == nil {
306                 return
307         }
308
309         log.WithFields(log.Fields{
310                 "module":  logModule,
311                 "peer":    basePeer.Addr(),
312                 "type":    reflect.TypeOf(msg),
313                 "message": msg.String(),
314         }).Debug("receive message from peer")
315
316         switch msg := msg.(type) {
317         case *msgs.GetBlockMessage:
318                 m.handleGetBlockMsg(peer, msg)
319
320         case *msgs.BlockMessage:
321                 m.handleBlockMsg(peer, msg)
322
323         case *msgs.StatusMessage:
324                 m.handleStatusMsg(basePeer, msg)
325
326         case *msgs.TransactionMessage:
327                 m.handleTransactionMsg(peer, msg)
328
329         case *msgs.TransactionsMessage:
330                 m.handleTransactionsMsg(peer, msg)
331
332         case *msgs.GetHeadersMessage:
333                 m.handleGetHeadersMsg(peer, msg)
334
335         case *msgs.HeadersMessage:
336                 m.handleHeadersMsg(peer, msg)
337
338         case *msgs.GetBlocksMessage:
339                 m.handleGetBlocksMsg(peer, msg)
340
341         case *msgs.BlocksMessage:
342                 m.handleBlocksMsg(peer, msg)
343
344         case *msgs.FilterLoadMessage:
345                 m.handleFilterLoadMsg(peer, msg)
346
347         case *msgs.FilterAddMessage:
348                 m.handleFilterAddMsg(peer, msg)
349
350         case *msgs.FilterClearMessage:
351                 m.handleFilterClearMsg(peer)
352
353         case *msgs.GetMerkleBlockMessage:
354                 m.handleGetMerkleBlockMsg(peer, msg)
355
356         default:
357                 log.WithFields(log.Fields{
358                         "module":       logModule,
359                         "peer":         basePeer.Addr(),
360                         "message_type": reflect.TypeOf(msg),
361                 }).Error("unhandled message type")
362         }
363 }
364
365 // RemovePeer delete peer for peer set
366 func (m *Manager) RemovePeer(peerID string) {
367         m.peers.RemovePeer(peerID)
368 }
369
370 // SendStatus sent the current self status to remote peer
371 func (m *Manager) SendStatus(peer peers.BasePeer) error {
372         p := m.peers.GetPeer(peer.ID())
373         if p == nil {
374                 return errors.New("invalid peer")
375         }
376
377         if err := p.SendStatus(m.chain.BestBlockHeader(), m.chain.LastIrreversibleHeader()); err != nil {
378                 m.peers.RemovePeer(p.ID())
379                 return err
380         }
381         return nil
382 }
383
384 // Start the network logic layer
385 func (m *Manager) Start() error {
386         var err error
387         m.txMsgSub, err = m.eventDispatcher.Subscribe(core.TxMsgEvent{})
388         if err != nil {
389                 return err
390         }
391         m.blockKeeper.start()
392         go m.broadcastTxsLoop()
393         go m.syncMempoolLoop()
394
395         return nil
396 }
397
398 //Stop stop sync manager
399 func (m *Manager) Stop() {
400         m.blockKeeper.stop()
401         close(m.quit)
402 }