OSDN Git Service

:sparkles: Add version notification (#1258)
[bytom/bytom.git] / netsync / handle.go
1 package netsync
2
3 import (
4         "encoding/hex"
5         "errors"
6         "net"
7         "path"
8         "reflect"
9         "strconv"
10         "strings"
11
12         log "github.com/sirupsen/logrus"
13         "github.com/tendermint/go-crypto"
14         cmn "github.com/tendermint/tmlibs/common"
15
16         cfg "github.com/bytom/config"
17         "github.com/bytom/consensus"
18         "github.com/bytom/p2p"
19         "github.com/bytom/p2p/discover"
20         core "github.com/bytom/protocol"
21         "github.com/bytom/protocol/bc"
22         "github.com/bytom/protocol/bc/types"
23         "github.com/bytom/version"
24 )
25
26 const (
27         maxTxChanSize = 10000
28 )
29
30 // Chain is the interface for Bytom core
31 type Chain interface {
32         BestBlockHeader() *types.BlockHeader
33         BestBlockHeight() uint64
34         CalcNextSeed(*bc.Hash) (*bc.Hash, error)
35         GetBlockByHash(*bc.Hash) (*types.Block, error)
36         GetBlockByHeight(uint64) (*types.Block, error)
37         GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
38         GetHeaderByHeight(uint64) (*types.BlockHeader, error)
39         InMainChain(bc.Hash) bool
40         ProcessBlock(*types.Block) (bool, error)
41         ValidateTx(*types.Tx) (bool, error)
42 }
43
44 //SyncManager Sync Manager is responsible for the business layer information synchronization
45 type SyncManager struct {
46         sw          *p2p.Switch
47         genesisHash bc.Hash
48
49         privKey      crypto.PrivKeyEd25519 // local node's p2p key
50         chain        Chain
51         txPool       *core.TxPool
52         blockFetcher *blockFetcher
53         blockKeeper  *blockKeeper
54         peers        *peerSet
55
56         newTxCh    chan *types.Tx
57         newBlockCh chan *bc.Hash
58         txSyncCh   chan *txSyncMsg
59         quitSync   chan struct{}
60         config     *cfg.Config
61 }
62
63 //NewSyncManager create a sync manager
64 func NewSyncManager(config *cfg.Config, chain Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash) (*SyncManager, error) {
65         genesisHeader, err := chain.GetHeaderByHeight(0)
66         if err != nil {
67                 return nil, err
68         }
69
70         sw := p2p.NewSwitch(config)
71         peers := newPeerSet(sw)
72         manager := &SyncManager{
73                 sw:           sw,
74                 genesisHash:  genesisHeader.Hash(),
75                 txPool:       txPool,
76                 chain:        chain,
77                 privKey:      crypto.GenPrivKeyEd25519(),
78                 blockFetcher: newBlockFetcher(chain, peers),
79                 blockKeeper:  newBlockKeeper(chain, peers),
80                 peers:        peers,
81                 newTxCh:      make(chan *types.Tx, maxTxChanSize),
82                 newBlockCh:   newBlockCh,
83                 txSyncCh:     make(chan *txSyncMsg),
84                 quitSync:     make(chan struct{}),
85                 config:       config,
86         }
87
88         protocolReactor := NewProtocolReactor(manager, manager.peers)
89         manager.sw.AddReactor("PROTOCOL", protocolReactor)
90
91         // Create & add listener
92         var listenerStatus bool
93         var l p2p.Listener
94         if !config.VaultMode {
95                 p, address := protocolAndAddress(manager.config.P2P.ListenAddress)
96                 l, listenerStatus = p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP)
97                 manager.sw.AddListener(l)
98
99                 discv, err := initDiscover(config, &manager.privKey, l.ExternalAddress().Port)
100                 if err != nil {
101                         return nil, err
102                 }
103                 manager.sw.SetDiscv(discv)
104         }
105         manager.sw.SetNodeInfo(manager.makeNodeInfo(listenerStatus))
106         manager.sw.SetNodePrivKey(manager.privKey)
107         return manager, nil
108 }
109
110 //BestPeer return the highest p2p peerInfo
111 func (sm *SyncManager) BestPeer() *PeerInfo {
112         bestPeer := sm.peers.bestPeer(consensus.SFFullNode)
113         if bestPeer != nil {
114                 return bestPeer.getPeerInfo()
115         }
116         return nil
117 }
118
119 // GetNewTxCh return a unconfirmed transaction feed channel
120 func (sm *SyncManager) GetNewTxCh() chan *types.Tx {
121         return sm.newTxCh
122 }
123
124 //GetPeerInfos return peer info of all peers
125 func (sm *SyncManager) GetPeerInfos() []*PeerInfo {
126         return sm.peers.getPeerInfos()
127 }
128
129 //IsCaughtUp check wheather the peer finish the sync
130 func (sm *SyncManager) IsCaughtUp() bool {
131         peer := sm.peers.bestPeer(consensus.SFFullNode)
132         return peer == nil || peer.Height() <= sm.chain.BestBlockHeight()
133 }
134
135 //NodeInfo get P2P peer node info
136 func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
137         return sm.sw.NodeInfo()
138 }
139
140 //StopPeer try to stop peer by given ID
141 func (sm *SyncManager) StopPeer(peerID string) error {
142         if peer := sm.peers.getPeer(peerID); peer == nil {
143                 return errors.New("peerId not exist")
144         }
145         sm.peers.removePeer(peerID)
146         return nil
147 }
148
149 //Switch get sync manager switch
150 func (sm *SyncManager) Switch() *p2p.Switch {
151         return sm.sw
152 }
153
154 func (sm *SyncManager) handleBlockMsg(peer *peer, msg *BlockMessage) {
155         sm.blockKeeper.processBlock(peer.ID(), msg.GetBlock())
156 }
157
158 func (sm *SyncManager) handleBlocksMsg(peer *peer, msg *BlocksMessage) {
159         blocks, err := msg.GetBlocks()
160         if err != nil {
161                 log.WithField("err", err).Debug("fail on handleBlocksMsg GetBlocks")
162                 return
163         }
164
165         sm.blockKeeper.processBlocks(peer.ID(), blocks)
166 }
167
168 func (sm *SyncManager) handleFilterClearMsg(peer *peer) {
169         peer.filterAdds.Clear()
170 }
171
172 func (sm *SyncManager) handleFilterLoadMsg(peer *peer, msg *FilterLoadMessage) {
173         if len(msg.Addresses) == 0 {
174                 log.Info("the addresses is empty from filter load message")
175                 return
176         }
177         peer.addFilterAddresses(msg.Addresses)
178 }
179
180 func (sm *SyncManager) handleGetBlockMsg(peer *peer, msg *GetBlockMessage) {
181         var block *types.Block
182         var err error
183         if msg.Height != 0 {
184                 block, err = sm.chain.GetBlockByHeight(msg.Height)
185         } else {
186                 block, err = sm.chain.GetBlockByHash(msg.GetHash())
187         }
188         if err != nil {
189                 log.WithField("err", err).Warning("fail on handleGetBlockMsg get block from chain")
190                 return
191         }
192
193         ok, err := peer.sendBlock(block)
194         if !ok {
195                 sm.peers.removePeer(peer.ID())
196         }
197         if err != nil {
198                 log.WithField("err", err).Error("fail on handleGetBlockMsg sentBlock")
199         }
200 }
201
202 func (sm *SyncManager) handleGetBlocksMsg(peer *peer, msg *GetBlocksMessage) {
203         blocks, err := sm.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
204         if err != nil || len(blocks) == 0 {
205                 return
206         }
207
208         totalSize := 0
209         sendBlocks := []*types.Block{}
210         for _, block := range blocks {
211                 rawData, err := block.MarshalText()
212                 if err != nil {
213                         log.WithField("err", err).Error("fail on handleGetBlocksMsg marshal block")
214                         continue
215                 }
216
217                 if totalSize+len(rawData) > maxBlockchainResponseSize/2 {
218                         break
219                 }
220                 totalSize += len(rawData)
221                 sendBlocks = append(sendBlocks, block)
222         }
223
224         ok, err := peer.sendBlocks(sendBlocks)
225         if !ok {
226                 sm.peers.removePeer(peer.ID())
227         }
228         if err != nil {
229                 log.WithField("err", err).Error("fail on handleGetBlocksMsg sentBlock")
230         }
231 }
232
233 func (sm *SyncManager) handleGetHeadersMsg(peer *peer, msg *GetHeadersMessage) {
234         headers, err := sm.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash())
235         if err != nil || len(headers) == 0 {
236                 log.WithField("err", err).Debug("fail on handleGetHeadersMsg locateHeaders")
237                 return
238         }
239
240         ok, err := peer.sendHeaders(headers)
241         if !ok {
242                 sm.peers.removePeer(peer.ID())
243         }
244         if err != nil {
245                 log.WithField("err", err).Error("fail on handleGetHeadersMsg sentBlock")
246         }
247 }
248
249 func (sm *SyncManager) handleGetMerkleBlockMsg(peer *peer, msg *GetMerkleBlockMessage) {}
250
251 func (sm *SyncManager) handleHeadersMsg(peer *peer, msg *HeadersMessage) {
252         headers, err := msg.GetHeaders()
253         if err != nil {
254                 log.WithField("err", err).Debug("fail on handleHeadersMsg GetHeaders")
255                 return
256         }
257
258         sm.blockKeeper.processHeaders(peer.ID(), headers)
259 }
260
261 func (sm *SyncManager) handleMineBlockMsg(peer *peer, msg *MineBlockMessage) {
262         block, err := msg.GetMineBlock()
263         if err != nil {
264                 log.WithField("err", err).Warning("fail on handleMineBlockMsg GetMineBlock")
265                 return
266         }
267
268         hash := block.Hash()
269         peer.markBlock(&hash)
270         sm.blockFetcher.processNewBlock(&blockMsg{peerID: peer.ID(), block: block})
271         peer.setStatus(block.Height, &hash)
272 }
273
274 func (sm *SyncManager) handleStatusRequestMsg(peer BasePeer) {
275         bestHeader := sm.chain.BestBlockHeader()
276         genesisBlock, err := sm.chain.GetBlockByHeight(0)
277         if err != nil {
278                 log.WithField("err", err).Error("fail on handleStatusRequestMsg get genesis")
279         }
280
281         genesisHash := genesisBlock.Hash()
282         msg := NewStatusResponseMessage(bestHeader, &genesisHash)
283         if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
284                 sm.peers.removePeer(peer.ID())
285         }
286 }
287
288 func (sm *SyncManager) handleStatusResponseMsg(basePeer BasePeer, msg *StatusResponseMessage) {
289         if peer := sm.peers.getPeer(basePeer.ID()); peer != nil {
290                 peer.setStatus(msg.Height, msg.GetHash())
291                 return
292         }
293
294         if genesisHash := msg.GetGenesisHash(); sm.genesisHash != *genesisHash {
295                 log.WithFields(log.Fields{
296                         "remote genesis": genesisHash.String(),
297                         "local genesis":  sm.genesisHash.String(),
298                 }).Warn("fail hand shake due to differnt genesis")
299                 return
300         }
301
302         sm.peers.addPeer(basePeer, msg.Height, msg.GetHash())
303 }
304
305 func (sm *SyncManager) handleTransactionMsg(peer *peer, msg *TransactionMessage) {
306         tx, err := msg.GetTransaction()
307         if err != nil {
308                 sm.peers.addBanScore(peer.ID(), 0, 10, "fail on get tx from message")
309                 return
310         }
311
312         if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && isOrphan == false {
313                 sm.peers.addBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
314         }
315 }
316
317 func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg BlockchainMessage) {
318         peer := sm.peers.getPeer(basePeer.ID())
319         if peer == nil && msgType != StatusResponseByte && msgType != StatusRequestByte {
320                 return
321         }
322
323         switch msg := msg.(type) {
324         case *GetBlockMessage:
325                 sm.handleGetBlockMsg(peer, msg)
326
327         case *BlockMessage:
328                 sm.handleBlockMsg(peer, msg)
329
330         case *StatusRequestMessage:
331                 sm.handleStatusRequestMsg(basePeer)
332
333         case *StatusResponseMessage:
334                 sm.handleStatusResponseMsg(basePeer, msg)
335
336         case *TransactionMessage:
337                 sm.handleTransactionMsg(peer, msg)
338
339         case *MineBlockMessage:
340                 sm.handleMineBlockMsg(peer, msg)
341
342         case *GetHeadersMessage:
343                 sm.handleGetHeadersMsg(peer, msg)
344
345         case *HeadersMessage:
346                 sm.handleHeadersMsg(peer, msg)
347
348         case *GetBlocksMessage:
349                 sm.handleGetBlocksMsg(peer, msg)
350
351         case *BlocksMessage:
352                 sm.handleBlocksMsg(peer, msg)
353
354         case *FilterLoadMessage:
355                 sm.handleFilterLoadMsg(peer, msg)
356
357         case *FilterClearMessage:
358                 sm.handleFilterClearMsg(peer)
359
360         case *GetMerkleBlockMessage:
361                 sm.handleGetMerkleBlockMsg(peer, msg)
362
363         default:
364                 log.Errorf("unknown message type %v", reflect.TypeOf(msg))
365         }
366 }
367
368 // Defaults to tcp
369 func protocolAndAddress(listenAddr string) (string, string) {
370         p, address := "tcp", listenAddr
371         parts := strings.SplitN(address, "://", 2)
372         if len(parts) == 2 {
373                 p, address = parts[0], parts[1]
374         }
375         return p, address
376 }
377
378 func (sm *SyncManager) makeNodeInfo(listenerStatus bool) *p2p.NodeInfo {
379         nodeInfo := &p2p.NodeInfo{
380                 PubKey:  sm.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
381                 Moniker: sm.config.Moniker,
382                 Network: sm.config.ChainID,
383                 Version: version.Version,
384                 Other:   []string{strconv.FormatUint(uint64(consensus.DefaultServices), 10)},
385         }
386
387         if !sm.sw.IsListening() {
388                 return nodeInfo
389         }
390
391         p2pListener := sm.sw.Listeners()[0]
392
393         // We assume that the rpcListener has the same ExternalAddress.
394         // This is probably true because both P2P and RPC listeners use UPnP,
395         // except of course if the rpc is only bound to localhost
396         if listenerStatus {
397                 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.ExternalAddress().IP.String(), p2pListener.ExternalAddress().Port)
398         } else {
399                 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.InternalAddress().IP.String(), p2pListener.InternalAddress().Port)
400         }
401         return nodeInfo
402 }
403
404 //Start start sync manager service
405 func (sm *SyncManager) Start() {
406         if _, err := sm.sw.Start(); err != nil {
407                 cmn.Exit(cmn.Fmt("fail on start SyncManager: %v", err))
408         }
409         // broadcast transactions
410         go sm.txBroadcastLoop()
411         go sm.minedBroadcastLoop()
412         go sm.txSyncLoop()
413 }
414
415 //Stop stop sync manager
416 func (sm *SyncManager) Stop() {
417         close(sm.quitSync)
418         sm.sw.Stop()
419 }
420
421 func initDiscover(config *cfg.Config, priv *crypto.PrivKeyEd25519, port uint16) (*discover.Network, error) {
422         addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort("0.0.0.0", strconv.FormatUint(uint64(port), 10)))
423         if err != nil {
424                 return nil, err
425         }
426
427         conn, err := net.ListenUDP("udp", addr)
428         if err != nil {
429                 return nil, err
430         }
431
432         realaddr := conn.LocalAddr().(*net.UDPAddr)
433         ntab, err := discover.ListenUDP(priv, conn, realaddr, path.Join(config.DBDir(), "discover.db"), nil)
434         if err != nil {
435                 return nil, err
436         }
437
438         // add the seeds node to the discover table
439         if config.P2P.Seeds == "" {
440                 return ntab, nil
441         }
442         nodes := []*discover.Node{}
443         for _, seed := range strings.Split(config.P2P.Seeds, ",") {
444                 version.Status.AddSeed(seed)
445                 url := "enode://" + hex.EncodeToString(crypto.Sha256([]byte(seed))) + "@" + seed
446                 nodes = append(nodes, discover.MustParseNode(url))
447         }
448         if err = ntab.SetFallbackNodes(nodes); err != nil {
449                 return nil, err
450         }
451         return ntab, nil
452 }
453
454 func (sm *SyncManager) minedBroadcastLoop() {
455         for {
456                 select {
457                 case blockHash := <-sm.newBlockCh:
458                         block, err := sm.chain.GetBlockByHash(blockHash)
459                         if err != nil {
460                                 log.Errorf("Failed on mined broadcast loop get block %v", err)
461                                 return
462                         }
463                         if err := sm.peers.broadcastMinedBlock(block); err != nil {
464                                 log.Errorf("Broadcast mine block error. %v", err)
465                                 return
466                         }
467                 case <-sm.quitSync:
468                         return
469                 }
470         }
471 }