OSDN Git Service

filter out known txs (#280)
[bytom/vapor.git] / netsync / chainmgr / handle.go
1 package chainmgr
2
3 import (
4         "errors"
5         "reflect"
6
7         log "github.com/sirupsen/logrus"
8
9         cfg "github.com/vapor/config"
10         "github.com/vapor/consensus"
11         dbm "github.com/vapor/database/leveldb"
12         "github.com/vapor/event"
13         msgs "github.com/vapor/netsync/messages"
14         "github.com/vapor/netsync/peers"
15         "github.com/vapor/p2p"
16         "github.com/vapor/p2p/security"
17         core "github.com/vapor/protocol"
18         "github.com/vapor/protocol/bc"
19         "github.com/vapor/protocol/bc/types"
20 )
21
22 const (
23         logModule = "netsync"
24 )
25
26 // Chain is the interface for Bytom core
27 type Chain interface {
28         BestBlockHeader() *types.BlockHeader
29         LastIrreversibleHeader() *types.BlockHeader
30         BestBlockHeight() uint64
31         GetBlockByHash(*bc.Hash) (*types.Block, error)
32         GetBlockByHeight(uint64) (*types.Block, error)
33         GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
34         GetHeaderByHeight(uint64) (*types.BlockHeader, error)
35         GetTransactionStatus(*bc.Hash) (*bc.TransactionStatus, error)
36         InMainChain(bc.Hash) bool
37         ProcessBlock(*types.Block) (bool, error)
38         ValidateTx(*types.Tx) (bool, error)
39 }
40
41 type Switch interface {
42         AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
43         Start() (bool, error)
44         Stop() bool
45         IsListening() bool
46         DialPeerWithAddress(addr *p2p.NetAddress) error
47         Peers() *p2p.PeerSet
48 }
49
50 // Mempool is the interface for Bytom mempool
51 type Mempool interface {
52         GetTransactions() []*core.TxDesc
53 }
54
55 //Manager is responsible for the business layer information synchronization
56 type Manager struct {
57         sw          Switch
58         chain       Chain
59         mempool     Mempool
60         blockKeeper *blockKeeper
61         peers       *peers.PeerSet
62
63         txSyncCh chan *txSyncMsg
64         quit     chan struct{}
65         config   *cfg.Config
66
67         eventDispatcher *event.Dispatcher
68         txMsgSub        *event.Subscription
69 }
70
71 //NewManager create a chain sync manager.
72 func NewManager(config *cfg.Config, sw Switch, chain Chain, mempool Mempool, dispatcher *event.Dispatcher, peers *peers.PeerSet, fastSyncDB dbm.DB) (*Manager, error) {
73         manager := &Manager{
74                 sw:              sw,
75                 mempool:         mempool,
76                 chain:           chain,
77                 blockKeeper:     newBlockKeeper(chain, peers, fastSyncDB),
78                 peers:           peers,
79                 txSyncCh:        make(chan *txSyncMsg),
80                 quit:            make(chan struct{}),
81                 config:          config,
82                 eventDispatcher: dispatcher,
83         }
84
85         if !config.VaultMode {
86                 protocolReactor := NewProtocolReactor(manager)
87                 manager.sw.AddReactor("PROTOCOL", protocolReactor)
88         }
89         return manager, nil
90 }
91
92 func (m *Manager) AddPeer(peer peers.BasePeer) {
93         m.peers.AddPeer(peer)
94 }
95
96 //IsCaughtUp check wheather the peer finish the sync
97 func (m *Manager) IsCaughtUp() bool {
98         peer := m.peers.BestPeer(consensus.SFFullNode)
99         return peer == nil || peer.Height() <= m.chain.BestBlockHeight()
100 }
101
102 func (m *Manager) handleBlockMsg(peer *peers.Peer, msg *msgs.BlockMessage) {
103         block, err := msg.GetBlock()
104         if err != nil {
105                 return
106         }
107
108         m.blockKeeper.processBlock(peer.ID(), block)
109 }
110
111 func (m *Manager) handleBlocksMsg(peer *peers.Peer, msg *msgs.BlocksMessage) {
112         blocks, err := msg.GetBlocks()
113         if err != nil {
114                 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleBlocksMsg GetBlocks")
115                 return
116         }
117
118         m.blockKeeper.processBlocks(peer.ID(), blocks)
119 }
120
121 func (m *Manager) handleFilterAddMsg(peer *peers.Peer, msg *msgs.FilterAddMessage) {
122         peer.AddFilterAddress(msg.Address)
123 }
124
125 func (m *Manager) handleFilterClearMsg(peer *peers.Peer) {
126         peer.FilterClear()
127 }
128
129 func (m *Manager) handleFilterLoadMsg(peer *peers.Peer, msg *msgs.FilterLoadMessage) {
130         peer.AddFilterAddresses(msg.Addresses)
131 }
132
133 func (m *Manager) handleGetBlockMsg(peer *peers.Peer, msg *msgs.GetBlockMessage) {
134         var block *types.Block
135         var err error
136         if msg.Height != 0 {
137                 block, err = m.chain.GetBlockByHeight(msg.Height)
138         } else {
139                 block, err = m.chain.GetBlockByHash(msg.GetHash())
140         }
141         if err != nil {
142                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetBlockMsg get block from chain")
143                 return
144         }
145
146         ok, err := peer.SendBlock(block)
147         if !ok {
148                 m.peers.RemovePeer(peer.ID())
149         }
150         if err != nil {
151                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlockMsg sentBlock")
152         }
153 }
154
155 func (m *Manager) handleGetBlocksMsg(peer *peers.Peer, msg *msgs.GetBlocksMessage) {
156         blocks, err := m.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
157         if err != nil || len(blocks) == 0 {
158                 return
159         }
160
161         totalSize := 0
162         sendBlocks := []*types.Block{}
163         for _, block := range blocks {
164                 rawData, err := block.MarshalText()
165                 if err != nil {
166                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg marshal block")
167                         return
168                 }
169
170                 if totalSize+len(rawData) > msgs.MaxBlockchainResponseSize/2 {
171                         break
172                 }
173                 totalSize += len(rawData)
174                 sendBlocks = append(sendBlocks, block)
175         }
176
177         ok, err := peer.SendBlocks(sendBlocks)
178         if !ok {
179                 m.peers.RemovePeer(peer.ID())
180         }
181         if err != nil {
182                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg sentBlock")
183         }
184 }
185
186 func (m *Manager) handleGetHeadersMsg(peer *peers.Peer, msg *msgs.GetHeadersMessage) {
187         headers, err := m.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash(), msg.GetSkip(), maxNumOfHeadersPerMsg)
188         if err != nil || len(headers) == 0 {
189                 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleGetHeadersMsg locateHeaders")
190                 return
191         }
192
193         ok, err := peer.SendHeaders(headers)
194         if !ok {
195                 m.peers.RemovePeer(peer.ID())
196         }
197         if err != nil {
198                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetHeadersMsg sentBlock")
199         }
200 }
201
202 func (m *Manager) handleGetMerkleBlockMsg(peer *peers.Peer, msg *msgs.GetMerkleBlockMessage) {
203         var err error
204         var block *types.Block
205         if msg.Height != 0 {
206                 block, err = m.chain.GetBlockByHeight(msg.Height)
207         } else {
208                 block, err = m.chain.GetBlockByHash(msg.GetHash())
209         }
210         if err != nil {
211                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get block from chain")
212                 return
213         }
214
215         blockHash := block.Hash()
216         txStatus, err := m.chain.GetTransactionStatus(&blockHash)
217         if err != nil {
218                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get transaction status")
219                 return
220         }
221
222         ok, err := peer.SendMerkleBlock(block, txStatus)
223         if err != nil {
224                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetMerkleBlockMsg sentMerkleBlock")
225                 return
226         }
227
228         if !ok {
229                 m.peers.RemovePeer(peer.ID())
230         }
231 }
232
233 func (m *Manager) handleHeadersMsg(peer *peers.Peer, msg *msgs.HeadersMessage) {
234         headers, err := msg.GetHeaders()
235         if err != nil {
236                 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleHeadersMsg GetHeaders")
237                 return
238         }
239
240         m.blockKeeper.processHeaders(peer.ID(), headers)
241 }
242
243 func (m *Manager) handleStatusMsg(basePeer peers.BasePeer, msg *msgs.StatusMessage) {
244         if peer := m.peers.GetPeer(basePeer.ID()); peer != nil {
245                 peer.SetBestStatus(msg.BestHeight, msg.GetBestHash())
246                 peer.SetIrreversibleStatus(msg.IrreversibleHeight, msg.GetIrreversibleHash())
247         }
248 }
249
250 func (m *Manager) handleTransactionMsg(peer *peers.Peer, msg *msgs.TransactionMessage) {
251         tx, err := msg.GetTransaction()
252         if err != nil {
253                 m.peers.ProcessIllegal(peer.ID(), security.LevelConnException, "fail on get tx from message")
254                 return
255         }
256
257         m.peers.MarkTx(peer.ID(), tx.ID)
258         if isOrphan, err := m.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
259                 m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "fail on validate tx transaction")
260         }
261 }
262
263 func (m *Manager) handleTransactionsMsg(peer *peers.Peer, msg *msgs.TransactionsMessage) {
264         txs, err := msg.GetTransactions()
265         if err != nil {
266                 m.peers.ProcessIllegal(peer.ID(), security.LevelConnException, "fail on get txs from message")
267                 return
268         }
269
270         if len(txs) > msgs.TxsMsgMaxTxNum {
271                 m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "exceeded the maximum tx number limit")
272                 return
273         }
274
275         for _, tx := range txs {
276                 m.peers.MarkTx(peer.ID(), tx.ID)
277                 if isOrphan, err := m.chain.ValidateTx(tx); err != nil && !isOrphan {
278                         m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "fail on validate tx transaction")
279                         return
280                 }
281         }
282 }
283
284 func (m *Manager) processMsg(basePeer peers.BasePeer, msgType byte, msg msgs.BlockchainMessage) {
285         peer := m.peers.GetPeer(basePeer.ID())
286         if peer == nil {
287                 return
288         }
289
290         log.WithFields(log.Fields{
291                 "module":  logModule,
292                 "peer":    basePeer.Addr(),
293                 "type":    reflect.TypeOf(msg),
294                 "message": msg.String(),
295         }).Debug("receive message from peer")
296
297         switch msg := msg.(type) {
298         case *msgs.GetBlockMessage:
299                 m.handleGetBlockMsg(peer, msg)
300
301         case *msgs.BlockMessage:
302                 m.handleBlockMsg(peer, msg)
303
304         case *msgs.StatusMessage:
305                 m.handleStatusMsg(basePeer, msg)
306
307         case *msgs.TransactionMessage:
308                 m.handleTransactionMsg(peer, msg)
309
310         case *msgs.TransactionsMessage:
311                 m.handleTransactionsMsg(peer, msg)
312
313         case *msgs.GetHeadersMessage:
314                 m.handleGetHeadersMsg(peer, msg)
315
316         case *msgs.HeadersMessage:
317                 m.handleHeadersMsg(peer, msg)
318
319         case *msgs.GetBlocksMessage:
320                 m.handleGetBlocksMsg(peer, msg)
321
322         case *msgs.BlocksMessage:
323                 m.handleBlocksMsg(peer, msg)
324
325         case *msgs.FilterLoadMessage:
326                 m.handleFilterLoadMsg(peer, msg)
327
328         case *msgs.FilterAddMessage:
329                 m.handleFilterAddMsg(peer, msg)
330
331         case *msgs.FilterClearMessage:
332                 m.handleFilterClearMsg(peer)
333
334         case *msgs.GetMerkleBlockMessage:
335                 m.handleGetMerkleBlockMsg(peer, msg)
336
337         default:
338                 log.WithFields(log.Fields{
339                         "module":       logModule,
340                         "peer":         basePeer.Addr(),
341                         "message_type": reflect.TypeOf(msg),
342                 }).Error("unhandled message type")
343         }
344 }
345
346 func (m *Manager) RemovePeer(peerID string) {
347         m.peers.RemovePeer(peerID)
348 }
349
350 func (m *Manager) SendStatus(peer peers.BasePeer) error {
351         p := m.peers.GetPeer(peer.ID())
352         if p == nil {
353                 return errors.New("invalid peer")
354         }
355
356         if err := p.SendStatus(m.chain.BestBlockHeader(), m.chain.LastIrreversibleHeader()); err != nil {
357                 m.peers.RemovePeer(p.ID())
358                 return err
359         }
360         return nil
361 }
362
363 func (m *Manager) Start() error {
364         var err error
365         m.txMsgSub, err = m.eventDispatcher.Subscribe(core.TxMsgEvent{})
366         if err != nil {
367                 return err
368         }
369         m.blockKeeper.start()
370         go m.broadcastTxsLoop()
371         go m.syncMempoolLoop()
372
373         return nil
374 }
375
376 //Stop stop sync manager
377 func (m *Manager) Stop() {
378         m.blockKeeper.stop()
379         close(m.quit)
380 }