OSDN Git Service

Format netsync module code directory (#88)
[bytom/vapor.git] / netsync / handle.go
diff --git a/netsync/handle.go b/netsync/handle.go
deleted file mode 100644 (file)
index 7dc35b9..0000000
+++ /dev/null
@@ -1,500 +0,0 @@
-package netsync
-
-import (
-       "errors"
-       "reflect"
-
-       log "github.com/sirupsen/logrus"
-
-       cfg "github.com/vapor/config"
-       "github.com/vapor/consensus"
-       "github.com/vapor/event"
-       "github.com/vapor/p2p"
-       core "github.com/vapor/protocol"
-       "github.com/vapor/protocol/bc"
-       "github.com/vapor/protocol/bc/types"
-)
-
-const (
-       logModule             = "netsync"
-       maxTxChanSize         = 10000
-       maxFilterAddressSize  = 50
-       maxFilterAddressCount = 1000
-)
-
-var (
-       errVaultModeDialPeer = errors.New("can't dial peer in vault mode")
-)
-
-// Chain is the interface for Bytom core
-type Chain interface {
-       BestBlockHeader() *types.BlockHeader
-       BestBlockHeight() uint64
-       GetBlockByHash(*bc.Hash) (*types.Block, error)
-       GetBlockByHeight(uint64) (*types.Block, error)
-       GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
-       GetHeaderByHeight(uint64) (*types.BlockHeader, error)
-       GetTransactionStatus(*bc.Hash) (*bc.TransactionStatus, error)
-       InMainChain(bc.Hash) bool
-       ProcessBlock(*types.Block) (bool, error)
-       ValidateTx(*types.Tx) (bool, error)
-}
-
-type Switch interface {
-       AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
-       AddBannedPeer(string) error
-       StopPeerGracefully(string)
-       Start() (bool, error)
-       Stop() bool
-       IsListening() bool
-       DialPeerWithAddress(addr *p2p.NetAddress) error
-       Peers() *p2p.PeerSet
-}
-
-//SyncManager Sync Manager is responsible for the business layer information synchronization
-type SyncManager struct {
-       sw           Switch
-       genesisHash  bc.Hash
-       chain        Chain
-       txPool       *core.TxPool
-       blockFetcher *blockFetcher
-       blockKeeper  *blockKeeper
-       peers        *peerSet
-
-       txSyncCh chan *txSyncMsg
-       quitSync chan struct{}
-       config   *cfg.Config
-
-       eventDispatcher *event.Dispatcher
-       minedBlockSub   *event.Subscription
-       txMsgSub        *event.Subscription
-}
-
-// CreateSyncManager create sync manager and set switch.
-func NewSyncManager(config *cfg.Config, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) {
-       sw, err := p2p.NewSwitch(config)
-       if err != nil {
-               return nil, err
-       }
-
-       return newSyncManager(config, sw, chain, txPool, dispatcher)
-}
-
-//NewSyncManager create a sync manager
-func newSyncManager(config *cfg.Config, sw Switch, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) {
-       genesisHeader, err := chain.GetHeaderByHeight(0)
-       if err != nil {
-               return nil, err
-       }
-       peers := newPeerSet(sw)
-       manager := &SyncManager{
-               sw:              sw,
-               genesisHash:     genesisHeader.Hash(),
-               txPool:          txPool,
-               chain:           chain,
-               blockFetcher:    newBlockFetcher(chain, peers),
-               blockKeeper:     newBlockKeeper(chain, peers),
-               peers:           peers,
-               txSyncCh:        make(chan *txSyncMsg),
-               quitSync:        make(chan struct{}),
-               config:          config,
-               eventDispatcher: dispatcher,
-       }
-
-       if !config.VaultMode {
-               protocolReactor := NewProtocolReactor(manager, peers)
-               manager.sw.AddReactor("PROTOCOL", protocolReactor)
-       }
-       return manager, nil
-}
-
-func (sm *SyncManager) AddPeer(peer BasePeer) {
-       sm.peers.addPeer(peer)
-}
-
-//BestPeer return the highest p2p peerInfo
-func (sm *SyncManager) BestPeer() *PeerInfo {
-       bestPeer := sm.peers.bestPeer(consensus.SFFullNode)
-       if bestPeer != nil {
-               return bestPeer.getPeerInfo()
-       }
-       return nil
-}
-
-func (sm *SyncManager) DialPeerWithAddress(addr *p2p.NetAddress) error {
-       if sm.config.VaultMode {
-               return errVaultModeDialPeer
-       }
-
-       return sm.sw.DialPeerWithAddress(addr)
-}
-
-func (sm *SyncManager) GetNetwork() string {
-       return sm.config.ChainID
-}
-
-//GetPeerInfos return peer info of all peers
-func (sm *SyncManager) GetPeerInfos() []*PeerInfo {
-       return sm.peers.getPeerInfos()
-}
-
-//IsCaughtUp check wheather the peer finish the sync
-func (sm *SyncManager) IsCaughtUp() bool {
-       peer := sm.peers.bestPeer(consensus.SFFullNode)
-       return peer == nil || peer.Height() <= sm.chain.BestBlockHeight()
-}
-
-//StopPeer try to stop peer by given ID
-func (sm *SyncManager) StopPeer(peerID string) error {
-       if peer := sm.peers.getPeer(peerID); peer == nil {
-               return errors.New("peerId not exist")
-       }
-       sm.peers.removePeer(peerID)
-       return nil
-}
-
-func (sm *SyncManager) handleBlockMsg(peer *peer, msg *BlockMessage) {
-       block, err := msg.GetBlock()
-       if err != nil {
-               return
-       }
-       sm.blockKeeper.processBlock(peer.ID(), block)
-}
-
-func (sm *SyncManager) handleBlocksMsg(peer *peer, msg *BlocksMessage) {
-       blocks, err := msg.GetBlocks()
-       if err != nil {
-               log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleBlocksMsg GetBlocks")
-               return
-       }
-
-       sm.blockKeeper.processBlocks(peer.ID(), blocks)
-}
-
-func (sm *SyncManager) handleFilterAddMsg(peer *peer, msg *FilterAddMessage) {
-       peer.addFilterAddress(msg.Address)
-}
-
-func (sm *SyncManager) handleFilterClearMsg(peer *peer) {
-       peer.filterAdds.Clear()
-}
-
-func (sm *SyncManager) handleFilterLoadMsg(peer *peer, msg *FilterLoadMessage) {
-       peer.addFilterAddresses(msg.Addresses)
-}
-
-func (sm *SyncManager) handleGetBlockMsg(peer *peer, msg *GetBlockMessage) {
-       var block *types.Block
-       var err error
-       if msg.Height != 0 {
-               block, err = sm.chain.GetBlockByHeight(msg.Height)
-       } else {
-               block, err = sm.chain.GetBlockByHash(msg.GetHash())
-       }
-       if err != nil {
-               log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetBlockMsg get block from chain")
-               return
-       }
-
-       ok, err := peer.sendBlock(block)
-       if !ok {
-               sm.peers.removePeer(peer.ID())
-       }
-       if err != nil {
-               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlockMsg sentBlock")
-       }
-}
-
-func (sm *SyncManager) handleGetBlocksMsg(peer *peer, msg *GetBlocksMessage) {
-       blocks, err := sm.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
-       if err != nil || len(blocks) == 0 {
-               return
-       }
-
-       totalSize := 0
-       sendBlocks := []*types.Block{}
-       for _, block := range blocks {
-               rawData, err := block.MarshalText()
-               if err != nil {
-                       log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg marshal block")
-                       continue
-               }
-
-               if totalSize+len(rawData) > maxBlockchainResponseSize/2 {
-                       break
-               }
-               totalSize += len(rawData)
-               sendBlocks = append(sendBlocks, block)
-       }
-
-       ok, err := peer.sendBlocks(sendBlocks)
-       if !ok {
-               sm.peers.removePeer(peer.ID())
-       }
-       if err != nil {
-               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg sentBlock")
-       }
-}
-
-func (sm *SyncManager) handleGetHeadersMsg(peer *peer, msg *GetHeadersMessage) {
-       headers, err := sm.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash())
-       if err != nil || len(headers) == 0 {
-               log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleGetHeadersMsg locateHeaders")
-               return
-       }
-
-       ok, err := peer.sendHeaders(headers)
-       if !ok {
-               sm.peers.removePeer(peer.ID())
-       }
-       if err != nil {
-               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetHeadersMsg sentBlock")
-       }
-}
-
-func (sm *SyncManager) handleGetMerkleBlockMsg(peer *peer, msg *GetMerkleBlockMessage) {
-       var err error
-       var block *types.Block
-       if msg.Height != 0 {
-               block, err = sm.chain.GetBlockByHeight(msg.Height)
-       } else {
-               block, err = sm.chain.GetBlockByHash(msg.GetHash())
-       }
-       if err != nil {
-               log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get block from chain")
-               return
-       }
-
-       blockHash := block.Hash()
-       txStatus, err := sm.chain.GetTransactionStatus(&blockHash)
-       if err != nil {
-               log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get transaction status")
-               return
-       }
-
-       ok, err := peer.sendMerkleBlock(block, txStatus)
-       if err != nil {
-               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetMerkleBlockMsg sentMerkleBlock")
-               return
-       }
-
-       if !ok {
-               sm.peers.removePeer(peer.ID())
-       }
-}
-
-func (sm *SyncManager) handleHeadersMsg(peer *peer, msg *HeadersMessage) {
-       headers, err := msg.GetHeaders()
-       if err != nil {
-               log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleHeadersMsg GetHeaders")
-               return
-       }
-
-       sm.blockKeeper.processHeaders(peer.ID(), headers)
-}
-
-func (sm *SyncManager) handleMineBlockMsg(peer *peer, msg *MineBlockMessage) {
-       block, err := msg.GetMineBlock()
-       if err != nil {
-               log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleMineBlockMsg GetMineBlock")
-               return
-       }
-
-       hash := block.Hash()
-       peer.markBlock(&hash)
-       sm.blockFetcher.processNewBlock(&blockMsg{peerID: peer.ID(), block: block})
-       peer.setStatus(block.Height, &hash)
-}
-
-func (sm *SyncManager) handleStatusMsg(basePeer BasePeer, msg *StatusMessage) {
-       if peer := sm.peers.getPeer(basePeer.ID()); peer != nil {
-               peer.setStatus(msg.Height, msg.GetHash())
-               return
-       }
-}
-
-func (sm *SyncManager) handleTransactionMsg(peer *peer, msg *TransactionMessage) {
-       tx, err := msg.GetTransaction()
-       if err != nil {
-               sm.peers.addBanScore(peer.ID(), 0, 10, "fail on get tx from message")
-               return
-       }
-
-       if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
-               sm.peers.addBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
-       }
-       sm.peers.markTx(peer.ID(), tx.ID)
-}
-
-func (sm *SyncManager) handleTransactionsMsg(peer *peer, msg *TransactionsMessage) {
-       txs, err := msg.GetTransactions()
-       if err != nil {
-               sm.peers.addBanScore(peer.ID(), 0, 20, "fail on get txs from message")
-               return
-       }
-
-       if len(txs) > txsMsgMaxTxNum {
-               sm.peers.addBanScore(peer.ID(), 20, 0, "exceeded the maximum tx number limit")
-               return
-       }
-
-       for _, tx := range txs {
-               if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && !isOrphan {
-                       sm.peers.addBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
-                       return
-               }
-               sm.peers.markTx(peer.ID(), tx.ID)
-       }
-}
-
-func (sm *SyncManager) IsListening() bool {
-       if sm.config.VaultMode {
-               return false
-       }
-       return sm.sw.IsListening()
-}
-
-func (sm *SyncManager) PeerCount() int {
-       if sm.config.VaultMode {
-               return 0
-       }
-       return len(sm.sw.Peers().List())
-}
-
-func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg BlockchainMessage) {
-       peer := sm.peers.getPeer(basePeer.ID())
-       if peer == nil {
-               return
-       }
-
-       log.WithFields(log.Fields{
-               "module":  logModule,
-               "peer":    basePeer.Addr(),
-               "type":    reflect.TypeOf(msg),
-               "message": msg.String(),
-       }).Info("receive message from peer")
-
-       switch msg := msg.(type) {
-       case *GetBlockMessage:
-               sm.handleGetBlockMsg(peer, msg)
-
-       case *BlockMessage:
-               sm.handleBlockMsg(peer, msg)
-
-       case *StatusMessage:
-               sm.handleStatusMsg(basePeer, msg)
-
-       case *TransactionMessage:
-               sm.handleTransactionMsg(peer, msg)
-
-       case *TransactionsMessage:
-               sm.handleTransactionsMsg(peer, msg)
-
-       case *MineBlockMessage:
-               sm.handleMineBlockMsg(peer, msg)
-
-       case *GetHeadersMessage:
-               sm.handleGetHeadersMsg(peer, msg)
-
-       case *HeadersMessage:
-               sm.handleHeadersMsg(peer, msg)
-
-       case *GetBlocksMessage:
-               sm.handleGetBlocksMsg(peer, msg)
-
-       case *BlocksMessage:
-               sm.handleBlocksMsg(peer, msg)
-
-       case *FilterLoadMessage:
-               sm.handleFilterLoadMsg(peer, msg)
-
-       case *FilterAddMessage:
-               sm.handleFilterAddMsg(peer, msg)
-
-       case *FilterClearMessage:
-               sm.handleFilterClearMsg(peer)
-
-       case *GetMerkleBlockMessage:
-               sm.handleGetMerkleBlockMsg(peer, msg)
-
-       default:
-               log.WithFields(log.Fields{
-                       "module":       logModule,
-                       "peer":         basePeer.Addr(),
-                       "message_type": reflect.TypeOf(msg),
-               }).Error("unhandled message type")
-       }
-}
-
-func (sm *SyncManager) SendStatus(peer BasePeer) error {
-       p := sm.peers.getPeer(peer.ID())
-       if p == nil {
-               return errors.New("invalid peer")
-       }
-
-       if err := p.sendStatus(sm.chain.BestBlockHeader()); err != nil {
-               sm.peers.removePeer(p.ID())
-               return err
-       }
-       return nil
-}
-
-func (sm *SyncManager) Start() error {
-       var err error
-       if _, err = sm.sw.Start(); err != nil {
-               log.Error("switch start err")
-               return err
-       }
-
-       sm.minedBlockSub, err = sm.eventDispatcher.Subscribe(event.NewMinedBlockEvent{})
-       if err != nil {
-               return err
-       }
-
-       sm.txMsgSub, err = sm.eventDispatcher.Subscribe(core.TxMsgEvent{})
-       if err != nil {
-               return err
-       }
-
-       // broadcast transactions
-       go sm.txBroadcastLoop()
-       go sm.minedBroadcastLoop()
-       go sm.txSyncLoop()
-
-       return nil
-}
-
-//Stop stop sync manager
-func (sm *SyncManager) Stop() {
-       close(sm.quitSync)
-       sm.minedBlockSub.Unsubscribe()
-       if !sm.config.VaultMode {
-               sm.sw.Stop()
-       }
-}
-
-func (sm *SyncManager) minedBroadcastLoop() {
-       for {
-               select {
-               case obj, ok := <-sm.minedBlockSub.Chan():
-                       if !ok {
-                               log.WithFields(log.Fields{"module": logModule}).Warning("mined block subscription channel closed")
-                               return
-                       }
-
-                       ev, ok := obj.Data.(event.NewMinedBlockEvent)
-                       if !ok {
-                               log.WithFields(log.Fields{"module": logModule}).Error("event type error")
-                               continue
-                       }
-
-                       if err := sm.peers.broadcastMinedBlock(&ev.Block); err != nil {
-                               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast mine block")
-                               continue
-                       }
-
-               case <-sm.quitSync:
-                       return
-               }
-       }
-}