"crypto/tls"
"net"
"net/http"
- "os"
+ _ "net/http/pprof"
"strings"
"sync"
"time"
- bc "github.com/bytom/blockchain"
- cfg "github.com/bytom/config"
- "github.com/bytom/consensus"
- p2p "github.com/bytom/p2p"
- "github.com/bytom/protocol/bc/legacy"
- rpccore "github.com/bytom/rpc/core"
- grpccore "github.com/bytom/rpc/grpc"
- "github.com/bytom/types"
- "github.com/bytom/version"
+ "github.com/kr/secureheader"
+ log "github.com/sirupsen/logrus"
crypto "github.com/tendermint/go-crypto"
wire "github.com/tendermint/go-wire"
cmn "github.com/tendermint/tmlibs/common"
dbm "github.com/tendermint/tmlibs/db"
- "github.com/tendermint/tmlibs/log"
- //rpc "github.com/blockchain/rpc/lib"
+
+ bc "github.com/bytom/blockchain"
+ "github.com/bytom/blockchain/accesstoken"
"github.com/bytom/blockchain/account"
"github.com/bytom/blockchain/asset"
"github.com/bytom/blockchain/pseudohsm"
"github.com/bytom/blockchain/txdb"
- "github.com/bytom/net/http/reqid"
- "github.com/bytom/protocol"
- rpcserver "github.com/bytom/rpc/lib/server"
- // "github.com/bytom/net/http/static"
- // "github.com/bytom/generated/dashboard"
-
+ "github.com/bytom/blockchain/txfeed"
+ w "github.com/bytom/blockchain/wallet"
+ cfg "github.com/bytom/config"
"github.com/bytom/env"
"github.com/bytom/errors"
- bytomlog "github.com/bytom/log"
- "github.com/kr/secureheader"
-
- _ "net/http/pprof"
+ "github.com/bytom/p2p"
+ "github.com/bytom/protocol"
+ "github.com/bytom/types"
+ "github.com/bytom/util/browser"
+ "github.com/bytom/version"
)
const (
- httpReadTimeout = 2 * time.Minute
- httpWriteTimeout = time.Hour
+ httpReadTimeout = 2 * time.Minute
+ httpWriteTimeout = time.Hour
+ webAddress = "http://127.0.0.1:9888"
+ expireReservationsPeriod = time.Second
)
type Node struct {
sw *p2p.Switch // p2p connections
addrBook *p2p.AddrBook // known peers
- // services
- evsw types.EventSwitch // pub/sub for services
- // blockStore *bc.MemStore
- blockStore *txdb.Store
- bcReactor *bc.BlockchainReactor
- accounts *account.Manager
- assets *asset.Registry
- rpcListeners []net.Listener // rpc servers
+ evsw types.EventSwitch // pub/sub for services
+ blockStore *txdb.Store
+ bcReactor *bc.BlockchainReactor
+ accounts *account.Manager
+ assets *asset.Registry
}
-var (
- // config vars
- rootCAs = env.String("ROOT_CA_CERTS", "") // file path
- splunkAddr = os.Getenv("SPLUNKADDR")
- logFile = os.Getenv("LOGFILE")
- logSize = env.Int("LOGSIZE", 5e6) // 5MB
- logCount = env.Int("LOGCOUNT", 9)
- logQueries = env.Bool("LOG_QUERIES", false)
- maxDBConns = env.Int("MAXDBCONNS", 10) // set to 100 in prod
- rpsToken = env.Int("RATELIMIT_TOKEN", 0) // reqs/sec
- rpsRemoteAddr = env.Int("RATELIMIT_REMOTE_ADDR", 0) // reqs/sec
- indexTxs = env.Bool("INDEX_TRANSACTIONS", true)
- home = bc.HomeDirFromEnvironment()
- bootURL = env.String("BOOTURL", "")
- // build vars; initialized by the linker
- buildTag = "?"
- buildCommit = "?"
- buildDate = "?"
- race []interface{} // initialized in race.go
-)
-
-func NewNodeDefault(config *cfg.Config, logger log.Logger) *Node {
- return NewNode(config, logger)
+func NewNodeDefault(config *cfg.Config) *Node {
+ return NewNode(config)
}
func RedirectHandler(next http.Handler) http.Handler {
wh.h.ServeHTTP(w, req)
}
-func rpcInit(h *bc.BlockchainReactor, config *cfg.Config) {
+func rpcInit(h *bc.BlockchainReactor, config *cfg.Config, accessTokens *accesstoken.CredentialStore) {
// The waitHandler accepts incoming requests, but blocks until its underlying
// handler is set, when the second phase is complete.
var coreHandler waitHandler
mux.Handle("/", &coreHandler)
var handler http.Handler = mux
- //handler = core.AuthHandler(handler, raftDB, accessTokens, tlsConfig)
+
+ if config.Auth.Disable == false {
+ handler = bc.AuthHandler(handler, accessTokens)
+ }
handler = RedirectHandler(handler)
- handler = reqid.Handler(handler)
secureheader.DefaultConfig.PermitClearLoopback = true
secureheader.DefaultConfig.HTTPSRedirect = false
TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){},
}
listenAddr := env.String("LISTEN", config.ApiAddress)
- listener, _ := net.Listen("tcp", *listenAddr)
+ log.WithField("api address:", config.ApiAddress).Info("Rpc listen")
+ listener, err := net.Listen("tcp", *listenAddr)
+ if err != nil {
+ cmn.Exit(cmn.Fmt("Failed to register tcp port: %v", err))
+ }
// The `Serve` call has to happen in its own goroutine because
// it's blocking and we need to proceed to the rest of the core setup after
// we call it.
go func() {
- err := server.Serve(listener)
- bytomlog.Fatalkv(context.Background(), bytomlog.KeyError, errors.Wrap(err, "Serve"))
+ if err := server.Serve(listener); err != nil {
+ log.WithField("error", errors.Wrap(err, "Serve")).Error("Rpc server")
+ }
}()
coreHandler.Set(h)
}
-func NewNode(config *cfg.Config, logger log.Logger) *Node {
+func NewNode(config *cfg.Config) *Node {
+ ctx := context.Background()
+
// Get store
- tx_db := dbm.NewDB("txdb", config.DBBackend, config.DBDir())
- store := txdb.NewStore(tx_db)
+ txDB := dbm.NewDB("txdb", config.DBBackend, config.DBDir())
+ store := txdb.NewStore(txDB)
+
+ tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir())
+ accessTokens := accesstoken.NewStore(tokenDB)
privKey := crypto.GenPrivKeyEd25519()
// Make event switch
eventSwitch := types.NewEventSwitch()
- eventSwitch.SetLogger(logger.With("module", "types"))
_, err := eventSwitch.Start()
if err != nil {
cmn.Exit(cmn.Fmt("Failed to start switch: %v", err))
}
- p2pLogger := logger.With("module", "p2p")
-
sw := p2p.NewSwitch(config.P2P)
- sw.SetLogger(p2pLogger)
-
- fastSync := config.FastSync
- genesisBlock := &legacy.Block{
- BlockHeader: legacy.BlockHeader{},
- Transactions: []*legacy.Tx{},
- }
- genesisBlock.UnmarshalText(consensus.InitBlock())
+ genesisBlock := cfg.GenerateGenesisBlock()
- chain, err := protocol.NewChain(context.Background(), genesisBlock.Hash(), store, nil)
- genesisSnap, err := chain.ApplyValidBlock(genesisBlock)
+ txPool := protocol.NewTxPool()
+ chain, err := protocol.NewChain(genesisBlock.Hash(), store, txPool)
if err != nil {
- cmn.Exit(cmn.Fmt("Failed to apply valid block: %v", err))
+ cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err))
}
- if err := chain.CommitAppliedBlock(nil, genesisBlock, genesisSnap); err != nil {
- cmn.Exit(cmn.Fmt("Failed to commit applied block: %v", err))
+
+ if chain.BestBlockHash() == nil {
+ if err := chain.SaveBlock(genesisBlock); err != nil {
+ cmn.Exit(cmn.Fmt("Failed to save genesisBlock to store: %v", err))
+ }
+ if err := chain.ConnectBlock(genesisBlock); err != nil {
+ cmn.Exit(cmn.Fmt("Failed to connect genesisBlock to chain: %v", err))
+ }
}
- txPool := protocol.NewTxPool()
- /* if err != nil {
- cmn.Exit(cmn.Fmt("protocol new chain failed: %v", err))
- }
- err = chain.CommitAppliedBlock(context.Background(), block, state.Empty())
- if err != nil {
- cmn.Exit(cmn.Fmt("commit block failed: %v", err))
- }
- chain.MaxIssuanceWindow = bc.MillisDuration(c.MaxIssuanceWindowMs)
- */
-
- accounts_db := dbm.NewDB("account", config.DBBackend, config.DBDir())
- accounts := account.NewManager(accounts_db, chain)
- assets_db := dbm.NewDB("asset", config.DBBackend, config.DBDir())
- assets := asset.NewRegistry(assets_db, chain)
-
- //Todo HSM
- /*
- if config.HsmUrl != ""{
- // todo remoteHSM
- cmn.Exit(cmn.Fmt("not implement"))
-
- } else {
- hsm, err = pseudohsm.New(config.KeysDir())
- if err != nil {
- cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
- }
- }*/
+ var accounts *account.Manager = nil
+ var assets *asset.Registry = nil
+ var wallet *w.Wallet = nil
+ var txFeed *txfeed.Tracker = nil
+
+ txFeedDB := dbm.NewDB("txfeeds", config.DBBackend, config.DBDir())
+ txFeed = txfeed.NewTracker(txFeedDB, chain)
+
+ if err = txFeed.Prepare(ctx); err != nil {
+ log.WithField("error", err).Error("start txfeed")
+ return nil
+ }
hsm, err := pseudohsm.New(config.KeysDir())
if err != nil {
cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
}
- bcReactor := bc.NewBlockchainReactor(store, chain, txPool, accounts, assets, hsm, fastSync)
- bcReactor.SetLogger(logger.With("module", "blockchain"))
+ if !config.Wallet.Disable {
+ xpubs, _ := hsm.ListKeys()
+ walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir())
+ accounts = account.NewManager(walletDB, chain)
+ assets = asset.NewRegistry(walletDB, chain)
+ wallet, err = w.NewWallet(walletDB, accounts, assets, chain, xpubs)
+ if err != nil {
+ log.WithField("error", err).Error("init NewWallet")
+ }
+ // Clean up expired UTXO reservations periodically.
+ go accounts.ExpireReservations(ctx, expireReservationsPeriod)
+ }
+
+ bcReactor := bc.NewBlockchainReactor(chain, txPool, accounts, assets, sw, hsm, wallet, txFeed, accessTokens, config.Mining)
+
sw.AddReactor("BLOCKCHAIN", bcReactor)
- rpcInit(bcReactor, config)
+ rpcInit(bcReactor, config, accessTokens)
// Optionally, start the pex reactor
var addrBook *p2p.AddrBook
if config.P2P.PexReactor {
addrBook = p2p.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
- addrBook.SetLogger(p2pLogger.With("book", config.P2P.AddrBookFile()))
pexReactor := p2p.NewPEXReactor(addrBook)
- pexReactor.SetLogger(p2pLogger)
sw.AddReactor("PEX", pexReactor)
}
- // add the event switch to all services
- // they should all satisfy events.Eventable
- //SetEventSwitch(eventSwitch, bcReactor, mempoolReactor, consensusReactor)
-
// run the profile server
profileHost := config.ProfListenAddress
if profileHost != "" {
-
+ // Profiling bytomd programs.see (https://blog.golang.org/profiling-go-programs)
+ // go tool pprof http://profileHose/debug/pprof/heap
go func() {
- logger.Error("Profile server", "error", http.ListenAndServe(profileHost, nil))
+ http.ListenAndServe(profileHost, nil)
}()
}
accounts: accounts,
assets: assets,
}
- node.BaseService = *cmn.NewBaseService(logger, "Node", node)
+ node.BaseService = *cmn.NewBaseService(nil, "Node", node)
+
return node
}
+// Lanch web broser or not
+func lanchWebBroser(lanch bool) {
+ if lanch {
+ log.Info("Launching System Browser with :", webAddress)
+ if err := browser.Open(webAddress); err != nil {
+ log.Error(err.Error())
+ return
+ }
+ }
+}
+
func (n *Node) OnStart() error {
// Create & add listener
- protocol, address := ProtocolAndAddress(n.config.P2P.ListenAddress)
- l := p2p.NewDefaultListener(protocol, address, n.config.P2P.SkipUPNP, n.Logger.With("module", "p2p"))
+ p, address := ProtocolAndAddress(n.config.P2P.ListenAddress)
+ l := p2p.NewDefaultListener(p, address, n.config.P2P.SkipUPNP, nil)
n.sw.AddListener(l)
// Start the switch
return err
}
}
- // Run the RPC server
- if n.config.RPC.ListenAddress != "" {
- listeners, err := n.startRPC()
- if err != nil {
- return err
- }
- n.rpcListeners = listeners
- }
-
+ lanchWebBroser(!n.config.Web.Closed)
return nil
}
func (n *Node) OnStop() {
n.BaseService.OnStop()
- n.Logger.Info("Stopping Node")
+ log.Info("Stopping Node")
// TODO: gracefully disconnect from peers.
n.sw.Stop()
- for _, l := range n.rpcListeners {
- n.Logger.Info("Closing rpc listener", "listener", l)
- if err := l.Close(); err != nil {
- n.Logger.Error("Error closing listener", "listener", l, "error", err)
- }
- }
}
func (n *Node) RunForever() {
})
}
-// Add the event switch to reactors, mempool, etc.
-func SetEventSwitch(evsw types.EventSwitch, eventables ...types.Eventable) {
- for _, e := range eventables {
- e.SetEventSwitch(evsw)
- }
-}
-
// Add a Listener to accept inbound peer connections.
// Add listeners before starting the Node.
// The first listener is the primary listener (in NodeInfo)
n.sw.AddListener(l)
}
-// ConfigureRPC sets all variables in rpccore so they will serve
-// rpc calls from this node
-func (n *Node) ConfigureRPC() {
- rpccore.SetEventSwitch(n.evsw)
- rpccore.SetBlockStore(n.blockStore)
- //rpccore.SetConsensusState(n.consensusState)
- //rpccore.SetMempool(n.mempoolReactor.Mempool)
- rpccore.SetSwitch(n.sw)
- //rpccore.SetGenesisDoc(n.genesisDoc)
- rpccore.SetAddrBook(n.addrBook)
- //rpccore.SetProxyAppQuery(n.proxyApp.Query())
- //rpccore.SetTxIndexer(n.txIndexer)
- rpccore.SetLogger(n.Logger.With("module", "rpc"))
-}
-
-func (n *Node) startRPC() ([]net.Listener, error) {
- n.ConfigureRPC()
- listenAddrs := strings.Split(n.config.RPC.ListenAddress, ",")
-
- if n.config.RPC.Unsafe {
- rpccore.AddUnsafeRoutes()
- }
-
- // we may expose the rpc over both a unix and tcp socket
- listeners := make([]net.Listener, len(listenAddrs))
- for i, listenAddr := range listenAddrs {
- mux := http.NewServeMux()
- wm := rpcserver.NewWebsocketManager(rpccore.Routes, n.evsw)
- rpcLogger := n.Logger.With("module", "rpc-server")
- wm.SetLogger(rpcLogger)
- mux.HandleFunc("/websocket", wm.WebsocketHandler)
- rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, rpcLogger)
- listener, err := rpcserver.StartHTTPServer(listenAddr, mux, rpcLogger)
- if err != nil {
- return nil, err
- }
- listeners[i] = listener
- }
-
- // we expose a simplified api over grpc for convenience to app devs
- grpcListenAddr := n.config.RPC.GRPCListenAddress
- if grpcListenAddr != "" {
- listener, err := grpccore.StartGRPCServer(grpcListenAddr)
- if err != nil {
- return nil, err
- }
- listeners = append(listeners, listener)
- }
- return listeners, nil
-}
-
func (n *Node) Switch() *p2p.Switch {
return n.sw
}
nodeInfo := &p2p.NodeInfo{
PubKey: n.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
Moniker: n.config.Moniker,
- Network: "chain0",
+ Network: "bytom",
Version: version.Version,
Other: []string{
cmn.Fmt("wire_version=%v", wire.Version),
p2pListener := n.sw.Listeners()[0]
p2pHost := p2pListener.ExternalAddress().IP.String()
p2pPort := p2pListener.ExternalAddress().Port
- rpcListenAddr := n.config.RPC.ListenAddress
+ //rpcListenAddr := n.config.RPC.ListenAddress
// We assume that the rpcListener has the same ExternalAddress.
// This is probably true because both P2P and RPC listeners use UPnP,
// except of course if the rpc is only bound to localhost
nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pHost, p2pPort)
- nodeInfo.Other = append(nodeInfo.Other, cmn.Fmt("rpc_addr=%v", rpcListenAddr))
+ //nodeInfo.Other = append(nodeInfo.Other, cmn.Fmt("rpc_addr=%v", rpcListenAddr))
return nodeInfo
}
// Defaults to tcp
func ProtocolAndAddress(listenAddr string) (string, string) {
- protocol, address := "tcp", listenAddr
+ p, address := "tcp", listenAddr
parts := strings.SplitN(address, "://", 2)
if len(parts) == 2 {
- protocol, address = parts[0], parts[1]
+ p, address = parts[0], parts[1]
}
- return protocol, address
+ return p, address
}
//------------------------------------------------------------------------------