OSDN Git Service

init delete the pow related (#55)
[bytom/vapor.git] / netsync / handle.go
index d1b15d4..67dd8c2 100644 (file)
@@ -1,27 +1,19 @@
 package netsync
 
 import (
-       "encoding/hex"
        "errors"
-       "net"
-       "path"
        "reflect"
-       "strconv"
-       "strings"
 
        log "github.com/sirupsen/logrus"
-       "github.com/tendermint/go-crypto"
-       cmn "github.com/tendermint/tmlibs/common"
 
-       "github.com/vapor/chain"
+       "github.com/tendermint/go-crypto"
        cfg "github.com/vapor/config"
        "github.com/vapor/consensus"
+       "github.com/vapor/event"
        "github.com/vapor/p2p"
-       "github.com/vapor/p2p/discover"
        core "github.com/vapor/protocol"
        "github.com/vapor/protocol/bc"
        "github.com/vapor/protocol/bc/types"
-       "github.com/vapor/version"
 )
 
 const (
@@ -31,69 +23,90 @@ const (
        maxFilterAddressCount = 1000
 )
 
+var (
+       errVaultModeDialPeer = errors.New("can't dial peer in vault mode")
+)
+
+// Chain is the interface for Bytom core
+type Chain interface {
+       BestBlockHeader() *types.BlockHeader
+       BestBlockHeight() uint64
+       GetBlockByHash(*bc.Hash) (*types.Block, error)
+       GetBlockByHeight(uint64) (*types.Block, error)
+       GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
+       GetHeaderByHeight(uint64) (*types.BlockHeader, error)
+       GetTransactionStatus(*bc.Hash) (*bc.TransactionStatus, error)
+       InMainChain(bc.Hash) bool
+       ProcessBlock(*types.Block) (bool, error)
+       ValidateTx(*types.Tx) (bool, error)
+}
+
+type Switch interface {
+       AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
+       AddBannedPeer(string) error
+       StopPeerGracefully(string)
+       NodeInfo() *p2p.NodeInfo
+       Start() (bool, error)
+       Stop() bool
+       IsListening() bool
+       DialPeerWithAddress(addr *p2p.NetAddress) error
+       Peers() *p2p.PeerSet
+}
+
 //SyncManager Sync Manager is responsible for the business layer information synchronization
 type SyncManager struct {
-       sw          *p2p.Switch
-       genesisHash bc.Hash
-
-       privKey      crypto.PrivKeyEd25519 // local node's p2p key
-       chain        chain.Chain
+       sw           Switch
+       genesisHash  bc.Hash
+       chain        Chain
        txPool       *core.TxPool
        blockFetcher *blockFetcher
        blockKeeper  *blockKeeper
        peers        *peerSet
 
-       newTxCh    chan *types.Tx
-       newBlockCh chan *bc.Hash
-       txSyncCh   chan *txSyncMsg
-       quitSync   chan struct{}
-       config     *cfg.Config
+       txSyncCh chan *txSyncMsg
+       quitSync chan struct{}
+       config   *cfg.Config
+
+       eventDispatcher *event.Dispatcher
+       minedBlockSub   *event.Subscription
+       txMsgSub        *event.Subscription
+}
+
+// CreateSyncManager create sync manager and set switch.
+func NewSyncManager(config *cfg.Config, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) {
+       sw, err := p2p.NewSwitch(config)
+       if err != nil {
+               return nil, err
+       }
+
+       return newSyncManager(config, sw, chain, txPool, dispatcher)
 }
 
 //NewSyncManager create a sync manager
-func NewSyncManager(config *cfg.Config, chain chain.Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash) (*SyncManager, error) {
+func newSyncManager(config *cfg.Config, sw Switch, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) {
        genesisHeader, err := chain.GetHeaderByHeight(0)
        if err != nil {
                return nil, err
        }
-
-       sw := p2p.NewSwitch(config)
        peers := newPeerSet(sw)
        manager := &SyncManager{
-               sw:           sw,
-               genesisHash:  genesisHeader.Hash(),
-               txPool:       txPool,
-               chain:        chain,
-               privKey:      crypto.GenPrivKeyEd25519(),
-               blockFetcher: newBlockFetcher(chain, peers),
-               blockKeeper:  newBlockKeeper(chain, peers),
-               peers:        peers,
-               newTxCh:      make(chan *types.Tx, maxTxChanSize),
-               newBlockCh:   newBlockCh,
-               txSyncCh:     make(chan *txSyncMsg),
-               quitSync:     make(chan struct{}),
-               config:       config,
-       }
-
-       protocolReactor := NewProtocolReactor(manager, manager.peers)
-       manager.sw.AddReactor("PROTOCOL", protocolReactor)
-
-       // Create & add listener
-       var listenerStatus bool
-       var l p2p.Listener
-       if !config.VaultMode {
-               p, address := protocolAndAddress(manager.config.P2P.ListenAddress)
-               l, listenerStatus = p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP)
-               manager.sw.AddListener(l)
+               sw:              sw,
+               genesisHash:     genesisHeader.Hash(),
+               txPool:          txPool,
+               chain:           chain,
+               blockFetcher:    newBlockFetcher(chain, peers),
+               blockKeeper:     newBlockKeeper(chain, peers),
+               peers:           peers,
+               txSyncCh:        make(chan *txSyncMsg),
+               quitSync:        make(chan struct{}),
+               config:          config,
+               eventDispatcher: dispatcher,
+       }
 
-               discv, err := initDiscover(config, &manager.privKey, l.ExternalAddress().Port)
-               if err != nil {
-                       return nil, err
-               }
-               manager.sw.SetDiscv(discv)
+       if !config.VaultMode {
+               protocolReactor := NewProtocolReactor(manager, peers)
+               manager.sw.AddReactor("PROTOCOL", protocolReactor)
        }
-       manager.sw.SetNodeInfo(manager.makeNodeInfo(listenerStatus))
-       manager.sw.SetNodePrivKey(manager.privKey)
        return manager, nil
 }
 
@@ -106,9 +119,16 @@ func (sm *SyncManager) BestPeer() *PeerInfo {
        return nil
 }
 
-// GetNewTxCh return a unconfirmed transaction feed channel
-func (sm *SyncManager) GetNewTxCh() chan *types.Tx {
-       return sm.newTxCh
+func (sm *SyncManager) DialPeerWithAddress(addr *p2p.NetAddress) error {
+       if sm.config.VaultMode {
+               return errVaultModeDialPeer
+       }
+
+       return sm.sw.DialPeerWithAddress(addr)
+}
+
+func (sm *SyncManager) GetNetwork() string {
+       return sm.config.ChainID
 }
 
 //GetPeerInfos return peer info of all peers
@@ -122,11 +142,6 @@ func (sm *SyncManager) IsCaughtUp() bool {
        return peer == nil || peer.Height() <= sm.chain.BestBlockHeight()
 }
 
-//NodeInfo get P2P peer node info
-func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
-       return sm.sw.NodeInfo()
-}
-
 //StopPeer try to stop peer by given ID
 func (sm *SyncManager) StopPeer(peerID string) error {
        if peer := sm.peers.getPeer(peerID); peer == nil {
@@ -136,11 +151,6 @@ func (sm *SyncManager) StopPeer(peerID string) error {
        return nil
 }
 
-//Switch get sync manager switch
-func (sm *SyncManager) Switch() *p2p.Switch {
-       return sm.sw
-}
-
 func (sm *SyncManager) handleBlockMsg(peer *peer, msg *BlockMessage) {
        block, err := msg.GetBlock()
        if err != nil {
@@ -315,11 +325,7 @@ func (sm *SyncManager) handleStatusResponseMsg(basePeer BasePeer, msg *StatusRes
        }
 
        if genesisHash := msg.GetGenesisHash(); sm.genesisHash != *genesisHash {
-               log.WithFields(log.Fields{
-                       "module":         logModule,
-                       "remote genesis": genesisHash.String(),
-                       "local genesis":  sm.genesisHash.String(),
-               }).Warn("fail hand shake due to differnt genesis")
+               log.WithFields(log.Fields{"module": logModule, "remote genesis": genesisHash.String(), "local genesis": sm.genesisHash.String()}).Warn("fail hand shake due to differnt genesis")
                return
        }
 
@@ -333,11 +339,32 @@ func (sm *SyncManager) handleTransactionMsg(peer *peer, msg *TransactionMessage)
                return
        }
 
-       if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && isOrphan == false {
+       if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
                sm.peers.addBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
        }
 }
 
+func (sm *SyncManager) IsListening() bool {
+       if sm.config.VaultMode {
+               return false
+       }
+       return sm.sw.IsListening()
+}
+
+func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
+       if sm.config.VaultMode {
+               return p2p.NewNodeInfo(sm.config, crypto.PubKeyEd25519{}, "")
+       }
+       return sm.sw.NodeInfo()
+}
+
+func (sm *SyncManager) PeerCount() int {
+       if sm.config.VaultMode {
+               return 0
+       }
+       return len(sm.sw.Peers().List())
+}
+
 func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg BlockchainMessage) {
        peer := sm.peers.getPeer(basePeer.ID())
        if peer == nil && msgType != StatusResponseByte && msgType != StatusRequestByte {
@@ -394,8 +421,6 @@ func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg Blockchai
        case *GetMerkleBlockMessage:
                sm.handleGetMerkleBlockMsg(peer, msg)
 
-       // TODO PBFT消息
-
        default:
                log.WithFields(log.Fields{
                        "module":       logModule,
@@ -405,105 +430,60 @@ func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg Blockchai
        }
 }
 
-// Defaults to tcp
-func protocolAndAddress(listenAddr string) (string, string) {
-       p, address := "tcp", listenAddr
-       parts := strings.SplitN(address, "://", 2)
-       if len(parts) == 2 {
-               p, address = parts[0], parts[1]
-       }
-       return p, address
-}
-
-func (sm *SyncManager) makeNodeInfo(listenerStatus bool) *p2p.NodeInfo {
-       nodeInfo := &p2p.NodeInfo{
-               PubKey:  sm.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
-               Moniker: sm.config.Moniker,
-               Network: sm.config.ChainID,
-               Version: version.Version,
-               Other:   []string{strconv.FormatUint(uint64(consensus.DefaultServices), 10)},
+func (sm *SyncManager) Start() error {
+       var err error
+       if _, err = sm.sw.Start(); err != nil {
+               log.Error("switch start err")
+               return err
        }
 
-       if !sm.sw.IsListening() {
-               return nodeInfo
+       sm.minedBlockSub, err = sm.eventDispatcher.Subscribe(event.NewMinedBlockEvent{})
+       if err != nil {
+               return err
        }
 
-       p2pListener := sm.sw.Listeners()[0]
-
-       // We assume that the rpcListener has the same ExternalAddress.
-       // This is probably true because both P2P and RPC listeners use UPnP,
-       // except of course if the rpc is only bound to localhost
-       if listenerStatus {
-               nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.ExternalAddress().IP.String(), p2pListener.ExternalAddress().Port)
-       } else {
-               nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.InternalAddress().IP.String(), p2pListener.InternalAddress().Port)
+       sm.txMsgSub, err = sm.eventDispatcher.Subscribe(core.TxMsgEvent{})
+       if err != nil {
+               return err
        }
-       return nodeInfo
-}
 
-//Start start sync manager service
-func (sm *SyncManager) Start() {
-       if _, err := sm.sw.Start(); err != nil {
-               cmn.Exit(cmn.Fmt("fail on start SyncManager: %v", err))
-       }
        // broadcast transactions
        go sm.txBroadcastLoop()
        go sm.minedBroadcastLoop()
        go sm.txSyncLoop()
+
+       return nil
 }
 
 //Stop stop sync manager
 func (sm *SyncManager) Stop() {
        close(sm.quitSync)
-       sm.sw.Stop()
-}
-
-func initDiscover(config *cfg.Config, priv *crypto.PrivKeyEd25519, port uint16) (*discover.Network, error) {
-       addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort("0.0.0.0", strconv.FormatUint(uint64(port), 10)))
-       if err != nil {
-               return nil, err
-       }
-
-       conn, err := net.ListenUDP("udp", addr)
-       if err != nil {
-               return nil, err
+       sm.minedBlockSub.Unsubscribe()
+       if !sm.config.VaultMode {
+               sm.sw.Stop()
        }
-
-       realaddr := conn.LocalAddr().(*net.UDPAddr)
-       ntab, err := discover.ListenUDP(priv, conn, realaddr, path.Join(config.DBDir(), "discover.db"), nil)
-       if err != nil {
-               return nil, err
-       }
-
-       // add the seeds node to the discover table
-       if config.P2P.Seeds == "" {
-               return ntab, nil
-       }
-       nodes := []*discover.Node{}
-       for _, seed := range strings.Split(config.P2P.Seeds, ",") {
-               version.Status.AddSeed(seed)
-               url := "enode://" + hex.EncodeToString(crypto.Sha256([]byte(seed))) + "@" + seed
-               nodes = append(nodes, discover.MustParseNode(url))
-       }
-       if err = ntab.SetFallbackNodes(nodes); err != nil {
-               return nil, err
-       }
-       return ntab, nil
 }
 
 func (sm *SyncManager) minedBroadcastLoop() {
        for {
                select {
-               case blockHash := <-sm.newBlockCh:
-                       block, err := sm.chain.GetBlockByHash(blockHash)
-                       if err != nil {
-                               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on mined broadcast loop get block")
+               case obj, ok := <-sm.minedBlockSub.Chan():
+                       if !ok {
+                               log.WithFields(log.Fields{"module": logModule}).Warning("mined block subscription channel closed")
                                return
                        }
-                       if err := sm.peers.broadcastMinedBlock(block); err != nil {
+
+                       ev, ok := obj.Data.(event.NewMinedBlockEvent)
+                       if !ok {
+                               log.WithFields(log.Fields{"module": logModule}).Error("event type error")
+                               continue
+                       }
+
+                       if err := sm.peers.broadcastMinedBlock(&ev.Block); err != nil {
                                log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast mine block")
-                               return
+                               continue
                        }
+
                case <-sm.quitSync:
                        return
                }