OSDN Git Service

d1bb41ee17a8b7f3dc1cf2fd629b64878318ebdb
[bytom/vapor.git] / netsync / handle.go
1 package netsync
2
3 import (
4         "errors"
5         "reflect"
6
7         log "github.com/sirupsen/logrus"
8
9         cfg "github.com/vapor/config"
10         "github.com/vapor/consensus"
11         "github.com/vapor/event"
12         msgs "github.com/vapor/netsync/messages"
13         "github.com/vapor/netsync/peers"
14         "github.com/vapor/p2p"
15         core "github.com/vapor/protocol"
16         "github.com/vapor/protocol/bc"
17         "github.com/vapor/protocol/bc/types"
18 )
19
20 const (
21         logModule = "netsync"
22 )
23
24 var (
25         errVaultModeDialPeer = errors.New("can't dial peer in vault mode")
26 )
27
28 // Chain is the interface for Bytom core
29 type Chain interface {
30         BestBlockHeader() *types.BlockHeader
31         BestBlockHeight() uint64
32         GetBlockByHash(*bc.Hash) (*types.Block, error)
33         GetBlockByHeight(uint64) (*types.Block, error)
34         GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
35         GetHeaderByHeight(uint64) (*types.BlockHeader, error)
36         GetTransactionStatus(*bc.Hash) (*bc.TransactionStatus, error)
37         InMainChain(bc.Hash) bool
38         ProcessBlock(*types.Block) (bool, error)
39         ValidateTx(*types.Tx) (bool, error)
40 }
41
42 type Switch interface {
43         AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
44         AddBannedPeer(string) error
45         Start() (bool, error)
46         Stop() bool
47         IsListening() bool
48         DialPeerWithAddress(addr *p2p.NetAddress) error
49         Peers() *p2p.PeerSet
50 }
51
52 //SyncManager Sync Manager is responsible for the business layer information synchronization
53 type ChainManager struct {
54         sw          Switch
55         chain       Chain
56         txPool      *core.TxPool
57         blockKeeper *blockKeeper
58         peers       *peers.PeerSet
59
60         txSyncCh chan *txSyncMsg
61         quitSync chan struct{}
62         config   *cfg.Config
63
64         eventDispatcher *event.Dispatcher
65         txMsgSub        *event.Subscription
66 }
67
68 //NewSyncManager create a sync manager
69 func NewChainManager(config *cfg.Config, sw Switch, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher, peers *peers.PeerSet) (*ChainManager, error) {
70         manager := &ChainManager{
71                 sw:              sw,
72                 txPool:          txPool,
73                 chain:           chain,
74                 blockKeeper:     newBlockKeeper(chain, peers),
75                 peers:           peers,
76                 txSyncCh:        make(chan *txSyncMsg),
77                 quitSync:        make(chan struct{}),
78                 config:          config,
79                 eventDispatcher: dispatcher,
80         }
81
82         if !config.VaultMode {
83                 protocolReactor := NewProtocolReactor(manager)
84                 manager.sw.AddReactor("PROTOCOL", protocolReactor)
85         }
86         return manager, nil
87 }
88
89 func (cm *ChainManager) AddPeer(peer peers.BasePeer) {
90         cm.peers.AddPeer(peer)
91 }
92
93 //IsCaughtUp check wheather the peer finish the sync
94 func (cm *ChainManager) IsCaughtUp() bool {
95         peer := cm.peers.BestPeer(consensus.SFFullNode)
96         return peer == nil || peer.Height() <= cm.chain.BestBlockHeight()
97 }
98
99 func (cm *ChainManager) handleBlockMsg(peer *peers.Peer, msg *msgs.BlockMessage) {
100         block, err := msg.GetBlock()
101         if err != nil {
102                 return
103         }
104         cm.blockKeeper.processBlock(peer.ID(), block)
105 }
106
107 func (cm *ChainManager) handleBlocksMsg(peer *peers.Peer, msg *msgs.BlocksMessage) {
108         blocks, err := msg.GetBlocks()
109         if err != nil {
110                 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleBlocksMsg GetBlocks")
111                 return
112         }
113
114         cm.blockKeeper.processBlocks(peer.ID(), blocks)
115 }
116
117 func (cm *ChainManager) handleFilterAddMsg(peer *peers.Peer, msg *msgs.FilterAddMessage) {
118         peer.AddFilterAddress(msg.Address)
119 }
120
121 func (cm *ChainManager) handleFilterClearMsg(peer *peers.Peer) {
122         peer.FilterClear()
123 }
124
125 func (cm *ChainManager) handleFilterLoadMsg(peer *peers.Peer, msg *msgs.FilterLoadMessage) {
126         peer.AddFilterAddresses(msg.Addresses)
127 }
128
129 func (cm *ChainManager) handleGetBlockMsg(peer *peers.Peer, msg *msgs.GetBlockMessage) {
130         var block *types.Block
131         var err error
132         if msg.Height != 0 {
133                 block, err = cm.chain.GetBlockByHeight(msg.Height)
134         } else {
135                 block, err = cm.chain.GetBlockByHash(msg.GetHash())
136         }
137         if err != nil {
138                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetBlockMsg get block from chain")
139                 return
140         }
141
142         ok, err := peer.SendBlock(block)
143         if !ok {
144                 cm.peers.RemovePeer(peer.ID())
145         }
146         if err != nil {
147                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlockMsg sentBlock")
148         }
149 }
150
151 func (cm *ChainManager) handleGetBlocksMsg(peer *peers.Peer, msg *msgs.GetBlocksMessage) {
152         blocks, err := cm.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
153         if err != nil || len(blocks) == 0 {
154                 return
155         }
156
157         totalSize := 0
158         sendBlocks := []*types.Block{}
159         for _, block := range blocks {
160                 rawData, err := block.MarshalText()
161                 if err != nil {
162                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg marshal block")
163                         continue
164                 }
165
166                 if totalSize+len(rawData) > msgs.MaxBlockchainResponseSize/2 {
167                         break
168                 }
169                 totalSize += len(rawData)
170                 sendBlocks = append(sendBlocks, block)
171         }
172
173         ok, err := peer.SendBlocks(sendBlocks)
174         if !ok {
175                 cm.peers.RemovePeer(peer.ID())
176         }
177         if err != nil {
178                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg sentBlock")
179         }
180 }
181
182 func (cm *ChainManager) handleGetHeadersMsg(peer *peers.Peer, msg *msgs.GetHeadersMessage) {
183         headers, err := cm.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash())
184         if err != nil || len(headers) == 0 {
185                 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleGetHeadersMsg locateHeaders")
186                 return
187         }
188
189         ok, err := peer.SendHeaders(headers)
190         if !ok {
191                 cm.peers.RemovePeer(peer.ID())
192         }
193         if err != nil {
194                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetHeadersMsg sentBlock")
195         }
196 }
197
198 func (cm *ChainManager) handleGetMerkleBlockMsg(peer *peers.Peer, msg *msgs.GetMerkleBlockMessage) {
199         var err error
200         var block *types.Block
201         if msg.Height != 0 {
202                 block, err = cm.chain.GetBlockByHeight(msg.Height)
203         } else {
204                 block, err = cm.chain.GetBlockByHash(msg.GetHash())
205         }
206         if err != nil {
207                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get block from chain")
208                 return
209         }
210
211         blockHash := block.Hash()
212         txStatus, err := cm.chain.GetTransactionStatus(&blockHash)
213         if err != nil {
214                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get transaction status")
215                 return
216         }
217
218         ok, err := peer.SendMerkleBlock(block, txStatus)
219         if err != nil {
220                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetMerkleBlockMsg sentMerkleBlock")
221                 return
222         }
223
224         if !ok {
225                 cm.peers.RemovePeer(peer.ID())
226         }
227 }
228
229 func (cm *ChainManager) handleHeadersMsg(peer *peers.Peer, msg *msgs.HeadersMessage) {
230         headers, err := msg.GetHeaders()
231         if err != nil {
232                 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleHeadersMsg GetHeaders")
233                 return
234         }
235
236         cm.blockKeeper.processHeaders(peer.ID(), headers)
237 }
238
239 func (cm *ChainManager) handleStatusMsg(basePeer peers.BasePeer, msg *msgs.StatusMessage) {
240         if peer := cm.peers.GetPeer(basePeer.ID()); peer != nil {
241                 peer.SetStatus(msg.Height, msg.GetHash())
242                 return
243         }
244 }
245
246 func (cm *ChainManager) handleTransactionMsg(peer *peers.Peer, msg *msgs.TransactionMessage) {
247         tx, err := msg.GetTransaction()
248         if err != nil {
249                 cm.peers.AddBanScore(peer.ID(), 0, 10, "fail on get tx from message")
250                 return
251         }
252
253         if isOrphan, err := cm.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
254                 cm.peers.AddBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
255         }
256         cm.peers.MarkTx(peer.ID(), tx.ID)
257 }
258
259 func (cm *ChainManager) handleTransactionsMsg(peer *peers.Peer, msg *msgs.TransactionsMessage) {
260         txs, err := msg.GetTransactions()
261         if err != nil {
262                 cm.peers.AddBanScore(peer.ID(), 0, 20, "fail on get txs from message")
263                 return
264         }
265
266         if len(txs) > msgs.TxsMsgMaxTxNum {
267                 cm.peers.AddBanScore(peer.ID(), 20, 0, "exceeded the maximum tx number limit")
268                 return
269         }
270
271         for _, tx := range txs {
272                 if isOrphan, err := cm.chain.ValidateTx(tx); err != nil && !isOrphan {
273                         cm.peers.AddBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
274                         return
275                 }
276                 cm.peers.MarkTx(peer.ID(), tx.ID)
277         }
278 }
279
280 func (cm *ChainManager) processMsg(basePeer peers.BasePeer, msgType byte, msg msgs.BlockchainMessage) {
281         peer := cm.peers.GetPeer(basePeer.ID())
282         if peer == nil {
283                 return
284         }
285
286         log.WithFields(log.Fields{
287                 "module":  logModule,
288                 "peer":    basePeer.Addr(),
289                 "type":    reflect.TypeOf(msg),
290                 "message": msg.String(),
291         }).Info("receive message from peer")
292
293         switch msg := msg.(type) {
294         case *msgs.GetBlockMessage:
295                 cm.handleGetBlockMsg(peer, msg)
296
297         case *msgs.BlockMessage:
298                 cm.handleBlockMsg(peer, msg)
299
300         case *msgs.StatusMessage:
301                 cm.handleStatusMsg(basePeer, msg)
302
303         case *msgs.TransactionMessage:
304                 cm.handleTransactionMsg(peer, msg)
305
306         case *msgs.TransactionsMessage:
307                 cm.handleTransactionsMsg(peer, msg)
308
309         case *msgs.GetHeadersMessage:
310                 cm.handleGetHeadersMsg(peer, msg)
311
312         case *msgs.HeadersMessage:
313                 cm.handleHeadersMsg(peer, msg)
314
315         case *msgs.GetBlocksMessage:
316                 cm.handleGetBlocksMsg(peer, msg)
317
318         case *msgs.BlocksMessage:
319                 cm.handleBlocksMsg(peer, msg)
320
321         case *msgs.FilterLoadMessage:
322                 cm.handleFilterLoadMsg(peer, msg)
323
324         case *msgs.FilterAddMessage:
325                 cm.handleFilterAddMsg(peer, msg)
326
327         case *msgs.FilterClearMessage:
328                 cm.handleFilterClearMsg(peer)
329
330         case *msgs.GetMerkleBlockMessage:
331                 cm.handleGetMerkleBlockMsg(peer, msg)
332
333         default:
334                 log.WithFields(log.Fields{
335                         "module":       logModule,
336                         "peer":         basePeer.Addr(),
337                         "message_type": reflect.TypeOf(msg),
338                 }).Error("unhandled message type")
339         }
340 }
341
342 func (cm *ChainManager) RemovePeer(peerID string) {
343         cm.peers.RemovePeer(peerID)
344 }
345
346 func (cm *ChainManager) SendStatus(peer peers.BasePeer) error {
347         p := cm.peers.GetPeer(peer.ID())
348         if p == nil {
349                 return errors.New("invalid peer")
350         }
351
352         if err := p.SendStatus(cm.chain.BestBlockHeader()); err != nil {
353                 cm.peers.RemovePeer(p.ID())
354                 return err
355         }
356         return nil
357 }
358
359 func (cm *ChainManager) Start() error {
360         var err error
361         cm.txMsgSub, err = cm.eventDispatcher.Subscribe(core.TxMsgEvent{})
362         if err != nil {
363                 return err
364         }
365
366         // broadcast transactions
367         go cm.txBroadcastLoop()
368         go cm.txSyncLoop()
369
370         return nil
371 }
372
373 //Stop stop sync manager
374 func (cm *ChainManager) Stop() {
375         close(cm.quitSync)
376 }