14 "github.com/kr/secureheader"
15 log "github.com/sirupsen/logrus"
16 crypto "github.com/tendermint/go-crypto"
17 wire "github.com/tendermint/go-wire"
18 cmn "github.com/tendermint/tmlibs/common"
19 dbm "github.com/tendermint/tmlibs/db"
21 bc "github.com/bytom/blockchain"
22 "github.com/bytom/blockchain/account"
23 "github.com/bytom/blockchain/asset"
24 "github.com/bytom/blockchain/pin"
25 "github.com/bytom/blockchain/pseudohsm"
26 "github.com/bytom/blockchain/txdb"
27 "github.com/bytom/blockchain/txfeed"
28 cfg "github.com/bytom/config"
29 "github.com/bytom/consensus"
30 "github.com/bytom/env"
31 "github.com/bytom/errors"
32 "github.com/bytom/p2p"
33 "github.com/bytom/protocol"
34 "github.com/bytom/protocol/bc/legacy"
35 "github.com/bytom/types"
36 "github.com/bytom/version"
40 httpReadTimeout = 2 * time.Minute
41 httpWriteTimeout = time.Hour
51 privKey crypto.PrivKeyEd25519 // local node's p2p key
52 sw *p2p.Switch // p2p connections
53 addrBook *p2p.AddrBook // known peers
55 evsw types.EventSwitch // pub/sub for services
56 blockStore *txdb.Store
57 bcReactor *bc.BlockchainReactor
58 accounts *account.Manager
59 assets *asset.Registry
64 rootCAs = env.String("ROOT_CA_CERTS", "") // file path
65 splunkAddr = os.Getenv("SPLUNKADDR")
66 logFile = os.Getenv("LOGFILE")
67 logSize = env.Int("LOGSIZE", 5e6) // 5MB
68 logCount = env.Int("LOGCOUNT", 9)
69 logQueries = env.Bool("LOG_QUERIES", false)
70 maxDBConns = env.Int("MAXDBCONNS", 10) // set to 100 in prod
71 rpsToken = env.Int("RATELIMIT_TOKEN", 0) // reqs/sec
72 rpsRemoteAddr = env.Int("RATELIMIT_REMOTE_ADDR", 0) // reqs/sec
73 indexTxs = env.Bool("INDEX_TRANSACTIONS", true)
74 home = bc.HomeDirFromEnvironment()
75 bootURL = env.String("BOOTURL", "")
76 // build vars; initialized by the linker
80 race []interface{} // initialized in race.go
83 func NewNodeDefault(config *cfg.Config) *Node {
84 return NewNode(config)
87 func RedirectHandler(next http.Handler) http.Handler {
88 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
89 if req.URL.Path == "/" {
90 http.Redirect(w, req, "/dashboard/", http.StatusFound)
93 next.ServeHTTP(w, req)
97 type waitHandler struct {
102 func (wh *waitHandler) Set(h http.Handler) {
107 func (wh *waitHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
109 wh.h.ServeHTTP(w, req)
112 func rpcInit(h *bc.BlockchainReactor, config *cfg.Config) {
113 // The waitHandler accepts incoming requests, but blocks until its underlying
114 // handler is set, when the second phase is complete.
115 var coreHandler waitHandler
116 coreHandler.wg.Add(1)
117 mux := http.NewServeMux()
118 mux.Handle("/", &coreHandler)
120 var handler http.Handler = mux
121 handler = RedirectHandler(handler)
123 secureheader.DefaultConfig.PermitClearLoopback = true
124 secureheader.DefaultConfig.HTTPSRedirect = false
125 secureheader.DefaultConfig.Next = handler
127 server := &http.Server{
128 // Note: we should not set TLSConfig here;
129 // we took care of TLS with the listener in maybeUseTLS.
130 Handler: secureheader.DefaultConfig,
131 ReadTimeout: httpReadTimeout,
132 WriteTimeout: httpWriteTimeout,
133 // Disable HTTP/2 for now until the Go implementation is more stable.
134 // https://github.com/golang/go/issues/16450
135 // https://github.com/golang/go/issues/17071
136 TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){},
138 listenAddr := env.String("LISTEN", config.ApiAddress)
139 listener, err := net.Listen("tcp", *listenAddr)
141 cmn.Exit(cmn.Fmt("Failed to register tcp port: %v", err))
144 // The `Serve` call has to happen in its own goroutine because
145 // it's blocking and we need to proceed to the rest of the core setup after
148 if err := server.Serve(listener); err != nil {
149 log.WithField("error", errors.Wrap(err, "Serve")).Error("Rpc server")
155 func NewNode(config *cfg.Config) *Node {
156 ctx := context.Background()
159 txDB := dbm.NewDB("txdb", config.DBBackend, config.DBDir())
160 store := txdb.NewStore(txDB)
162 privKey := crypto.GenPrivKeyEd25519()
165 eventSwitch := types.NewEventSwitch()
166 _, err := eventSwitch.Start()
168 cmn.Exit(cmn.Fmt("Failed to start switch: %v", err))
171 sw := p2p.NewSwitch(config.P2P)
173 genesisBlock := &legacy.Block{
174 BlockHeader: legacy.BlockHeader{},
175 Transactions: []*legacy.Tx{},
177 genesisBlock.UnmarshalText(consensus.InitBlock())
179 txPool := protocol.NewTxPool()
180 chain, err := protocol.NewChain(genesisBlock.Hash(), store, txPool)
182 cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err))
185 if chain.Height() == 0 {
186 if err := chain.SaveBlock(genesisBlock); err != nil {
187 cmn.Exit(cmn.Fmt("Failed to save genesisBlock to store: %v", err))
189 if err := chain.ConnectBlock(genesisBlock); err != nil {
190 cmn.Exit(cmn.Fmt("Failed to connect genesisBlock to chain: %v", err))
194 var accounts *account.Manager = nil
195 var assets *asset.Registry = nil
196 var pinStore *pin.Store = nil
197 var txFeed *txfeed.Tracker = nil
199 if config.Wallet.Enable {
200 accountsDB := dbm.NewDB("account", config.DBBackend, config.DBDir())
201 accUTXODB := dbm.NewDB("accountutxos", config.DBBackend, config.DBDir())
202 pinStore = pin.NewStore(accUTXODB)
203 if err = pinStore.LoadAll(ctx); err != nil {
204 log.WithField("error", err).Error("load pin store")
208 pinHeight := chain.Height()
210 pinHeight = pinHeight - 1
213 pins := []string{account.PinName, account.DeleteSpentsPinName}
214 for _, p := range pins {
215 if err = pinStore.CreatePin(ctx, p, pinHeight); err != nil {
216 log.WithField("error", err).Error("Create pin")
220 accounts = account.NewManager(accountsDB, chain, pinStore)
221 go accounts.ProcessBlocks(ctx)
223 assetsDB := dbm.NewDB("asset", config.DBBackend, config.DBDir())
224 assets = asset.NewRegistry(assetsDB, chain)
226 txFeedDB := dbm.NewDB("txfeeds", config.DBBackend, config.DBDir())
227 txFeed = txfeed.NewTracker(txFeedDB, chain)
229 if err = txFeed.Prepare(ctx); err != nil {
230 log.WithField("error", err).Error("start txfeed")
237 if config.HsmUrl != ""{
239 cmn.Exit(cmn.Fmt("not implement"))
241 hsm, err = pseudohsm.New(config.KeysDir())
243 cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
247 hsm, err := pseudohsm.New(config.KeysDir())
249 cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
251 bcReactor := bc.NewBlockchainReactor(chain, txPool, accounts, assets, sw, hsm, pinStore, txFeed)
253 sw.AddReactor("BLOCKCHAIN", bcReactor)
255 rpcInit(bcReactor, config)
256 // Optionally, start the pex reactor
257 var addrBook *p2p.AddrBook
258 if config.P2P.PexReactor {
259 addrBook = p2p.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
260 pexReactor := p2p.NewPEXReactor(addrBook)
261 sw.AddReactor("PEX", pexReactor)
264 // run the profile server
265 profileHost := config.ProfListenAddress
266 if profileHost != "" {
267 // Profiling bytomd programs.see (https://blog.golang.org/profiling-go-programs)
268 // go tool pprof http://profileHose/debug/pprof/heap
270 http.ListenAndServe(profileHost, nil)
282 bcReactor: bcReactor,
287 node.BaseService = *cmn.NewBaseService(nil, "Node", node)
291 func (n *Node) OnStart() error {
292 // Create & add listener
293 protocol, address := ProtocolAndAddress(n.config.P2P.ListenAddress)
294 l := p2p.NewDefaultListener(protocol, address, n.config.P2P.SkipUPNP, nil)
298 n.sw.SetNodeInfo(n.makeNodeInfo())
299 n.sw.SetNodePrivKey(n.privKey)
300 _, err := n.sw.Start()
305 // If seeds exist, add them to the address book and dial out
306 if n.config.P2P.Seeds != "" {
308 seeds := strings.Split(n.config.P2P.Seeds, ",")
309 if err := n.DialSeeds(seeds); err != nil {
316 func (n *Node) OnStop() {
317 n.BaseService.OnStop()
319 log.Info("Stopping Node")
320 // TODO: gracefully disconnect from peers.
325 func (n *Node) RunForever() {
326 // Sleep forever and then...
327 cmn.TrapSignal(func() {
332 // Add the event switch to reactors, mempool, etc.
333 func SetEventSwitch(evsw types.EventSwitch, eventables ...types.Eventable) {
334 for _, e := range eventables {
335 e.SetEventSwitch(evsw)
339 // Add a Listener to accept inbound peer connections.
340 // Add listeners before starting the Node.
341 // The first listener is the primary listener (in NodeInfo)
342 func (n *Node) AddListener(l p2p.Listener) {
346 func (n *Node) Switch() *p2p.Switch {
350 func (n *Node) EventSwitch() types.EventSwitch {
354 func (n *Node) makeNodeInfo() *p2p.NodeInfo {
355 nodeInfo := &p2p.NodeInfo{
356 PubKey: n.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
357 Moniker: n.config.Moniker,
359 Version: version.Version,
361 cmn.Fmt("wire_version=%v", wire.Version),
362 cmn.Fmt("p2p_version=%v", p2p.Version),
366 if !n.sw.IsListening() {
370 p2pListener := n.sw.Listeners()[0]
371 p2pHost := p2pListener.ExternalAddress().IP.String()
372 p2pPort := p2pListener.ExternalAddress().Port
373 //rpcListenAddr := n.config.RPC.ListenAddress
375 // We assume that the rpcListener has the same ExternalAddress.
376 // This is probably true because both P2P and RPC listeners use UPnP,
377 // except of course if the rpc is only bound to localhost
378 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pHost, p2pPort)
379 //nodeInfo.Other = append(nodeInfo.Other, cmn.Fmt("rpc_addr=%v", rpcListenAddr))
383 //------------------------------------------------------------------------------
385 func (n *Node) NodeInfo() *p2p.NodeInfo {
386 return n.sw.NodeInfo()
389 func (n *Node) DialSeeds(seeds []string) error {
390 return n.sw.DialSeeds(n.addrBook, seeds)
394 func ProtocolAndAddress(listenAddr string) (string, string) {
395 protocol, address := "tcp", listenAddr
396 parts := strings.SplitN(address, "://", 2)
398 protocol, address = parts[0], parts[1]
400 return protocol, address
403 //------------------------------------------------------------------------------