OSDN Git Service

new repo
[bytom/vapor.git] / vendor / github.com / tendermint / abci / client / socket_client.go
1 package abcicli
2
3 import (
4         "bufio"
5         "container/list"
6         "errors"
7         "fmt"
8         "net"
9         "reflect"
10         "sync"
11         "time"
12
13         "github.com/tendermint/abci/types"
14         cmn "github.com/tendermint/tmlibs/common"
15 )
16
17 const (
18         OK  = types.CodeType_OK
19         LOG = ""
20 )
21
22 const reqQueueSize = 256 // TODO make configurable
23 // const maxResponseSize = 1048576 // 1MB TODO make configurable
24 const flushThrottleMS = 20 // Don't wait longer than...
25
26 // This is goroutine-safe, but users should beware that
27 // the application in general is not meant to be interfaced
28 // with concurrent callers.
29 type socketClient struct {
30         cmn.BaseService
31
32         reqQueue    chan *ReqRes
33         flushTimer  *cmn.ThrottleTimer
34         mustConnect bool
35
36         mtx     sync.Mutex
37         addr    string
38         conn    net.Conn
39         err     error
40         reqSent *list.List
41         resCb   func(*types.Request, *types.Response) // listens to all callbacks
42
43 }
44
45 func NewSocketClient(addr string, mustConnect bool) *socketClient {
46         cli := &socketClient{
47                 reqQueue:    make(chan *ReqRes, reqQueueSize),
48                 flushTimer:  cmn.NewThrottleTimer("socketClient", flushThrottleMS),
49                 mustConnect: mustConnect,
50
51                 addr:    addr,
52                 reqSent: list.New(),
53                 resCb:   nil,
54         }
55         cli.BaseService = *cmn.NewBaseService(nil, "socketClient", cli)
56         return cli
57 }
58
59 func (cli *socketClient) OnStart() error {
60         if err := cli.BaseService.OnStart(); err != nil {
61                 return err
62         }
63
64         var err error
65         var conn net.Conn
66 RETRY_LOOP:
67         for {
68                 conn, err = cmn.Connect(cli.addr)
69                 if err != nil {
70                         if cli.mustConnect {
71                                 return err
72                         }
73                         cli.Logger.Error(fmt.Sprintf("abci.socketClient failed to connect to %v.  Retrying...", cli.addr))
74                         time.Sleep(time.Second * 3)
75                         continue RETRY_LOOP
76                 }
77                 cli.conn = conn
78
79                 go cli.sendRequestsRoutine(conn)
80                 go cli.recvResponseRoutine(conn)
81
82                 return nil
83         }
84 }
85
86 func (cli *socketClient) OnStop() {
87         cli.BaseService.OnStop()
88
89         cli.mtx.Lock()
90         defer cli.mtx.Unlock()
91         if cli.conn != nil {
92                 cli.conn.Close()
93         }
94
95         cli.flushQueue()
96 }
97
98 // Stop the client and set the error
99 func (cli *socketClient) StopForError(err error) {
100         if !cli.IsRunning() {
101                 return
102         }
103
104         cli.mtx.Lock()
105         if cli.err == nil {
106                 cli.err = err
107         }
108         cli.mtx.Unlock()
109
110         cli.Logger.Error(fmt.Sprintf("Stopping abci.socketClient for error: %v", err.Error()))
111         cli.Stop()
112 }
113
114 func (cli *socketClient) Error() error {
115         cli.mtx.Lock()
116         defer cli.mtx.Unlock()
117         return cli.err
118 }
119
120 // Set listener for all responses
121 // NOTE: callback may get internally generated flush responses.
122 func (cli *socketClient) SetResponseCallback(resCb Callback) {
123         cli.mtx.Lock()
124         defer cli.mtx.Unlock()
125         cli.resCb = resCb
126 }
127
128 //----------------------------------------
129
130 func (cli *socketClient) sendRequestsRoutine(conn net.Conn) {
131
132         w := bufio.NewWriter(conn)
133         for {
134                 select {
135                 case <-cli.flushTimer.Ch:
136                         select {
137                         case cli.reqQueue <- NewReqRes(types.ToRequestFlush()):
138                         default:
139                                 // Probably will fill the buffer, or retry later.
140                         }
141                 case <-cli.BaseService.Quit:
142                         return
143                 case reqres := <-cli.reqQueue:
144                         cli.willSendReq(reqres)
145                         err := types.WriteMessage(reqres.Request, w)
146                         if err != nil {
147                                 cli.StopForError(fmt.Errorf("Error writing msg: %v", err))
148                                 return
149                         }
150                         // cli.Logger.Debug("Sent request", "requestType", reflect.TypeOf(reqres.Request), "request", reqres.Request)
151                         if _, ok := reqres.Request.Value.(*types.Request_Flush); ok {
152                                 err = w.Flush()
153                                 if err != nil {
154                                         cli.StopForError(fmt.Errorf("Error flushing writer: %v", err))
155                                         return
156                                 }
157                         }
158                 }
159         }
160 }
161
162 func (cli *socketClient) recvResponseRoutine(conn net.Conn) {
163
164         r := bufio.NewReader(conn) // Buffer reads
165         for {
166                 var res = &types.Response{}
167                 err := types.ReadMessage(r, res)
168                 if err != nil {
169                         cli.StopForError(err)
170                         return
171                 }
172                 switch r := res.Value.(type) {
173                 case *types.Response_Exception:
174                         // XXX After setting cli.err, release waiters (e.g. reqres.Done())
175                         cli.StopForError(errors.New(r.Exception.Error))
176                         return
177                 default:
178                         // cli.Logger.Debug("Received response", "responseType", reflect.TypeOf(res), "response", res)
179                         err := cli.didRecvResponse(res)
180                         if err != nil {
181                                 cli.StopForError(err)
182                                 return
183                         }
184                 }
185         }
186 }
187
188 func (cli *socketClient) willSendReq(reqres *ReqRes) {
189         cli.mtx.Lock()
190         defer cli.mtx.Unlock()
191         cli.reqSent.PushBack(reqres)
192 }
193
194 func (cli *socketClient) didRecvResponse(res *types.Response) error {
195         cli.mtx.Lock()
196         defer cli.mtx.Unlock()
197
198         // Get the first ReqRes
199         next := cli.reqSent.Front()
200         if next == nil {
201                 return fmt.Errorf("Unexpected result type %v when nothing expected", reflect.TypeOf(res.Value))
202         }
203         reqres := next.Value.(*ReqRes)
204         if !resMatchesReq(reqres.Request, res) {
205                 return fmt.Errorf("Unexpected result type %v when response to %v expected",
206                         reflect.TypeOf(res.Value), reflect.TypeOf(reqres.Request.Value))
207         }
208
209         reqres.Response = res    // Set response
210         reqres.Done()            // Release waiters
211         cli.reqSent.Remove(next) // Pop first item from linked list
212
213         // Notify reqRes listener if set
214         if cb := reqres.GetCallback(); cb != nil {
215                 cb(res)
216         }
217
218         // Notify client listener if set
219         if cli.resCb != nil {
220                 cli.resCb(reqres.Request, res)
221         }
222
223         return nil
224 }
225
226 //----------------------------------------
227
228 func (cli *socketClient) EchoAsync(msg string) *ReqRes {
229         return cli.queueRequest(types.ToRequestEcho(msg))
230 }
231
232 func (cli *socketClient) FlushAsync() *ReqRes {
233         return cli.queueRequest(types.ToRequestFlush())
234 }
235
236 func (cli *socketClient) InfoAsync(req types.RequestInfo) *ReqRes {
237         return cli.queueRequest(types.ToRequestInfo(req))
238 }
239
240 func (cli *socketClient) SetOptionAsync(key string, value string) *ReqRes {
241         return cli.queueRequest(types.ToRequestSetOption(key, value))
242 }
243
244 func (cli *socketClient) DeliverTxAsync(tx []byte) *ReqRes {
245         return cli.queueRequest(types.ToRequestDeliverTx(tx))
246 }
247
248 func (cli *socketClient) CheckTxAsync(tx []byte) *ReqRes {
249         return cli.queueRequest(types.ToRequestCheckTx(tx))
250 }
251
252 func (cli *socketClient) QueryAsync(reqQuery types.RequestQuery) *ReqRes {
253         return cli.queueRequest(types.ToRequestQuery(reqQuery))
254 }
255
256 func (cli *socketClient) CommitAsync() *ReqRes {
257         return cli.queueRequest(types.ToRequestCommit())
258 }
259
260 func (cli *socketClient) InitChainAsync(params types.RequestInitChain) *ReqRes {
261         return cli.queueRequest(types.ToRequestInitChain(params))
262 }
263
264 func (cli *socketClient) BeginBlockAsync(params types.RequestBeginBlock) *ReqRes {
265         return cli.queueRequest(types.ToRequestBeginBlock(params))
266 }
267
268 func (cli *socketClient) EndBlockAsync(height uint64) *ReqRes {
269         return cli.queueRequest(types.ToRequestEndBlock(height))
270 }
271
272 //----------------------------------------
273
274 func (cli *socketClient) EchoSync(msg string) (res types.Result) {
275         reqres := cli.queueRequest(types.ToRequestEcho(msg))
276         cli.FlushSync()
277         if err := cli.Error(); err != nil {
278                 return types.ErrInternalError.SetLog(err.Error())
279         }
280         resp := reqres.Response.GetEcho()
281         return types.Result{Code: OK, Data: []byte(resp.Message)}
282 }
283
284 func (cli *socketClient) FlushSync() error {
285         reqRes := cli.queueRequest(types.ToRequestFlush())
286         if err := cli.Error(); err != nil {
287                 return types.ErrInternalError.SetLog(err.Error())
288         }
289         reqRes.Wait() // NOTE: if we don't flush the queue, its possible to get stuck here
290         return cli.Error()
291 }
292
293 func (cli *socketClient) InfoSync(req types.RequestInfo) (resInfo types.ResponseInfo, err error) {
294         reqres := cli.queueRequest(types.ToRequestInfo(req))
295         cli.FlushSync()
296         if err := cli.Error(); err != nil {
297                 return resInfo, err
298         }
299         if resInfo_ := reqres.Response.GetInfo(); resInfo_ != nil {
300                 return *resInfo_, nil
301         }
302         return resInfo, nil
303 }
304
305 func (cli *socketClient) SetOptionSync(key string, value string) (res types.Result) {
306         reqres := cli.queueRequest(types.ToRequestSetOption(key, value))
307         cli.FlushSync()
308         if err := cli.Error(); err != nil {
309                 return types.ErrInternalError.SetLog(err.Error())
310         }
311         resp := reqres.Response.GetSetOption()
312         return types.Result{Code: OK, Data: nil, Log: resp.Log}
313 }
314
315 func (cli *socketClient) DeliverTxSync(tx []byte) (res types.Result) {
316         reqres := cli.queueRequest(types.ToRequestDeliverTx(tx))
317         cli.FlushSync()
318         if err := cli.Error(); err != nil {
319                 return types.ErrInternalError.SetLog(err.Error())
320         }
321         resp := reqres.Response.GetDeliverTx()
322         return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
323 }
324
325 func (cli *socketClient) CheckTxSync(tx []byte) (res types.Result) {
326         reqres := cli.queueRequest(types.ToRequestCheckTx(tx))
327         cli.FlushSync()
328         if err := cli.Error(); err != nil {
329                 return types.ErrInternalError.SetLog(err.Error())
330         }
331         resp := reqres.Response.GetCheckTx()
332         return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
333 }
334
335 func (cli *socketClient) QuerySync(reqQuery types.RequestQuery) (resQuery types.ResponseQuery, err error) {
336         reqres := cli.queueRequest(types.ToRequestQuery(reqQuery))
337         cli.FlushSync()
338         if err := cli.Error(); err != nil {
339                 return resQuery, err
340         }
341         if resQuery_ := reqres.Response.GetQuery(); resQuery_ != nil {
342                 return *resQuery_, nil
343         }
344         return resQuery, nil
345 }
346
347 func (cli *socketClient) CommitSync() (res types.Result) {
348         reqres := cli.queueRequest(types.ToRequestCommit())
349         cli.FlushSync()
350         if err := cli.Error(); err != nil {
351                 return types.ErrInternalError.SetLog(err.Error())
352         }
353         resp := reqres.Response.GetCommit()
354         return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
355 }
356
357 func (cli *socketClient) InitChainSync(params types.RequestInitChain) (err error) {
358         cli.queueRequest(types.ToRequestInitChain(params))
359         cli.FlushSync()
360         return cli.Error()
361 }
362
363 func (cli *socketClient) BeginBlockSync(params types.RequestBeginBlock) (err error) {
364         cli.queueRequest(types.ToRequestBeginBlock(params))
365         cli.FlushSync()
366         return cli.Error()
367 }
368
369 func (cli *socketClient) EndBlockSync(height uint64) (resEndBlock types.ResponseEndBlock, err error) {
370         reqres := cli.queueRequest(types.ToRequestEndBlock(height))
371         cli.FlushSync()
372         if err := cli.Error(); err != nil {
373                 return resEndBlock, err
374         }
375         if blk := reqres.Response.GetEndBlock(); blk != nil {
376                 return *blk, nil
377         }
378         return resEndBlock, nil
379 }
380
381 //----------------------------------------
382
383 func (cli *socketClient) queueRequest(req *types.Request) *ReqRes {
384         reqres := NewReqRes(req)
385
386         // TODO: set cli.err if reqQueue times out
387         cli.reqQueue <- reqres
388
389         // Maybe auto-flush, or unset auto-flush
390         switch req.Value.(type) {
391         case *types.Request_Flush:
392                 cli.flushTimer.Unset()
393         default:
394                 cli.flushTimer.Set()
395         }
396
397         return reqres
398 }
399
400 func (cli *socketClient) flushQueue() {
401 LOOP:
402         for {
403                 select {
404                 case reqres := <-cli.reqQueue:
405                         reqres.Done()
406                 default:
407                         break LOOP
408                 }
409         }
410 }
411
412 //----------------------------------------
413
414 func resMatchesReq(req *types.Request, res *types.Response) (ok bool) {
415         switch req.Value.(type) {
416         case *types.Request_Echo:
417                 _, ok = res.Value.(*types.Response_Echo)
418         case *types.Request_Flush:
419                 _, ok = res.Value.(*types.Response_Flush)
420         case *types.Request_Info:
421                 _, ok = res.Value.(*types.Response_Info)
422         case *types.Request_SetOption:
423                 _, ok = res.Value.(*types.Response_SetOption)
424         case *types.Request_DeliverTx:
425                 _, ok = res.Value.(*types.Response_DeliverTx)
426         case *types.Request_CheckTx:
427                 _, ok = res.Value.(*types.Response_CheckTx)
428         case *types.Request_Commit:
429                 _, ok = res.Value.(*types.Response_Commit)
430         case *types.Request_Query:
431                 _, ok = res.Value.(*types.Response_Query)
432         case *types.Request_InitChain:
433                 _, ok = res.Value.(*types.Response_InitChain)
434         case *types.Request_BeginBlock:
435                 _, ok = res.Value.(*types.Response_BeginBlock)
436         case *types.Request_EndBlock:
437                 _, ok = res.Value.(*types.Response_EndBlock)
438         }
439         return ok
440 }