From 9b686b66e503a1cd0e54cb7ba2a64578e9cfbdc2 Mon Sep 17 00:00:00 2001 From: HAOYUatHZ Date: Wed, 5 Jun 2019 15:54:40 +0800 Subject: [PATCH] feat: init service --- federation/service/node.go | 98 +++++++++++++++++++++++++++ federation/service/ws_client.go | 142 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 240 insertions(+) create mode 100644 federation/service/node.go create mode 100644 federation/service/ws_client.go diff --git a/federation/service/node.go b/federation/service/node.go new file mode 100644 index 00000000..b1b55f6b --- /dev/null +++ b/federation/service/node.go @@ -0,0 +1,98 @@ +package service + +import ( + "encoding/json" + + // "github.com/bytom/errors" + // "github.com/bytom/protocol/bc" + // "github.com/bytom/protocol/bc/types" + + "github.com/blockcenter/util" +) + +// Node can invoke the api which provide by the full node server +type Node struct { + ip string +} + +// Node create a api client with target server +func NewNode(ip string) *Node { + return &Node{ip: ip} +} + +func (n *Node) GetBlockByHash(hash string) (*types.Block, *bc.TransactionStatus, error) { + return n.getRawBlock(&getRawBlockReq{BlockHash: hash}) +} + +func (n *Node) GetBlockByHeight(height uint64) (*types.Block, *bc.TransactionStatus, error) { + return n.getRawBlock(&getRawBlockReq{BlockHeight: height}) +} + +type getBlockCountResp struct { + BlockCount uint64 `json:"block_count"` +} + +func (n *Node) GetBlockCount() (uint64, error) { + url := "/get-block-count" + res := &getBlockCountResp{} + return res.BlockCount, n.request(url, nil, res) +} + +type getRawBlockReq struct { + BlockHeight uint64 `json:"block_height"` + BlockHash string `json:"block_hash"` +} + +type getRawBlockResp struct { + RawBlock *types.Block `json:"raw_block"` + TransactionStatus *bc.TransactionStatus `json:"transaction_status"` +} + +func (n *Node) getRawBlock(req *getRawBlockReq) (*types.Block, *bc.TransactionStatus, error) { + url := "/get-raw-block" + payload, err := json.Marshal(req) + if err != nil { + return nil, nil, errors.Wrap(err, "json marshal") + } + + res := &getRawBlockResp{} + return res.RawBlock, res.TransactionStatus, n.request(url, payload, res) +} + +type submitTxReq struct { + Tx *types.Tx `json:"raw_transaction"` +} + +type submitTxResp struct { + TxID string `json:"tx_id"` +} + +func (n *Node) SubmitTx(tx *types.Tx) (string, error) { + url := "/submit-transaction" + payload, err := json.Marshal(submitTxReq{Tx: tx}) + if err != nil { + return "", errors.Wrap(err, "json marshal") + } + + res := &submitTxResp{} + return res.TxID, n.request(url, payload, res) +} + +type response struct { + Status string `json:"status"` + Data json.RawMessage `json:"data"` + ErrDetail string `json:"error_detail"` +} + +func (n *Node) request(url string, payload []byte, respData interface{}) error { + resp := &response{} + if err := util.Post(n.ip+url, payload, resp); err != nil { + return err + } + + if resp.Status != "success" { + return errors.New(resp.ErrDetail) + } + + return json.Unmarshal(resp.Data, respData) +} diff --git a/federation/service/ws_client.go b/federation/service/ws_client.go new file mode 100644 index 00000000..c955b38d --- /dev/null +++ b/federation/service/ws_client.go @@ -0,0 +1,142 @@ +package service + +import ( + "encoding/json" + "errors" + "net/url" + "time" + + "github.com/gorilla/websocket" + log "github.com/sirupsen/logrus" +) + +const ( + apiPath = "/websocket-subscribe" + + // TopicNotifyNewTransactions is a topic can be subscribed, when a new valid transaction is incoming, the client will be notified. + TopicNotifyNewTransactions = "notify_new_transactions" + + // ResponseNewTransaction is a notification type indicate a new transaction is incomming. + ResponseNewTransaction = "new_transaction" +) + +// WSClient establish a websocket connection with websocket server, +// which can subscribe topics, and receive the corresponding message. +type WSClient struct { + conn *websocket.Conn + processCh chan *WSResponse + remoteAddr string + closed bool + subscribeTopics []string +} + +// NewWSClient create a new websocket client +func NewWSClient(addr string, processCh chan *WSResponse) *WSClient { + return &WSClient{remoteAddr: addr, processCh: processCh} +} + +// Connect establish a websocket connection with websocket server, +// and listen to the arrive message +func (w *WSClient) Connect() error { + if err := w.tryConnect(); err != nil { + return err + } + + go w.listen() + return nil +} + +func (w *WSClient) tryConnect() error { + u := url.URL{Scheme: "ws", Host: w.remoteAddr, Path: apiPath} + conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil) + w.conn = conn + return err +} + +func (w *WSClient) reconnect() bool { + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + for range ticker.C { + if w.closed { + return false + } + if err := w.tryConnect(); err != nil { + log.WithField("err", err).Error("reconnect websocket server fail") + continue + } + for _, topic := range w.subscribeTopics { + if err := w.Subscribe(topic); err != nil { + log.WithField("err", err).WithField("topic", topic).Error("subscribe topic fail") + return false + } + } + return true + } + return false +} + +// Close remove the websocket connection +func (w *WSClient) Close() error { + w.closed = true + return w.conn.Close() +} + +// wsRequest means the data structure of the request +type wsRequest struct { + Topic string `json:"topic"` +} + +// WSResponse means the returned data structure +type WSResponse struct { + NotificationType string `json:"notification_type"` + Data json.RawMessage `json:"data"` + ErrorDetail string `json:"error_detail,omitempty"` +} + +func (w *WSClient) Subscribe(topic string) error { + if w.conn == nil { + return errors.New("must connect the ws server before subscribe") + } + req := &wsRequest{Topic: topic} + msg, err := json.Marshal(req) + if err != nil { + return err + } + + if err := w.conn.WriteMessage(websocket.TextMessage, msg); err != nil { + return err + } + + w.subscribeTopics = append(w.subscribeTopics, topic) + return nil +} + +func (w *WSClient) listen() { + for { + _, msg, err := w.conn.ReadMessage() + if err != nil { + log.WithField("err", err).Error("read message error") + if w.closed { + break + } + switch err.(type) { + case *websocket.CloseError: + if w.reconnect() { + continue + } + break + default: + continue + } + } + + resp := &WSResponse{} + if err = json.Unmarshal(msg, resp); err != nil { + log.WithField("err", err).Error("Unmarshal error") + continue + } + + w.processCh <- resp + } +} -- 2.11.0