13 "github.com/apache/thrift/lib/go/thrift"
14 lightstep "github.com/lightstep/lightstep-tracer-go"
15 "github.com/oklog/oklog/pkg/group"
16 stdopentracing "github.com/opentracing/opentracing-go"
17 zipkin "github.com/openzipkin/zipkin-go-opentracing"
18 stdprometheus "github.com/prometheus/client_golang/prometheus"
19 "github.com/prometheus/client_golang/prometheus/promhttp"
20 "google.golang.org/grpc"
21 "sourcegraph.com/sourcegraph/appdash"
22 appdashot "sourcegraph.com/sourcegraph/appdash/opentracing"
24 "github.com/go-kit/kit/log"
25 "github.com/go-kit/kit/metrics"
26 "github.com/go-kit/kit/metrics/prometheus"
28 addpb "github.com/go-kit/kit/examples/addsvc/pb"
29 "github.com/go-kit/kit/examples/addsvc/pkg/addendpoint"
30 "github.com/go-kit/kit/examples/addsvc/pkg/addservice"
31 "github.com/go-kit/kit/examples/addsvc/pkg/addtransport"
32 addthrift "github.com/go-kit/kit/examples/addsvc/thrift/gen-go/addsvc"
36 // Define our flags. Your service probably won't need to bind listeners for
37 // *all* supported transports, or support both Zipkin and LightStep, and so
38 // on, but we do it here for demonstration purposes.
39 fs := flag.NewFlagSet("addsvc", flag.ExitOnError)
41 debugAddr = fs.String("debug.addr", ":8080", "Debug and metrics listen address")
42 httpAddr = fs.String("http-addr", ":8081", "HTTP listen address")
43 grpcAddr = fs.String("grpc-addr", ":8082", "gRPC listen address")
44 thriftAddr = fs.String("thrift-addr", ":8083", "Thrift listen address")
45 thriftProtocol = fs.String("thrift-protocol", "binary", "binary, compact, json, simplejson")
46 thriftBuffer = fs.Int("thrift-buffer", 0, "0 for unbuffered")
47 thriftFramed = fs.Bool("thrift-framed", false, "true to enable framing")
48 zipkinURL = fs.String("zipkin-url", "", "Enable Zipkin tracing via a collector URL e.g. http://localhost:9411/api/v1/spans")
49 lightstepToken = flag.String("lightstep-token", "", "Enable LightStep tracing via a LightStep access token")
50 appdashAddr = flag.String("appdash-addr", "", "Enable Appdash tracing via an Appdash server host:port")
52 fs.Usage = usageFor(fs, os.Args[0]+" [flags]")
55 // Create a single logger, which we'll use and give to other components.
58 logger = log.NewLogfmtLogger(os.Stderr)
59 logger = log.With(logger, "ts", log.DefaultTimestampUTC)
60 logger = log.With(logger, "caller", log.DefaultCaller)
63 // Determine which tracer to use. We'll pass the tracer to all the
64 // components that use it, as a dependency.
65 var tracer stdopentracing.Tracer
68 logger.Log("tracer", "Zipkin", "URL", *zipkinURL)
69 collector, err := zipkin.NewHTTPCollector(*zipkinURL)
71 logger.Log("err", err)
74 defer collector.Close()
77 hostPort = "localhost:80"
78 serviceName = "addsvc"
80 recorder := zipkin.NewRecorder(collector, debug, hostPort, serviceName)
81 tracer, err = zipkin.NewTracer(recorder)
83 logger.Log("err", err)
86 } else if *lightstepToken != "" {
87 logger.Log("tracer", "LightStep") // probably don't want to print out the token :)
88 tracer = lightstep.NewTracer(lightstep.Options{
89 AccessToken: *lightstepToken,
91 defer lightstep.FlushLightStepTracer(tracer)
92 } else if *appdashAddr != "" {
93 logger.Log("tracer", "Appdash", "addr", *appdashAddr)
94 tracer = appdashot.NewTracer(appdash.NewRemoteCollector(*appdashAddr))
96 logger.Log("tracer", "none")
97 tracer = stdopentracing.GlobalTracer() // no-op
101 // Create the (sparse) metrics we'll use in the service. They, too, are
102 // dependencies that we pass to components that use them.
103 var ints, chars metrics.Counter
105 // Business-level metrics.
106 ints = prometheus.NewCounterFrom(stdprometheus.CounterOpts{
107 Namespace: "example",
109 Name: "integers_summed",
110 Help: "Total count of integers summed via the Sum method.",
112 chars = prometheus.NewCounterFrom(stdprometheus.CounterOpts{
113 Namespace: "example",
115 Name: "characters_concatenated",
116 Help: "Total count of characters concatenated via the Concat method.",
119 var duration metrics.Histogram
121 // Endpoint-level metrics.
122 duration = prometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
123 Namespace: "example",
125 Name: "request_duration_seconds",
126 Help: "Request duration in seconds.",
127 }, []string{"method", "success"})
129 http.DefaultServeMux.Handle("/metrics", promhttp.Handler())
131 // Build the layers of the service "onion" from the inside out. First, the
132 // business logic service; then, the set of endpoints that wrap the service;
133 // and finally, a series of concrete transport adapters. The adapters, like
134 // the HTTP handler or the gRPC server, are the bridge between Go kit and
135 // the interfaces that the transports expect. Note that we're not binding
136 // them to ports or anything yet; we'll do that next.
138 service = addservice.New(logger, ints, chars)
139 endpoints = addendpoint.New(service, logger, duration, tracer)
140 httpHandler = addtransport.NewHTTPHandler(endpoints, tracer, logger)
141 grpcServer = addtransport.NewGRPCServer(endpoints, tracer, logger)
142 thriftServer = addtransport.NewThriftServer(endpoints)
145 // Now we're to the part of the func main where we want to start actually
146 // running things, like servers bound to listeners to receive connections.
148 // The method is the same for each component: add a new actor to the group
149 // struct, which is a combination of 2 anonymous functions: the first
150 // function actually runs the component, and the second function should
151 // interrupt the first function and cause it to return. It's in these
152 // functions that we actually bind the Go kit server/handler structs to the
153 // concrete transports and run them.
155 // Putting each component into its own block is mostly for aesthetics: it
156 // clearly demarcates the scope in which each listener/socket may be used.
159 // The debug listener mounts the http.DefaultServeMux, and serves up
160 // stuff like the Prometheus metrics route, the Go debug and profiling
161 // routes, and so on.
162 debugListener, err := net.Listen("tcp", *debugAddr)
164 logger.Log("transport", "debug/HTTP", "during", "Listen", "err", err)
168 logger.Log("transport", "debug/HTTP", "addr", *debugAddr)
169 return http.Serve(debugListener, http.DefaultServeMux)
171 debugListener.Close()
175 // The HTTP listener mounts the Go kit HTTP handler we created.
176 httpListener, err := net.Listen("tcp", *httpAddr)
178 logger.Log("transport", "HTTP", "during", "Listen", "err", err)
182 logger.Log("transport", "HTTP", "addr", *httpAddr)
183 return http.Serve(httpListener, httpHandler)
189 // The gRPC listener mounts the Go kit gRPC server we created.
190 grpcListener, err := net.Listen("tcp", *grpcAddr)
192 logger.Log("transport", "gRPC", "during", "Listen", "err", err)
196 logger.Log("transport", "gRPC", "addr", *grpcAddr)
197 baseServer := grpc.NewServer()
198 addpb.RegisterAddServer(baseServer, grpcServer)
199 return baseServer.Serve(grpcListener)
205 // The Thrift socket mounts the Go kit Thrift server we created earlier.
206 // There's a lot of boilerplate involved here, related to configuring
207 // the protocol and transport; blame Thrift.
208 thriftSocket, err := thrift.NewTServerSocket(*thriftAddr)
210 logger.Log("transport", "Thrift", "during", "Listen", "err", err)
214 logger.Log("transport", "Thrift", "addr", *thriftAddr)
215 var protocolFactory thrift.TProtocolFactory
216 switch *thriftProtocol {
218 protocolFactory = thrift.NewTBinaryProtocolFactoryDefault()
220 protocolFactory = thrift.NewTCompactProtocolFactory()
222 protocolFactory = thrift.NewTJSONProtocolFactory()
224 protocolFactory = thrift.NewTSimpleJSONProtocolFactory()
226 return fmt.Errorf("invalid Thrift protocol %q", *thriftProtocol)
228 var transportFactory thrift.TTransportFactory
229 if *thriftBuffer > 0 {
230 transportFactory = thrift.NewTBufferedTransportFactory(*thriftBuffer)
232 transportFactory = thrift.NewTTransportFactory()
235 transportFactory = thrift.NewTFramedTransportFactory(transportFactory)
237 return thrift.NewTSimpleServer4(
238 addthrift.NewAddServiceProcessor(thriftServer),
248 // This function just sits and waits for ctrl-C.
249 cancelInterrupt := make(chan struct{})
251 c := make(chan os.Signal, 1)
252 signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
255 return fmt.Errorf("received signal %s", sig)
256 case <-cancelInterrupt:
260 close(cancelInterrupt)
263 logger.Log("exit", g.Run())
266 func usageFor(fs *flag.FlagSet, short string) func() {
268 fmt.Fprintf(os.Stderr, "USAGE\n")
269 fmt.Fprintf(os.Stderr, " %s\n", short)
270 fmt.Fprintf(os.Stderr, "\n")
271 fmt.Fprintf(os.Stderr, "FLAGS\n")
272 w := tabwriter.NewWriter(os.Stderr, 0, 2, 2, ' ', 0)
273 fs.VisitAll(func(f *flag.Flag) {
274 fmt.Fprintf(w, "\t-%s %s\t%s\n", f.Name, f.DefValue, f.Usage)
277 fmt.Fprintf(os.Stderr, "\n")