9 context "golang.org/x/net/context"
10 grpc "google.golang.org/grpc"
12 "github.com/tendermint/abci/types"
13 cmn "github.com/tendermint/tmlibs/common"
16 // A stripped copy of the remoteClient that makes
17 // synchronous calls using grpc
18 type grpcClient struct {
22 client types.ABCIApplicationClient
27 resCb func(*types.Request, *types.Response) // listens to all callbacks
30 func NewGRPCClient(addr string, mustConnect bool) *grpcClient {
33 mustConnect: mustConnect,
35 cli.BaseService = *cmn.NewBaseService(nil, "grpcClient", cli)
39 func dialerFunc(addr string, timeout time.Duration) (net.Conn, error) {
40 return cmn.Connect(addr)
43 func (cli *grpcClient) OnStart() error {
44 if err := cli.BaseService.OnStart(); err != nil {
50 conn, err := grpc.Dial(cli.addr, grpc.WithInsecure(), grpc.WithDialer(dialerFunc))
55 cli.Logger.Error(fmt.Sprintf("abci.grpcClient failed to connect to %v. Retrying...\n", cli.addr))
56 time.Sleep(time.Second * 3)
60 client := types.NewABCIApplicationClient(conn)
64 _, err := client.Echo(context.Background(), &types.RequestEcho{"hello"}, grpc.FailFast(true))
66 break ENSURE_CONNECTED
68 time.Sleep(time.Second)
76 func (cli *grpcClient) OnStop() {
77 cli.BaseService.OnStop()
79 defer cli.mtx.Unlock()
80 // TODO: how to close conn? its not a net.Conn and grpc doesn't expose a Close()
81 /*if cli.conn != nil {
86 func (cli *grpcClient) StopForError(err error) {
97 cli.Logger.Error(fmt.Sprintf("Stopping abci.grpcClient for error: %v", err.Error()))
101 func (cli *grpcClient) Error() error {
103 defer cli.mtx.Unlock()
107 // Set listener for all responses
108 // NOTE: callback may get internally generated flush responses.
109 func (cli *grpcClient) SetResponseCallback(resCb Callback) {
111 defer cli.mtx.Unlock()
115 //----------------------------------------
116 // GRPC calls are synchronous, but some callbacks expect to be called asynchronously
117 // (eg. the mempool expects to be able to lock to remove bad txs from cache).
118 // To accommodate, we finish each call in its own go-routine,
119 // which is expensive, but easy - if you want something better, use the socket protocol!
120 // maybe one day, if people really want it, we use grpc streams,
121 // but hopefully not :D
123 func (cli *grpcClient) EchoAsync(msg string) *ReqRes {
124 req := types.ToRequestEcho(msg)
125 res, err := cli.client.Echo(context.Background(), req.GetEcho(), grpc.FailFast(true))
127 cli.StopForError(err)
129 return cli.finishAsyncCall(req, &types.Response{&types.Response_Echo{res}})
132 func (cli *grpcClient) FlushAsync() *ReqRes {
133 req := types.ToRequestFlush()
134 res, err := cli.client.Flush(context.Background(), req.GetFlush(), grpc.FailFast(true))
136 cli.StopForError(err)
138 return cli.finishAsyncCall(req, &types.Response{&types.Response_Flush{res}})
141 func (cli *grpcClient) InfoAsync(params types.RequestInfo) *ReqRes {
142 req := types.ToRequestInfo(params)
143 res, err := cli.client.Info(context.Background(), req.GetInfo(), grpc.FailFast(true))
145 cli.StopForError(err)
147 return cli.finishAsyncCall(req, &types.Response{&types.Response_Info{res}})
150 func (cli *grpcClient) SetOptionAsync(key string, value string) *ReqRes {
151 req := types.ToRequestSetOption(key, value)
152 res, err := cli.client.SetOption(context.Background(), req.GetSetOption(), grpc.FailFast(true))
154 cli.StopForError(err)
156 return cli.finishAsyncCall(req, &types.Response{&types.Response_SetOption{res}})
159 func (cli *grpcClient) DeliverTxAsync(tx []byte) *ReqRes {
160 req := types.ToRequestDeliverTx(tx)
161 res, err := cli.client.DeliverTx(context.Background(), req.GetDeliverTx(), grpc.FailFast(true))
163 cli.StopForError(err)
165 return cli.finishAsyncCall(req, &types.Response{&types.Response_DeliverTx{res}})
168 func (cli *grpcClient) CheckTxAsync(tx []byte) *ReqRes {
169 req := types.ToRequestCheckTx(tx)
170 res, err := cli.client.CheckTx(context.Background(), req.GetCheckTx(), grpc.FailFast(true))
172 cli.StopForError(err)
174 return cli.finishAsyncCall(req, &types.Response{&types.Response_CheckTx{res}})
177 func (cli *grpcClient) QueryAsync(params types.RequestQuery) *ReqRes {
178 req := types.ToRequestQuery(params)
179 res, err := cli.client.Query(context.Background(), req.GetQuery(), grpc.FailFast(true))
181 cli.StopForError(err)
183 return cli.finishAsyncCall(req, &types.Response{&types.Response_Query{res}})
186 func (cli *grpcClient) CommitAsync() *ReqRes {
187 req := types.ToRequestCommit()
188 res, err := cli.client.Commit(context.Background(), req.GetCommit(), grpc.FailFast(true))
190 cli.StopForError(err)
192 return cli.finishAsyncCall(req, &types.Response{&types.Response_Commit{res}})
195 func (cli *grpcClient) InitChainAsync(params types.RequestInitChain) *ReqRes {
196 req := types.ToRequestInitChain(params)
197 res, err := cli.client.InitChain(context.Background(), req.GetInitChain(), grpc.FailFast(true))
199 cli.StopForError(err)
201 return cli.finishAsyncCall(req, &types.Response{&types.Response_InitChain{res}})
204 func (cli *grpcClient) BeginBlockAsync(params types.RequestBeginBlock) *ReqRes {
205 req := types.ToRequestBeginBlock(params)
206 res, err := cli.client.BeginBlock(context.Background(), req.GetBeginBlock(), grpc.FailFast(true))
208 cli.StopForError(err)
210 return cli.finishAsyncCall(req, &types.Response{&types.Response_BeginBlock{res}})
213 func (cli *grpcClient) EndBlockAsync(height uint64) *ReqRes {
214 req := types.ToRequestEndBlock(height)
215 res, err := cli.client.EndBlock(context.Background(), req.GetEndBlock(), grpc.FailFast(true))
217 cli.StopForError(err)
219 return cli.finishAsyncCall(req, &types.Response{&types.Response_EndBlock{res}})
222 func (cli *grpcClient) finishAsyncCall(req *types.Request, res *types.Response) *ReqRes {
223 reqres := NewReqRes(req)
224 reqres.Response = res // Set response
225 reqres.Done() // Release waiters
226 reqres.SetDone() // so reqRes.SetCallback will run the callback
228 // go routine for callbacks
230 // Notify reqRes listener if set
231 if cb := reqres.GetCallback(); cb != nil {
235 // Notify client listener if set
236 if cli.resCb != nil {
237 cli.resCb(reqres.Request, res)
243 func (cli *grpcClient) checkErrGetResult() types.Result {
244 if err := cli.Error(); err != nil {
245 // StopForError should already have been called if error is set
246 return types.ErrInternalError.SetLog(err.Error())
248 return types.Result{}
251 //----------------------------------------
253 func (cli *grpcClient) EchoSync(msg string) (res types.Result) {
254 reqres := cli.EchoAsync(msg)
255 if res := cli.checkErrGetResult(); res.IsErr() {
258 resp := reqres.Response.GetEcho()
259 return types.NewResultOK([]byte(resp.Message), "")
262 func (cli *grpcClient) FlushSync() error {
266 func (cli *grpcClient) InfoSync(req types.RequestInfo) (resInfo types.ResponseInfo, err error) {
267 reqres := cli.InfoAsync(req)
268 if err = cli.Error(); err != nil {
271 if info := reqres.Response.GetInfo(); info != nil {
277 func (cli *grpcClient) SetOptionSync(key string, value string) (res types.Result) {
278 reqres := cli.SetOptionAsync(key, value)
279 if res := cli.checkErrGetResult(); res.IsErr() {
282 resp := reqres.Response.GetSetOption()
283 return types.Result{Code: OK, Data: nil, Log: resp.Log}
286 func (cli *grpcClient) DeliverTxSync(tx []byte) (res types.Result) {
287 reqres := cli.DeliverTxAsync(tx)
288 if res := cli.checkErrGetResult(); res.IsErr() {
291 resp := reqres.Response.GetDeliverTx()
292 return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
295 func (cli *grpcClient) CheckTxSync(tx []byte) (res types.Result) {
296 reqres := cli.CheckTxAsync(tx)
297 if res := cli.checkErrGetResult(); res.IsErr() {
300 resp := reqres.Response.GetCheckTx()
301 return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
304 func (cli *grpcClient) QuerySync(reqQuery types.RequestQuery) (resQuery types.ResponseQuery, err error) {
305 reqres := cli.QueryAsync(reqQuery)
306 if err = cli.Error(); err != nil {
309 if resQuery_ := reqres.Response.GetQuery(); resQuery_ != nil {
310 return *resQuery_, nil
315 func (cli *grpcClient) CommitSync() (res types.Result) {
316 reqres := cli.CommitAsync()
317 if res := cli.checkErrGetResult(); res.IsErr() {
320 resp := reqres.Response.GetCommit()
321 return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
324 func (cli *grpcClient) InitChainSync(params types.RequestInitChain) (err error) {
325 cli.InitChainAsync(params)
329 func (cli *grpcClient) BeginBlockSync(params types.RequestBeginBlock) (err error) {
330 cli.BeginBlockAsync(params)
334 func (cli *grpcClient) EndBlockSync(height uint64) (resEndBlock types.ResponseEndBlock, err error) {
335 reqres := cli.EndBlockAsync(height)
336 if err := cli.Error(); err != nil {
337 return resEndBlock, err
339 if blk := reqres.Response.GetEndBlock(); blk != nil {
342 return resEndBlock, nil