OSDN Git Service

keep last irreversible block on the main chain (#245)
[bytom/vapor.git] / netsync / chainmgr / handle.go
1 package chainmgr
2
3 import (
4         "errors"
5         "reflect"
6
7         log "github.com/sirupsen/logrus"
8
9         cfg "github.com/vapor/config"
10         "github.com/vapor/consensus"
11         "github.com/vapor/event"
12         msgs "github.com/vapor/netsync/messages"
13         "github.com/vapor/netsync/peers"
14         "github.com/vapor/p2p"
15         "github.com/vapor/p2p/security"
16         core "github.com/vapor/protocol"
17         "github.com/vapor/protocol/bc"
18         "github.com/vapor/protocol/bc/types"
19 )
20
21 const (
22         logModule = "netsync"
23 )
24
25 // Chain is the interface for Bytom core
26 type Chain interface {
27         BestBlockHeader() *types.BlockHeader
28         LastIrreversibleHeader() *types.BlockHeader
29         BestBlockHeight() uint64
30         GetBlockByHash(*bc.Hash) (*types.Block, error)
31         GetBlockByHeight(uint64) (*types.Block, error)
32         GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
33         GetHeaderByHeight(uint64) (*types.BlockHeader, error)
34         GetTransactionStatus(*bc.Hash) (*bc.TransactionStatus, error)
35         InMainChain(bc.Hash) bool
36         ProcessBlock(*types.Block) (bool, error)
37         ValidateTx(*types.Tx) (bool, error)
38 }
39
40 type Switch interface {
41         AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
42         Start() (bool, error)
43         Stop() bool
44         IsListening() bool
45         DialPeerWithAddress(addr *p2p.NetAddress) error
46         Peers() *p2p.PeerSet
47 }
48
49 // Mempool is the interface for Bytom mempool
50 type Mempool interface {
51         GetTransactions() []*core.TxDesc
52 }
53
54 //Manager is responsible for the business layer information synchronization
55 type Manager struct {
56         sw          Switch
57         chain       Chain
58         mempool     Mempool
59         blockKeeper *blockKeeper
60         peers       *peers.PeerSet
61
62         txSyncCh chan *txSyncMsg
63         quit     chan struct{}
64         config   *cfg.Config
65
66         eventDispatcher *event.Dispatcher
67         txMsgSub        *event.Subscription
68 }
69
70 //NewChainManager create a chain sync manager.
71 func NewManager(config *cfg.Config, sw Switch, chain Chain, mempool Mempool, dispatcher *event.Dispatcher, peers *peers.PeerSet) (*Manager, error) {
72         manager := &Manager{
73                 sw:              sw,
74                 mempool:         mempool,
75                 chain:           chain,
76                 blockKeeper:     newBlockKeeper(chain, peers),
77                 peers:           peers,
78                 txSyncCh:        make(chan *txSyncMsg),
79                 quit:            make(chan struct{}),
80                 config:          config,
81                 eventDispatcher: dispatcher,
82         }
83
84         if !config.VaultMode {
85                 protocolReactor := NewProtocolReactor(manager)
86                 manager.sw.AddReactor("PROTOCOL", protocolReactor)
87         }
88         return manager, nil
89 }
90
91 func (m *Manager) AddPeer(peer peers.BasePeer) {
92         m.peers.AddPeer(peer)
93 }
94
95 //IsCaughtUp check wheather the peer finish the sync
96 func (m *Manager) IsCaughtUp() bool {
97         peer := m.peers.BestPeer(consensus.SFFullNode)
98         return peer == nil || peer.Height() <= m.chain.BestBlockHeight()
99 }
100
101 func (m *Manager) handleBlockMsg(peer *peers.Peer, msg *msgs.BlockMessage) {
102         block, err := msg.GetBlock()
103         if err != nil {
104                 return
105         }
106         m.blockKeeper.processBlock(peer.ID(), block)
107 }
108
109 func (m *Manager) handleBlocksMsg(peer *peers.Peer, msg *msgs.BlocksMessage) {
110         blocks, err := msg.GetBlocks()
111         if err != nil {
112                 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleBlocksMsg GetBlocks")
113                 return
114         }
115
116         m.blockKeeper.processBlocks(peer.ID(), blocks)
117 }
118
119 func (m *Manager) handleFilterAddMsg(peer *peers.Peer, msg *msgs.FilterAddMessage) {
120         peer.AddFilterAddress(msg.Address)
121 }
122
123 func (m *Manager) handleFilterClearMsg(peer *peers.Peer) {
124         peer.FilterClear()
125 }
126
127 func (m *Manager) handleFilterLoadMsg(peer *peers.Peer, msg *msgs.FilterLoadMessage) {
128         peer.AddFilterAddresses(msg.Addresses)
129 }
130
131 func (m *Manager) handleGetBlockMsg(peer *peers.Peer, msg *msgs.GetBlockMessage) {
132         var block *types.Block
133         var err error
134         if msg.Height != 0 {
135                 block, err = m.chain.GetBlockByHeight(msg.Height)
136         } else {
137                 block, err = m.chain.GetBlockByHash(msg.GetHash())
138         }
139         if err != nil {
140                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetBlockMsg get block from chain")
141                 return
142         }
143
144         ok, err := peer.SendBlock(block)
145         if !ok {
146                 m.peers.RemovePeer(peer.ID())
147         }
148         if err != nil {
149                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlockMsg sentBlock")
150         }
151 }
152
153 func (m *Manager) handleGetBlocksMsg(peer *peers.Peer, msg *msgs.GetBlocksMessage) {
154         blocks, err := m.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
155         if err != nil || len(blocks) == 0 {
156                 return
157         }
158
159         totalSize := 0
160         sendBlocks := []*types.Block{}
161         for _, block := range blocks {
162                 rawData, err := block.MarshalText()
163                 if err != nil {
164                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg marshal block")
165                         return
166                 }
167
168                 if totalSize+len(rawData) > msgs.MaxBlockchainResponseSize/2 {
169                         break
170                 }
171                 totalSize += len(rawData)
172                 sendBlocks = append(sendBlocks, block)
173         }
174
175         ok, err := peer.SendBlocks(sendBlocks)
176         if !ok {
177                 m.peers.RemovePeer(peer.ID())
178         }
179         if err != nil {
180                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg sentBlock")
181         }
182 }
183
184 func (m *Manager) handleGetHeadersMsg(peer *peers.Peer, msg *msgs.GetHeadersMessage) {
185         headers, err := m.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash(), msg.GetSkip(), maxHeadersPerMsg)
186         if err != nil || len(headers) == 0 {
187                 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleGetHeadersMsg locateHeaders")
188                 return
189         }
190
191         ok, err := peer.SendHeaders(headers)
192         if !ok {
193                 m.peers.RemovePeer(peer.ID())
194         }
195         if err != nil {
196                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetHeadersMsg sentBlock")
197         }
198 }
199
200 func (m *Manager) handleGetMerkleBlockMsg(peer *peers.Peer, msg *msgs.GetMerkleBlockMessage) {
201         var err error
202         var block *types.Block
203         if msg.Height != 0 {
204                 block, err = m.chain.GetBlockByHeight(msg.Height)
205         } else {
206                 block, err = m.chain.GetBlockByHash(msg.GetHash())
207         }
208         if err != nil {
209                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get block from chain")
210                 return
211         }
212
213         blockHash := block.Hash()
214         txStatus, err := m.chain.GetTransactionStatus(&blockHash)
215         if err != nil {
216                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get transaction status")
217                 return
218         }
219
220         ok, err := peer.SendMerkleBlock(block, txStatus)
221         if err != nil {
222                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetMerkleBlockMsg sentMerkleBlock")
223                 return
224         }
225
226         if !ok {
227                 m.peers.RemovePeer(peer.ID())
228         }
229 }
230
231 func (m *Manager) handleHeadersMsg(peer *peers.Peer, msg *msgs.HeadersMessage) {
232         headers, err := msg.GetHeaders()
233         if err != nil {
234                 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleHeadersMsg GetHeaders")
235                 return
236         }
237
238         m.blockKeeper.processHeaders(peer.ID(), headers)
239 }
240
241 func (m *Manager) handleStatusMsg(basePeer peers.BasePeer, msg *msgs.StatusMessage) {
242         if peer := m.peers.GetPeer(basePeer.ID()); peer != nil {
243                 peer.SetBestStatus(msg.BestHeight, msg.GetBestHash())
244                 peer.SetIrreversibleStatus(msg.IrreversibleHeight, msg.GetIrreversibleHash())
245         }
246 }
247
248 func (m *Manager) handleTransactionMsg(peer *peers.Peer, msg *msgs.TransactionMessage) {
249         tx, err := msg.GetTransaction()
250         if err != nil {
251                 m.peers.ProcessIllegal(peer.ID(), security.LevelConnException, "fail on get tx from message")
252                 return
253         }
254
255         if isOrphan, err := m.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
256                 m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "fail on validate tx transaction")
257         }
258         m.peers.MarkTx(peer.ID(), tx.ID)
259 }
260
261 func (m *Manager) handleTransactionsMsg(peer *peers.Peer, msg *msgs.TransactionsMessage) {
262         txs, err := msg.GetTransactions()
263         if err != nil {
264                 m.peers.ProcessIllegal(peer.ID(), security.LevelConnException, "fail on get txs from message")
265                 return
266         }
267
268         if len(txs) > msgs.TxsMsgMaxTxNum {
269                 m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "exceeded the maximum tx number limit")
270                 return
271         }
272
273         for _, tx := range txs {
274                 if isOrphan, err := m.chain.ValidateTx(tx); err != nil && !isOrphan {
275                         m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "fail on validate tx transaction")
276                         return
277                 }
278                 m.peers.MarkTx(peer.ID(), tx.ID)
279         }
280 }
281
282 func (m *Manager) processMsg(basePeer peers.BasePeer, msgType byte, msg msgs.BlockchainMessage) {
283         peer := m.peers.GetPeer(basePeer.ID())
284         if peer == nil {
285                 return
286         }
287
288         log.WithFields(log.Fields{
289                 "module":  logModule,
290                 "peer":    basePeer.Addr(),
291                 "type":    reflect.TypeOf(msg),
292                 "message": msg.String(),
293         }).Debug("receive message from peer")
294
295         switch msg := msg.(type) {
296         case *msgs.GetBlockMessage:
297                 m.handleGetBlockMsg(peer, msg)
298
299         case *msgs.BlockMessage:
300                 m.handleBlockMsg(peer, msg)
301
302         case *msgs.StatusMessage:
303                 m.handleStatusMsg(basePeer, msg)
304
305         case *msgs.TransactionMessage:
306                 m.handleTransactionMsg(peer, msg)
307
308         case *msgs.TransactionsMessage:
309                 m.handleTransactionsMsg(peer, msg)
310
311         case *msgs.GetHeadersMessage:
312                 m.handleGetHeadersMsg(peer, msg)
313
314         case *msgs.HeadersMessage:
315                 m.handleHeadersMsg(peer, msg)
316
317         case *msgs.GetBlocksMessage:
318                 m.handleGetBlocksMsg(peer, msg)
319
320         case *msgs.BlocksMessage:
321                 m.handleBlocksMsg(peer, msg)
322
323         case *msgs.FilterLoadMessage:
324                 m.handleFilterLoadMsg(peer, msg)
325
326         case *msgs.FilterAddMessage:
327                 m.handleFilterAddMsg(peer, msg)
328
329         case *msgs.FilterClearMessage:
330                 m.handleFilterClearMsg(peer)
331
332         case *msgs.GetMerkleBlockMessage:
333                 m.handleGetMerkleBlockMsg(peer, msg)
334
335         default:
336                 log.WithFields(log.Fields{
337                         "module":       logModule,
338                         "peer":         basePeer.Addr(),
339                         "message_type": reflect.TypeOf(msg),
340                 }).Error("unhandled message type")
341         }
342 }
343
344 func (m *Manager) RemovePeer(peerID string) {
345         m.peers.RemovePeer(peerID)
346 }
347
348 func (m *Manager) SendStatus(peer peers.BasePeer) error {
349         p := m.peers.GetPeer(peer.ID())
350         if p == nil {
351                 return errors.New("invalid peer")
352         }
353
354         if err := p.SendStatus(m.chain.BestBlockHeader(), m.chain.LastIrreversibleHeader()); err != nil {
355                 m.peers.RemovePeer(p.ID())
356                 return err
357         }
358         return nil
359 }
360
361 func (m *Manager) Start() error {
362         var err error
363         m.txMsgSub, err = m.eventDispatcher.Subscribe(core.TxMsgEvent{})
364         if err != nil {
365                 return err
366         }
367         m.blockKeeper.start()
368         go m.broadcastTxsLoop()
369         go m.syncMempoolLoop()
370
371         return nil
372 }
373
374 //Stop stop sync manager
375 func (m *Manager) Stop() {
376         m.blockKeeper.stop()
377         close(m.quit)
378 }