X-Git-Url: http://git.osdn.net/view?a=blobdiff_plain;f=netsync%2Fhandle.go;h=b412bee7e28ffaa38bb9a84eb17a7cd0b0f60bf3;hb=ae829f102dfd5e16a10a683f8704ff3680f9ee30;hp=9da1da03102ea693e6392c3f26fded9372fee1e0;hpb=018f92dc731ec4cc0569b784c959e2177a911c67;p=bytom%2Fbytom.git diff --git a/netsync/handle.go b/netsync/handle.go index 9da1da03..b412bee7 100644 --- a/netsync/handle.go +++ b/netsync/handle.go @@ -1,215 +1,492 @@ package netsync import ( - "strings" + "errors" + "reflect" log "github.com/sirupsen/logrus" - "github.com/tendermint/go-crypto" - "github.com/tendermint/go-wire" - cmn "github.com/tendermint/tmlibs/common" - dbm "github.com/tendermint/tmlibs/db" cfg "github.com/bytom/config" + "github.com/bytom/consensus" + "github.com/bytom/event" "github.com/bytom/p2p" core "github.com/bytom/protocol" "github.com/bytom/protocol/bc" - "github.com/bytom/version" + "github.com/bytom/protocol/bc/types" + "github.com/tendermint/go-crypto" +) + +const ( + logModule = "netsync" + maxTxChanSize = 10000 + maxFilterAddressSize = 50 + maxFilterAddressCount = 1000 +) + +var ( + errVaultModeDialPeer = errors.New("can't dial peer in vault mode") ) +// Chain is the interface for Bytom core +type Chain interface { + BestBlockHeader() *types.BlockHeader + BestBlockHeight() uint64 + CalcNextSeed(*bc.Hash) (*bc.Hash, error) + GetBlockByHash(*bc.Hash) (*types.Block, error) + GetBlockByHeight(uint64) (*types.Block, error) + GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error) + GetHeaderByHeight(uint64) (*types.BlockHeader, error) + GetTransactionStatus(*bc.Hash) (*bc.TransactionStatus, error) + InMainChain(bc.Hash) bool + ProcessBlock(*types.Block) (bool, error) + ValidateTx(*types.Tx) (bool, error) +} + +type Switch interface { + AddReactor(name string, reactor p2p.Reactor) p2p.Reactor + AddBannedPeer(string) error + StopPeerGracefully(string) + NodeInfo() *p2p.NodeInfo + Start() (bool, error) + Stop() bool + IsListening() bool + DialPeerWithAddress(addr *p2p.NetAddress) error + Peers() *p2p.PeerSet +} + //SyncManager Sync Manager is responsible for the business layer information synchronization type SyncManager struct { - networkID uint64 - sw *p2p.Switch - addrBook *p2p.AddrBook // known peers - - privKey crypto.PrivKeyEd25519 // local node's p2p key - chain *core.Chain - txPool *core.TxPool - fetcher *Fetcher - blockKeeper *blockKeeper - peers *peerSet - - newBlockCh chan *bc.Hash - newPeerCh chan struct{} - txSyncCh chan *txsync - dropPeerCh chan *string - quitSync chan struct{} - config *cfg.Config - synchronising int32 + sw Switch + genesisHash bc.Hash + chain Chain + txPool *core.TxPool + blockFetcher *blockFetcher + blockKeeper *blockKeeper + peers *peerSet + + txSyncCh chan *txSyncMsg + quitSync chan struct{} + config *cfg.Config + + eventDispatcher *event.Dispatcher + minedBlockSub *event.Subscription + txMsgSub *event.Subscription +} + +// CreateSyncManager create sync manager and set switch. +func NewSyncManager(config *cfg.Config, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) { + sw, err := p2p.NewSwitch(config) + if err != nil { + return nil, err + } + + return newSyncManager(config, sw, chain, txPool, dispatcher) } //NewSyncManager create a sync manager -func NewSyncManager(config *cfg.Config, chain *core.Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash) (*SyncManager, error) { - // Create the protocol manager with the base fields +func newSyncManager(config *cfg.Config, sw Switch, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) { + genesisHeader, err := chain.GetHeaderByHeight(0) + if err != nil { + return nil, err + } + peers := newPeerSet(sw) manager := &SyncManager{ - txPool: txPool, - chain: chain, - privKey: crypto.GenPrivKeyEd25519(), - config: config, - quitSync: make(chan struct{}), - newBlockCh: newBlockCh, - newPeerCh: make(chan struct{}), - txSyncCh: make(chan *txsync), - dropPeerCh: make(chan *string, maxQuitReq), - peers: newPeerSet(), + sw: sw, + genesisHash: genesisHeader.Hash(), + txPool: txPool, + chain: chain, + blockFetcher: newBlockFetcher(chain, peers), + blockKeeper: newBlockKeeper(chain, peers), + peers: peers, + txSyncCh: make(chan *txSyncMsg), + quitSync: make(chan struct{}), + config: config, + eventDispatcher: dispatcher, } - trustHistoryDB := dbm.NewDB("trusthistory", config.DBBackend, config.DBDir()) - manager.sw = p2p.NewSwitch(config.P2P, trustHistoryDB) - - manager.blockKeeper = newBlockKeeper(manager.chain, manager.sw, manager.peers, manager.dropPeerCh) - manager.fetcher = NewFetcher(chain, manager.sw, manager.peers) + if !config.VaultMode { + protocolReactor := NewProtocolReactor(manager, peers) + manager.sw.AddReactor("PROTOCOL", protocolReactor) + } + return manager, nil +} - protocolReactor := NewProtocolReactor(chain, txPool, manager.sw, manager.blockKeeper, manager.fetcher, manager.peers, manager.newPeerCh, manager.txSyncCh, manager.dropPeerCh) - manager.sw.AddReactor("PROTOCOL", protocolReactor) +//BestPeer return the highest p2p peerInfo +func (sm *SyncManager) BestPeer() *PeerInfo { + bestPeer := sm.peers.bestPeer(consensus.SFFullNode) + if bestPeer != nil { + return bestPeer.getPeerInfo() + } + return nil +} - // Optionally, start the pex reactor - //var addrBook *p2p.AddrBook - if config.P2P.PexReactor { - manager.addrBook = p2p.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict) - pexReactor := p2p.NewPEXReactor(manager.addrBook) - manager.sw.AddReactor("PEX", pexReactor) +func (sm *SyncManager) DialPeerWithAddress(addr *p2p.NetAddress) error { + if sm.config.VaultMode { + return errVaultModeDialPeer } - return manager, nil + return sm.sw.DialPeerWithAddress(addr) +} + +func (sm *SyncManager) GetNetwork() string { + return sm.config.ChainID +} + +//GetPeerInfos return peer info of all peers +func (sm *SyncManager) GetPeerInfos() []*PeerInfo { + return sm.peers.getPeerInfos() +} + +//IsCaughtUp check wheather the peer finish the sync +func (sm *SyncManager) IsCaughtUp() bool { + peer := sm.peers.bestPeer(consensus.SFFullNode) + return peer == nil || peer.Height() <= sm.chain.BestBlockHeight() } -// Defaults to tcp -func protocolAndAddress(listenAddr string) (string, string) { - p, address := "tcp", listenAddr - parts := strings.SplitN(address, "://", 2) - if len(parts) == 2 { - p, address = parts[0], parts[1] +//StopPeer try to stop peer by given ID +func (sm *SyncManager) StopPeer(peerID string) error { + if peer := sm.peers.getPeer(peerID); peer == nil { + return errors.New("peerId not exist") } - return p, address + sm.peers.removePeer(peerID) + return nil } -func (sm *SyncManager) makeNodeInfo() *p2p.NodeInfo { - nodeInfo := &p2p.NodeInfo{ - PubKey: sm.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519), - Moniker: sm.config.Moniker, - Network: sm.config.ChainID, - Version: version.Version, - Other: []string{ - cmn.Fmt("wire_version=%v", wire.Version), - cmn.Fmt("p2p_version=%v", p2p.Version), - }, +func (sm *SyncManager) handleBlockMsg(peer *peer, msg *BlockMessage) { + block, err := msg.GetBlock() + if err != nil { + return } + sm.blockKeeper.processBlock(peer.ID(), block) +} - if !sm.sw.IsListening() { - return nodeInfo +func (sm *SyncManager) handleBlocksMsg(peer *peer, msg *BlocksMessage) { + blocks, err := msg.GetBlocks() + if err != nil { + log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleBlocksMsg GetBlocks") + return } - p2pListener := sm.sw.Listeners()[0] - p2pHost := p2pListener.ExternalAddress().IP.String() - p2pPort := p2pListener.ExternalAddress().Port + sm.blockKeeper.processBlocks(peer.ID(), blocks) +} - // We assume that the rpcListener has the same ExternalAddress. - // This is probably true because both P2P and RPC listeners use UPnP, - // except of course if the rpc is only bound to localhost - nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pHost, p2pPort) - return nodeInfo +func (sm *SyncManager) handleFilterAddMsg(peer *peer, msg *FilterAddMessage) { + peer.addFilterAddress(msg.Address) } -func (sm *SyncManager) netStart() error { - // Create & add listener - p, address := protocolAndAddress(sm.config.P2P.ListenAddress) +func (sm *SyncManager) handleFilterClearMsg(peer *peer) { + peer.filterAdds.Clear() +} - l := p2p.NewDefaultListener(p, address, sm.config.P2P.SkipUPNP, nil) +func (sm *SyncManager) handleFilterLoadMsg(peer *peer, msg *FilterLoadMessage) { + peer.addFilterAddresses(msg.Addresses) +} - sm.sw.AddListener(l) +func (sm *SyncManager) handleGetBlockMsg(peer *peer, msg *GetBlockMessage) { + var block *types.Block + var err error + if msg.Height != 0 { + block, err = sm.chain.GetBlockByHeight(msg.Height) + } else { + block, err = sm.chain.GetBlockByHash(msg.GetHash()) + } + if err != nil { + log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetBlockMsg get block from chain") + return + } - // Start the switch - sm.sw.SetNodeInfo(sm.makeNodeInfo()) - sm.sw.SetNodePrivKey(sm.privKey) - _, err := sm.sw.Start() + ok, err := peer.sendBlock(block) + if !ok { + sm.peers.removePeer(peer.ID()) + } if err != nil { - return err + log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlockMsg sentBlock") + } +} + +func (sm *SyncManager) handleGetBlocksMsg(peer *peer, msg *GetBlocksMessage) { + blocks, err := sm.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash()) + if err != nil || len(blocks) == 0 { + return } - // If seeds exist, add them to the address book and dial out - if sm.config.P2P.Seeds != "" { - // dial out - seeds := strings.Split(sm.config.P2P.Seeds, ",") - if err := sm.DialSeeds(seeds); err != nil { - return err + totalSize := 0 + sendBlocks := []*types.Block{} + for _, block := range blocks { + rawData, err := block.MarshalText() + if err != nil { + log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg marshal block") + continue } + + if totalSize+len(rawData) > maxBlockchainResponseSize/2 { + break + } + totalSize += len(rawData) + sendBlocks = append(sendBlocks, block) } - return nil + ok, err := peer.sendBlocks(sendBlocks) + if !ok { + sm.peers.removePeer(peer.ID()) + } + if err != nil { + log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg sentBlock") + } } -//Start start sync manager service -func (sm *SyncManager) Start() { - go sm.netStart() - // broadcast transactions - go sm.txBroadcastLoop() +func (sm *SyncManager) handleGetHeadersMsg(peer *peer, msg *GetHeadersMessage) { + headers, err := sm.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash()) + if err != nil || len(headers) == 0 { + log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleGetHeadersMsg locateHeaders") + return + } - // broadcast mined blocks - go sm.minedBroadcastLoop() + ok, err := peer.sendHeaders(headers) + if !ok { + sm.peers.removePeer(peer.ID()) + } + if err != nil { + log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetHeadersMsg sentBlock") + } +} - // start sync handlers - go sm.syncer() +func (sm *SyncManager) handleGetMerkleBlockMsg(peer *peer, msg *GetMerkleBlockMessage) { + var err error + var block *types.Block + if msg.Height != 0 { + block, err = sm.chain.GetBlockByHeight(msg.Height) + } else { + block, err = sm.chain.GetBlockByHash(msg.GetHash()) + } + if err != nil { + log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get block from chain") + return + } + + blockHash := block.Hash() + txStatus, err := sm.chain.GetTransactionStatus(&blockHash) + if err != nil { + log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get transaction status") + return + } + + ok, err := peer.sendMerkleBlock(block, txStatus) + if err != nil { + log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetMerkleBlockMsg sentMerkleBlock") + return + } - go sm.txsyncLoop() + if !ok { + sm.peers.removePeer(peer.ID()) + } } -//Stop stop sync manager -func (sm *SyncManager) Stop() { - close(sm.quitSync) - sm.sw.Stop() +func (sm *SyncManager) handleHeadersMsg(peer *peer, msg *HeadersMessage) { + headers, err := msg.GetHeaders() + if err != nil { + log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleHeadersMsg GetHeaders") + return + } + + sm.blockKeeper.processHeaders(peer.ID(), headers) } -func (sm *SyncManager) txBroadcastLoop() { - newTxCh := sm.txPool.GetNewTxCh() - for { - select { - case newTx := <-newTxCh: - sm.peers.BroadcastTx(newTx) +func (sm *SyncManager) handleMineBlockMsg(peer *peer, msg *MineBlockMessage) { + block, err := msg.GetMineBlock() + if err != nil { + log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleMineBlockMsg GetMineBlock") + return + } - case <-sm.quitSync: - return - } + hash := block.Hash() + peer.markBlock(&hash) + sm.blockFetcher.processNewBlock(&blockMsg{peerID: peer.ID(), block: block}) + peer.setStatus(block.Height, &hash) +} + +func (sm *SyncManager) handleStatusRequestMsg(peer BasePeer) { + bestHeader := sm.chain.BestBlockHeader() + genesisBlock, err := sm.chain.GetBlockByHeight(0) + if err != nil { + log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleStatusRequestMsg get genesis") + } + + genesisHash := genesisBlock.Hash() + msg := NewStatusResponseMessage(bestHeader, &genesisHash) + if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok { + sm.peers.removePeer(peer.ID()) } } -func (sm *SyncManager) minedBroadcastLoop() { - for { - select { - case blockHash := <-sm.newBlockCh: - block, err := sm.chain.GetBlockByHash(blockHash) - if err != nil { - log.Errorf("Failed on mined broadcast loop get block %v", err) - return - } - sm.peers.BroadcastMinedBlock(block) - case <-sm.quitSync: - return - } +func (sm *SyncManager) handleStatusResponseMsg(basePeer BasePeer, msg *StatusResponseMessage) { + if peer := sm.peers.getPeer(basePeer.ID()); peer != nil { + peer.setStatus(msg.Height, msg.GetHash()) + return + } + + if genesisHash := msg.GetGenesisHash(); sm.genesisHash != *genesisHash { + log.WithFields(log.Fields{"module": logModule, "remote genesis": genesisHash.String(), "local genesis": sm.genesisHash.String()}).Warn("fail hand shake due to differnt genesis") + return } + + sm.peers.addPeer(basePeer, msg.Height, msg.GetHash()) +} + +func (sm *SyncManager) handleTransactionMsg(peer *peer, msg *TransactionMessage) { + tx, err := msg.GetTransaction() + if err != nil { + sm.peers.addBanScore(peer.ID(), 0, 10, "fail on get tx from message") + return + } + + if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && !isOrphan { + sm.peers.addBanScore(peer.ID(), 10, 0, "fail on validate tx transaction") + } +} + +func (sm *SyncManager) IsListening() bool { + if sm.config.VaultMode { + return false + } + return sm.sw.IsListening() } -//NodeInfo get P2P peer node info func (sm *SyncManager) NodeInfo() *p2p.NodeInfo { + if sm.config.VaultMode { + return p2p.NewNodeInfo(sm.config, crypto.PubKeyEd25519{}, "") + } return sm.sw.NodeInfo() } -//BlockKeeper get block keeper -func (sm *SyncManager) BlockKeeper() *blockKeeper { - return sm.blockKeeper +func (sm *SyncManager) PeerCount() int { + if sm.config.VaultMode { + return 0 + } + return len(sm.sw.Peers().List()) +} + +func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg BlockchainMessage) { + peer := sm.peers.getPeer(basePeer.ID()) + if peer == nil && msgType != StatusResponseByte && msgType != StatusRequestByte { + return + } + + log.WithFields(log.Fields{ + "module": logModule, + "peer": basePeer.Addr(), + "type": reflect.TypeOf(msg), + "message": msg.String(), + }).Info("receive message from peer") + + switch msg := msg.(type) { + case *GetBlockMessage: + sm.handleGetBlockMsg(peer, msg) + + case *BlockMessage: + sm.handleBlockMsg(peer, msg) + + case *StatusRequestMessage: + sm.handleStatusRequestMsg(basePeer) + + case *StatusResponseMessage: + sm.handleStatusResponseMsg(basePeer, msg) + + case *TransactionMessage: + sm.handleTransactionMsg(peer, msg) + + case *MineBlockMessage: + sm.handleMineBlockMsg(peer, msg) + + case *GetHeadersMessage: + sm.handleGetHeadersMsg(peer, msg) + + case *HeadersMessage: + sm.handleHeadersMsg(peer, msg) + + case *GetBlocksMessage: + sm.handleGetBlocksMsg(peer, msg) + + case *BlocksMessage: + sm.handleBlocksMsg(peer, msg) + + case *FilterLoadMessage: + sm.handleFilterLoadMsg(peer, msg) + + case *FilterAddMessage: + sm.handleFilterAddMsg(peer, msg) + + case *FilterClearMessage: + sm.handleFilterClearMsg(peer) + + case *GetMerkleBlockMessage: + sm.handleGetMerkleBlockMsg(peer, msg) + + default: + log.WithFields(log.Fields{ + "module": logModule, + "peer": basePeer.Addr(), + "message_type": reflect.TypeOf(msg), + }).Error("unhandled message type") + } } -//Peers get sync manager peer set -func (sm *SyncManager) Peers() *peerSet { - return sm.peers +func (sm *SyncManager) Start() error { + var err error + if _, err = sm.sw.Start(); err != nil { + log.Error("switch start err") + return err + } + + sm.minedBlockSub, err = sm.eventDispatcher.Subscribe(event.NewMinedBlockEvent{}) + if err != nil { + return err + } + + sm.txMsgSub, err = sm.eventDispatcher.Subscribe(core.TxMsgEvent{}) + if err != nil { + return err + } + + // broadcast transactions + go sm.txBroadcastLoop() + go sm.minedBroadcastLoop() + go sm.txSyncLoop() + + return nil } -//DialSeeds dial seed peers -func (sm *SyncManager) DialSeeds(seeds []string) error { - return sm.sw.DialSeeds(sm.addrBook, seeds) +//Stop stop sync manager +func (sm *SyncManager) Stop() { + close(sm.quitSync) + sm.minedBlockSub.Unsubscribe() + if !sm.config.VaultMode { + sm.sw.Stop() + } } -//Switch get sync manager switch -func (sm *SyncManager) Switch() *p2p.Switch { - return sm.sw +func (sm *SyncManager) minedBroadcastLoop() { + for { + select { + case obj, ok := <-sm.minedBlockSub.Chan(): + if !ok { + log.WithFields(log.Fields{"module": logModule}).Warning("mined block subscription channel closed") + return + } + + ev, ok := obj.Data.(event.NewMinedBlockEvent) + if !ok { + log.WithFields(log.Fields{"module": logModule}).Error("event type error") + continue + } + + if err := sm.peers.broadcastMinedBlock(ev.Block); err != nil { + log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast mine block") + continue + } + + case <-sm.quitSync: + return + } + } }