OSDN Git Service

mark tx before validation so it won't be sent to source again (#278)
[bytom/vapor.git] / netsync / chainmgr / handle.go
1 package chainmgr
2
3 import (
4         "errors"
5         "reflect"
6
7         log "github.com/sirupsen/logrus"
8
9         cfg "github.com/vapor/config"
10         "github.com/vapor/consensus"
11         dbm "github.com/vapor/database/leveldb"
12         "github.com/vapor/event"
13         msgs "github.com/vapor/netsync/messages"
14         "github.com/vapor/netsync/peers"
15         "github.com/vapor/p2p"
16         "github.com/vapor/p2p/security"
17         core "github.com/vapor/protocol"
18         "github.com/vapor/protocol/bc"
19         "github.com/vapor/protocol/bc/types"
20 )
21
22 const (
23         logModule = "netsync"
24 )
25
26 // Chain is the interface for Bytom core
27 type Chain interface {
28         BestBlockHeader() *types.BlockHeader
29         LastIrreversibleHeader() *types.BlockHeader
30         BestBlockHeight() uint64
31         GetBlockByHash(*bc.Hash) (*types.Block, error)
32         GetBlockByHeight(uint64) (*types.Block, error)
33         GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
34         GetHeaderByHeight(uint64) (*types.BlockHeader, error)
35         GetTransactionStatus(*bc.Hash) (*bc.TransactionStatus, error)
36         InMainChain(bc.Hash) bool
37         ProcessBlock(*types.Block) (bool, error)
38         ValidateTx(*types.Tx) (bool, error)
39 }
40
41 type Switch interface {
42         AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
43         Start() (bool, error)
44         Stop() bool
45         IsListening() bool
46         DialPeerWithAddress(addr *p2p.NetAddress) error
47         Peers() *p2p.PeerSet
48 }
49
50 // Mempool is the interface for Bytom mempool
51 type Mempool interface {
52         GetTransactions() []*core.TxDesc
53 }
54
55 //Manager is responsible for the business layer information synchronization
56 type Manager struct {
57         sw          Switch
58         chain       Chain
59         mempool     Mempool
60         blockKeeper *blockKeeper
61         peers       *peers.PeerSet
62
63         txSyncCh chan *txSyncMsg
64         quit     chan struct{}
65         config   *cfg.Config
66
67         eventDispatcher *event.Dispatcher
68         txMsgSub        *event.Subscription
69 }
70
71 //NewChainManager create a chain sync manager.
72 func NewManager(config *cfg.Config, sw Switch, chain Chain, mempool Mempool, dispatcher *event.Dispatcher, peers *peers.PeerSet, fastSyncDB dbm.DB) (*Manager, error) {
73         manager := &Manager{
74                 sw:              sw,
75                 mempool:         mempool,
76                 chain:           chain,
77                 blockKeeper:     newBlockKeeper(chain, peers, fastSyncDB),
78                 peers:           peers,
79                 txSyncCh:        make(chan *txSyncMsg),
80                 quit:            make(chan struct{}),
81                 config:          config,
82                 eventDispatcher: dispatcher,
83         }
84
85         if !config.VaultMode {
86                 protocolReactor := NewProtocolReactor(manager)
87                 manager.sw.AddReactor("PROTOCOL", protocolReactor)
88         }
89         return manager, nil
90 }
91
92 func (m *Manager) AddPeer(peer peers.BasePeer) {
93         m.peers.AddPeer(peer)
94 }
95
96 //IsCaughtUp check wheather the peer finish the sync
97 func (m *Manager) IsCaughtUp() bool {
98         peer := m.peers.BestPeer(consensus.SFFullNode)
99         return peer == nil || peer.Height() <= m.chain.BestBlockHeight()
100 }
101
102 func (m *Manager) handleBlockMsg(peer *peers.Peer, msg *msgs.BlockMessage) {
103         block, err := msg.GetBlock()
104         if err != nil {
105                 return
106         }
107         m.blockKeeper.processBlock(peer.ID(), block)
108 }
109
110 func (m *Manager) handleBlocksMsg(peer *peers.Peer, msg *msgs.BlocksMessage) {
111         blocks, err := msg.GetBlocks()
112         if err != nil {
113                 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleBlocksMsg GetBlocks")
114                 return
115         }
116
117         m.blockKeeper.processBlocks(peer.ID(), blocks)
118 }
119
120 func (m *Manager) handleFilterAddMsg(peer *peers.Peer, msg *msgs.FilterAddMessage) {
121         peer.AddFilterAddress(msg.Address)
122 }
123
124 func (m *Manager) handleFilterClearMsg(peer *peers.Peer) {
125         peer.FilterClear()
126 }
127
128 func (m *Manager) handleFilterLoadMsg(peer *peers.Peer, msg *msgs.FilterLoadMessage) {
129         peer.AddFilterAddresses(msg.Addresses)
130 }
131
132 func (m *Manager) handleGetBlockMsg(peer *peers.Peer, msg *msgs.GetBlockMessage) {
133         var block *types.Block
134         var err error
135         if msg.Height != 0 {
136                 block, err = m.chain.GetBlockByHeight(msg.Height)
137         } else {
138                 block, err = m.chain.GetBlockByHash(msg.GetHash())
139         }
140         if err != nil {
141                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetBlockMsg get block from chain")
142                 return
143         }
144
145         ok, err := peer.SendBlock(block)
146         if !ok {
147                 m.peers.RemovePeer(peer.ID())
148         }
149         if err != nil {
150                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlockMsg sentBlock")
151         }
152 }
153
154 func (m *Manager) handleGetBlocksMsg(peer *peers.Peer, msg *msgs.GetBlocksMessage) {
155         blocks, err := m.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
156         if err != nil || len(blocks) == 0 {
157                 return
158         }
159
160         totalSize := 0
161         sendBlocks := []*types.Block{}
162         for _, block := range blocks {
163                 rawData, err := block.MarshalText()
164                 if err != nil {
165                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg marshal block")
166                         return
167                 }
168
169                 if totalSize+len(rawData) > msgs.MaxBlockchainResponseSize/2 {
170                         break
171                 }
172                 totalSize += len(rawData)
173                 sendBlocks = append(sendBlocks, block)
174         }
175
176         ok, err := peer.SendBlocks(sendBlocks)
177         if !ok {
178                 m.peers.RemovePeer(peer.ID())
179         }
180         if err != nil {
181                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg sentBlock")
182         }
183 }
184
185 func (m *Manager) handleGetHeadersMsg(peer *peers.Peer, msg *msgs.GetHeadersMessage) {
186         headers, err := m.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash(), msg.GetSkip(), maxNumOfHeadersPerMsg)
187         if err != nil || len(headers) == 0 {
188                 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleGetHeadersMsg locateHeaders")
189                 return
190         }
191
192         ok, err := peer.SendHeaders(headers)
193         if !ok {
194                 m.peers.RemovePeer(peer.ID())
195         }
196         if err != nil {
197                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetHeadersMsg sentBlock")
198         }
199 }
200
201 func (m *Manager) handleGetMerkleBlockMsg(peer *peers.Peer, msg *msgs.GetMerkleBlockMessage) {
202         var err error
203         var block *types.Block
204         if msg.Height != 0 {
205                 block, err = m.chain.GetBlockByHeight(msg.Height)
206         } else {
207                 block, err = m.chain.GetBlockByHash(msg.GetHash())
208         }
209         if err != nil {
210                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get block from chain")
211                 return
212         }
213
214         blockHash := block.Hash()
215         txStatus, err := m.chain.GetTransactionStatus(&blockHash)
216         if err != nil {
217                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get transaction status")
218                 return
219         }
220
221         ok, err := peer.SendMerkleBlock(block, txStatus)
222         if err != nil {
223                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetMerkleBlockMsg sentMerkleBlock")
224                 return
225         }
226
227         if !ok {
228                 m.peers.RemovePeer(peer.ID())
229         }
230 }
231
232 func (m *Manager) handleHeadersMsg(peer *peers.Peer, msg *msgs.HeadersMessage) {
233         headers, err := msg.GetHeaders()
234         if err != nil {
235                 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleHeadersMsg GetHeaders")
236                 return
237         }
238
239         m.blockKeeper.processHeaders(peer.ID(), headers)
240 }
241
242 func (m *Manager) handleStatusMsg(basePeer peers.BasePeer, msg *msgs.StatusMessage) {
243         if peer := m.peers.GetPeer(basePeer.ID()); peer != nil {
244                 peer.SetBestStatus(msg.BestHeight, msg.GetBestHash())
245                 peer.SetIrreversibleStatus(msg.IrreversibleHeight, msg.GetIrreversibleHash())
246         }
247 }
248
249 func (m *Manager) handleTransactionMsg(peer *peers.Peer, msg *msgs.TransactionMessage) {
250         tx, err := msg.GetTransaction()
251         if err != nil {
252                 m.peers.ProcessIllegal(peer.ID(), security.LevelConnException, "fail on get tx from message")
253                 return
254         }
255
256         m.peers.MarkTx(peer.ID(), tx.ID)
257         if isOrphan, err := m.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
258                 m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "fail on validate tx transaction")
259         }
260 }
261
262 func (m *Manager) handleTransactionsMsg(peer *peers.Peer, msg *msgs.TransactionsMessage) {
263         txs, err := msg.GetTransactions()
264         if err != nil {
265                 m.peers.ProcessIllegal(peer.ID(), security.LevelConnException, "fail on get txs from message")
266                 return
267         }
268
269         if len(txs) > msgs.TxsMsgMaxTxNum {
270                 m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "exceeded the maximum tx number limit")
271                 return
272         }
273
274         for _, tx := range txs {
275                 m.peers.MarkTx(peer.ID(), tx.ID)
276                 if isOrphan, err := m.chain.ValidateTx(tx); err != nil && !isOrphan {
277                         m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "fail on validate tx transaction")
278                         return
279                 }
280         }
281 }
282
283 func (m *Manager) processMsg(basePeer peers.BasePeer, msgType byte, msg msgs.BlockchainMessage) {
284         peer := m.peers.GetPeer(basePeer.ID())
285         if peer == nil {
286                 return
287         }
288
289         log.WithFields(log.Fields{
290                 "module":  logModule,
291                 "peer":    basePeer.Addr(),
292                 "type":    reflect.TypeOf(msg),
293                 "message": msg.String(),
294         }).Debug("receive message from peer")
295
296         switch msg := msg.(type) {
297         case *msgs.GetBlockMessage:
298                 m.handleGetBlockMsg(peer, msg)
299
300         case *msgs.BlockMessage:
301                 m.handleBlockMsg(peer, msg)
302
303         case *msgs.StatusMessage:
304                 m.handleStatusMsg(basePeer, msg)
305
306         case *msgs.TransactionMessage:
307                 m.handleTransactionMsg(peer, msg)
308
309         case *msgs.TransactionsMessage:
310                 m.handleTransactionsMsg(peer, msg)
311
312         case *msgs.GetHeadersMessage:
313                 m.handleGetHeadersMsg(peer, msg)
314
315         case *msgs.HeadersMessage:
316                 m.handleHeadersMsg(peer, msg)
317
318         case *msgs.GetBlocksMessage:
319                 m.handleGetBlocksMsg(peer, msg)
320
321         case *msgs.BlocksMessage:
322                 m.handleBlocksMsg(peer, msg)
323
324         case *msgs.FilterLoadMessage:
325                 m.handleFilterLoadMsg(peer, msg)
326
327         case *msgs.FilterAddMessage:
328                 m.handleFilterAddMsg(peer, msg)
329
330         case *msgs.FilterClearMessage:
331                 m.handleFilterClearMsg(peer)
332
333         case *msgs.GetMerkleBlockMessage:
334                 m.handleGetMerkleBlockMsg(peer, msg)
335
336         default:
337                 log.WithFields(log.Fields{
338                         "module":       logModule,
339                         "peer":         basePeer.Addr(),
340                         "message_type": reflect.TypeOf(msg),
341                 }).Error("unhandled message type")
342         }
343 }
344
345 func (m *Manager) RemovePeer(peerID string) {
346         m.peers.RemovePeer(peerID)
347 }
348
349 func (m *Manager) SendStatus(peer peers.BasePeer) error {
350         p := m.peers.GetPeer(peer.ID())
351         if p == nil {
352                 return errors.New("invalid peer")
353         }
354
355         if err := p.SendStatus(m.chain.BestBlockHeader(), m.chain.LastIrreversibleHeader()); err != nil {
356                 m.peers.RemovePeer(p.ID())
357                 return err
358         }
359         return nil
360 }
361
362 func (m *Manager) Start() error {
363         var err error
364         m.txMsgSub, err = m.eventDispatcher.Subscribe(core.TxMsgEvent{})
365         if err != nil {
366                 return err
367         }
368         m.blockKeeper.start()
369         go m.broadcastTxsLoop()
370         go m.syncMempoolLoop()
371
372         return nil
373 }
374
375 //Stop stop sync manager
376 func (m *Manager) Stop() {
377         m.blockKeeper.stop()
378         close(m.quit)
379 }