13 "github.com/kr/secureheader"
14 log "github.com/sirupsen/logrus"
15 crypto "github.com/tendermint/go-crypto"
16 wire "github.com/tendermint/go-wire"
17 cmn "github.com/tendermint/tmlibs/common"
18 dbm "github.com/tendermint/tmlibs/db"
21 bc "github.com/bytom/blockchain"
22 "github.com/bytom/blockchain/account"
23 "github.com/bytom/blockchain/asset"
24 "github.com/bytom/blockchain/pin"
25 "github.com/bytom/blockchain/pseudohsm"
26 "github.com/bytom/blockchain/txdb"
27 cfg "github.com/bytom/config"
28 "github.com/bytom/consensus"
29 "github.com/bytom/env"
30 "github.com/bytom/errors"
31 p2p "github.com/bytom/p2p"
32 "github.com/bytom/protocol"
33 "github.com/bytom/protocol/bc/legacy"
34 "github.com/bytom/types"
35 "github.com/bytom/version"
39 httpReadTimeout = 2 * time.Minute
40 httpWriteTimeout = time.Hour
50 privKey crypto.PrivKeyEd25519 // local node's p2p key
51 sw *p2p.Switch // p2p connections
52 addrBook *p2p.AddrBook // known peers
55 evsw types.EventSwitch // pub/sub for services
56 // blockStore *bc.MemStore
57 blockStore *txdb.Store
58 bcReactor *bc.BlockchainReactor
59 accounts *account.Manager
60 assets *asset.Registry
65 rootCAs = env.String("ROOT_CA_CERTS", "") // file path
66 splunkAddr = os.Getenv("SPLUNKADDR")
67 logFile = os.Getenv("LOGFILE")
68 logSize = env.Int("LOGSIZE", 5e6) // 5MB
69 logCount = env.Int("LOGCOUNT", 9)
70 logQueries = env.Bool("LOG_QUERIES", false)
71 maxDBConns = env.Int("MAXDBCONNS", 10) // set to 100 in prod
72 rpsToken = env.Int("RATELIMIT_TOKEN", 0) // reqs/sec
73 rpsRemoteAddr = env.Int("RATELIMIT_REMOTE_ADDR", 0) // reqs/sec
74 indexTxs = env.Bool("INDEX_TRANSACTIONS", true)
75 home = bc.HomeDirFromEnvironment()
76 bootURL = env.String("BOOTURL", "")
77 // build vars; initialized by the linker
81 race []interface{} // initialized in race.go
84 func NewNodeDefault(config *cfg.Config) *Node {
85 return NewNode(config)
88 func RedirectHandler(next http.Handler) http.Handler {
89 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
90 if req.URL.Path == "/" {
91 http.Redirect(w, req, "/dashboard/", http.StatusFound)
94 next.ServeHTTP(w, req)
98 type waitHandler struct {
103 func (wh *waitHandler) Set(h http.Handler) {
108 func (wh *waitHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
110 wh.h.ServeHTTP(w, req)
113 func rpcInit(h *bc.BlockchainReactor, config *cfg.Config) {
114 // The waitHandler accepts incoming requests, but blocks until its underlying
115 // handler is set, when the second phase is complete.
116 var coreHandler waitHandler
117 coreHandler.wg.Add(1)
118 mux := http.NewServeMux()
119 mux.Handle("/", &coreHandler)
121 var handler http.Handler = mux
122 handler = RedirectHandler(handler)
124 secureheader.DefaultConfig.PermitClearLoopback = true
125 secureheader.DefaultConfig.HTTPSRedirect = false
126 secureheader.DefaultConfig.Next = handler
128 server := &http.Server{
129 // Note: we should not set TLSConfig here;
130 // we took care of TLS with the listener in maybeUseTLS.
131 Handler: secureheader.DefaultConfig,
132 ReadTimeout: httpReadTimeout,
133 WriteTimeout: httpWriteTimeout,
134 // Disable HTTP/2 for now until the Go implementation is more stable.
135 // https://github.com/golang/go/issues/16450
136 // https://github.com/golang/go/issues/17071
137 TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){},
139 listenAddr := env.String("LISTEN", config.ApiAddress)
140 listener, _ := net.Listen("tcp", *listenAddr)
142 // The `Serve` call has to happen in its own goroutine because
143 // it's blocking and we need to proceed to the rest of the core setup after
146 err := server.Serve(listener)
147 log.WithField("error", errors.Wrap(err, "Serve")).Error("Rpc server")
152 func NewNode(config *cfg.Config) *Node {
153 ctx := context.Background()
156 txDB := dbm.NewDB("txdb", config.DBBackend, config.DBDir())
157 store := txdb.NewStore(txDB)
159 privKey := crypto.GenPrivKeyEd25519()
162 eventSwitch := types.NewEventSwitch()
163 _, err := eventSwitch.Start()
165 cmn.Exit(cmn.Fmt("Failed to start switch: %v", err))
168 sw := p2p.NewSwitch(config.P2P)
170 fastSync := config.FastSync
172 genesisBlock := &legacy.Block{
173 BlockHeader: legacy.BlockHeader{},
174 Transactions: []*legacy.Tx{},
176 genesisBlock.UnmarshalText(consensus.InitBlock())
178 txPool := protocol.NewTxPool()
179 chain, err := protocol.NewChain(ctx, genesisBlock.Hash(), store, txPool, nil)
181 if store.Height() < 1 {
182 if err := chain.AddBlock(nil, genesisBlock); err != nil {
183 cmn.Exit(cmn.Fmt("Failed to add genesisBlock to Chain: %v", err))
187 var accounts *account.Manager = nil
188 var assets *asset.Registry = nil
189 var pinStore *pin.Store = nil
191 if config.Wallet.Enable {
192 accountsDB := dbm.NewDB("account", config.DBBackend, config.DBDir())
193 accUTXODB := dbm.NewDB("accountutxos", config.DBBackend, config.DBDir())
194 pinStore = pin.NewStore(accUTXODB)
195 err = pinStore.LoadAll(ctx)
197 log.WithField("error", err).Error("load pin store")
201 pinHeight := store.Height()
203 pinHeight = pinHeight - 1
206 pins := []string{account.PinName, account.DeleteSpentsPinName}
207 for _, p := range pins {
208 err = pinStore.CreatePin(ctx, p, pinHeight)
210 log.WithField("error", err).Error("Create pin")
214 accounts = account.NewManager(accountsDB, chain, pinStore)
215 go accounts.ProcessBlocks(ctx)
217 assetsDB := dbm.NewDB("asset", config.DBBackend, config.DBDir())
218 assets = asset.NewRegistry(assetsDB, chain)
222 if config.HsmUrl != ""{
224 cmn.Exit(cmn.Fmt("not implement"))
226 hsm, err = pseudohsm.New(config.KeysDir())
228 cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
232 hsm, err := pseudohsm.New(config.KeysDir())
234 cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
236 bcReactor := bc.NewBlockchainReactor(
247 sw.AddReactor("BLOCKCHAIN", bcReactor)
249 rpcInit(bcReactor, config)
250 // Optionally, start the pex reactor
251 var addrBook *p2p.AddrBook
252 if config.P2P.PexReactor {
253 addrBook = p2p.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
254 pexReactor := p2p.NewPEXReactor(addrBook)
255 sw.AddReactor("PEX", pexReactor)
258 // add the event switch to all services
259 // they should all satisfy events.Eventable
260 //SetEventSwitch(eventSwitch, bcReactor, mempoolReactor, consensusReactor)
262 // run the profile server
263 profileHost := config.ProfListenAddress
264 if profileHost != "" {
267 log.WithField("error", http.ListenAndServe(profileHost, nil)).Error("Profile server")
279 bcReactor: bcReactor,
284 node.BaseService = *cmn.NewBaseService(nil, "Node", node)
288 func (n *Node) OnStart() error {
289 // Create & add listener
290 protocol, address := ProtocolAndAddress(n.config.P2P.ListenAddress)
291 l := p2p.NewDefaultListener(protocol, address, n.config.P2P.SkipUPNP, nil)
295 n.sw.SetNodeInfo(n.makeNodeInfo())
296 n.sw.SetNodePrivKey(n.privKey)
297 _, err := n.sw.Start()
302 // If seeds exist, add them to the address book and dial out
303 if n.config.P2P.Seeds != "" {
305 seeds := strings.Split(n.config.P2P.Seeds, ",")
306 if err := n.DialSeeds(seeds); err != nil {
313 func (n *Node) OnStop() {
314 n.BaseService.OnStop()
316 log.Info("Stopping Node")
317 // TODO: gracefully disconnect from peers.
322 func (n *Node) RunForever() {
323 // Sleep forever and then...
324 cmn.TrapSignal(func() {
329 // Add the event switch to reactors, mempool, etc.
330 func SetEventSwitch(evsw types.EventSwitch, eventables ...types.Eventable) {
331 for _, e := range eventables {
332 e.SetEventSwitch(evsw)
336 // Add a Listener to accept inbound peer connections.
337 // Add listeners before starting the Node.
338 // The first listener is the primary listener (in NodeInfo)
339 func (n *Node) AddListener(l p2p.Listener) {
343 func (n *Node) Switch() *p2p.Switch {
347 func (n *Node) EventSwitch() types.EventSwitch {
351 func (n *Node) makeNodeInfo() *p2p.NodeInfo {
352 nodeInfo := &p2p.NodeInfo{
353 PubKey: n.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
354 Moniker: n.config.Moniker,
356 Version: version.Version,
358 cmn.Fmt("wire_version=%v", wire.Version),
359 cmn.Fmt("p2p_version=%v", p2p.Version),
363 if !n.sw.IsListening() {
367 p2pListener := n.sw.Listeners()[0]
368 p2pHost := p2pListener.ExternalAddress().IP.String()
369 p2pPort := p2pListener.ExternalAddress().Port
370 //rpcListenAddr := n.config.RPC.ListenAddress
372 // We assume that the rpcListener has the same ExternalAddress.
373 // This is probably true because both P2P and RPC listeners use UPnP,
374 // except of course if the rpc is only bound to localhost
375 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pHost, p2pPort)
376 //nodeInfo.Other = append(nodeInfo.Other, cmn.Fmt("rpc_addr=%v", rpcListenAddr))
380 //------------------------------------------------------------------------------
382 func (n *Node) NodeInfo() *p2p.NodeInfo {
383 return n.sw.NodeInfo()
386 func (n *Node) DialSeeds(seeds []string) error {
387 return n.sw.DialSeeds(n.addrBook, seeds)
391 func ProtocolAndAddress(listenAddr string) (string, string) {
392 protocol, address := "tcp", listenAddr
393 parts := strings.SplitN(address, "://", 2)
395 protocol, address = parts[0], parts[1]
397 return protocol, address
400 //------------------------------------------------------------------------------