OSDN Git Service

versoin1.1.9 (#594)
[bytom/vapor.git] / netsync / chainmgr / handle.go
1 package chainmgr
2
3 import (
4         "errors"
5         "reflect"
6         "time"
7
8         log "github.com/sirupsen/logrus"
9
10         cfg "github.com/bytom/vapor/config"
11         "github.com/bytom/vapor/consensus"
12         dbm "github.com/bytom/vapor/database/leveldb"
13         "github.com/bytom/vapor/event"
14         msgs "github.com/bytom/vapor/netsync/messages"
15         "github.com/bytom/vapor/netsync/peers"
16         "github.com/bytom/vapor/p2p"
17         "github.com/bytom/vapor/p2p/security"
18         core "github.com/bytom/vapor/protocol"
19         "github.com/bytom/vapor/protocol/bc"
20         "github.com/bytom/vapor/protocol/bc/types"
21 )
22
23 const (
24         logModule = "netsync"
25 )
26
27 // Chain is the interface for Bytom core
28 type Chain interface {
29         BestBlockHeader() *types.BlockHeader
30         LastIrreversibleHeader() *types.BlockHeader
31         BestBlockHeight() uint64
32         GetBlockByHash(*bc.Hash) (*types.Block, error)
33         GetBlockByHeight(uint64) (*types.Block, error)
34         GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
35         GetHeaderByHeight(uint64) (*types.BlockHeader, error)
36         GetTransactionStatus(*bc.Hash) (*bc.TransactionStatus, error)
37         InMainChain(bc.Hash) bool
38         ProcessBlock(*types.Block) (bool, error)
39         ValidateTx(*types.Tx) (bool, error)
40 }
41
42 // Switch is the interface for network layer
43 type Switch interface {
44         AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
45         Start() (bool, error)
46         Stop() bool
47         IsListening() bool
48         DialPeerWithAddress(addr *p2p.NetAddress) error
49         Peers() *p2p.PeerSet
50 }
51
52 // Mempool is the interface for Bytom mempool
53 type Mempool interface {
54         GetTransactions() []*core.TxDesc
55         IsDust(tx *types.Tx) bool
56 }
57
58 //Manager is responsible for the business layer information synchronization
59 type Manager struct {
60         sw          Switch
61         chain       Chain
62         mempool     Mempool
63         blockKeeper *blockKeeper
64         peers       *peers.PeerSet
65
66         txSyncCh chan *txSyncMsg
67         quit     chan struct{}
68         config   *cfg.Config
69
70         eventDispatcher *event.Dispatcher
71         txMsgSub        *event.Subscription
72 }
73
74 //NewManager create a chain sync manager.
75 func NewManager(config *cfg.Config, sw Switch, chain Chain, mempool Mempool, dispatcher *event.Dispatcher, peers *peers.PeerSet, fastSyncDB dbm.DB) (*Manager, error) {
76         manager := &Manager{
77                 sw:              sw,
78                 mempool:         mempool,
79                 chain:           chain,
80                 blockKeeper:     newBlockKeeper(chain, peers, fastSyncDB),
81                 peers:           peers,
82                 txSyncCh:        make(chan *txSyncMsg),
83                 quit:            make(chan struct{}),
84                 config:          config,
85                 eventDispatcher: dispatcher,
86         }
87
88         if !config.VaultMode {
89                 protocolReactor := NewProtocolReactor(manager)
90                 manager.sw.AddReactor("PROTOCOL", protocolReactor)
91         }
92         return manager, nil
93 }
94
95 // AddPeer add the network layer peer to logic layer
96 func (m *Manager) AddPeer(peer peers.BasePeer) {
97         m.peers.AddPeer(peer)
98 }
99
100 //IsCaughtUp check wheather the peer finish the sync
101 func (m *Manager) IsCaughtUp() bool {
102         peer := m.peers.BestPeer(consensus.SFFullNode)
103         return peer == nil || peer.Height() <= m.chain.BestBlockHeight()
104 }
105
106 func (m *Manager) handleBlockMsg(peer *peers.Peer, msg *msgs.BlockMessage) {
107         block, err := msg.GetBlock()
108         if err != nil {
109                 return
110         }
111
112         m.blockKeeper.processBlock(peer.ID(), block)
113 }
114
115 func (m *Manager) handleBlocksMsg(peer *peers.Peer, msg *msgs.BlocksMessage) {
116         blocks, err := msg.GetBlocks()
117         if err != nil {
118                 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleBlocksMsg GetBlocks")
119                 return
120         }
121
122         m.blockKeeper.processBlocks(peer.ID(), blocks)
123 }
124
125 func (m *Manager) handleFilterAddMsg(peer *peers.Peer, msg *msgs.FilterAddMessage) {
126         peer.AddFilterAddress(msg.Address)
127 }
128
129 func (m *Manager) handleFilterClearMsg(peer *peers.Peer) {
130         peer.FilterClear()
131 }
132
133 func (m *Manager) handleFilterLoadMsg(peer *peers.Peer, msg *msgs.FilterLoadMessage) {
134         peer.AddFilterAddresses(msg.Addresses)
135 }
136
137 func (m *Manager) handleGetBlockMsg(peer *peers.Peer, msg *msgs.GetBlockMessage) {
138         var block *types.Block
139         var err error
140         if msg.Height != 0 {
141                 block, err = m.chain.GetBlockByHeight(msg.Height)
142         } else {
143                 block, err = m.chain.GetBlockByHash(msg.GetHash())
144         }
145         if err != nil {
146                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetBlockMsg get block from chain")
147                 return
148         }
149
150         ok, err := peer.SendBlock(block)
151         if !ok {
152                 m.peers.RemovePeer(peer.ID())
153         }
154         if err != nil {
155                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlockMsg sentBlock")
156         }
157 }
158
159 func (m *Manager) handleGetBlocksMsg(peer *peers.Peer, msg *msgs.GetBlocksMessage) {
160         endTime := time.Now().Add(requireBlocksTimeout / 10)
161         isTimeout := func() bool {
162                 return time.Now().After(endTime)
163         }
164
165         blocks, err := m.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash(), isTimeout)
166         if err != nil || len(blocks) == 0 {
167                 log.WithFields(log.Fields{
168                         "module": logModule,
169                         "err":    err,
170                         "size":   len(blocks),
171                 }).Error("fail on handleGetBlocksMsg locateBlocks")
172                 return
173         }
174
175         totalSize := 0
176         sendBlocks := []*types.Block{}
177         for _, block := range blocks {
178                 rawData, err := block.MarshalText()
179                 if err != nil {
180                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg marshal block")
181                         return
182                 }
183
184                 if totalSize+len(rawData) > msgs.MaxBlockchainResponseSize/2 {
185                         break
186                 }
187                 totalSize += len(rawData)
188                 sendBlocks = append(sendBlocks, block)
189         }
190
191         ok, err := peer.SendBlocks(sendBlocks)
192         if !ok {
193                 m.peers.RemovePeer(peer.ID())
194         }
195         if err != nil {
196                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg sentBlock")
197         }
198 }
199
200 func (m *Manager) handleGetHeadersMsg(peer *peers.Peer, msg *msgs.GetHeadersMessage) {
201         headers, err := m.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash(), msg.GetSkip(), maxNumOfHeadersPerMsg)
202         if err != nil || len(headers) == 0 {
203                 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleGetHeadersMsg locateHeaders")
204                 return
205         }
206
207         ok, err := peer.SendHeaders(headers)
208         if !ok {
209                 m.peers.RemovePeer(peer.ID())
210         }
211         if err != nil {
212                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetHeadersMsg sentBlock")
213         }
214 }
215
216 func (m *Manager) handleGetMerkleBlockMsg(peer *peers.Peer, msg *msgs.GetMerkleBlockMessage) {
217         var err error
218         var block *types.Block
219         if msg.Height != 0 {
220                 block, err = m.chain.GetBlockByHeight(msg.Height)
221         } else {
222                 block, err = m.chain.GetBlockByHash(msg.GetHash())
223         }
224         if err != nil {
225                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get block from chain")
226                 return
227         }
228
229         blockHash := block.Hash()
230         txStatus, err := m.chain.GetTransactionStatus(&blockHash)
231         if err != nil {
232                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get transaction status")
233                 return
234         }
235
236         ok, err := peer.SendMerkleBlock(block, txStatus)
237         if err != nil {
238                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetMerkleBlockMsg sentMerkleBlock")
239                 return
240         }
241
242         if !ok {
243                 m.peers.RemovePeer(peer.ID())
244         }
245 }
246
247 func (m *Manager) handleHeadersMsg(peer *peers.Peer, msg *msgs.HeadersMessage) {
248         headers, err := msg.GetHeaders()
249         if err != nil {
250                 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleHeadersMsg GetHeaders")
251                 return
252         }
253
254         m.blockKeeper.processHeaders(peer.ID(), headers)
255 }
256
257 func (m *Manager) handleStatusMsg(basePeer peers.BasePeer, msg *msgs.StatusMessage) {
258         if peer := m.peers.GetPeer(basePeer.ID()); peer != nil {
259                 peer.SetBestStatus(msg.BestHeight, msg.GetBestHash())
260                 peer.SetIrreversibleStatus(msg.IrreversibleHeight, msg.GetIrreversibleHash())
261         }
262 }
263
264 func (m *Manager) handleTransactionMsg(peer *peers.Peer, msg *msgs.TransactionMessage) {
265         tx, err := msg.GetTransaction()
266         if err != nil {
267                 m.peers.ProcessIllegal(peer.ID(), security.LevelConnException, "fail on get tx from message")
268                 return
269         }
270
271         if m.mempool.IsDust(tx) {
272                 log.WithFields(log.Fields{"tx_hash": tx.ID.String(), "peer": peer.Addr()}).Warn("receive dust tx msg")
273                 return
274         }
275
276         m.peers.MarkTx(peer.ID(), tx.ID)
277         if isOrphan, err := m.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
278                 m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "fail on validate tx transaction")
279         }
280 }
281
282 func (m *Manager) handleTransactionsMsg(peer *peers.Peer, msg *msgs.TransactionsMessage) {
283         txs, err := msg.GetTransactions()
284         if err != nil {
285                 m.peers.ProcessIllegal(peer.ID(), security.LevelConnException, "fail on get txs from message")
286                 return
287         }
288
289         if len(txs) > msgs.TxsMsgMaxTxNum {
290                 m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "exceeded the maximum tx number limit")
291                 return
292         }
293
294         for _, tx := range txs {
295                 if m.mempool.IsDust(tx) {
296                         m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "receive dust txs msg")
297                         continue
298                 }
299
300                 m.peers.MarkTx(peer.ID(), tx.ID)
301                 if isOrphan, err := m.chain.ValidateTx(tx); err != nil && !isOrphan {
302                         m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "fail on validate tx transaction")
303                         return
304                 }
305         }
306 }
307
308 func (m *Manager) processMsg(basePeer peers.BasePeer, msgType byte, msg msgs.BlockchainMessage) {
309         peer := m.peers.GetPeer(basePeer.ID())
310         if peer == nil {
311                 return
312         }
313
314         log.WithFields(log.Fields{
315                 "module":  logModule,
316                 "peer":    basePeer.Addr(),
317                 "type":    reflect.TypeOf(msg),
318                 "message": msg.String(),
319         }).Debug("receive message from peer")
320
321         switch msg := msg.(type) {
322         case *msgs.GetBlockMessage:
323                 m.handleGetBlockMsg(peer, msg)
324
325         case *msgs.BlockMessage:
326                 m.handleBlockMsg(peer, msg)
327
328         case *msgs.StatusMessage:
329                 m.handleStatusMsg(basePeer, msg)
330
331         case *msgs.TransactionMessage:
332                 m.handleTransactionMsg(peer, msg)
333
334         case *msgs.TransactionsMessage:
335                 m.handleTransactionsMsg(peer, msg)
336
337         case *msgs.GetHeadersMessage:
338                 m.handleGetHeadersMsg(peer, msg)
339
340         case *msgs.HeadersMessage:
341                 m.handleHeadersMsg(peer, msg)
342
343         case *msgs.GetBlocksMessage:
344                 m.handleGetBlocksMsg(peer, msg)
345
346         case *msgs.BlocksMessage:
347                 m.handleBlocksMsg(peer, msg)
348
349         case *msgs.FilterLoadMessage:
350                 m.handleFilterLoadMsg(peer, msg)
351
352         case *msgs.FilterAddMessage:
353                 m.handleFilterAddMsg(peer, msg)
354
355         case *msgs.FilterClearMessage:
356                 m.handleFilterClearMsg(peer)
357
358         case *msgs.GetMerkleBlockMessage:
359                 m.handleGetMerkleBlockMsg(peer, msg)
360
361         default:
362                 log.WithFields(log.Fields{
363                         "module":       logModule,
364                         "peer":         basePeer.Addr(),
365                         "message_type": reflect.TypeOf(msg),
366                 }).Error("unhandled message type")
367         }
368 }
369
370 // RemovePeer delete peer for peer set
371 func (m *Manager) RemovePeer(peerID string) {
372         m.peers.RemovePeer(peerID)
373 }
374
375 // SendStatus sent the current self status to remote peer
376 func (m *Manager) SendStatus(peer peers.BasePeer) error {
377         p := m.peers.GetPeer(peer.ID())
378         if p == nil {
379                 return errors.New("invalid peer")
380         }
381
382         if err := p.SendStatus(m.chain.BestBlockHeader(), m.chain.LastIrreversibleHeader()); err != nil {
383                 m.peers.RemovePeer(p.ID())
384                 return err
385         }
386         return nil
387 }
388
389 // Start the network logic layer
390 func (m *Manager) Start() error {
391         var err error
392         m.txMsgSub, err = m.eventDispatcher.Subscribe(core.TxMsgEvent{})
393         if err != nil {
394                 return err
395         }
396         m.blockKeeper.start()
397         go m.broadcastTxsLoop()
398         go m.syncMempoolLoop()
399
400         return nil
401 }
402
403 //Stop stop sync manager
404 func (m *Manager) Stop() {
405         m.blockKeeper.stop()
406         close(m.quit)
407 }