OSDN Git Service

new repo
[bytom/vapor.git] / vendor / github.com / go-kit / kit / examples / addsvc / pkg / addtransport / grpc.go
1 package addtransport
2
3 import (
4         "context"
5         "errors"
6         "time"
7
8         "google.golang.org/grpc"
9
10         stdopentracing "github.com/opentracing/opentracing-go"
11         "github.com/sony/gobreaker"
12         oldcontext "golang.org/x/net/context"
13         "golang.org/x/time/rate"
14
15         "github.com/go-kit/kit/circuitbreaker"
16         "github.com/go-kit/kit/endpoint"
17         "github.com/go-kit/kit/log"
18         "github.com/go-kit/kit/ratelimit"
19         "github.com/go-kit/kit/tracing/opentracing"
20         grpctransport "github.com/go-kit/kit/transport/grpc"
21
22         "github.com/go-kit/kit/examples/addsvc/pb"
23         "github.com/go-kit/kit/examples/addsvc/pkg/addendpoint"
24         "github.com/go-kit/kit/examples/addsvc/pkg/addservice"
25 )
26
27 type grpcServer struct {
28         sum    grpctransport.Handler
29         concat grpctransport.Handler
30 }
31
32 // NewGRPCServer makes a set of endpoints available as a gRPC AddServer.
33 func NewGRPCServer(endpoints addendpoint.Set, tracer stdopentracing.Tracer, logger log.Logger) pb.AddServer {
34         options := []grpctransport.ServerOption{
35                 grpctransport.ServerErrorLogger(logger),
36         }
37         return &grpcServer{
38                 sum: grpctransport.NewServer(
39                         endpoints.SumEndpoint,
40                         decodeGRPCSumRequest,
41                         encodeGRPCSumResponse,
42                         append(options, grpctransport.ServerBefore(opentracing.GRPCToContext(tracer, "Sum", logger)))...,
43                 ),
44                 concat: grpctransport.NewServer(
45                         endpoints.ConcatEndpoint,
46                         decodeGRPCConcatRequest,
47                         encodeGRPCConcatResponse,
48                         append(options, grpctransport.ServerBefore(opentracing.GRPCToContext(tracer, "Concat", logger)))...,
49                 ),
50         }
51 }
52
53 func (s *grpcServer) Sum(ctx oldcontext.Context, req *pb.SumRequest) (*pb.SumReply, error) {
54         _, rep, err := s.sum.ServeGRPC(ctx, req)
55         if err != nil {
56                 return nil, err
57         }
58         return rep.(*pb.SumReply), nil
59 }
60
61 func (s *grpcServer) Concat(ctx oldcontext.Context, req *pb.ConcatRequest) (*pb.ConcatReply, error) {
62         _, rep, err := s.concat.ServeGRPC(ctx, req)
63         if err != nil {
64                 return nil, err
65         }
66         return rep.(*pb.ConcatReply), nil
67 }
68
69 // NewGRPCClient returns an AddService backed by a gRPC server at the other end
70 // of the conn. The caller is responsible for constructing the conn, and
71 // eventually closing the underlying transport. We bake-in certain middlewares,
72 // implementing the client library pattern.
73 func NewGRPCClient(conn *grpc.ClientConn, tracer stdopentracing.Tracer, logger log.Logger) addservice.Service {
74         // We construct a single ratelimiter middleware, to limit the total outgoing
75         // QPS from this client to all methods on the remote instance. We also
76         // construct per-endpoint circuitbreaker middlewares to demonstrate how
77         // that's done, although they could easily be combined into a single breaker
78         // for the entire remote instance, too.
79         limiter := ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 100))
80
81         // Each individual endpoint is an http/transport.Client (which implements
82         // endpoint.Endpoint) that gets wrapped with various middlewares. If you
83         // made your own client library, you'd do this work there, so your server
84         // could rely on a consistent set of client behavior.
85         var sumEndpoint endpoint.Endpoint
86         {
87                 sumEndpoint = grpctransport.NewClient(
88                         conn,
89                         "pb.Add",
90                         "Sum",
91                         encodeGRPCSumRequest,
92                         decodeGRPCSumResponse,
93                         pb.SumReply{},
94                         grpctransport.ClientBefore(opentracing.ContextToGRPC(tracer, logger)),
95                 ).Endpoint()
96                 sumEndpoint = opentracing.TraceClient(tracer, "Sum")(sumEndpoint)
97                 sumEndpoint = limiter(sumEndpoint)
98                 sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
99                         Name:    "Sum",
100                         Timeout: 30 * time.Second,
101                 }))(sumEndpoint)
102         }
103
104         // The Concat endpoint is the same thing, with slightly different
105         // middlewares to demonstrate how to specialize per-endpoint.
106         var concatEndpoint endpoint.Endpoint
107         {
108                 concatEndpoint = grpctransport.NewClient(
109                         conn,
110                         "pb.Add",
111                         "Concat",
112                         encodeGRPCConcatRequest,
113                         decodeGRPCConcatResponse,
114                         pb.ConcatReply{},
115                         grpctransport.ClientBefore(opentracing.ContextToGRPC(tracer, logger)),
116                 ).Endpoint()
117                 concatEndpoint = opentracing.TraceClient(tracer, "Concat")(concatEndpoint)
118                 concatEndpoint = limiter(concatEndpoint)
119                 concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
120                         Name:    "Concat",
121                         Timeout: 10 * time.Second,
122                 }))(concatEndpoint)
123         }
124
125         // Returning the endpoint.Set as a service.Service relies on the
126         // endpoint.Set implementing the Service methods. That's just a simple bit
127         // of glue code.
128         return addendpoint.Set{
129                 SumEndpoint:    sumEndpoint,
130                 ConcatEndpoint: concatEndpoint,
131         }
132 }
133
134 // decodeGRPCSumRequest is a transport/grpc.DecodeRequestFunc that converts a
135 // gRPC sum request to a user-domain sum request. Primarily useful in a server.
136 func decodeGRPCSumRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
137         req := grpcReq.(*pb.SumRequest)
138         return addendpoint.SumRequest{A: int(req.A), B: int(req.B)}, nil
139 }
140
141 // decodeGRPCConcatRequest is a transport/grpc.DecodeRequestFunc that converts a
142 // gRPC concat request to a user-domain concat request. Primarily useful in a
143 // server.
144 func decodeGRPCConcatRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
145         req := grpcReq.(*pb.ConcatRequest)
146         return addendpoint.ConcatRequest{A: req.A, B: req.B}, nil
147 }
148
149 // decodeGRPCSumResponse is a transport/grpc.DecodeResponseFunc that converts a
150 // gRPC sum reply to a user-domain sum response. Primarily useful in a client.
151 func decodeGRPCSumResponse(_ context.Context, grpcReply interface{}) (interface{}, error) {
152         reply := grpcReply.(*pb.SumReply)
153         return addendpoint.SumResponse{V: int(reply.V), Err: str2err(reply.Err)}, nil
154 }
155
156 // decodeGRPCConcatResponse is a transport/grpc.DecodeResponseFunc that converts
157 // a gRPC concat reply to a user-domain concat response. Primarily useful in a
158 // client.
159 func decodeGRPCConcatResponse(_ context.Context, grpcReply interface{}) (interface{}, error) {
160         reply := grpcReply.(*pb.ConcatReply)
161         return addendpoint.ConcatResponse{V: reply.V, Err: str2err(reply.Err)}, nil
162 }
163
164 // encodeGRPCSumResponse is a transport/grpc.EncodeResponseFunc that converts a
165 // user-domain sum response to a gRPC sum reply. Primarily useful in a server.
166 func encodeGRPCSumResponse(_ context.Context, response interface{}) (interface{}, error) {
167         resp := response.(addendpoint.SumResponse)
168         return &pb.SumReply{V: int64(resp.V), Err: err2str(resp.Err)}, nil
169 }
170
171 // encodeGRPCConcatResponse is a transport/grpc.EncodeResponseFunc that converts
172 // a user-domain concat response to a gRPC concat reply. Primarily useful in a
173 // server.
174 func encodeGRPCConcatResponse(_ context.Context, response interface{}) (interface{}, error) {
175         resp := response.(addendpoint.ConcatResponse)
176         return &pb.ConcatReply{V: resp.V, Err: err2str(resp.Err)}, nil
177 }
178
179 // encodeGRPCSumRequest is a transport/grpc.EncodeRequestFunc that converts a
180 // user-domain sum request to a gRPC sum request. Primarily useful in a client.
181 func encodeGRPCSumRequest(_ context.Context, request interface{}) (interface{}, error) {
182         req := request.(addendpoint.SumRequest)
183         return &pb.SumRequest{A: int64(req.A), B: int64(req.B)}, nil
184 }
185
186 // encodeGRPCConcatRequest is a transport/grpc.EncodeRequestFunc that converts a
187 // user-domain concat request to a gRPC concat request. Primarily useful in a
188 // client.
189 func encodeGRPCConcatRequest(_ context.Context, request interface{}) (interface{}, error) {
190         req := request.(addendpoint.ConcatRequest)
191         return &pb.ConcatRequest{A: req.A, B: req.B}, nil
192 }
193
194 // These annoying helper functions are required to translate Go error types to
195 // and from strings, which is the type we use in our IDLs to represent errors.
196 // There is special casing to treat empty strings as nil errors.
197
198 func str2err(s string) error {
199         if s == "" {
200                 return nil
201         }
202         return errors.New(s)
203 }
204
205 func err2str(err error) string {
206         if err == nil {
207                 return ""
208         }
209         return err.Error()
210 }