package blockchain import ( "context" "fmt" "net/http" "reflect" "time" log "github.com/sirupsen/logrus" cmn "github.com/tendermint/tmlibs/common" "github.com/bytom/blockchain/accesstoken" "github.com/bytom/blockchain/account" "github.com/bytom/blockchain/asset" "github.com/bytom/blockchain/pin" "github.com/bytom/blockchain/pseudohsm" "github.com/bytom/blockchain/rpc" ctypes "github.com/bytom/blockchain/rpc/types" "github.com/bytom/blockchain/txfeed" "github.com/bytom/encoding/json" "github.com/bytom/errors" "github.com/bytom/mining/cpuminer" "github.com/bytom/net/http/httpjson" "github.com/bytom/p2p" "github.com/bytom/protocol" "github.com/bytom/protocol/bc/legacy" "github.com/bytom/types" ) const ( // BlockchainChannel is a channel for blocks and status updates BlockchainChannel = byte(0x40) defaultChannelCapacity = 100 trySyncIntervalMS = 100 statusUpdateIntervalSeconds = 10 maxBlockchainResponseSize = 22020096 + 2 crosscoreRPCPrefix = "/rpc/" ) // BlockchainReactor handles long-term catchup syncing. type BlockchainReactor struct { p2p.BaseReactor chain *protocol.Chain pinStore *pin.Store accounts *account.Manager assets *asset.Registry accesstoken *accesstoken.Token txFeeds *txfeed.TxFeed blockKeeper *blockKeeper txPool *protocol.TxPool hsm *pseudohsm.HSM mining *cpuminer.CPUMiner mux *http.ServeMux sw *p2p.Switch handler http.Handler evsw types.EventSwitch } func batchRecover(ctx context.Context, v *interface{}) { if r := recover(); r != nil { var err error if recoveredErr, ok := r.(error); ok { err = recoveredErr } else { err = fmt.Errorf("panic with %T", r) } err = errors.Wrap(err) *v = err } if *v == nil { return } // Convert errors into error responses (including errors // from recovered panics above). if err, ok := (*v).(error); ok { *v = errorFormatter.Format(err) } } func jsonHandler(f interface{}) http.Handler { h, err := httpjson.Handler(f, errorFormatter.Write) if err != nil { panic(err) } return h } func alwaysError(err error) http.Handler { return jsonHandler(func() error { return err }) } func (bcr *BlockchainReactor) ServeHTTP(rw http.ResponseWriter, req *http.Request) { bcr.handler.ServeHTTP(rw, req) } func (bcr *BlockchainReactor) info(ctx context.Context) (map[string]interface{}, error) { return map[string]interface{}{ "is_configured": false, "version": "0.001", "build_commit": "----", "build_date": "------", "build_config": "---------", }, nil } func maxBytes(h http.Handler) http.Handler { const maxReqSize = 1e7 // 10MB return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { // A block can easily be bigger than maxReqSize, but everything // else should be pretty small. if req.URL.Path != crosscoreRPCPrefix+"signer/sign-block" { req.Body = http.MaxBytesReader(w, req.Body, maxReqSize) } h.ServeHTTP(w, req) }) } func (bcr *BlockchainReactor) BuildHander() { m := bcr.mux if bcr.accounts != nil { m.Handle("/create-account", jsonHandler(bcr.createAccount)) m.Handle("/update-account-tags", jsonHandler(bcr.updateAccountTags)) m.Handle("/create-account-receiver", jsonHandler(bcr.createAccountReceiver)) m.Handle("/list-accounts", jsonHandler(bcr.listAccounts)) } else { log.Warn("Please enable wallet") } if bcr.assets != nil { m.Handle("/create-asset", jsonHandler(bcr.createAsset)) m.Handle("/update-asset-tags", jsonHandler(bcr.updateAssetTags)) m.Handle("/list-assets", jsonHandler(bcr.listAssets)) } else { log.Warn("Please enable wallet") } m.Handle("/build-transaction", jsonHandler(bcr.build)) m.Handle("/create-control-program", jsonHandler(bcr.createControlProgram)) m.Handle("/create-transaction-feed", jsonHandler(bcr.createTxFeed)) m.Handle("/get-transaction-feed", jsonHandler(bcr.getTxFeed)) m.Handle("/update-transaction-feed", jsonHandler(bcr.updateTxFeed)) m.Handle("/delete-transaction-feed", jsonHandler(bcr.deleteTxFeed)) m.Handle("/list-transaction-feeds", jsonHandler(bcr.listTxFeeds)) m.Handle("/list-transactions", jsonHandler(bcr.listTransactions)) m.Handle("/list-balances", jsonHandler(bcr.listBalances)) m.Handle("/list-unspent-outputs", jsonHandler(bcr.listUnspentOutputs)) m.Handle("/", alwaysError(errors.New("not Found"))) m.Handle("/info", jsonHandler(bcr.info)) m.Handle("/submit-transaction", jsonHandler(bcr.submit)) m.Handle("/create-access-token", jsonHandler(bcr.createAccessToken)) m.Handle("/list-access-tokens", jsonHandler(bcr.listAccessTokens)) m.Handle("/delete-access-token", jsonHandler(bcr.deleteAccessToken)) //hsm api m.Handle("/create-key", jsonHandler(bcr.pseudohsmCreateKey)) m.Handle("/list-keys", jsonHandler(bcr.pseudohsmListKeys)) m.Handle("/delete-key", jsonHandler(bcr.pseudohsmDeleteKey)) m.Handle("/sign-transactions", jsonHandler(bcr.pseudohsmSignTemplates)) m.Handle("/reset-password", jsonHandler(bcr.pseudohsmResetPassword)) m.Handle("/update-alias", jsonHandler(bcr.pseudohsmUpdateAlias)) m.Handle("/net-info", jsonHandler(bcr.getNetInfo)) latencyHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { if l := latency(m, req); l != nil { defer l.RecordSince(time.Now()) } m.ServeHTTP(w, req) }) handler := maxBytes(latencyHandler) // TODO(tessr): consider moving this to non-core specific mux bcr.handler = handler } // Used as a request object for api queries type requestQuery struct { Filter string `json:"filter,omitempty"` FilterParams []interface{} `json:"filter_params,omitempty"` SumBy []string `json:"sum_by,omitempty"` PageSize int `json:"page_size"` // AscLongPoll and Timeout are used by /list-transactions // to facilitate notifications. AscLongPoll bool `json:"ascending_with_long_poll,omitempty"` Timeout json.Duration `json:"timeout"` // After is a completely opaque cursor, indicating that only // items in the result set after the one identified by `After` // should be included. It has no relationship to time. After string `json:"after"` // These two are used for time-range queries like /list-transactions StartTimeMS uint64 `json:"start_time,omitempty"` EndTimeMS uint64 `json:"end_time,omitempty"` // This is used for point-in-time queries like /list-balances // TODO(bobg): Different request structs for endpoints with different needs TimestampMS uint64 `json:"timestamp,omitempty"` // This is used for filtering results from /list-access-tokens // Value must be "client" or "network" Type string `json:"type"` // Aliases is used to filter results from /mockshm/list-keys Aliases []string `json:"aliases,omitempty"` } // Used as a response object for api queries type page struct { Items interface{} `json:"items"` Next requestQuery `json:"next"` LastPage bool `json:"last_page"` } func NewBlockchainReactor(chain *protocol.Chain, txPool *protocol.TxPool, accounts *account.Manager, assets *asset.Registry, sw *p2p.Switch, hsm *pseudohsm.HSM, pinStore *pin.Store) *BlockchainReactor { mining := cpuminer.NewCPUMiner(chain, txPool) bcR := &BlockchainReactor{ chain: chain, pinStore: pinStore, accounts: accounts, assets: assets, blockKeeper: newBlockKeeper(chain, sw), txPool: txPool, mining: mining, mux: http.NewServeMux(), sw: sw, hsm: hsm, } bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR) return bcR } // OnStart implements BaseService func (bcR *BlockchainReactor) OnStart() error { bcR.BaseReactor.OnStart() bcR.BuildHander() bcR.mining.Start() go bcR.syncRoutine() return nil } // OnStop implements BaseService func (bcR *BlockchainReactor) OnStop() { bcR.BaseReactor.OnStop() } // GetChannels implements Reactor func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor { return []*p2p.ChannelDescriptor{ &p2p.ChannelDescriptor{ ID: BlockchainChannel, Priority: 5, SendQueueCapacity: 100, }, } } // AddPeer implements Reactor by sending our state to peer. func (bcR *BlockchainReactor) AddPeer(peer *p2p.Peer) { peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}}) } // RemovePeer implements Reactor by removing peer from the pool. func (bcR *BlockchainReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { bcR.blockKeeper.RemovePeer(peer.Key) } // Receive implements Reactor by handling 4 types of messages (look below). func (bcR *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) { _, msg, err := DecodeMessage(msgBytes) if err != nil { log.Errorf("Error decoding messagek %v", err) return } log.WithFields(log.Fields{"peerID": src.Key, "msg": msg}).Info("Receive request") switch msg := msg.(type) { case *BlockRequestMessage: var block *legacy.Block var err error if msg.Height != 0 { block, err = bcR.chain.GetBlockByHeight(msg.Height) } else { block, err = bcR.chain.GetBlockByHash(msg.GetHash()) } if err != nil { log.Errorf("Fail on BlockRequestMessage get block: %v", err) return } response, err := NewBlockResponseMessage(block) if err != nil { log.Errorf("Fail on BlockRequestMessage create resoinse: %v", err) return } src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{response}) case *BlockResponseMessage: bcR.blockKeeper.AddBlock(msg.GetBlock(), src.Key) case *StatusRequestMessage: block, _ := bcR.chain.State() src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{NewStatusResponseMessage(block)}) case *StatusResponseMessage: bcR.blockKeeper.SetPeerHeight(src.Key, msg.Height, msg.GetHash()) case *TransactionNotifyMessage: tx := msg.GetTransaction() if err := bcR.chain.ValidateTx(tx); err != nil { log.Errorf("TransactionNotifyMessage: %v", err) } default: log.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg))) } } // Handle messages from the poolReactor telling the reactor what to do. // NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down! // (Except for the SYNC_LOOP, which is the primary purpose and must be synchronous.) func (bcR *BlockchainReactor) syncRoutine() { statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second) newTxCh := bcR.txPool.GetNewTxCh() for { select { case newTx := <-newTxCh: go bcR.BroadcastTransaction(newTx) case _ = <-statusUpdateTicker.C: go bcR.BroadcastStatusResponse() case <-bcR.Quit: return } } } func (bcR *BlockchainReactor) getNetInfo() (*ctypes.ResultNetInfo, error) { return rpc.NetInfo(bcR.sw) } // BroadcastStatusRequest broadcasts `BlockStore` height. func (bcR *BlockchainReactor) BroadcastStatusResponse() { block, _ := bcR.chain.State() bcR.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{NewStatusResponseMessage(block)}) } func (bcR *BlockchainReactor) BroadcastTransaction(tx *legacy.Tx) error { msg, err := NewTransactionNotifyMessage(tx) if err != nil { return err } bcR.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{msg}) return nil }