package netsync
import (
- "encoding/hex"
"errors"
- "net"
- "path"
"reflect"
- "strconv"
- "strings"
log "github.com/sirupsen/logrus"
- "github.com/tendermint/go-crypto"
- cmn "github.com/tendermint/tmlibs/common"
- "github.com/vapor/chain"
+ "github.com/tendermint/go-crypto"
cfg "github.com/vapor/config"
"github.com/vapor/consensus"
+ "github.com/vapor/event"
"github.com/vapor/p2p"
- "github.com/vapor/p2p/discover"
core "github.com/vapor/protocol"
"github.com/vapor/protocol/bc"
"github.com/vapor/protocol/bc/types"
- "github.com/vapor/version"
)
const (
maxFilterAddressCount = 1000
)
+var (
+ errVaultModeDialPeer = errors.New("can't dial peer in vault mode")
+)
+
+// Chain is the interface for Bytom core
+type Chain interface {
+ BestBlockHeader() *types.BlockHeader
+ BestBlockHeight() uint64
+ GetBlockByHash(*bc.Hash) (*types.Block, error)
+ GetBlockByHeight(uint64) (*types.Block, error)
+ GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
+ GetHeaderByHeight(uint64) (*types.BlockHeader, error)
+ GetTransactionStatus(*bc.Hash) (*bc.TransactionStatus, error)
+ InMainChain(bc.Hash) bool
+ ProcessBlock(*types.Block) (bool, error)
+ ValidateTx(*types.Tx) (bool, error)
+}
+
+type Switch interface {
+ AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
+ AddBannedPeer(string) error
+ StopPeerGracefully(string)
+ NodeInfo() *p2p.NodeInfo
+ Start() (bool, error)
+ Stop() bool
+ IsListening() bool
+ DialPeerWithAddress(addr *p2p.NetAddress) error
+ Peers() *p2p.PeerSet
+}
+
//SyncManager Sync Manager is responsible for the business layer information synchronization
type SyncManager struct {
- sw *p2p.Switch
- genesisHash bc.Hash
-
- privKey crypto.PrivKeyEd25519 // local node's p2p key
- chain chain.Chain
+ sw Switch
+ genesisHash bc.Hash
+ chain Chain
txPool *core.TxPool
blockFetcher *blockFetcher
blockKeeper *blockKeeper
peers *peerSet
- newTxCh chan *types.Tx
- newBlockCh chan *bc.Hash
- txSyncCh chan *txSyncMsg
- quitSync chan struct{}
- config *cfg.Config
+ txSyncCh chan *txSyncMsg
+ quitSync chan struct{}
+ config *cfg.Config
+
+ eventDispatcher *event.Dispatcher
+ minedBlockSub *event.Subscription
+ txMsgSub *event.Subscription
+}
+
+// CreateSyncManager create sync manager and set switch.
+func NewSyncManager(config *cfg.Config, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) {
+ sw, err := p2p.NewSwitch(config)
+ if err != nil {
+ return nil, err
+ }
+
+ return newSyncManager(config, sw, chain, txPool, dispatcher)
}
//NewSyncManager create a sync manager
-func NewSyncManager(config *cfg.Config, chain chain.Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash) (*SyncManager, error) {
+func newSyncManager(config *cfg.Config, sw Switch, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) {
genesisHeader, err := chain.GetHeaderByHeight(0)
if err != nil {
return nil, err
}
-
- sw := p2p.NewSwitch(config)
peers := newPeerSet(sw)
manager := &SyncManager{
- sw: sw,
- genesisHash: genesisHeader.Hash(),
- txPool: txPool,
- chain: chain,
- privKey: crypto.GenPrivKeyEd25519(),
- blockFetcher: newBlockFetcher(chain, peers),
- blockKeeper: newBlockKeeper(chain, peers),
- peers: peers,
- newTxCh: make(chan *types.Tx, maxTxChanSize),
- newBlockCh: newBlockCh,
- txSyncCh: make(chan *txSyncMsg),
- quitSync: make(chan struct{}),
- config: config,
- }
-
- protocolReactor := NewProtocolReactor(manager, manager.peers)
- manager.sw.AddReactor("PROTOCOL", protocolReactor)
-
- // Create & add listener
- var listenerStatus bool
- var l p2p.Listener
- if !config.VaultMode {
- p, address := protocolAndAddress(manager.config.P2P.ListenAddress)
- l, listenerStatus = p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP)
- manager.sw.AddListener(l)
+ sw: sw,
+ genesisHash: genesisHeader.Hash(),
+ txPool: txPool,
+ chain: chain,
+ blockFetcher: newBlockFetcher(chain, peers),
+ blockKeeper: newBlockKeeper(chain, peers),
+ peers: peers,
+ txSyncCh: make(chan *txSyncMsg),
+ quitSync: make(chan struct{}),
+ config: config,
+ eventDispatcher: dispatcher,
+ }
- discv, err := initDiscover(config, &manager.privKey, l.ExternalAddress().Port)
- if err != nil {
- return nil, err
- }
- manager.sw.SetDiscv(discv)
+ if !config.VaultMode {
+ protocolReactor := NewProtocolReactor(manager, peers)
+ manager.sw.AddReactor("PROTOCOL", protocolReactor)
}
- manager.sw.SetNodeInfo(manager.makeNodeInfo(listenerStatus))
- manager.sw.SetNodePrivKey(manager.privKey)
return manager, nil
}
return nil
}
-// GetNewTxCh return a unconfirmed transaction feed channel
-func (sm *SyncManager) GetNewTxCh() chan *types.Tx {
- return sm.newTxCh
+func (sm *SyncManager) DialPeerWithAddress(addr *p2p.NetAddress) error {
+ if sm.config.VaultMode {
+ return errVaultModeDialPeer
+ }
+
+ return sm.sw.DialPeerWithAddress(addr)
+}
+
+func (sm *SyncManager) GetNetwork() string {
+ return sm.config.ChainID
}
//GetPeerInfos return peer info of all peers
return peer == nil || peer.Height() <= sm.chain.BestBlockHeight()
}
-//NodeInfo get P2P peer node info
-func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
- return sm.sw.NodeInfo()
-}
-
//StopPeer try to stop peer by given ID
func (sm *SyncManager) StopPeer(peerID string) error {
if peer := sm.peers.getPeer(peerID); peer == nil {
return nil
}
-//Switch get sync manager switch
-func (sm *SyncManager) Switch() *p2p.Switch {
- return sm.sw
-}
-
func (sm *SyncManager) handleBlockMsg(peer *peer, msg *BlockMessage) {
block, err := msg.GetBlock()
if err != nil {
}
if genesisHash := msg.GetGenesisHash(); sm.genesisHash != *genesisHash {
- log.WithFields(log.Fields{
- "module": logModule,
- "remote genesis": genesisHash.String(),
- "local genesis": sm.genesisHash.String(),
- }).Warn("fail hand shake due to differnt genesis")
+ log.WithFields(log.Fields{"module": logModule, "remote genesis": genesisHash.String(), "local genesis": sm.genesisHash.String()}).Warn("fail hand shake due to differnt genesis")
return
}
return
}
- if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && isOrphan == false {
+ if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
sm.peers.addBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
}
}
+func (sm *SyncManager) IsListening() bool {
+ if sm.config.VaultMode {
+ return false
+ }
+ return sm.sw.IsListening()
+}
+
+func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
+ if sm.config.VaultMode {
+ return p2p.NewNodeInfo(sm.config, crypto.PubKeyEd25519{}, "")
+ }
+ return sm.sw.NodeInfo()
+}
+
+func (sm *SyncManager) PeerCount() int {
+ if sm.config.VaultMode {
+ return 0
+ }
+ return len(sm.sw.Peers().List())
+}
+
func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg BlockchainMessage) {
peer := sm.peers.getPeer(basePeer.ID())
if peer == nil && msgType != StatusResponseByte && msgType != StatusRequestByte {
case *GetMerkleBlockMessage:
sm.handleGetMerkleBlockMsg(peer, msg)
- // TODO PBFT消息
-
default:
log.WithFields(log.Fields{
"module": logModule,
}
}
-// Defaults to tcp
-func protocolAndAddress(listenAddr string) (string, string) {
- p, address := "tcp", listenAddr
- parts := strings.SplitN(address, "://", 2)
- if len(parts) == 2 {
- p, address = parts[0], parts[1]
- }
- return p, address
-}
-
-func (sm *SyncManager) makeNodeInfo(listenerStatus bool) *p2p.NodeInfo {
- nodeInfo := &p2p.NodeInfo{
- PubKey: sm.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
- Moniker: sm.config.Moniker,
- Network: sm.config.ChainID,
- Version: version.Version,
- Other: []string{strconv.FormatUint(uint64(consensus.DefaultServices), 10)},
+func (sm *SyncManager) Start() error {
+ var err error
+ if _, err = sm.sw.Start(); err != nil {
+ log.Error("switch start err")
+ return err
}
- if !sm.sw.IsListening() {
- return nodeInfo
+ sm.minedBlockSub, err = sm.eventDispatcher.Subscribe(event.NewMinedBlockEvent{})
+ if err != nil {
+ return err
}
- p2pListener := sm.sw.Listeners()[0]
-
- // We assume that the rpcListener has the same ExternalAddress.
- // This is probably true because both P2P and RPC listeners use UPnP,
- // except of course if the rpc is only bound to localhost
- if listenerStatus {
- nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.ExternalAddress().IP.String(), p2pListener.ExternalAddress().Port)
- } else {
- nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.InternalAddress().IP.String(), p2pListener.InternalAddress().Port)
+ sm.txMsgSub, err = sm.eventDispatcher.Subscribe(core.TxMsgEvent{})
+ if err != nil {
+ return err
}
- return nodeInfo
-}
-//Start start sync manager service
-func (sm *SyncManager) Start() {
- if _, err := sm.sw.Start(); err != nil {
- cmn.Exit(cmn.Fmt("fail on start SyncManager: %v", err))
- }
// broadcast transactions
go sm.txBroadcastLoop()
go sm.minedBroadcastLoop()
go sm.txSyncLoop()
+
+ return nil
}
//Stop stop sync manager
func (sm *SyncManager) Stop() {
close(sm.quitSync)
- sm.sw.Stop()
-}
-
-func initDiscover(config *cfg.Config, priv *crypto.PrivKeyEd25519, port uint16) (*discover.Network, error) {
- addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort("0.0.0.0", strconv.FormatUint(uint64(port), 10)))
- if err != nil {
- return nil, err
- }
-
- conn, err := net.ListenUDP("udp", addr)
- if err != nil {
- return nil, err
+ sm.minedBlockSub.Unsubscribe()
+ if !sm.config.VaultMode {
+ sm.sw.Stop()
}
-
- realaddr := conn.LocalAddr().(*net.UDPAddr)
- ntab, err := discover.ListenUDP(priv, conn, realaddr, path.Join(config.DBDir(), "discover.db"), nil)
- if err != nil {
- return nil, err
- }
-
- // add the seeds node to the discover table
- if config.P2P.Seeds == "" {
- return ntab, nil
- }
- nodes := []*discover.Node{}
- for _, seed := range strings.Split(config.P2P.Seeds, ",") {
- version.Status.AddSeed(seed)
- url := "enode://" + hex.EncodeToString(crypto.Sha256([]byte(seed))) + "@" + seed
- nodes = append(nodes, discover.MustParseNode(url))
- }
- if err = ntab.SetFallbackNodes(nodes); err != nil {
- return nil, err
- }
- return ntab, nil
}
func (sm *SyncManager) minedBroadcastLoop() {
for {
select {
- case blockHash := <-sm.newBlockCh:
- block, err := sm.chain.GetBlockByHash(blockHash)
- if err != nil {
- log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on mined broadcast loop get block")
+ case obj, ok := <-sm.minedBlockSub.Chan():
+ if !ok {
+ log.WithFields(log.Fields{"module": logModule}).Warning("mined block subscription channel closed")
return
}
- if err := sm.peers.broadcastMinedBlock(block); err != nil {
+
+ ev, ok := obj.Data.(event.NewMinedBlockEvent)
+ if !ok {
+ log.WithFields(log.Fields{"module": logModule}).Error("event type error")
+ continue
+ }
+
+ if err := sm.peers.broadcastMinedBlock(&ev.Block); err != nil {
log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast mine block")
- return
+ continue
}
+
case <-sm.quitSync:
return
}