import (
"context"
+ "errors"
"net/http"
_ "net/http/pprof"
+ "os"
+ "path/filepath"
"strings"
- "time"
+ "github.com/prometheus/prometheus/util/flock"
log "github.com/sirupsen/logrus"
- "github.com/tendermint/go-crypto"
- "github.com/tendermint/go-wire"
cmn "github.com/tendermint/tmlibs/common"
dbm "github.com/tendermint/tmlibs/db"
+ browser "github.com/toqueteos/webbrowser"
- "github.com/bytom/api"
- "github.com/bytom/crypto/ed25519/chainkd"
- bc "github.com/bytom/blockchain"
"github.com/bytom/accesstoken"
"github.com/bytom/account"
+ "github.com/bytom/api"
"github.com/bytom/asset"
"github.com/bytom/blockchain/pseudohsm"
"github.com/bytom/blockchain/txfeed"
cfg "github.com/bytom/config"
+ "github.com/bytom/consensus"
"github.com/bytom/database/leveldb"
"github.com/bytom/env"
- "github.com/bytom/p2p"
+ "github.com/bytom/mining/cpuminer"
+ "github.com/bytom/mining/miningpool"
+ "github.com/bytom/mining/tensority"
+ "github.com/bytom/netsync"
"github.com/bytom/protocol"
- "github.com/bytom/types"
- "github.com/bytom/util/browser"
- "github.com/bytom/version"
+ "github.com/bytom/protocol/bc"
w "github.com/bytom/wallet"
)
const (
- webAddress = "http://127.0.0.1:9888"
- expireReservationsPeriod = time.Second
+ webHost = "http://127.0.0.1"
+ maxNewBlockChSize = 1024
)
type Node struct {
// config
config *cfg.Config
- // network
- privKey crypto.PrivKeyEd25519 // local node's p2p key
- sw *p2p.Switch // p2p connections
- addrBook *p2p.AddrBook // known peers
+ syncManager *netsync.SyncManager
- evsw types.EventSwitch // pub/sub for services
- bcReactor *bc.BlockchainReactor
+ //bcReactor *bc.BlockchainReactor
wallet *w.Wallet
accessTokens *accesstoken.CredentialStore
api *api.API
chain *protocol.Chain
+ txfeed *txfeed.Tracker
+ cpuMiner *cpuminer.CPUMiner
+ miningPool *miningpool.MiningPool
+ miningEnable bool
}
func NewNode(config *cfg.Config) *Node {
ctx := context.Background()
-
+ if err := lockDataDirectory(config); err != nil {
+ cmn.Exit("Error: " + err.Error())
+ }
+ initLogFile(config)
+ initActiveNetParams(config)
// Get store
- txDB := dbm.NewDB("txdb", config.DBBackend, config.DBDir())
- store := leveldb.NewStore(txDB)
+ coreDB := dbm.NewDB("core", config.DBBackend, config.DBDir())
+ store := leveldb.NewStore(coreDB)
tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir())
accessTokens := accesstoken.NewStore(tokenDB)
- privKey := crypto.GenPrivKeyEd25519()
-
- // Make event switch
- eventSwitch := types.NewEventSwitch()
- _, err := eventSwitch.Start()
- if err != nil {
- cmn.Exit(cmn.Fmt("Failed to start switch: %v", err))
- }
-
- trustHistoryDB := dbm.NewDB("trusthistory", config.DBBackend, config.DBDir())
-
- sw := p2p.NewSwitch(config.P2P, trustHistoryDB)
-
- genesisBlock := cfg.GenerateGenesisBlock()
-
- txPool := protocol.NewTxPool()
- chain, err := protocol.NewChain(genesisBlock.Hash(), store, txPool)
+ txPool := protocol.NewTxPool(store)
+ chain, err := protocol.NewChain(store, txPool)
if err != nil {
cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err))
}
- if chain.BestBlockHash() == nil {
- if err := chain.SaveBlock(genesisBlock); err != nil {
- cmn.Exit(cmn.Fmt("Failed to save genesisBlock to store: %v", err))
- }
- if err := chain.ConnectBlock(genesisBlock); err != nil {
- cmn.Exit(cmn.Fmt("Failed to connect genesisBlock to chain: %v", err))
- }
- }
-
var accounts *account.Manager = nil
var assets *asset.Registry = nil
var wallet *w.Wallet = nil
walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir())
accounts = account.NewManager(walletDB, chain)
assets = asset.NewRegistry(walletDB, chain)
- wallet, err = w.NewWallet(walletDB, accounts, assets, hsm, accessTokens, chain)
+ wallet, err = w.NewWallet(walletDB, accounts, assets, hsm, chain)
if err != nil {
log.WithField("error", err).Error("init NewWallet")
}
- if err := initOrRecoverAccount(hsm, wallet); err != nil {
- log.WithField("error", err).Error("initialize or recover account")
+ // trigger rescan wallet
+ if config.Wallet.Rescan {
+ wallet.RescanBlocks()
}
-
- // Clean up expired UTXO reservations periodically.
- go accounts.ExpireReservations(ctx, expireReservationsPeriod)
}
+ newBlockCh := make(chan *bc.Hash, maxNewBlockChSize)
- bcReactor := bc.NewBlockchainReactor(chain, txPool, sw, wallet, txFeed, config.Mining)
-
- sw.AddReactor("BLOCKCHAIN", bcReactor)
+ syncManager, _ := netsync.NewSyncManager(config, chain, txPool, newBlockCh)
- // Optionally, start the pex reactor
- var addrBook *p2p.AddrBook
- if config.P2P.PexReactor {
- addrBook = p2p.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
- pexReactor := p2p.NewPEXReactor(addrBook)
- sw.AddReactor("PEX", pexReactor)
- }
+ // get transaction from txPool and send it to syncManager and wallet
+ go newPoolTxListener(txPool, syncManager, wallet)
// run the profile server
profileHost := config.ProfListenAddress
}
node := &Node{
- config: config,
-
- privKey: privKey,
- sw: sw,
- addrBook: addrBook,
-
- evsw: eventSwitch,
- bcReactor: bcReactor,
+ config: config,
+ syncManager: syncManager,
accessTokens: accessTokens,
wallet: wallet,
chain: chain,
+ txfeed: txFeed,
+ miningEnable: config.Mining,
}
+
+ node.cpuMiner = cpuminer.NewCPUMiner(chain, accounts, txPool, newBlockCh)
+ node.miningPool = miningpool.NewMiningPool(chain, accounts, txPool, newBlockCh)
+
node.BaseService = *cmn.NewBaseService(nil, "Node", node)
+ if config.Simd.Enable {
+ tensority.UseSIMD = true
+ }
+
return node
}
-func initOrRecoverAccount(hsm *pseudohsm.HSM, wallet *w.Wallet) error {
- xpubs := hsm.ListKeys()
-
- if len(xpubs) == 0 {
- xpub, err := hsm.XCreate("default", "123456")
- if err != nil {
- return err
+// newPoolTxListener listener transaction from txPool, and send it to syncManager and wallet
+func newPoolTxListener(txPool *protocol.TxPool, syncManager *netsync.SyncManager, wallet *w.Wallet) {
+ txMsgCh := txPool.GetMsgCh()
+ syncManagerTxCh := syncManager.GetNewTxCh()
+
+ for {
+ msg := <-txMsgCh
+ switch msg.MsgType {
+ case protocol.MsgNewTx:
+ syncManagerTxCh <- msg.Tx
+ if wallet != nil {
+ wallet.AddUnconfirmedTx(msg.TxDesc)
+ }
+ case protocol.MsgRemoveTx:
+ if wallet != nil {
+ wallet.RemoveUnconfirmedTx(msg.TxDesc)
+ }
+ default:
+ log.Warn("got unknow message type from the txPool channel")
}
-
- wallet.AccountMgr.Create(nil, []chainkd.XPub{xpub.XPub}, 1, "default", nil)
- return nil
}
+}
- accounts, err := wallet.AccountMgr.ListAccounts("")
+// Lock data directory after daemonization
+func lockDataDirectory(config *cfg.Config) error {
+ _, _, err := flock.New(filepath.Join(config.RootDir, "LOCK"))
if err != nil {
- return err
+ return errors.New("datadir already used by another process")
}
+ return nil
+}
- if len(accounts) > 0 {
- return nil
+func initActiveNetParams(config *cfg.Config) {
+ var exist bool
+ consensus.ActiveNetParams, exist = consensus.NetParams[config.ChainID]
+ if !exist {
+ cmn.Exit(cmn.Fmt("chain_id[%v] don't exist", config.ChainID))
}
+}
- for i, xPub := range xpubs {
- if err := wallet.ImportAccountXpubKey(i, xPub, w.RecoveryIndex); err != nil {
- return err
- }
+func initLogFile(config *cfg.Config) {
+ if config.LogFile == "" {
+ return
}
- return nil
+ cmn.EnsureDir(filepath.Dir(config.LogFile), 0700)
+ file, err := os.OpenFile(config.LogFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
+ if err == nil {
+ log.SetOutput(file)
+ } else {
+ log.WithField("err", err).Info("using default")
+ }
+
}
// Lanch web broser or not
-func lanchWebBroser(lanch bool) {
- if lanch {
- log.Info("Launching System Browser with :", webAddress)
- if err := browser.Open(webAddress); err != nil {
- log.Error(err.Error())
- return
- }
+func launchWebBrowser(port string) {
+ webAddress := webHost + ":" + port
+ log.Info("Launching System Browser with :", webAddress)
+ if err := browser.Open(webAddress); err != nil {
+ log.Error(err.Error())
+ return
}
}
func (n *Node) initAndstartApiServer() {
- n.api = api.NewAPI(n.bcReactor, n.wallet, n.chain, n.config)
+ n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.cpuMiner, n.miningPool, n.chain, n.config, n.accessTokens)
listenAddr := env.String("LISTEN", n.config.ApiAddress)
+ env.Parse()
n.api.StartServer(*listenAddr)
}
func (n *Node) OnStart() error {
- // Create & add listener
- p, address := ProtocolAndAddress(n.config.P2P.ListenAddress)
- l := p2p.NewDefaultListener(p, address, n.config.P2P.SkipUPNP, nil)
- n.sw.AddListener(l)
-
- // Start the switch
- n.sw.SetNodeInfo(n.makeNodeInfo())
- n.sw.SetNodePrivKey(n.privKey)
- _, err := n.sw.Start()
- if err != nil {
- return err
- }
-
- // If seeds exist, add them to the address book and dial out
- if n.config.P2P.Seeds != "" {
- // dial out
- seeds := strings.Split(n.config.P2P.Seeds, ",")
- if err := n.DialSeeds(seeds); err != nil {
- return err
+ if n.miningEnable {
+ if _, err := n.wallet.AccountMgr.GetMiningAddress(); err != nil {
+ n.miningEnable = false
+ log.Error(err)
+ } else {
+ n.cpuMiner.Start()
}
}
-
+ if !n.config.VaultMode {
+ n.syncManager.Start()
+ }
n.initAndstartApiServer()
- lanchWebBroser(!n.config.Web.Closed)
-
+ if !n.config.Web.Closed {
+ s := strings.Split(n.config.ApiAddress, ":")
+ if len(s) != 2 {
+ log.Error("Invalid api address")
+ }
+ launchWebBrowser(s[1])
+ }
return nil
}
func (n *Node) OnStop() {
n.BaseService.OnStop()
-
- log.Info("Stopping Node")
- // TODO: gracefully disconnect from peers.
- n.sw.Stop()
-
+ if n.miningEnable {
+ n.cpuMiner.Stop()
+ }
+ if !n.config.VaultMode {
+ n.syncManager.Stop()
+ }
}
func (n *Node) RunForever() {
})
}
-// Add a Listener to accept inbound peer connections.
-// Add listeners before starting the Node.
-// The first listener is the primary listener (in NodeInfo)
-func (n *Node) AddListener(l p2p.Listener) {
- n.sw.AddListener(l)
+func (n *Node) SyncManager() *netsync.SyncManager {
+ return n.syncManager
}
-func (n *Node) Switch() *p2p.Switch {
- return n.sw
+func (n *Node) MiningPool() *miningpool.MiningPool {
+ return n.miningPool
}
-
-func (n *Node) EventSwitch() types.EventSwitch {
- return n.evsw
-}
-
-func (n *Node) makeNodeInfo() *p2p.NodeInfo {
- nodeInfo := &p2p.NodeInfo{
- PubKey: n.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
- Moniker: n.config.Moniker,
- Network: "bytom",
- Version: version.Version,
- Other: []string{
- cmn.Fmt("wire_version=%v", wire.Version),
- cmn.Fmt("p2p_version=%v", p2p.Version),
- },
- }
-
- if !n.sw.IsListening() {
- return nodeInfo
- }
-
- p2pListener := n.sw.Listeners()[0]
- p2pHost := p2pListener.ExternalAddress().IP.String()
- p2pPort := p2pListener.ExternalAddress().Port
- //rpcListenAddr := n.config.RPC.ListenAddress
-
- // We assume that the rpcListener has the same ExternalAddress.
- // This is probably true because both P2P and RPC listeners use UPnP,
- // except of course if the rpc is only bound to localhost
- nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pHost, p2pPort)
- //nodeInfo.Other = append(nodeInfo.Other, cmn.Fmt("rpc_addr=%v", rpcListenAddr))
- return nodeInfo
-}
-
-//------------------------------------------------------------------------------
-
-func (n *Node) NodeInfo() *p2p.NodeInfo {
- return n.sw.NodeInfo()
-}
-
-func (n *Node) DialSeeds(seeds []string) error {
- return n.sw.DialSeeds(n.addrBook, seeds)
-}
-
-// Defaults to tcp
-func ProtocolAndAddress(listenAddr string) (string, string) {
- p, address := "tcp", listenAddr
- parts := strings.SplitN(address, "://", 2)
- if len(parts) == 2 {
- p, address = parts[0], parts[1]
- }
- return p, address
-}
-
-//------------------------------------------------------------------------------