OSDN Git Service

modify dpos
[bytom/vapor.git] / netsync / handle.go
1 package netsync
2
3 import (
4         "encoding/hex"
5         "errors"
6         "net"
7         "path"
8         "reflect"
9         "strconv"
10         "strings"
11
12         log "github.com/sirupsen/logrus"
13         "github.com/tendermint/go-crypto"
14         cmn "github.com/tendermint/tmlibs/common"
15
16         "github.com/vapor/chain"
17         cfg "github.com/vapor/config"
18         "github.com/vapor/consensus"
19         "github.com/vapor/p2p"
20         "github.com/vapor/p2p/discover"
21         core "github.com/vapor/protocol"
22         "github.com/vapor/protocol/bc"
23         "github.com/vapor/protocol/bc/types"
24         "github.com/vapor/version"
25 )
26
27 const (
28         logModule             = "netsync"
29         maxTxChanSize         = 10000
30         maxFilterAddressSize  = 50
31         maxFilterAddressCount = 1000
32 )
33
34 //SyncManager Sync Manager is responsible for the business layer information synchronization
35 type SyncManager struct {
36         sw          *p2p.Switch
37         genesisHash bc.Hash
38
39         privKey      crypto.PrivKeyEd25519 // local node's p2p key
40         chain        chain.Chain
41         txPool       *core.TxPool
42         blockFetcher *blockFetcher
43         blockKeeper  *blockKeeper
44         peers        *peerSet
45
46         newTxCh    chan *types.Tx
47         newBlockCh chan *bc.Hash
48         txSyncCh   chan *txSyncMsg
49         quitSync   chan struct{}
50         config     *cfg.Config
51 }
52
53 //NewSyncManager create a sync manager
54 func NewSyncManager(config *cfg.Config, chain chain.Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash) (*SyncManager, error) {
55         genesisHeader, err := chain.GetHeaderByHeight(0)
56         if err != nil {
57                 return nil, err
58         }
59
60         sw := p2p.NewSwitch(config)
61         peers := newPeerSet(sw)
62         manager := &SyncManager{
63                 sw:           sw,
64                 genesisHash:  genesisHeader.Hash(),
65                 txPool:       txPool,
66                 chain:        chain,
67                 privKey:      crypto.GenPrivKeyEd25519(),
68                 blockFetcher: newBlockFetcher(chain, peers),
69                 blockKeeper:  newBlockKeeper(chain, peers),
70                 peers:        peers,
71                 newTxCh:      make(chan *types.Tx, maxTxChanSize),
72                 newBlockCh:   newBlockCh,
73                 txSyncCh:     make(chan *txSyncMsg),
74                 quitSync:     make(chan struct{}),
75                 config:       config,
76         }
77
78         protocolReactor := NewProtocolReactor(manager, manager.peers)
79         manager.sw.AddReactor("PROTOCOL", protocolReactor)
80
81         // Create & add listener
82         var listenerStatus bool
83         var l p2p.Listener
84         if !config.VaultMode {
85                 p, address := protocolAndAddress(manager.config.P2P.ListenAddress)
86                 l, listenerStatus = p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP)
87                 manager.sw.AddListener(l)
88
89                 discv, err := initDiscover(config, &manager.privKey, l.ExternalAddress().Port)
90                 if err != nil {
91                         return nil, err
92                 }
93                 manager.sw.SetDiscv(discv)
94         }
95         manager.sw.SetNodeInfo(manager.makeNodeInfo(listenerStatus))
96         manager.sw.SetNodePrivKey(manager.privKey)
97         return manager, nil
98 }
99
100 //BestPeer return the highest p2p peerInfo
101 func (sm *SyncManager) BestPeer() *PeerInfo {
102         bestPeer := sm.peers.bestPeer(consensus.SFFullNode)
103         if bestPeer != nil {
104                 return bestPeer.getPeerInfo()
105         }
106         return nil
107 }
108
109 // GetNewTxCh return a unconfirmed transaction feed channel
110 func (sm *SyncManager) GetNewTxCh() chan *types.Tx {
111         return sm.newTxCh
112 }
113
114 //GetPeerInfos return peer info of all peers
115 func (sm *SyncManager) GetPeerInfos() []*PeerInfo {
116         return sm.peers.getPeerInfos()
117 }
118
119 //IsCaughtUp check wheather the peer finish the sync
120 func (sm *SyncManager) IsCaughtUp() bool {
121         peer := sm.peers.bestPeer(consensus.SFFullNode)
122         return peer == nil || peer.Height() <= sm.chain.BestBlockHeight()
123 }
124
125 //NodeInfo get P2P peer node info
126 func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
127         return sm.sw.NodeInfo()
128 }
129
130 //StopPeer try to stop peer by given ID
131 func (sm *SyncManager) StopPeer(peerID string) error {
132         if peer := sm.peers.getPeer(peerID); peer == nil {
133                 return errors.New("peerId not exist")
134         }
135         sm.peers.removePeer(peerID)
136         return nil
137 }
138
139 //Switch get sync manager switch
140 func (sm *SyncManager) Switch() *p2p.Switch {
141         return sm.sw
142 }
143
144 func (sm *SyncManager) handleBlockMsg(peer *peer, msg *BlockMessage) {
145         block, err := msg.GetBlock()
146         if err != nil {
147                 return
148         }
149         sm.blockKeeper.processBlock(peer.ID(), block)
150 }
151
152 func (sm *SyncManager) handleBlocksMsg(peer *peer, msg *BlocksMessage) {
153         blocks, err := msg.GetBlocks()
154         if err != nil {
155                 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleBlocksMsg GetBlocks")
156                 return
157         }
158
159         sm.blockKeeper.processBlocks(peer.ID(), blocks)
160 }
161
162 func (sm *SyncManager) handleFilterAddMsg(peer *peer, msg *FilterAddMessage) {
163         peer.addFilterAddress(msg.Address)
164 }
165
166 func (sm *SyncManager) handleFilterClearMsg(peer *peer) {
167         peer.filterAdds.Clear()
168 }
169
170 func (sm *SyncManager) handleFilterLoadMsg(peer *peer, msg *FilterLoadMessage) {
171         peer.addFilterAddresses(msg.Addresses)
172 }
173
174 func (sm *SyncManager) handleGetBlockMsg(peer *peer, msg *GetBlockMessage) {
175         var block *types.Block
176         var err error
177         if msg.Height != 0 {
178                 block, err = sm.chain.GetBlockByHeight(msg.Height)
179         } else {
180                 block, err = sm.chain.GetBlockByHash(msg.GetHash())
181         }
182         if err != nil {
183                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetBlockMsg get block from chain")
184                 return
185         }
186
187         ok, err := peer.sendBlock(block)
188         if !ok {
189                 sm.peers.removePeer(peer.ID())
190         }
191         if err != nil {
192                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlockMsg sentBlock")
193         }
194 }
195
196 func (sm *SyncManager) handleGetBlocksMsg(peer *peer, msg *GetBlocksMessage) {
197         blocks, err := sm.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
198         if err != nil || len(blocks) == 0 {
199                 return
200         }
201
202         totalSize := 0
203         sendBlocks := []*types.Block{}
204         for _, block := range blocks {
205                 rawData, err := block.MarshalText()
206                 if err != nil {
207                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg marshal block")
208                         continue
209                 }
210
211                 if totalSize+len(rawData) > maxBlockchainResponseSize/2 {
212                         break
213                 }
214                 totalSize += len(rawData)
215                 sendBlocks = append(sendBlocks, block)
216         }
217
218         ok, err := peer.sendBlocks(sendBlocks)
219         if !ok {
220                 sm.peers.removePeer(peer.ID())
221         }
222         if err != nil {
223                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg sentBlock")
224         }
225 }
226
227 func (sm *SyncManager) handleGetHeadersMsg(peer *peer, msg *GetHeadersMessage) {
228         headers, err := sm.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash())
229         if err != nil || len(headers) == 0 {
230                 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleGetHeadersMsg locateHeaders")
231                 return
232         }
233
234         ok, err := peer.sendHeaders(headers)
235         if !ok {
236                 sm.peers.removePeer(peer.ID())
237         }
238         if err != nil {
239                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetHeadersMsg sentBlock")
240         }
241 }
242
243 func (sm *SyncManager) handleGetMerkleBlockMsg(peer *peer, msg *GetMerkleBlockMessage) {
244         var err error
245         var block *types.Block
246         if msg.Height != 0 {
247                 block, err = sm.chain.GetBlockByHeight(msg.Height)
248         } else {
249                 block, err = sm.chain.GetBlockByHash(msg.GetHash())
250         }
251         if err != nil {
252                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get block from chain")
253                 return
254         }
255
256         blockHash := block.Hash()
257         txStatus, err := sm.chain.GetTransactionStatus(&blockHash)
258         if err != nil {
259                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get transaction status")
260                 return
261         }
262
263         ok, err := peer.sendMerkleBlock(block, txStatus)
264         if err != nil {
265                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetMerkleBlockMsg sentMerkleBlock")
266                 return
267         }
268
269         if !ok {
270                 sm.peers.removePeer(peer.ID())
271         }
272 }
273
274 func (sm *SyncManager) handleHeadersMsg(peer *peer, msg *HeadersMessage) {
275         headers, err := msg.GetHeaders()
276         if err != nil {
277                 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleHeadersMsg GetHeaders")
278                 return
279         }
280
281         sm.blockKeeper.processHeaders(peer.ID(), headers)
282 }
283
284 func (sm *SyncManager) handleMineBlockMsg(peer *peer, msg *MineBlockMessage) {
285         block, err := msg.GetMineBlock()
286         if err != nil {
287                 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleMineBlockMsg GetMineBlock")
288                 return
289         }
290
291         hash := block.Hash()
292         peer.markBlock(&hash)
293         sm.blockFetcher.processNewBlock(&blockMsg{peerID: peer.ID(), block: block})
294         peer.setStatus(block.Height, &hash)
295 }
296
297 func (sm *SyncManager) handleStatusRequestMsg(peer BasePeer) {
298         bestHeader := sm.chain.BestBlockHeader()
299         genesisBlock, err := sm.chain.GetBlockByHeight(0)
300         if err != nil {
301                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleStatusRequestMsg get genesis")
302         }
303
304         genesisHash := genesisBlock.Hash()
305         msg := NewStatusResponseMessage(bestHeader, &genesisHash)
306         if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
307                 sm.peers.removePeer(peer.ID())
308         }
309 }
310
311 func (sm *SyncManager) handleStatusResponseMsg(basePeer BasePeer, msg *StatusResponseMessage) {
312         if peer := sm.peers.getPeer(basePeer.ID()); peer != nil {
313                 peer.setStatus(msg.Height, msg.GetHash())
314                 return
315         }
316
317         if genesisHash := msg.GetGenesisHash(); sm.genesisHash != *genesisHash {
318                 log.WithFields(log.Fields{
319                         "module":         logModule,
320                         "remote genesis": genesisHash.String(),
321                         "local genesis":  sm.genesisHash.String(),
322                 }).Warn("fail hand shake due to differnt genesis")
323                 return
324         }
325
326         sm.peers.addPeer(basePeer, msg.Height, msg.GetHash())
327 }
328
329 func (sm *SyncManager) handleTransactionMsg(peer *peer, msg *TransactionMessage) {
330         tx, err := msg.GetTransaction()
331         if err != nil {
332                 sm.peers.addBanScore(peer.ID(), 0, 10, "fail on get tx from message")
333                 return
334         }
335
336         if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && isOrphan == false {
337                 sm.peers.addBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
338         }
339 }
340
341 func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg BlockchainMessage) {
342         peer := sm.peers.getPeer(basePeer.ID())
343         if peer == nil && msgType != StatusResponseByte && msgType != StatusRequestByte {
344                 return
345         }
346
347         log.WithFields(log.Fields{
348                 "module":  logModule,
349                 "peer":    basePeer.Addr(),
350                 "type":    reflect.TypeOf(msg),
351                 "message": msg.String(),
352         }).Info("receive message from peer")
353
354         switch msg := msg.(type) {
355         case *GetBlockMessage:
356                 sm.handleGetBlockMsg(peer, msg)
357
358         case *BlockMessage:
359                 sm.handleBlockMsg(peer, msg)
360
361         case *StatusRequestMessage:
362                 sm.handleStatusRequestMsg(basePeer)
363
364         case *StatusResponseMessage:
365                 sm.handleStatusResponseMsg(basePeer, msg)
366
367         case *TransactionMessage:
368                 sm.handleTransactionMsg(peer, msg)
369
370         case *MineBlockMessage:
371                 sm.handleMineBlockMsg(peer, msg)
372
373         case *GetHeadersMessage:
374                 sm.handleGetHeadersMsg(peer, msg)
375
376         case *HeadersMessage:
377                 sm.handleHeadersMsg(peer, msg)
378
379         case *GetBlocksMessage:
380                 sm.handleGetBlocksMsg(peer, msg)
381
382         case *BlocksMessage:
383                 sm.handleBlocksMsg(peer, msg)
384
385         case *FilterLoadMessage:
386                 sm.handleFilterLoadMsg(peer, msg)
387
388         case *FilterAddMessage:
389                 sm.handleFilterAddMsg(peer, msg)
390
391         case *FilterClearMessage:
392                 sm.handleFilterClearMsg(peer)
393
394         case *GetMerkleBlockMessage:
395                 sm.handleGetMerkleBlockMsg(peer, msg)
396
397         // TODO PBFT消息
398
399         default:
400                 log.WithFields(log.Fields{
401                         "module":       logModule,
402                         "peer":         basePeer.Addr(),
403                         "message_type": reflect.TypeOf(msg),
404                 }).Error("unhandled message type")
405         }
406 }
407
408 // Defaults to tcp
409 func protocolAndAddress(listenAddr string) (string, string) {
410         p, address := "tcp", listenAddr
411         parts := strings.SplitN(address, "://", 2)
412         if len(parts) == 2 {
413                 p, address = parts[0], parts[1]
414         }
415         return p, address
416 }
417
418 func (sm *SyncManager) makeNodeInfo(listenerStatus bool) *p2p.NodeInfo {
419         nodeInfo := &p2p.NodeInfo{
420                 PubKey:  sm.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
421                 Moniker: sm.config.Moniker,
422                 Network: sm.config.ChainID,
423                 Version: version.Version,
424                 Other:   []string{strconv.FormatUint(uint64(consensus.DefaultServices), 10)},
425         }
426
427         if !sm.sw.IsListening() {
428                 return nodeInfo
429         }
430
431         p2pListener := sm.sw.Listeners()[0]
432
433         // We assume that the rpcListener has the same ExternalAddress.
434         // This is probably true because both P2P and RPC listeners use UPnP,
435         // except of course if the rpc is only bound to localhost
436         if listenerStatus {
437                 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.ExternalAddress().IP.String(), p2pListener.ExternalAddress().Port)
438         } else {
439                 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.InternalAddress().IP.String(), p2pListener.InternalAddress().Port)
440         }
441         return nodeInfo
442 }
443
444 //Start start sync manager service
445 func (sm *SyncManager) Start() {
446         if _, err := sm.sw.Start(); err != nil {
447                 cmn.Exit(cmn.Fmt("fail on start SyncManager: %v", err))
448         }
449         // broadcast transactions
450         go sm.txBroadcastLoop()
451         go sm.minedBroadcastLoop()
452         go sm.txSyncLoop()
453 }
454
455 //Stop stop sync manager
456 func (sm *SyncManager) Stop() {
457         close(sm.quitSync)
458         sm.sw.Stop()
459 }
460
461 func initDiscover(config *cfg.Config, priv *crypto.PrivKeyEd25519, port uint16) (*discover.Network, error) {
462         addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort("0.0.0.0", strconv.FormatUint(uint64(port), 10)))
463         if err != nil {
464                 return nil, err
465         }
466
467         conn, err := net.ListenUDP("udp", addr)
468         if err != nil {
469                 return nil, err
470         }
471
472         realaddr := conn.LocalAddr().(*net.UDPAddr)
473         ntab, err := discover.ListenUDP(priv, conn, realaddr, path.Join(config.DBDir(), "discover.db"), nil)
474         if err != nil {
475                 return nil, err
476         }
477
478         // add the seeds node to the discover table
479         if config.P2P.Seeds == "" {
480                 return ntab, nil
481         }
482         nodes := []*discover.Node{}
483         for _, seed := range strings.Split(config.P2P.Seeds, ",") {
484                 version.Status.AddSeed(seed)
485                 url := "enode://" + hex.EncodeToString(crypto.Sha256([]byte(seed))) + "@" + seed
486                 nodes = append(nodes, discover.MustParseNode(url))
487         }
488         if err = ntab.SetFallbackNodes(nodes); err != nil {
489                 return nil, err
490         }
491         return ntab, nil
492 }
493
494 func (sm *SyncManager) minedBroadcastLoop() {
495         for {
496                 select {
497                 case blockHash := <-sm.newBlockCh:
498                         block, err := sm.chain.GetBlockByHash(blockHash)
499                         if err != nil {
500                                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on mined broadcast loop get block")
501                                 return
502                         }
503                         if err := sm.peers.broadcastMinedBlock(block); err != nil {
504                                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast mine block")
505                                 return
506                         }
507                 case <-sm.quitSync:
508                         return
509                 }
510         }
511 }