OSDN Git Service

Merge pull request #16 from Bytom/dev
[bytom/vapor.git] / netsync / handle.go
1 package netsync
2
3 import (
4         "encoding/hex"
5         "errors"
6         "net"
7         "path"
8         "reflect"
9         "strconv"
10         "strings"
11
12         log "github.com/sirupsen/logrus"
13         "github.com/tendermint/go-crypto"
14         cmn "github.com/tendermint/tmlibs/common"
15
16         cfg "github.com/vapor/config"
17         "github.com/vapor/consensus"
18         "github.com/vapor/p2p"
19         "github.com/vapor/p2p/discover"
20         core "github.com/vapor/protocol"
21         "github.com/vapor/protocol/bc"
22         "github.com/vapor/protocol/bc/types"
23         "github.com/vapor/version"
24 )
25
26 const (
27         logModule             = "netsync"
28         maxTxChanSize         = 10000
29         maxFilterAddressSize  = 50
30         maxFilterAddressCount = 1000
31 )
32
33 // Chain is the interface for Bytom core
34 type Chain interface {
35         BestBlockHeader() *types.BlockHeader
36         BestBlockHeight() uint64
37         CalcNextSeed(*bc.Hash) (*bc.Hash, error)
38         GetBlockByHash(*bc.Hash) (*types.Block, error)
39         GetBlockByHeight(uint64) (*types.Block, error)
40         GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
41         GetHeaderByHeight(uint64) (*types.BlockHeader, error)
42         GetTransactionStatus(*bc.Hash) (*bc.TransactionStatus, error)
43         InMainChain(bc.Hash) bool
44         ProcessBlock(*types.Block) (bool, error)
45         ValidateTx(*types.Tx) (bool, error)
46 }
47
48 //SyncManager Sync Manager is responsible for the business layer information synchronization
49 type SyncManager struct {
50         sw          *p2p.Switch
51         genesisHash bc.Hash
52
53         privKey      crypto.PrivKeyEd25519 // local node's p2p key
54         chain        Chain
55         txPool       *core.TxPool
56         blockFetcher *blockFetcher
57         blockKeeper  *blockKeeper
58         peers        *peerSet
59
60         newTxCh    chan *types.Tx
61         newBlockCh chan *bc.Hash
62         txSyncCh   chan *txSyncMsg
63         quitSync   chan struct{}
64         config     *cfg.Config
65 }
66
67 //NewSyncManager create a sync manager
68 func NewSyncManager(config *cfg.Config, chain Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash) (*SyncManager, error) {
69         genesisHeader, err := chain.GetHeaderByHeight(0)
70         if err != nil {
71                 return nil, err
72         }
73
74         sw := p2p.NewSwitch(config)
75         peers := newPeerSet(sw)
76         manager := &SyncManager{
77                 sw:           sw,
78                 genesisHash:  genesisHeader.Hash(),
79                 txPool:       txPool,
80                 chain:        chain,
81                 privKey:      crypto.GenPrivKeyEd25519(),
82                 blockFetcher: newBlockFetcher(chain, peers),
83                 blockKeeper:  newBlockKeeper(chain, peers),
84                 peers:        peers,
85                 newTxCh:      make(chan *types.Tx, maxTxChanSize),
86                 newBlockCh:   newBlockCh,
87                 txSyncCh:     make(chan *txSyncMsg),
88                 quitSync:     make(chan struct{}),
89                 config:       config,
90         }
91
92         protocolReactor := NewProtocolReactor(manager, manager.peers)
93         manager.sw.AddReactor("PROTOCOL", protocolReactor)
94
95         // Create & add listener
96         var listenerStatus bool
97         var l p2p.Listener
98         if !config.VaultMode {
99                 p, address := protocolAndAddress(manager.config.P2P.ListenAddress)
100                 l, listenerStatus = p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP)
101                 manager.sw.AddListener(l)
102
103                 discv, err := initDiscover(config, &manager.privKey, l.ExternalAddress().Port)
104                 if err != nil {
105                         return nil, err
106                 }
107                 manager.sw.SetDiscv(discv)
108         }
109         manager.sw.SetNodeInfo(manager.makeNodeInfo(listenerStatus))
110         manager.sw.SetNodePrivKey(manager.privKey)
111         return manager, nil
112 }
113
114 //BestPeer return the highest p2p peerInfo
115 func (sm *SyncManager) BestPeer() *PeerInfo {
116         bestPeer := sm.peers.bestPeer(consensus.SFFullNode)
117         if bestPeer != nil {
118                 return bestPeer.getPeerInfo()
119         }
120         return nil
121 }
122
123 // GetNewTxCh return a unconfirmed transaction feed channel
124 func (sm *SyncManager) GetNewTxCh() chan *types.Tx {
125         return sm.newTxCh
126 }
127
128 //GetPeerInfos return peer info of all peers
129 func (sm *SyncManager) GetPeerInfos() []*PeerInfo {
130         return sm.peers.getPeerInfos()
131 }
132
133 //IsCaughtUp check wheather the peer finish the sync
134 func (sm *SyncManager) IsCaughtUp() bool {
135         peer := sm.peers.bestPeer(consensus.SFFullNode)
136         return peer == nil || peer.Height() <= sm.chain.BestBlockHeight()
137 }
138
139 //NodeInfo get P2P peer node info
140 func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
141         return sm.sw.NodeInfo()
142 }
143
144 //StopPeer try to stop peer by given ID
145 func (sm *SyncManager) StopPeer(peerID string) error {
146         if peer := sm.peers.getPeer(peerID); peer == nil {
147                 return errors.New("peerId not exist")
148         }
149         sm.peers.removePeer(peerID)
150         return nil
151 }
152
153 //Switch get sync manager switch
154 func (sm *SyncManager) Switch() *p2p.Switch {
155         return sm.sw
156 }
157
158 func (sm *SyncManager) handleBlockMsg(peer *peer, msg *BlockMessage) {
159         block, err := msg.GetBlock()
160         if err != nil {
161                 return
162         }
163         sm.blockKeeper.processBlock(peer.ID(), block)
164 }
165
166 func (sm *SyncManager) handleBlocksMsg(peer *peer, msg *BlocksMessage) {
167         blocks, err := msg.GetBlocks()
168         if err != nil {
169                 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleBlocksMsg GetBlocks")
170                 return
171         }
172
173         sm.blockKeeper.processBlocks(peer.ID(), blocks)
174 }
175
176 func (sm *SyncManager) handleFilterAddMsg(peer *peer, msg *FilterAddMessage) {
177         peer.addFilterAddress(msg.Address)
178 }
179
180 func (sm *SyncManager) handleFilterClearMsg(peer *peer) {
181         peer.filterAdds.Clear()
182 }
183
184 func (sm *SyncManager) handleFilterLoadMsg(peer *peer, msg *FilterLoadMessage) {
185         peer.addFilterAddresses(msg.Addresses)
186 }
187
188 func (sm *SyncManager) handleGetBlockMsg(peer *peer, msg *GetBlockMessage) {
189         var block *types.Block
190         var err error
191         if msg.Height != 0 {
192                 block, err = sm.chain.GetBlockByHeight(msg.Height)
193         } else {
194                 block, err = sm.chain.GetBlockByHash(msg.GetHash())
195         }
196         if err != nil {
197                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetBlockMsg get block from chain")
198                 return
199         }
200
201         ok, err := peer.sendBlock(block)
202         if !ok {
203                 sm.peers.removePeer(peer.ID())
204         }
205         if err != nil {
206                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlockMsg sentBlock")
207         }
208 }
209
210 func (sm *SyncManager) handleGetBlocksMsg(peer *peer, msg *GetBlocksMessage) {
211         blocks, err := sm.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
212         if err != nil || len(blocks) == 0 {
213                 return
214         }
215
216         totalSize := 0
217         sendBlocks := []*types.Block{}
218         for _, block := range blocks {
219                 rawData, err := block.MarshalText()
220                 if err != nil {
221                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg marshal block")
222                         continue
223                 }
224
225                 if totalSize+len(rawData) > maxBlockchainResponseSize/2 {
226                         break
227                 }
228                 totalSize += len(rawData)
229                 sendBlocks = append(sendBlocks, block)
230         }
231
232         ok, err := peer.sendBlocks(sendBlocks)
233         if !ok {
234                 sm.peers.removePeer(peer.ID())
235         }
236         if err != nil {
237                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg sentBlock")
238         }
239 }
240
241 func (sm *SyncManager) handleGetHeadersMsg(peer *peer, msg *GetHeadersMessage) {
242         headers, err := sm.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash())
243         if err != nil || len(headers) == 0 {
244                 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleGetHeadersMsg locateHeaders")
245                 return
246         }
247
248         ok, err := peer.sendHeaders(headers)
249         if !ok {
250                 sm.peers.removePeer(peer.ID())
251         }
252         if err != nil {
253                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetHeadersMsg sentBlock")
254         }
255 }
256
257 func (sm *SyncManager) handleGetMerkleBlockMsg(peer *peer, msg *GetMerkleBlockMessage) {
258         var err error
259         var block *types.Block
260         if msg.Height != 0 {
261                 block, err = sm.chain.GetBlockByHeight(msg.Height)
262         } else {
263                 block, err = sm.chain.GetBlockByHash(msg.GetHash())
264         }
265         if err != nil {
266                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get block from chain")
267                 return
268         }
269
270         blockHash := block.Hash()
271         txStatus, err := sm.chain.GetTransactionStatus(&blockHash)
272         if err != nil {
273                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get transaction status")
274                 return
275         }
276
277         ok, err := peer.sendMerkleBlock(block, txStatus)
278         if err != nil {
279                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetMerkleBlockMsg sentMerkleBlock")
280                 return
281         }
282
283         if !ok {
284                 sm.peers.removePeer(peer.ID())
285         }
286 }
287
288 func (sm *SyncManager) handleHeadersMsg(peer *peer, msg *HeadersMessage) {
289         headers, err := msg.GetHeaders()
290         if err != nil {
291                 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleHeadersMsg GetHeaders")
292                 return
293         }
294
295         sm.blockKeeper.processHeaders(peer.ID(), headers)
296 }
297
298 func (sm *SyncManager) handleMineBlockMsg(peer *peer, msg *MineBlockMessage) {
299         block, err := msg.GetMineBlock()
300         if err != nil {
301                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleMineBlockMsg GetMineBlock")
302                 return
303         }
304
305         hash := block.Hash()
306         peer.markBlock(&hash)
307         sm.blockFetcher.processNewBlock(&blockMsg{peerID: peer.ID(), block: block})
308         peer.setStatus(block.Height, &hash)
309 }
310
311 func (sm *SyncManager) handleStatusRequestMsg(peer BasePeer) {
312         bestHeader := sm.chain.BestBlockHeader()
313         genesisBlock, err := sm.chain.GetBlockByHeight(0)
314         if err != nil {
315                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleStatusRequestMsg get genesis")
316         }
317
318         genesisHash := genesisBlock.Hash()
319         msg := NewStatusResponseMessage(bestHeader, &genesisHash)
320         if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
321                 sm.peers.removePeer(peer.ID())
322         }
323 }
324
325 func (sm *SyncManager) handleStatusResponseMsg(basePeer BasePeer, msg *StatusResponseMessage) {
326         if peer := sm.peers.getPeer(basePeer.ID()); peer != nil {
327                 peer.setStatus(msg.Height, msg.GetHash())
328                 return
329         }
330
331         if genesisHash := msg.GetGenesisHash(); sm.genesisHash != *genesisHash {
332                 log.WithFields(log.Fields{
333                         "module":         logModule,
334                         "remote genesis": genesisHash.String(),
335                         "local genesis":  sm.genesisHash.String(),
336                 }).Warn("fail hand shake due to differnt genesis")
337                 return
338         }
339
340         sm.peers.addPeer(basePeer, msg.Height, msg.GetHash())
341 }
342
343 func (sm *SyncManager) handleTransactionMsg(peer *peer, msg *TransactionMessage) {
344         tx, err := msg.GetTransaction()
345         if err != nil {
346                 sm.peers.addBanScore(peer.ID(), 0, 10, "fail on get tx from message")
347                 return
348         }
349
350         if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && isOrphan == false {
351                 sm.peers.addBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
352         }
353 }
354
355 func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg BlockchainMessage) {
356         peer := sm.peers.getPeer(basePeer.ID())
357         if peer == nil && msgType != StatusResponseByte && msgType != StatusRequestByte {
358                 return
359         }
360
361         log.WithFields(log.Fields{
362                 "module":  logModule,
363                 "peer":    basePeer.Addr(),
364                 "type":    reflect.TypeOf(msg),
365                 "message": msg.String(),
366         }).Info("receive message from peer")
367
368         switch msg := msg.(type) {
369         case *GetBlockMessage:
370                 sm.handleGetBlockMsg(peer, msg)
371
372         case *BlockMessage:
373                 sm.handleBlockMsg(peer, msg)
374
375         case *StatusRequestMessage:
376                 sm.handleStatusRequestMsg(basePeer)
377
378         case *StatusResponseMessage:
379                 sm.handleStatusResponseMsg(basePeer, msg)
380
381         case *TransactionMessage:
382                 sm.handleTransactionMsg(peer, msg)
383
384         case *MineBlockMessage:
385                 sm.handleMineBlockMsg(peer, msg)
386
387         case *GetHeadersMessage:
388                 sm.handleGetHeadersMsg(peer, msg)
389
390         case *HeadersMessage:
391                 sm.handleHeadersMsg(peer, msg)
392
393         case *GetBlocksMessage:
394                 sm.handleGetBlocksMsg(peer, msg)
395
396         case *BlocksMessage:
397                 sm.handleBlocksMsg(peer, msg)
398
399         case *FilterLoadMessage:
400                 sm.handleFilterLoadMsg(peer, msg)
401
402         case *FilterAddMessage:
403                 sm.handleFilterAddMsg(peer, msg)
404
405         case *FilterClearMessage:
406                 sm.handleFilterClearMsg(peer)
407
408         case *GetMerkleBlockMessage:
409                 sm.handleGetMerkleBlockMsg(peer, msg)
410
411         default:
412                 log.WithFields(log.Fields{
413                         "module":       logModule,
414                         "peer":         basePeer.Addr(),
415                         "message_type": reflect.TypeOf(msg),
416                 }).Error("unhandled message type")
417         }
418 }
419
420 // Defaults to tcp
421 func protocolAndAddress(listenAddr string) (string, string) {
422         p, address := "tcp", listenAddr
423         parts := strings.SplitN(address, "://", 2)
424         if len(parts) == 2 {
425                 p, address = parts[0], parts[1]
426         }
427         return p, address
428 }
429
430 func (sm *SyncManager) makeNodeInfo(listenerStatus bool) *p2p.NodeInfo {
431         nodeInfo := &p2p.NodeInfo{
432                 PubKey:  sm.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
433                 Moniker: sm.config.Moniker,
434                 Network: sm.config.ChainID,
435                 Version: version.Version,
436                 Other:   []string{strconv.FormatUint(uint64(consensus.DefaultServices), 10)},
437         }
438
439         if !sm.sw.IsListening() {
440                 return nodeInfo
441         }
442
443         p2pListener := sm.sw.Listeners()[0]
444
445         // We assume that the rpcListener has the same ExternalAddress.
446         // This is probably true because both P2P and RPC listeners use UPnP,
447         // except of course if the rpc is only bound to localhost
448         if listenerStatus {
449                 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.ExternalAddress().IP.String(), p2pListener.ExternalAddress().Port)
450         } else {
451                 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.InternalAddress().IP.String(), p2pListener.InternalAddress().Port)
452         }
453         return nodeInfo
454 }
455
456 //Start start sync manager service
457 func (sm *SyncManager) Start() {
458         if _, err := sm.sw.Start(); err != nil {
459                 cmn.Exit(cmn.Fmt("fail on start SyncManager: %v", err))
460         }
461         // broadcast transactions
462         go sm.txBroadcastLoop()
463         go sm.minedBroadcastLoop()
464         go sm.txSyncLoop()
465 }
466
467 //Stop stop sync manager
468 func (sm *SyncManager) Stop() {
469         close(sm.quitSync)
470         sm.sw.Stop()
471 }
472
473 func initDiscover(config *cfg.Config, priv *crypto.PrivKeyEd25519, port uint16) (*discover.Network, error) {
474         addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort("0.0.0.0", strconv.FormatUint(uint64(port), 10)))
475         if err != nil {
476                 return nil, err
477         }
478
479         conn, err := net.ListenUDP("udp", addr)
480         if err != nil {
481                 return nil, err
482         }
483
484         realaddr := conn.LocalAddr().(*net.UDPAddr)
485         ntab, err := discover.ListenUDP(priv, conn, realaddr, path.Join(config.DBDir(), "discover.db"), nil)
486         if err != nil {
487                 return nil, err
488         }
489
490         // add the seeds node to the discover table
491         if config.P2P.Seeds == "" {
492                 return ntab, nil
493         }
494         nodes := []*discover.Node{}
495         for _, seed := range strings.Split(config.P2P.Seeds, ",") {
496                 version.Status.AddSeed(seed)
497                 url := "enode://" + hex.EncodeToString(crypto.Sha256([]byte(seed))) + "@" + seed
498                 nodes = append(nodes, discover.MustParseNode(url))
499         }
500         if err = ntab.SetFallbackNodes(nodes); err != nil {
501                 return nil, err
502         }
503         return ntab, nil
504 }
505
506 func (sm *SyncManager) minedBroadcastLoop() {
507         for {
508                 select {
509                 case blockHash := <-sm.newBlockCh:
510                         block, err := sm.chain.GetBlockByHash(blockHash)
511                         if err != nil {
512                                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on mined broadcast loop get block")
513                                 return
514                         }
515                         if err := sm.peers.broadcastMinedBlock(block); err != nil {
516                                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast mine block")
517                                 return
518                         }
519                 case <-sm.quitSync:
520                         return
521                 }
522         }
523 }