OSDN Git Service

new repo
[bytom/vapor.git] / vendor / github.com / tendermint / abci / client / grpc_client.go
1 package abcicli
2
3 import (
4         "fmt"
5         "net"
6         "sync"
7         "time"
8
9         context "golang.org/x/net/context"
10         grpc "google.golang.org/grpc"
11
12         "github.com/tendermint/abci/types"
13         cmn "github.com/tendermint/tmlibs/common"
14 )
15
16 // A stripped copy of the remoteClient that makes
17 // synchronous calls using grpc
18 type grpcClient struct {
19         cmn.BaseService
20         mustConnect bool
21
22         client types.ABCIApplicationClient
23
24         mtx   sync.Mutex
25         addr  string
26         err   error
27         resCb func(*types.Request, *types.Response) // listens to all callbacks
28 }
29
30 func NewGRPCClient(addr string, mustConnect bool) *grpcClient {
31         cli := &grpcClient{
32                 addr:        addr,
33                 mustConnect: mustConnect,
34         }
35         cli.BaseService = *cmn.NewBaseService(nil, "grpcClient", cli)
36         return cli
37 }
38
39 func dialerFunc(addr string, timeout time.Duration) (net.Conn, error) {
40         return cmn.Connect(addr)
41 }
42
43 func (cli *grpcClient) OnStart() error {
44         if err := cli.BaseService.OnStart(); err != nil {
45                 return err
46         }
47 RETRY_LOOP:
48
49         for {
50                 conn, err := grpc.Dial(cli.addr, grpc.WithInsecure(), grpc.WithDialer(dialerFunc))
51                 if err != nil {
52                         if cli.mustConnect {
53                                 return err
54                         }
55                         cli.Logger.Error(fmt.Sprintf("abci.grpcClient failed to connect to %v.  Retrying...\n", cli.addr))
56                         time.Sleep(time.Second * 3)
57                         continue RETRY_LOOP
58                 }
59
60                 client := types.NewABCIApplicationClient(conn)
61
62         ENSURE_CONNECTED:
63                 for {
64                         _, err := client.Echo(context.Background(), &types.RequestEcho{"hello"}, grpc.FailFast(true))
65                         if err == nil {
66                                 break ENSURE_CONNECTED
67                         }
68                         time.Sleep(time.Second)
69                 }
70
71                 cli.client = client
72                 return nil
73         }
74 }
75
76 func (cli *grpcClient) OnStop() {
77         cli.BaseService.OnStop()
78         cli.mtx.Lock()
79         defer cli.mtx.Unlock()
80         // TODO: how to close conn? its not a net.Conn and grpc doesn't expose a Close()
81         /*if cli.conn != nil {
82                 cli.conn.Close()
83         }*/
84 }
85
86 func (cli *grpcClient) StopForError(err error) {
87         cli.mtx.Lock()
88         if !cli.IsRunning() {
89                 return
90         }
91
92         if cli.err == nil {
93                 cli.err = err
94         }
95         cli.mtx.Unlock()
96
97         cli.Logger.Error(fmt.Sprintf("Stopping abci.grpcClient for error: %v", err.Error()))
98         cli.Stop()
99 }
100
101 func (cli *grpcClient) Error() error {
102         cli.mtx.Lock()
103         defer cli.mtx.Unlock()
104         return cli.err
105 }
106
107 // Set listener for all responses
108 // NOTE: callback may get internally generated flush responses.
109 func (cli *grpcClient) SetResponseCallback(resCb Callback) {
110         cli.mtx.Lock()
111         defer cli.mtx.Unlock()
112         cli.resCb = resCb
113 }
114
115 //----------------------------------------
116 // GRPC calls are synchronous, but some callbacks expect to be called asynchronously
117 // (eg. the mempool expects to be able to lock to remove bad txs from cache).
118 // To accommodate, we finish each call in its own go-routine,
119 // which is expensive, but easy - if you want something better, use the socket protocol!
120 // maybe one day, if people really want it, we use grpc streams,
121 // but hopefully not :D
122
123 func (cli *grpcClient) EchoAsync(msg string) *ReqRes {
124         req := types.ToRequestEcho(msg)
125         res, err := cli.client.Echo(context.Background(), req.GetEcho(), grpc.FailFast(true))
126         if err != nil {
127                 cli.StopForError(err)
128         }
129         return cli.finishAsyncCall(req, &types.Response{&types.Response_Echo{res}})
130 }
131
132 func (cli *grpcClient) FlushAsync() *ReqRes {
133         req := types.ToRequestFlush()
134         res, err := cli.client.Flush(context.Background(), req.GetFlush(), grpc.FailFast(true))
135         if err != nil {
136                 cli.StopForError(err)
137         }
138         return cli.finishAsyncCall(req, &types.Response{&types.Response_Flush{res}})
139 }
140
141 func (cli *grpcClient) InfoAsync(params types.RequestInfo) *ReqRes {
142         req := types.ToRequestInfo(params)
143         res, err := cli.client.Info(context.Background(), req.GetInfo(), grpc.FailFast(true))
144         if err != nil {
145                 cli.StopForError(err)
146         }
147         return cli.finishAsyncCall(req, &types.Response{&types.Response_Info{res}})
148 }
149
150 func (cli *grpcClient) SetOptionAsync(key string, value string) *ReqRes {
151         req := types.ToRequestSetOption(key, value)
152         res, err := cli.client.SetOption(context.Background(), req.GetSetOption(), grpc.FailFast(true))
153         if err != nil {
154                 cli.StopForError(err)
155         }
156         return cli.finishAsyncCall(req, &types.Response{&types.Response_SetOption{res}})
157 }
158
159 func (cli *grpcClient) DeliverTxAsync(tx []byte) *ReqRes {
160         req := types.ToRequestDeliverTx(tx)
161         res, err := cli.client.DeliverTx(context.Background(), req.GetDeliverTx(), grpc.FailFast(true))
162         if err != nil {
163                 cli.StopForError(err)
164         }
165         return cli.finishAsyncCall(req, &types.Response{&types.Response_DeliverTx{res}})
166 }
167
168 func (cli *grpcClient) CheckTxAsync(tx []byte) *ReqRes {
169         req := types.ToRequestCheckTx(tx)
170         res, err := cli.client.CheckTx(context.Background(), req.GetCheckTx(), grpc.FailFast(true))
171         if err != nil {
172                 cli.StopForError(err)
173         }
174         return cli.finishAsyncCall(req, &types.Response{&types.Response_CheckTx{res}})
175 }
176
177 func (cli *grpcClient) QueryAsync(params types.RequestQuery) *ReqRes {
178         req := types.ToRequestQuery(params)
179         res, err := cli.client.Query(context.Background(), req.GetQuery(), grpc.FailFast(true))
180         if err != nil {
181                 cli.StopForError(err)
182         }
183         return cli.finishAsyncCall(req, &types.Response{&types.Response_Query{res}})
184 }
185
186 func (cli *grpcClient) CommitAsync() *ReqRes {
187         req := types.ToRequestCommit()
188         res, err := cli.client.Commit(context.Background(), req.GetCommit(), grpc.FailFast(true))
189         if err != nil {
190                 cli.StopForError(err)
191         }
192         return cli.finishAsyncCall(req, &types.Response{&types.Response_Commit{res}})
193 }
194
195 func (cli *grpcClient) InitChainAsync(params types.RequestInitChain) *ReqRes {
196         req := types.ToRequestInitChain(params)
197         res, err := cli.client.InitChain(context.Background(), req.GetInitChain(), grpc.FailFast(true))
198         if err != nil {
199                 cli.StopForError(err)
200         }
201         return cli.finishAsyncCall(req, &types.Response{&types.Response_InitChain{res}})
202 }
203
204 func (cli *grpcClient) BeginBlockAsync(params types.RequestBeginBlock) *ReqRes {
205         req := types.ToRequestBeginBlock(params)
206         res, err := cli.client.BeginBlock(context.Background(), req.GetBeginBlock(), grpc.FailFast(true))
207         if err != nil {
208                 cli.StopForError(err)
209         }
210         return cli.finishAsyncCall(req, &types.Response{&types.Response_BeginBlock{res}})
211 }
212
213 func (cli *grpcClient) EndBlockAsync(height uint64) *ReqRes {
214         req := types.ToRequestEndBlock(height)
215         res, err := cli.client.EndBlock(context.Background(), req.GetEndBlock(), grpc.FailFast(true))
216         if err != nil {
217                 cli.StopForError(err)
218         }
219         return cli.finishAsyncCall(req, &types.Response{&types.Response_EndBlock{res}})
220 }
221
222 func (cli *grpcClient) finishAsyncCall(req *types.Request, res *types.Response) *ReqRes {
223         reqres := NewReqRes(req)
224         reqres.Response = res // Set response
225         reqres.Done()         // Release waiters
226         reqres.SetDone()      // so reqRes.SetCallback will run the callback
227
228         // go routine for callbacks
229         go func() {
230                 // Notify reqRes listener if set
231                 if cb := reqres.GetCallback(); cb != nil {
232                         cb(res)
233                 }
234
235                 // Notify client listener if set
236                 if cli.resCb != nil {
237                         cli.resCb(reqres.Request, res)
238                 }
239         }()
240         return reqres
241 }
242
243 func (cli *grpcClient) checkErrGetResult() types.Result {
244         if err := cli.Error(); err != nil {
245                 // StopForError should already have been called if error is set
246                 return types.ErrInternalError.SetLog(err.Error())
247         }
248         return types.Result{}
249 }
250
251 //----------------------------------------
252
253 func (cli *grpcClient) EchoSync(msg string) (res types.Result) {
254         reqres := cli.EchoAsync(msg)
255         if res := cli.checkErrGetResult(); res.IsErr() {
256                 return res
257         }
258         resp := reqres.Response.GetEcho()
259         return types.NewResultOK([]byte(resp.Message), "")
260 }
261
262 func (cli *grpcClient) FlushSync() error {
263         return nil
264 }
265
266 func (cli *grpcClient) InfoSync(req types.RequestInfo) (resInfo types.ResponseInfo, err error) {
267         reqres := cli.InfoAsync(req)
268         if err = cli.Error(); err != nil {
269                 return resInfo, err
270         }
271         if info := reqres.Response.GetInfo(); info != nil {
272                 return *info, nil
273         }
274         return resInfo, nil
275 }
276
277 func (cli *grpcClient) SetOptionSync(key string, value string) (res types.Result) {
278         reqres := cli.SetOptionAsync(key, value)
279         if res := cli.checkErrGetResult(); res.IsErr() {
280                 return res
281         }
282         resp := reqres.Response.GetSetOption()
283         return types.Result{Code: OK, Data: nil, Log: resp.Log}
284 }
285
286 func (cli *grpcClient) DeliverTxSync(tx []byte) (res types.Result) {
287         reqres := cli.DeliverTxAsync(tx)
288         if res := cli.checkErrGetResult(); res.IsErr() {
289                 return res
290         }
291         resp := reqres.Response.GetDeliverTx()
292         return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
293 }
294
295 func (cli *grpcClient) CheckTxSync(tx []byte) (res types.Result) {
296         reqres := cli.CheckTxAsync(tx)
297         if res := cli.checkErrGetResult(); res.IsErr() {
298                 return res
299         }
300         resp := reqres.Response.GetCheckTx()
301         return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
302 }
303
304 func (cli *grpcClient) QuerySync(reqQuery types.RequestQuery) (resQuery types.ResponseQuery, err error) {
305         reqres := cli.QueryAsync(reqQuery)
306         if err = cli.Error(); err != nil {
307                 return resQuery, err
308         }
309         if resQuery_ := reqres.Response.GetQuery(); resQuery_ != nil {
310                 return *resQuery_, nil
311         }
312         return resQuery, nil
313 }
314
315 func (cli *grpcClient) CommitSync() (res types.Result) {
316         reqres := cli.CommitAsync()
317         if res := cli.checkErrGetResult(); res.IsErr() {
318                 return res
319         }
320         resp := reqres.Response.GetCommit()
321         return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
322 }
323
324 func (cli *grpcClient) InitChainSync(params types.RequestInitChain) (err error) {
325         cli.InitChainAsync(params)
326         return cli.Error()
327 }
328
329 func (cli *grpcClient) BeginBlockSync(params types.RequestBeginBlock) (err error) {
330         cli.BeginBlockAsync(params)
331         return cli.Error()
332 }
333
334 func (cli *grpcClient) EndBlockSync(height uint64) (resEndBlock types.ResponseEndBlock, err error) {
335         reqres := cli.EndBlockAsync(height)
336         if err := cli.Error(); err != nil {
337                 return resEndBlock, err
338         }
339         if blk := reqres.Response.GetEndBlock(); blk != nil {
340                 return *blk, nil
341         }
342         return resEndBlock, nil
343 }