OSDN Git Service

Merge pull request #1386 from Bytom/dev
[bytom/bytom.git] / netsync / handle.go
1 package netsync
2
3 import (
4         "encoding/hex"
5         "errors"
6         "net"
7         "path"
8         "reflect"
9         "strconv"
10         "strings"
11
12         log "github.com/sirupsen/logrus"
13         "github.com/tendermint/go-crypto"
14         cmn "github.com/tendermint/tmlibs/common"
15
16         cfg "github.com/bytom/config"
17         "github.com/bytom/consensus"
18         "github.com/bytom/p2p"
19         "github.com/bytom/p2p/discover"
20         core "github.com/bytom/protocol"
21         "github.com/bytom/protocol/bc"
22         "github.com/bytom/protocol/bc/types"
23         "github.com/bytom/version"
24 )
25
26 const (
27         maxTxChanSize         = 10000
28         maxFilterAddressSize  = 50
29         maxFilterAddressCount = 1000
30 )
31
32 // Chain is the interface for Bytom core
33 type Chain interface {
34         BestBlockHeader() *types.BlockHeader
35         BestBlockHeight() uint64
36         CalcNextSeed(*bc.Hash) (*bc.Hash, error)
37         GetBlockByHash(*bc.Hash) (*types.Block, error)
38         GetBlockByHeight(uint64) (*types.Block, error)
39         GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
40         GetHeaderByHeight(uint64) (*types.BlockHeader, error)
41         GetTransactionStatus(*bc.Hash) (*bc.TransactionStatus, error)
42         InMainChain(bc.Hash) bool
43         ProcessBlock(*types.Block) (bool, error)
44         ValidateTx(*types.Tx) (bool, error)
45 }
46
47 //SyncManager Sync Manager is responsible for the business layer information synchronization
48 type SyncManager struct {
49         sw          *p2p.Switch
50         genesisHash bc.Hash
51
52         privKey      crypto.PrivKeyEd25519 // local node's p2p key
53         chain        Chain
54         txPool       *core.TxPool
55         blockFetcher *blockFetcher
56         blockKeeper  *blockKeeper
57         peers        *peerSet
58
59         newTxCh    chan *types.Tx
60         newBlockCh chan *bc.Hash
61         txSyncCh   chan *txSyncMsg
62         quitSync   chan struct{}
63         config     *cfg.Config
64 }
65
66 //NewSyncManager create a sync manager
67 func NewSyncManager(config *cfg.Config, chain Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash) (*SyncManager, error) {
68         genesisHeader, err := chain.GetHeaderByHeight(0)
69         if err != nil {
70                 return nil, err
71         }
72
73         sw := p2p.NewSwitch(config)
74         peers := newPeerSet(sw)
75         manager := &SyncManager{
76                 sw:           sw,
77                 genesisHash:  genesisHeader.Hash(),
78                 txPool:       txPool,
79                 chain:        chain,
80                 privKey:      crypto.GenPrivKeyEd25519(),
81                 blockFetcher: newBlockFetcher(chain, peers),
82                 blockKeeper:  newBlockKeeper(chain, peers),
83                 peers:        peers,
84                 newTxCh:      make(chan *types.Tx, maxTxChanSize),
85                 newBlockCh:   newBlockCh,
86                 txSyncCh:     make(chan *txSyncMsg),
87                 quitSync:     make(chan struct{}),
88                 config:       config,
89         }
90
91         protocolReactor := NewProtocolReactor(manager, manager.peers)
92         manager.sw.AddReactor("PROTOCOL", protocolReactor)
93
94         // Create & add listener
95         var listenerStatus bool
96         var l p2p.Listener
97         if !config.VaultMode {
98                 p, address := protocolAndAddress(manager.config.P2P.ListenAddress)
99                 l, listenerStatus = p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP)
100                 manager.sw.AddListener(l)
101
102                 discv, err := initDiscover(config, &manager.privKey, l.ExternalAddress().Port)
103                 if err != nil {
104                         return nil, err
105                 }
106                 manager.sw.SetDiscv(discv)
107         }
108         manager.sw.SetNodeInfo(manager.makeNodeInfo(listenerStatus))
109         manager.sw.SetNodePrivKey(manager.privKey)
110         return manager, nil
111 }
112
113 //BestPeer return the highest p2p peerInfo
114 func (sm *SyncManager) BestPeer() *PeerInfo {
115         bestPeer := sm.peers.bestPeer(consensus.SFFullNode)
116         if bestPeer != nil {
117                 return bestPeer.getPeerInfo()
118         }
119         return nil
120 }
121
122 // GetNewTxCh return a unconfirmed transaction feed channel
123 func (sm *SyncManager) GetNewTxCh() chan *types.Tx {
124         return sm.newTxCh
125 }
126
127 //GetPeerInfos return peer info of all peers
128 func (sm *SyncManager) GetPeerInfos() []*PeerInfo {
129         return sm.peers.getPeerInfos()
130 }
131
132 //IsCaughtUp check wheather the peer finish the sync
133 func (sm *SyncManager) IsCaughtUp() bool {
134         peer := sm.peers.bestPeer(consensus.SFFullNode)
135         return peer == nil || peer.Height() <= sm.chain.BestBlockHeight()
136 }
137
138 //NodeInfo get P2P peer node info
139 func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
140         return sm.sw.NodeInfo()
141 }
142
143 //StopPeer try to stop peer by given ID
144 func (sm *SyncManager) StopPeer(peerID string) error {
145         if peer := sm.peers.getPeer(peerID); peer == nil {
146                 return errors.New("peerId not exist")
147         }
148         sm.peers.removePeer(peerID)
149         return nil
150 }
151
152 //Switch get sync manager switch
153 func (sm *SyncManager) Switch() *p2p.Switch {
154         return sm.sw
155 }
156
157 func (sm *SyncManager) handleBlockMsg(peer *peer, msg *BlockMessage) {
158         block, err := msg.GetBlock()
159         if err != nil {
160                 return
161         }
162         sm.blockKeeper.processBlock(peer.ID(), block)
163 }
164
165 func (sm *SyncManager) handleBlocksMsg(peer *peer, msg *BlocksMessage) {
166         blocks, err := msg.GetBlocks()
167         if err != nil {
168                 log.WithField("err", err).Debug("fail on handleBlocksMsg GetBlocks")
169                 return
170         }
171
172         sm.blockKeeper.processBlocks(peer.ID(), blocks)
173 }
174
175 func (sm *SyncManager) handleFilterAddMsg(peer *peer, msg *FilterAddMessage) {
176         peer.addFilterAddress(msg.Address)
177 }
178
179 func (sm *SyncManager) handleFilterClearMsg(peer *peer) {
180         peer.filterAdds.Clear()
181 }
182
183 func (sm *SyncManager) handleFilterLoadMsg(peer *peer, msg *FilterLoadMessage) {
184         peer.addFilterAddresses(msg.Addresses)
185 }
186
187 func (sm *SyncManager) handleGetBlockMsg(peer *peer, msg *GetBlockMessage) {
188         var block *types.Block
189         var err error
190         if msg.Height != 0 {
191                 block, err = sm.chain.GetBlockByHeight(msg.Height)
192         } else {
193                 block, err = sm.chain.GetBlockByHash(msg.GetHash())
194         }
195         if err != nil {
196                 log.WithField("err", err).Warning("fail on handleGetBlockMsg get block from chain")
197                 return
198         }
199
200         ok, err := peer.sendBlock(block)
201         if !ok {
202                 sm.peers.removePeer(peer.ID())
203         }
204         if err != nil {
205                 log.WithField("err", err).Error("fail on handleGetBlockMsg sentBlock")
206         }
207 }
208
209 func (sm *SyncManager) handleGetBlocksMsg(peer *peer, msg *GetBlocksMessage) {
210         blocks, err := sm.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
211         if err != nil || len(blocks) == 0 {
212                 return
213         }
214
215         totalSize := 0
216         sendBlocks := []*types.Block{}
217         for _, block := range blocks {
218                 rawData, err := block.MarshalText()
219                 if err != nil {
220                         log.WithField("err", err).Error("fail on handleGetBlocksMsg marshal block")
221                         continue
222                 }
223
224                 if totalSize+len(rawData) > maxBlockchainResponseSize/2 {
225                         break
226                 }
227                 totalSize += len(rawData)
228                 sendBlocks = append(sendBlocks, block)
229         }
230
231         ok, err := peer.sendBlocks(sendBlocks)
232         if !ok {
233                 sm.peers.removePeer(peer.ID())
234         }
235         if err != nil {
236                 log.WithField("err", err).Error("fail on handleGetBlocksMsg sentBlock")
237         }
238 }
239
240 func (sm *SyncManager) handleGetHeadersMsg(peer *peer, msg *GetHeadersMessage) {
241         headers, err := sm.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash())
242         if err != nil || len(headers) == 0 {
243                 log.WithField("err", err).Debug("fail on handleGetHeadersMsg locateHeaders")
244                 return
245         }
246
247         ok, err := peer.sendHeaders(headers)
248         if !ok {
249                 sm.peers.removePeer(peer.ID())
250         }
251         if err != nil {
252                 log.WithField("err", err).Error("fail on handleGetHeadersMsg sentBlock")
253         }
254 }
255
256 func (sm *SyncManager) handleGetMerkleBlockMsg(peer *peer, msg *GetMerkleBlockMessage) {
257         var err error
258         var block *types.Block
259         if msg.Height != 0 {
260                 block, err = sm.chain.GetBlockByHeight(msg.Height)
261         } else {
262                 block, err = sm.chain.GetBlockByHash(msg.GetHash())
263         }
264         if err != nil {
265                 log.WithField("err", err).Warning("fail on handleGetMerkleBlockMsg get block from chain")
266                 return
267         }
268
269         blockHash := block.Hash()
270         txStatus, err := sm.chain.GetTransactionStatus(&blockHash)
271         if err != nil {
272                 log.WithField("err", err).Warning("fail on handleGetMerkleBlockMsg get transaction status")
273                 return
274         }
275
276         ok, err := peer.sendMerkleBlock(block, txStatus)
277         if err != nil {
278                 log.WithField("err", err).Error("fail on handleGetMerkleBlockMsg sentMerkleBlock")
279                 return
280         }
281
282         if !ok {
283                 sm.peers.removePeer(peer.ID())
284         }
285 }
286
287 func (sm *SyncManager) handleHeadersMsg(peer *peer, msg *HeadersMessage) {
288         headers, err := msg.GetHeaders()
289         if err != nil {
290                 log.WithField("err", err).Debug("fail on handleHeadersMsg GetHeaders")
291                 return
292         }
293
294         sm.blockKeeper.processHeaders(peer.ID(), headers)
295 }
296
297 func (sm *SyncManager) handleMineBlockMsg(peer *peer, msg *MineBlockMessage) {
298         block, err := msg.GetMineBlock()
299         if err != nil {
300                 log.WithField("err", err).Warning("fail on handleMineBlockMsg GetMineBlock")
301                 return
302         }
303
304         hash := block.Hash()
305         peer.markBlock(&hash)
306         sm.blockFetcher.processNewBlock(&blockMsg{peerID: peer.ID(), block: block})
307         peer.setStatus(block.Height, &hash)
308 }
309
310 func (sm *SyncManager) handleStatusRequestMsg(peer BasePeer) {
311         bestHeader := sm.chain.BestBlockHeader()
312         genesisBlock, err := sm.chain.GetBlockByHeight(0)
313         if err != nil {
314                 log.WithField("err", err).Error("fail on handleStatusRequestMsg get genesis")
315         }
316
317         genesisHash := genesisBlock.Hash()
318         msg := NewStatusResponseMessage(bestHeader, &genesisHash)
319         if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
320                 sm.peers.removePeer(peer.ID())
321         }
322 }
323
324 func (sm *SyncManager) handleStatusResponseMsg(basePeer BasePeer, msg *StatusResponseMessage) {
325         if peer := sm.peers.getPeer(basePeer.ID()); peer != nil {
326                 peer.setStatus(msg.Height, msg.GetHash())
327                 return
328         }
329
330         if genesisHash := msg.GetGenesisHash(); sm.genesisHash != *genesisHash {
331                 log.WithFields(log.Fields{
332                         "remote genesis": genesisHash.String(),
333                         "local genesis":  sm.genesisHash.String(),
334                 }).Warn("fail hand shake due to differnt genesis")
335                 return
336         }
337
338         sm.peers.addPeer(basePeer, msg.Height, msg.GetHash())
339 }
340
341 func (sm *SyncManager) handleTransactionMsg(peer *peer, msg *TransactionMessage) {
342         tx, err := msg.GetTransaction()
343         if err != nil {
344                 sm.peers.addBanScore(peer.ID(), 0, 10, "fail on get tx from message")
345                 return
346         }
347
348         if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && isOrphan == false {
349                 sm.peers.addBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
350         }
351 }
352
353 func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg BlockchainMessage) {
354         peer := sm.peers.getPeer(basePeer.ID())
355         if peer == nil && msgType != StatusResponseByte && msgType != StatusRequestByte {
356                 return
357         }
358
359         switch msg := msg.(type) {
360         case *GetBlockMessage:
361                 sm.handleGetBlockMsg(peer, msg)
362
363         case *BlockMessage:
364                 sm.handleBlockMsg(peer, msg)
365
366         case *StatusRequestMessage:
367                 sm.handleStatusRequestMsg(basePeer)
368
369         case *StatusResponseMessage:
370                 sm.handleStatusResponseMsg(basePeer, msg)
371
372         case *TransactionMessage:
373                 sm.handleTransactionMsg(peer, msg)
374
375         case *MineBlockMessage:
376                 sm.handleMineBlockMsg(peer, msg)
377
378         case *GetHeadersMessage:
379                 sm.handleGetHeadersMsg(peer, msg)
380
381         case *HeadersMessage:
382                 sm.handleHeadersMsg(peer, msg)
383
384         case *GetBlocksMessage:
385                 sm.handleGetBlocksMsg(peer, msg)
386
387         case *BlocksMessage:
388                 sm.handleBlocksMsg(peer, msg)
389
390         case *FilterLoadMessage:
391                 sm.handleFilterLoadMsg(peer, msg)
392
393         case *FilterAddMessage:
394                 sm.handleFilterAddMsg(peer, msg)
395
396         case *FilterClearMessage:
397                 sm.handleFilterClearMsg(peer)
398
399         case *GetMerkleBlockMessage:
400                 sm.handleGetMerkleBlockMsg(peer, msg)
401
402         default:
403                 log.Errorf("unknown message type %v", reflect.TypeOf(msg))
404         }
405 }
406
407 // Defaults to tcp
408 func protocolAndAddress(listenAddr string) (string, string) {
409         p, address := "tcp", listenAddr
410         parts := strings.SplitN(address, "://", 2)
411         if len(parts) == 2 {
412                 p, address = parts[0], parts[1]
413         }
414         return p, address
415 }
416
417 func (sm *SyncManager) makeNodeInfo(listenerStatus bool) *p2p.NodeInfo {
418         nodeInfo := &p2p.NodeInfo{
419                 PubKey:  sm.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
420                 Moniker: sm.config.Moniker,
421                 Network: sm.config.ChainID,
422                 Version: version.Version,
423                 Other:   []string{strconv.FormatUint(uint64(consensus.DefaultServices), 10)},
424         }
425
426         if !sm.sw.IsListening() {
427                 return nodeInfo
428         }
429
430         p2pListener := sm.sw.Listeners()[0]
431
432         // We assume that the rpcListener has the same ExternalAddress.
433         // This is probably true because both P2P and RPC listeners use UPnP,
434         // except of course if the rpc is only bound to localhost
435         if listenerStatus {
436                 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.ExternalAddress().IP.String(), p2pListener.ExternalAddress().Port)
437         } else {
438                 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.InternalAddress().IP.String(), p2pListener.InternalAddress().Port)
439         }
440         return nodeInfo
441 }
442
443 //Start start sync manager service
444 func (sm *SyncManager) Start() {
445         if _, err := sm.sw.Start(); err != nil {
446                 cmn.Exit(cmn.Fmt("fail on start SyncManager: %v", err))
447         }
448         // broadcast transactions
449         go sm.txBroadcastLoop()
450         go sm.minedBroadcastLoop()
451         go sm.txSyncLoop()
452 }
453
454 //Stop stop sync manager
455 func (sm *SyncManager) Stop() {
456         close(sm.quitSync)
457         sm.sw.Stop()
458 }
459
460 func initDiscover(config *cfg.Config, priv *crypto.PrivKeyEd25519, port uint16) (*discover.Network, error) {
461         addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort("0.0.0.0", strconv.FormatUint(uint64(port), 10)))
462         if err != nil {
463                 return nil, err
464         }
465
466         conn, err := net.ListenUDP("udp", addr)
467         if err != nil {
468                 return nil, err
469         }
470
471         realaddr := conn.LocalAddr().(*net.UDPAddr)
472         ntab, err := discover.ListenUDP(priv, conn, realaddr, path.Join(config.DBDir(), "discover.db"), nil)
473         if err != nil {
474                 return nil, err
475         }
476
477         // add the seeds node to the discover table
478         if config.P2P.Seeds == "" {
479                 return ntab, nil
480         }
481         nodes := []*discover.Node{}
482         for _, seed := range strings.Split(config.P2P.Seeds, ",") {
483                 version.Status.AddSeed(seed)
484                 url := "enode://" + hex.EncodeToString(crypto.Sha256([]byte(seed))) + "@" + seed
485                 nodes = append(nodes, discover.MustParseNode(url))
486         }
487         if err = ntab.SetFallbackNodes(nodes); err != nil {
488                 return nil, err
489         }
490         return ntab, nil
491 }
492
493 func (sm *SyncManager) minedBroadcastLoop() {
494         for {
495                 select {
496                 case blockHash := <-sm.newBlockCh:
497                         block, err := sm.chain.GetBlockByHash(blockHash)
498                         if err != nil {
499                                 log.Errorf("Failed on mined broadcast loop get block %v", err)
500                                 return
501                         }
502                         if err := sm.peers.broadcastMinedBlock(block); err != nil {
503                                 log.Errorf("Broadcast mine block error. %v", err)
504                                 return
505                         }
506                 case <-sm.quitSync:
507                         return
508                 }
509         }
510 }