From f97707db4cdb9eea577fd6355848f861547cb038 Mon Sep 17 00:00:00 2001 From: gguoss <1536310027@qq.com> Date: Tue, 22 Aug 2017 14:15:07 +0800 Subject: [PATCH] Added blockchain/rpc. --- blockchain/rpc.go | 72 ++++++++++++++++++++ blockchain/rpc/rpc.go | 167 +++++++++++++++++++++++++++++++++++++++++++++++ blockchain/txdb/store.go | 9 ++- 3 files changed, 247 insertions(+), 1 deletion(-) create mode 100644 blockchain/rpc.go create mode 100644 blockchain/rpc/rpc.go diff --git a/blockchain/rpc.go b/blockchain/rpc.go new file mode 100644 index 00000000..321a7690 --- /dev/null +++ b/blockchain/rpc.go @@ -0,0 +1,72 @@ +package blockchain + +import ( + "context" +// "encoding/json" +// "net/http" + + chainjson "github.com/bytom/encoding/json" + "github.com/bytom/errors" +// "github.com/bytom/net/http/httpjson" + "github.com/bytom/protocol/bc" +) + +// getBlockRPC returns the block at the requested height. +// If successful, it always returns at least one block, +// waiting if necessary until one is created. +// It is an error to request blocks very far in the future. +func (a *BlockchainReactor) getBlockRPC(ctx context.Context, height uint64) (chainjson.HexBytes, error) { + err := <-a.chain.BlockSoonWaiter(ctx, height) + if err != nil { + return nil, errors.Wrapf(err, "waiting for block at height %d", height) + } + + rawBlock, err := a.store.GetRawBlock(height) + if err != nil { + return nil, err + } + + return rawBlock, nil +} + +type snapshotInfoResp struct { + Height uint64 `json:"height"` + Size uint64 `json:"size"` + BlockchainID bc.Hash `json:"blockchain_id"` +} + +/* +func (a *BlockchainReactor) getSnapshotInfoRPC(ctx context.Context) (resp snapshotInfoResp, err error) { + // TODO(jackson): cache latest snapshot and its height & size in-memory. + resp.Height, resp.Size, err = a.store.LatestSnapshotInfo(ctx) + resp.BlockchainID = *a.config.BlockchainId + return resp, err +} + +// getSnapshotRPC returns the raw protobuf snapshot at the provided height. +// Non-generators can call this endpoint to get raw data +// that they can use to populate their own snapshot table. +// +// This handler doesn't use the httpjson.Handler format so that it can return +// raw protobuf bytes on the wire. +func (a *BlockchainReactor) getSnapshotRPC(rw http.ResponseWriter, req *http.Request) { + if a.config == nil { + alwaysError(errUnconfigured).ServeHTTP(rw, req) + return + } + + var height uint64 + err := json.NewDecoder(req.Body).Decode(&height) + if err != nil { + errorFormatter.Write(req.Context(), rw, httpjson.ErrBadRequest) + return + } + + data, err := a.store.GetSnapshot(req.Context(), height) + if err != nil { + errorFormatter.Write(req.Context(), rw, err) + return + } + rw.Header().Set("Content-Type", "application/x-protobuf") + rw.Write(data) +}*/ diff --git a/blockchain/rpc/rpc.go b/blockchain/rpc/rpc.go new file mode 100644 index 00000000..76a10169 --- /dev/null +++ b/blockchain/rpc/rpc.go @@ -0,0 +1,167 @@ +// Package rpc implements Chain Core's RPC client. +package rpc + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "strings" + "time" + + "github.com/bytom/errors" + "github.com/bytom/net/http/httperror" + "github.com/bytom/net/http/reqid" +) + +// Chain-specific header fields +const ( + HeaderBlockchainID = "Blockchain-ID" + HeaderCoreID = "Chain-Core-ID" + HeaderTimeout = "RPC-Timeout" +) + +// ErrWrongNetwork is returned when a peer's blockchain ID differs from +// the RPC client's blockchain ID. +var ErrWrongNetwork = errors.New("connected to a peer on a different network") + +// A Client is a Chain RPC client. It performs RPCs over HTTP using JSON +// request and responses. A Client must be configured with a secret token +// to authenticate with other Cores on the network. +type Client struct { + BaseURL string + AccessToken string + Username string + BuildTag string + BlockchainID string + CoreID string + + // If set, Client is used for outgoing requests. + // TODO(kr): make this required (crash on nil) + Client *http.Client +} + +func (c Client) userAgent() string { + return fmt.Sprintf("Chain; process=%s; buildtag=%s; blockchainID=%s", + c.Username, c.BuildTag, c.BlockchainID) +} + +// ErrStatusCode is an error returned when an rpc fails with a non-200 +// response code. +type ErrStatusCode struct { + URL string + StatusCode int + ErrorData *httperror.Response +} + +func (e ErrStatusCode) Error() string { + return fmt.Sprintf("Request to `%s` responded with %d %s", + e.URL, e.StatusCode, http.StatusText(e.StatusCode)) +} + +// Call calls a remote procedure on another node, specified by the path. +func (c *Client) Call(ctx context.Context, path string, request, response interface{}) error { + r, err := c.CallRaw(ctx, path, request) + if err != nil { + return err + } + defer r.Close() + if response != nil { + err = errors.Wrap(json.NewDecoder(r).Decode(response)) + } + return err +} + +// CallRaw calls a remote procedure on another node, specified by the path. It +// returns a io.ReadCloser of the raw response body. +func (c *Client) CallRaw(ctx context.Context, path string, request interface{}) (io.ReadCloser, error) { + u, err := url.Parse(c.BaseURL) + if err != nil { + return nil, errors.Wrap(err) + } + u.Path = path + + var bodyReader io.Reader + if request != nil { + var jsonBody bytes.Buffer + if err := json.NewEncoder(&jsonBody).Encode(request); err != nil { + return nil, errors.Wrap(err) + } + bodyReader = &jsonBody + } + + req, err := http.NewRequest("POST", u.String(), bodyReader) + if err != nil { + return nil, errors.Wrap(err) + } + + if c.AccessToken != "" { + var username, password string + toks := strings.SplitN(c.AccessToken, ":", 2) + if len(toks) > 0 { + username = toks[0] + } + if len(toks) > 1 { + password = toks[1] + } + req.SetBasicAuth(username, password) + } + + // Propagate our request ID so that we can trace a request across nodes. + req.Header.Add("Request-ID", reqid.FromContext(ctx)) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("User-Agent", c.userAgent()) + req.Header.Set(HeaderBlockchainID, c.BlockchainID) + req.Header.Set(HeaderCoreID, c.CoreID) + + // Propagate our deadline if we have one. + deadline, ok := ctx.Deadline() + if ok { + req.Header.Set(HeaderTimeout, deadline.Sub(time.Now()).String()) + } + + client := c.Client + if client == nil { + client = http.DefaultClient + } + resp, err := client.Do(req.WithContext(ctx)) + if err != nil && ctx.Err() != nil { // check if it timed out + return nil, errors.Wrap(ctx.Err()) + } else if err != nil { + return nil, errors.Wrap(err) + } + + if id := resp.Header.Get(HeaderBlockchainID); c.BlockchainID != "" && id != "" && c.BlockchainID != id { + resp.Body.Close() + return nil, errors.Wrap(ErrWrongNetwork) + } + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + defer resp.Body.Close() + + resErr := ErrStatusCode{ + URL: cleanedURLString(u), + StatusCode: resp.StatusCode, + } + + // Attach formatted error message, if available + var errData httperror.Response + err := json.NewDecoder(resp.Body).Decode(&errData) + if err == nil && errData.ChainCode != "" { + resErr.ErrorData = &errData + } + + return nil, resErr + } + + return resp.Body, nil +} + +func cleanedURLString(u *url.URL) string { + var dup url.URL = *u + dup.User = nil + return dup.String() +} diff --git a/blockchain/txdb/store.go b/blockchain/txdb/store.go index fa0ee8d8..e6d199aa 100644 --- a/blockchain/txdb/store.go +++ b/blockchain/txdb/store.go @@ -11,7 +11,6 @@ import ( "github.com/bytom/protocol/state" dbm "github.com/tendermint/tmlibs/db" . "github.com/tendermint/tmlibs/common" - ) // A Store encapsulates storage for blockchain validation. @@ -102,6 +101,14 @@ func (s *Store) GetBlock(height uint64) (*legacy.Block, error) { return s.cache.lookup(height) } +func (s *Store) GetRawBlock(height uint64) ([]byte, error) { + bytez := s.db.Get(calcBlockKey(height)) + if bytez == nil { + return nil , errors.New("querying blocks from the db null") + } + return bytez, nil +} + // LatestSnapshot returns the most recent state snapshot stored in // the database and its corresponding block height. func (s *Store) LatestSnapshot(ctx context.Context) (*state.Snapshot, uint64, error) { -- 2.11.0