7 "golang.org/x/time/rate"
9 "github.com/sony/gobreaker"
11 "github.com/go-kit/kit/circuitbreaker"
12 "github.com/go-kit/kit/endpoint"
13 "github.com/go-kit/kit/ratelimit"
15 "github.com/go-kit/kit/examples/addsvc/pkg/addendpoint"
16 "github.com/go-kit/kit/examples/addsvc/pkg/addservice"
17 addthrift "github.com/go-kit/kit/examples/addsvc/thrift/gen-go/addsvc"
20 type thriftServer struct {
22 endpoints addendpoint.Set
25 // NewThriftServer makes a set of endpoints available as a Thrift service.
26 func NewThriftServer(endpoints addendpoint.Set) addthrift.AddService {
32 func (s *thriftServer) Sum(ctx context.Context, a int64, b int64) (*addthrift.SumReply, error) {
33 request := addendpoint.SumRequest{A: int(a), B: int(b)}
34 response, err := s.endpoints.SumEndpoint(ctx, request)
38 resp := response.(addendpoint.SumResponse)
39 return &addthrift.SumReply{Value: int64(resp.V), Err: err2str(resp.Err)}, nil
42 func (s *thriftServer) Concat(ctx context.Context, a string, b string) (*addthrift.ConcatReply, error) {
43 request := addendpoint.ConcatRequest{A: a, B: b}
44 response, err := s.endpoints.ConcatEndpoint(ctx, request)
48 resp := response.(addendpoint.ConcatResponse)
49 return &addthrift.ConcatReply{Value: resp.V, Err: err2str(resp.Err)}, nil
52 // NewThriftClient returns an AddService backed by a Thrift server described by
53 // the provided client. The caller is responsible for constructing the client,
54 // and eventually closing the underlying transport. We bake-in certain middlewares,
55 // implementing the client library pattern.
56 func NewThriftClient(client *addthrift.AddServiceClient) addservice.Service {
57 // We construct a single ratelimiter middleware, to limit the total outgoing
58 // QPS from this client to all methods on the remote instance. We also
59 // construct per-endpoint circuitbreaker middlewares to demonstrate how
60 // that's done, although they could easily be combined into a single breaker
61 // for the entire remote instance, too.
62 limiter := ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 100))
64 // Each individual endpoint is an http/transport.Client (which implements
65 // endpoint.Endpoint) that gets wrapped with various middlewares. If you
66 // could rely on a consistent set of client behavior.
67 var sumEndpoint endpoint.Endpoint
69 sumEndpoint = MakeThriftSumEndpoint(client)
70 sumEndpoint = limiter(sumEndpoint)
71 sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
73 Timeout: 30 * time.Second,
77 // The Concat endpoint is the same thing, with slightly different
78 // middlewares to demonstrate how to specialize per-endpoint.
79 var concatEndpoint endpoint.Endpoint
81 concatEndpoint = MakeThriftConcatEndpoint(client)
82 concatEndpoint = limiter(concatEndpoint)
83 concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
85 Timeout: 10 * time.Second,
89 // Returning the endpoint.Set as a service.Service relies on the
90 // endpoint.Set implementing the Service methods. That's just a simple bit
92 return addendpoint.Set{
93 SumEndpoint: sumEndpoint,
94 ConcatEndpoint: concatEndpoint,
98 // MakeThriftSumEndpoint returns an endpoint that invokes the passed Thrift client.
99 // Useful only in clients, and only until a proper transport/thrift.Client exists.
100 func MakeThriftSumEndpoint(client *addthrift.AddServiceClient) endpoint.Endpoint {
101 return func(ctx context.Context, request interface{}) (interface{}, error) {
102 req := request.(addendpoint.SumRequest)
103 reply, err := client.Sum(ctx, int64(req.A), int64(req.B))
104 if err == addservice.ErrIntOverflow {
105 return nil, err // special case; see comment on ErrIntOverflow
107 return addendpoint.SumResponse{V: int(reply.Value), Err: err}, nil
111 // MakeThriftConcatEndpoint returns an endpoint that invokes the passed Thrift
112 // client. Useful only in clients, and only until a proper
113 // transport/thrift.Client exists.
114 func MakeThriftConcatEndpoint(client *addthrift.AddServiceClient) endpoint.Endpoint {
115 return func(ctx context.Context, request interface{}) (interface{}, error) {
116 req := request.(addendpoint.ConcatRequest)
117 reply, err := client.Concat(ctx, req.A, req.B)
118 return addendpoint.ConcatResponse{V: reply.Value, Err: err}, nil