14 "github.com/kr/secureheader"
15 log "github.com/sirupsen/logrus"
16 crypto "github.com/tendermint/go-crypto"
17 wire "github.com/tendermint/go-wire"
18 cmn "github.com/tendermint/tmlibs/common"
19 dbm "github.com/tendermint/tmlibs/db"
21 bc "github.com/bytom/blockchain"
22 "github.com/bytom/blockchain/account"
23 "github.com/bytom/blockchain/asset"
24 "github.com/bytom/blockchain/pin"
25 "github.com/bytom/blockchain/pseudohsm"
26 "github.com/bytom/blockchain/txdb"
27 cfg "github.com/bytom/config"
28 "github.com/bytom/consensus"
29 "github.com/bytom/env"
30 "github.com/bytom/errors"
31 "github.com/bytom/p2p"
32 "github.com/bytom/protocol"
33 "github.com/bytom/protocol/bc/legacy"
34 "github.com/bytom/types"
35 "github.com/bytom/version"
39 httpReadTimeout = 2 * time.Minute
40 httpWriteTimeout = time.Hour
50 privKey crypto.PrivKeyEd25519 // local node's p2p key
51 sw *p2p.Switch // p2p connections
52 addrBook *p2p.AddrBook // known peers
54 evsw types.EventSwitch // pub/sub for services
55 blockStore *txdb.Store
56 bcReactor *bc.BlockchainReactor
57 accounts *account.Manager
58 assets *asset.Registry
63 rootCAs = env.String("ROOT_CA_CERTS", "") // file path
64 splunkAddr = os.Getenv("SPLUNKADDR")
65 logFile = os.Getenv("LOGFILE")
66 logSize = env.Int("LOGSIZE", 5e6) // 5MB
67 logCount = env.Int("LOGCOUNT", 9)
68 logQueries = env.Bool("LOG_QUERIES", false)
69 maxDBConns = env.Int("MAXDBCONNS", 10) // set to 100 in prod
70 rpsToken = env.Int("RATELIMIT_TOKEN", 0) // reqs/sec
71 rpsRemoteAddr = env.Int("RATELIMIT_REMOTE_ADDR", 0) // reqs/sec
72 indexTxs = env.Bool("INDEX_TRANSACTIONS", true)
73 home = bc.HomeDirFromEnvironment()
74 bootURL = env.String("BOOTURL", "")
75 // build vars; initialized by the linker
79 race []interface{} // initialized in race.go
82 func NewNodeDefault(config *cfg.Config) *Node {
83 return NewNode(config)
86 func RedirectHandler(next http.Handler) http.Handler {
87 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
88 if req.URL.Path == "/" {
89 http.Redirect(w, req, "/dashboard/", http.StatusFound)
92 next.ServeHTTP(w, req)
96 type waitHandler struct {
101 func (wh *waitHandler) Set(h http.Handler) {
106 func (wh *waitHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
108 wh.h.ServeHTTP(w, req)
111 func rpcInit(h *bc.BlockchainReactor, config *cfg.Config) {
112 // The waitHandler accepts incoming requests, but blocks until its underlying
113 // handler is set, when the second phase is complete.
114 var coreHandler waitHandler
115 coreHandler.wg.Add(1)
116 mux := http.NewServeMux()
117 mux.Handle("/", &coreHandler)
119 var handler http.Handler = mux
120 handler = RedirectHandler(handler)
122 secureheader.DefaultConfig.PermitClearLoopback = true
123 secureheader.DefaultConfig.HTTPSRedirect = false
124 secureheader.DefaultConfig.Next = handler
126 server := &http.Server{
127 // Note: we should not set TLSConfig here;
128 // we took care of TLS with the listener in maybeUseTLS.
129 Handler: secureheader.DefaultConfig,
130 ReadTimeout: httpReadTimeout,
131 WriteTimeout: httpWriteTimeout,
132 // Disable HTTP/2 for now until the Go implementation is more stable.
133 // https://github.com/golang/go/issues/16450
134 // https://github.com/golang/go/issues/17071
135 TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){},
137 listenAddr := env.String("LISTEN", config.ApiAddress)
138 listener, err := net.Listen("tcp", *listenAddr)
140 cmn.Exit(cmn.Fmt("Failed to register tcp port: %v", err))
143 // The `Serve` call has to happen in its own goroutine because
144 // it's blocking and we need to proceed to the rest of the core setup after
147 if err := server.Serve(listener); err != nil {
148 log.WithField("error", errors.Wrap(err, "Serve")).Error("Rpc server")
154 func NewNode(config *cfg.Config) *Node {
155 ctx := context.Background()
158 txDB := dbm.NewDB("txdb", config.DBBackend, config.DBDir())
159 store := txdb.NewStore(txDB)
161 privKey := crypto.GenPrivKeyEd25519()
164 eventSwitch := types.NewEventSwitch()
165 _, err := eventSwitch.Start()
167 cmn.Exit(cmn.Fmt("Failed to start switch: %v", err))
170 sw := p2p.NewSwitch(config.P2P)
172 genesisBlock := &legacy.Block{
173 BlockHeader: legacy.BlockHeader{},
174 Transactions: []*legacy.Tx{},
176 genesisBlock.UnmarshalText(consensus.InitBlock())
178 txPool := protocol.NewTxPool()
179 chain, err := protocol.NewChain(genesisBlock.Hash(), store, txPool)
181 cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err))
184 if chain.Height() == 0 {
185 if err := chain.SaveBlock(genesisBlock); err != nil {
186 cmn.Exit(cmn.Fmt("Failed to save genesisBlock to store: %v", err))
188 if err := chain.ConnectBlock(genesisBlock); err != nil {
189 cmn.Exit(cmn.Fmt("Failed to connect genesisBlock to chain: %v", err))
193 var accounts *account.Manager = nil
194 var assets *asset.Registry = nil
195 var pinStore *pin.Store = nil
197 if config.Wallet.Enable {
198 accountsDB := dbm.NewDB("account", config.DBBackend, config.DBDir())
199 accUTXODB := dbm.NewDB("accountutxos", config.DBBackend, config.DBDir())
200 pinStore = pin.NewStore(accUTXODB)
201 if err = pinStore.LoadAll(ctx); err != nil {
202 log.WithField("error", err).Error("load pin store")
206 pinHeight := chain.Height()
208 pinHeight = pinHeight - 1
211 pins := []string{account.PinName, account.DeleteSpentsPinName}
212 for _, p := range pins {
213 if err = pinStore.CreatePin(ctx, p, pinHeight); err != nil {
214 log.WithField("error", err).Error("Create pin")
218 accounts = account.NewManager(accountsDB, chain, pinStore)
219 go accounts.ProcessBlocks(ctx)
221 assetsDB := dbm.NewDB("asset", config.DBBackend, config.DBDir())
222 assets = asset.NewRegistry(assetsDB, chain)
226 if config.HsmUrl != ""{
228 cmn.Exit(cmn.Fmt("not implement"))
230 hsm, err = pseudohsm.New(config.KeysDir())
232 cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
236 hsm, err := pseudohsm.New(config.KeysDir())
238 cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
240 bcReactor := bc.NewBlockchainReactor(chain, txPool, accounts, assets, sw, hsm, pinStore)
242 sw.AddReactor("BLOCKCHAIN", bcReactor)
244 rpcInit(bcReactor, config)
245 // Optionally, start the pex reactor
246 var addrBook *p2p.AddrBook
247 if config.P2P.PexReactor {
248 addrBook = p2p.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
249 pexReactor := p2p.NewPEXReactor(addrBook)
250 sw.AddReactor("PEX", pexReactor)
253 // add the event switch to all services
254 // they should all satisfy events.Eventable
255 //SetEventSwitch(eventSwitch, bcReactor, mempoolReactor, consensusReactor)
257 // run the profile server
258 profileHost := config.ProfListenAddress
259 if profileHost != "" {
262 log.WithField("error", http.ListenAndServe(profileHost, nil)).Error("Profile server")
274 bcReactor: bcReactor,
279 node.BaseService = *cmn.NewBaseService(nil, "Node", node)
283 func (n *Node) OnStart() error {
284 // Create & add listener
285 protocol, address := ProtocolAndAddress(n.config.P2P.ListenAddress)
286 l := p2p.NewDefaultListener(protocol, address, n.config.P2P.SkipUPNP, nil)
290 n.sw.SetNodeInfo(n.makeNodeInfo())
291 n.sw.SetNodePrivKey(n.privKey)
292 _, err := n.sw.Start()
297 // If seeds exist, add them to the address book and dial out
298 if n.config.P2P.Seeds != "" {
300 seeds := strings.Split(n.config.P2P.Seeds, ",")
301 if err := n.DialSeeds(seeds); err != nil {
308 func (n *Node) OnStop() {
309 n.BaseService.OnStop()
311 log.Info("Stopping Node")
312 // TODO: gracefully disconnect from peers.
317 func (n *Node) RunForever() {
318 // Sleep forever and then...
319 cmn.TrapSignal(func() {
324 // Add the event switch to reactors, mempool, etc.
325 func SetEventSwitch(evsw types.EventSwitch, eventables ...types.Eventable) {
326 for _, e := range eventables {
327 e.SetEventSwitch(evsw)
331 // Add a Listener to accept inbound peer connections.
332 // Add listeners before starting the Node.
333 // The first listener is the primary listener (in NodeInfo)
334 func (n *Node) AddListener(l p2p.Listener) {
338 func (n *Node) Switch() *p2p.Switch {
342 func (n *Node) EventSwitch() types.EventSwitch {
346 func (n *Node) makeNodeInfo() *p2p.NodeInfo {
347 nodeInfo := &p2p.NodeInfo{
348 PubKey: n.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
349 Moniker: n.config.Moniker,
351 Version: version.Version,
353 cmn.Fmt("wire_version=%v", wire.Version),
354 cmn.Fmt("p2p_version=%v", p2p.Version),
358 if !n.sw.IsListening() {
362 p2pListener := n.sw.Listeners()[0]
363 p2pHost := p2pListener.ExternalAddress().IP.String()
364 p2pPort := p2pListener.ExternalAddress().Port
365 //rpcListenAddr := n.config.RPC.ListenAddress
367 // We assume that the rpcListener has the same ExternalAddress.
368 // This is probably true because both P2P and RPC listeners use UPnP,
369 // except of course if the rpc is only bound to localhost
370 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pHost, p2pPort)
371 //nodeInfo.Other = append(nodeInfo.Other, cmn.Fmt("rpc_addr=%v", rpcListenAddr))
375 //------------------------------------------------------------------------------
377 func (n *Node) NodeInfo() *p2p.NodeInfo {
378 return n.sw.NodeInfo()
381 func (n *Node) DialSeeds(seeds []string) error {
382 return n.sw.DialSeeds(n.addrBook, seeds)
386 func ProtocolAndAddress(listenAddr string) (string, string) {
387 protocol, address := "tcp", listenAddr
388 parts := strings.SplitN(address, "://", 2)
390 protocol, address = parts[0], parts[1]
392 return protocol, address
395 //------------------------------------------------------------------------------