OSDN Git Service

Merge pull request #473 from Bytom/master
[bytom/vapor.git] / netsync / chainmgr / handle.go
1 package chainmgr
2
3 import (
4         "errors"
5         "reflect"
6
7         log "github.com/sirupsen/logrus"
8
9         cfg "github.com/bytom/vapor/config"
10         "github.com/bytom/vapor/consensus"
11         dbm "github.com/bytom/vapor/database/leveldb"
12         "github.com/bytom/vapor/event"
13         msgs "github.com/bytom/vapor/netsync/messages"
14         "github.com/bytom/vapor/netsync/peers"
15         "github.com/bytom/vapor/p2p"
16         "github.com/bytom/vapor/p2p/security"
17         core "github.com/bytom/vapor/protocol"
18         "github.com/bytom/vapor/protocol/bc"
19         "github.com/bytom/vapor/protocol/bc/types"
20 )
21
22 const (
23         logModule = "netsync"
24 )
25
26 // Chain is the interface for Bytom core
27 type Chain interface {
28         BestBlockHeader() *types.BlockHeader
29         LastIrreversibleHeader() *types.BlockHeader
30         BestBlockHeight() uint64
31         GetBlockByHash(*bc.Hash) (*types.Block, error)
32         GetBlockByHeight(uint64) (*types.Block, error)
33         GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
34         GetHeaderByHeight(uint64) (*types.BlockHeader, error)
35         GetTransactionStatus(*bc.Hash) (*bc.TransactionStatus, error)
36         InMainChain(bc.Hash) bool
37         ProcessBlock(*types.Block) (bool, error)
38         ValidateTx(*types.Tx) (bool, error)
39 }
40
41 type Switch interface {
42         AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
43         Start() (bool, error)
44         Stop() bool
45         IsListening() bool
46         DialPeerWithAddress(addr *p2p.NetAddress) error
47         Peers() *p2p.PeerSet
48 }
49
50 // Mempool is the interface for Bytom mempool
51 type Mempool interface {
52         GetTransactions() []*core.TxDesc
53         IsDust(tx *types.Tx) bool
54 }
55
56 //Manager is responsible for the business layer information synchronization
57 type Manager struct {
58         sw          Switch
59         chain       Chain
60         mempool     Mempool
61         blockKeeper *blockKeeper
62         peers       *peers.PeerSet
63
64         txSyncCh chan *txSyncMsg
65         quit     chan struct{}
66         config   *cfg.Config
67
68         eventDispatcher *event.Dispatcher
69         txMsgSub        *event.Subscription
70 }
71
72 //NewManager create a chain sync manager.
73 func NewManager(config *cfg.Config, sw Switch, chain Chain, mempool Mempool, dispatcher *event.Dispatcher, peers *peers.PeerSet, fastSyncDB dbm.DB) (*Manager, error) {
74         manager := &Manager{
75                 sw:              sw,
76                 mempool:         mempool,
77                 chain:           chain,
78                 blockKeeper:     newBlockKeeper(chain, peers, fastSyncDB),
79                 peers:           peers,
80                 txSyncCh:        make(chan *txSyncMsg),
81                 quit:            make(chan struct{}),
82                 config:          config,
83                 eventDispatcher: dispatcher,
84         }
85
86         if !config.VaultMode {
87                 protocolReactor := NewProtocolReactor(manager)
88                 manager.sw.AddReactor("PROTOCOL", protocolReactor)
89         }
90         return manager, nil
91 }
92
93 func (m *Manager) AddPeer(peer peers.BasePeer) {
94         m.peers.AddPeer(peer)
95 }
96
97 //IsCaughtUp check wheather the peer finish the sync
98 func (m *Manager) IsCaughtUp() bool {
99         peer := m.peers.BestPeer(consensus.SFFullNode)
100         return peer == nil || peer.Height() <= m.chain.BestBlockHeight()
101 }
102
103 func (m *Manager) handleBlockMsg(peer *peers.Peer, msg *msgs.BlockMessage) {
104         block, err := msg.GetBlock()
105         if err != nil {
106                 return
107         }
108
109         m.blockKeeper.processBlock(peer.ID(), block)
110 }
111
112 func (m *Manager) handleBlocksMsg(peer *peers.Peer, msg *msgs.BlocksMessage) {
113         blocks, err := msg.GetBlocks()
114         if err != nil {
115                 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleBlocksMsg GetBlocks")
116                 return
117         }
118
119         m.blockKeeper.processBlocks(peer.ID(), blocks)
120 }
121
122 func (m *Manager) handleFilterAddMsg(peer *peers.Peer, msg *msgs.FilterAddMessage) {
123         peer.AddFilterAddress(msg.Address)
124 }
125
126 func (m *Manager) handleFilterClearMsg(peer *peers.Peer) {
127         peer.FilterClear()
128 }
129
130 func (m *Manager) handleFilterLoadMsg(peer *peers.Peer, msg *msgs.FilterLoadMessage) {
131         peer.AddFilterAddresses(msg.Addresses)
132 }
133
134 func (m *Manager) handleGetBlockMsg(peer *peers.Peer, msg *msgs.GetBlockMessage) {
135         var block *types.Block
136         var err error
137         if msg.Height != 0 {
138                 block, err = m.chain.GetBlockByHeight(msg.Height)
139         } else {
140                 block, err = m.chain.GetBlockByHash(msg.GetHash())
141         }
142         if err != nil {
143                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetBlockMsg get block from chain")
144                 return
145         }
146
147         ok, err := peer.SendBlock(block)
148         if !ok {
149                 m.peers.RemovePeer(peer.ID())
150         }
151         if err != nil {
152                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlockMsg sentBlock")
153         }
154 }
155
156 func (m *Manager) handleGetBlocksMsg(peer *peers.Peer, msg *msgs.GetBlocksMessage) {
157         blocks, err := m.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
158         if err != nil || len(blocks) == 0 {
159                 return
160         }
161
162         totalSize := 0
163         sendBlocks := []*types.Block{}
164         for _, block := range blocks {
165                 rawData, err := block.MarshalText()
166                 if err != nil {
167                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg marshal block")
168                         return
169                 }
170
171                 if totalSize+len(rawData) > msgs.MaxBlockchainResponseSize/2 {
172                         break
173                 }
174                 totalSize += len(rawData)
175                 sendBlocks = append(sendBlocks, block)
176         }
177
178         ok, err := peer.SendBlocks(sendBlocks)
179         if !ok {
180                 m.peers.RemovePeer(peer.ID())
181         }
182         if err != nil {
183                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg sentBlock")
184         }
185 }
186
187 func (m *Manager) handleGetHeadersMsg(peer *peers.Peer, msg *msgs.GetHeadersMessage) {
188         headers, err := m.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash(), msg.GetSkip(), maxNumOfHeadersPerMsg)
189         if err != nil || len(headers) == 0 {
190                 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleGetHeadersMsg locateHeaders")
191                 return
192         }
193
194         ok, err := peer.SendHeaders(headers)
195         if !ok {
196                 m.peers.RemovePeer(peer.ID())
197         }
198         if err != nil {
199                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetHeadersMsg sentBlock")
200         }
201 }
202
203 func (m *Manager) handleGetMerkleBlockMsg(peer *peers.Peer, msg *msgs.GetMerkleBlockMessage) {
204         var err error
205         var block *types.Block
206         if msg.Height != 0 {
207                 block, err = m.chain.GetBlockByHeight(msg.Height)
208         } else {
209                 block, err = m.chain.GetBlockByHash(msg.GetHash())
210         }
211         if err != nil {
212                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get block from chain")
213                 return
214         }
215
216         blockHash := block.Hash()
217         txStatus, err := m.chain.GetTransactionStatus(&blockHash)
218         if err != nil {
219                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get transaction status")
220                 return
221         }
222
223         ok, err := peer.SendMerkleBlock(block, txStatus)
224         if err != nil {
225                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetMerkleBlockMsg sentMerkleBlock")
226                 return
227         }
228
229         if !ok {
230                 m.peers.RemovePeer(peer.ID())
231         }
232 }
233
234 func (m *Manager) handleHeadersMsg(peer *peers.Peer, msg *msgs.HeadersMessage) {
235         headers, err := msg.GetHeaders()
236         if err != nil {
237                 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleHeadersMsg GetHeaders")
238                 return
239         }
240
241         m.blockKeeper.processHeaders(peer.ID(), headers)
242 }
243
244 func (m *Manager) handleStatusMsg(basePeer peers.BasePeer, msg *msgs.StatusMessage) {
245         if peer := m.peers.GetPeer(basePeer.ID()); peer != nil {
246                 peer.SetBestStatus(msg.BestHeight, msg.GetBestHash())
247                 peer.SetIrreversibleStatus(msg.IrreversibleHeight, msg.GetIrreversibleHash())
248         }
249 }
250
251 func (m *Manager) handleTransactionMsg(peer *peers.Peer, msg *msgs.TransactionMessage) {
252         tx, err := msg.GetTransaction()
253         if err != nil {
254                 m.peers.ProcessIllegal(peer.ID(), security.LevelConnException, "fail on get tx from message")
255                 return
256         }
257
258         if m.mempool.IsDust(tx) {
259                 m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "receive dust tx msg")
260                 return
261         }
262
263         m.peers.MarkTx(peer.ID(), tx.ID)
264         if isOrphan, err := m.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
265                 m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "fail on validate tx transaction")
266         }
267 }
268
269 func (m *Manager) handleTransactionsMsg(peer *peers.Peer, msg *msgs.TransactionsMessage) {
270         txs, err := msg.GetTransactions()
271         if err != nil {
272                 m.peers.ProcessIllegal(peer.ID(), security.LevelConnException, "fail on get txs from message")
273                 return
274         }
275
276         if len(txs) > msgs.TxsMsgMaxTxNum {
277                 m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "exceeded the maximum tx number limit")
278                 return
279         }
280
281         for _, tx := range txs {
282                 if m.mempool.IsDust(tx) {
283                         m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "receive dust txs msg")
284                         continue
285                 }
286
287                 m.peers.MarkTx(peer.ID(), tx.ID)
288                 if isOrphan, err := m.chain.ValidateTx(tx); err != nil && !isOrphan {
289                         m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "fail on validate tx transaction")
290                         return
291                 }
292         }
293 }
294
295 func (m *Manager) processMsg(basePeer peers.BasePeer, msgType byte, msg msgs.BlockchainMessage) {
296         peer := m.peers.GetPeer(basePeer.ID())
297         if peer == nil {
298                 return
299         }
300
301         log.WithFields(log.Fields{
302                 "module":  logModule,
303                 "peer":    basePeer.Addr(),
304                 "type":    reflect.TypeOf(msg),
305                 "message": msg.String(),
306         }).Debug("receive message from peer")
307
308         switch msg := msg.(type) {
309         case *msgs.GetBlockMessage:
310                 m.handleGetBlockMsg(peer, msg)
311
312         case *msgs.BlockMessage:
313                 m.handleBlockMsg(peer, msg)
314
315         case *msgs.StatusMessage:
316                 m.handleStatusMsg(basePeer, msg)
317
318         case *msgs.TransactionMessage:
319                 m.handleTransactionMsg(peer, msg)
320
321         case *msgs.TransactionsMessage:
322                 m.handleTransactionsMsg(peer, msg)
323
324         case *msgs.GetHeadersMessage:
325                 m.handleGetHeadersMsg(peer, msg)
326
327         case *msgs.HeadersMessage:
328                 m.handleHeadersMsg(peer, msg)
329
330         case *msgs.GetBlocksMessage:
331                 m.handleGetBlocksMsg(peer, msg)
332
333         case *msgs.BlocksMessage:
334                 m.handleBlocksMsg(peer, msg)
335
336         case *msgs.FilterLoadMessage:
337                 m.handleFilterLoadMsg(peer, msg)
338
339         case *msgs.FilterAddMessage:
340                 m.handleFilterAddMsg(peer, msg)
341
342         case *msgs.FilterClearMessage:
343                 m.handleFilterClearMsg(peer)
344
345         case *msgs.GetMerkleBlockMessage:
346                 m.handleGetMerkleBlockMsg(peer, msg)
347
348         default:
349                 log.WithFields(log.Fields{
350                         "module":       logModule,
351                         "peer":         basePeer.Addr(),
352                         "message_type": reflect.TypeOf(msg),
353                 }).Error("unhandled message type")
354         }
355 }
356
357 func (m *Manager) RemovePeer(peerID string) {
358         m.peers.RemovePeer(peerID)
359 }
360
361 func (m *Manager) SendStatus(peer peers.BasePeer) error {
362         p := m.peers.GetPeer(peer.ID())
363         if p == nil {
364                 return errors.New("invalid peer")
365         }
366
367         if err := p.SendStatus(m.chain.BestBlockHeader(), m.chain.LastIrreversibleHeader()); err != nil {
368                 m.peers.RemovePeer(p.ID())
369                 return err
370         }
371         return nil
372 }
373
374 func (m *Manager) Start() error {
375         var err error
376         m.txMsgSub, err = m.eventDispatcher.Subscribe(core.TxMsgEvent{})
377         if err != nil {
378                 return err
379         }
380         m.blockKeeper.start()
381         go m.broadcastTxsLoop()
382         go m.syncMempoolLoop()
383
384         return nil
385 }
386
387 //Stop stop sync manager
388 func (m *Manager) Stop() {
389         m.blockKeeper.stop()
390         close(m.quit)
391 }