From cffe60771e5a82aff447776a76a15bd2841e6846 Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Sat, 1 Feb 2020 19:42:49 +0100 Subject: [PATCH 1/5] add simplified export pipeline setup for Jaeger --- example/jaeger/main.go | 23 +++++++--------- exporters/trace/jaeger/jaeger.go | 39 +++++++++++++++++++++++++-- exporters/trace/jaeger/jaeger_test.go | 20 +++++++------- 3 files changed, 57 insertions(+), 25 deletions(-) diff --git a/example/jaeger/main.go b/example/jaeger/main.go index 35bfc17c3c8..09f01933e03 100644 --- a/example/jaeger/main.go +++ b/example/jaeger/main.go @@ -30,8 +30,8 @@ import ( // initTracer creates a new trace provider instance and registers it as global trace provider. func initTracer() func() { - // Create Jaeger Exporter - exporter, err := jaeger.NewExporter( + // Create and install Jaeger export pipeline + tp, flush, err := jaeger.InstallNewPipeline( jaeger.WithCollectorEndpoint("http://localhost:14268/api/traces"), jaeger.WithProcess(jaeger.Process{ ServiceName: "trace-demo", @@ -45,18 +45,15 @@ func initTracer() func() { log.Fatal(err) } - // For demoing purposes, always sample. In a production application, you should - // configure this to a trace.ProbabilitySampler set at the desired - // probability. - tp, err := sdktrace.NewProvider( - sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}), - sdktrace.WithSyncer(exporter)) - if err != nil { - log.Fatal(err) - } - global.SetTraceProvider(tp) + // For demoing purposes, use trace.AlwaysSample to sample every trace. + // In a production application, you should configure this to a trace.ProbabilitySampler + // set at the desired probability. + tp.ApplyConfig(sdktrace.Config{ + DefaultSampler: sdktrace.AlwaysSample(), + }) + return func() { - exporter.Flush() + flush() } } diff --git a/exporters/trace/jaeger/jaeger.go b/exporters/trace/jaeger/jaeger.go index 597659bebec..634e4e406cf 100644 --- a/exporters/trace/jaeger/jaeger.go +++ b/exporters/trace/jaeger/jaeger.go @@ -23,8 +23,10 @@ import ( "google.golang.org/grpc/codes" "go.opentelemetry.io/otel/api/core" + "go.opentelemetry.io/otel/api/global" gen "go.opentelemetry.io/otel/exporters/trace/jaeger/internal/gen-go/jaeger" export "go.opentelemetry.io/otel/sdk/export/trace" + sdktrace "go.opentelemetry.io/otel/sdk/trace" ) const defaultServiceName = "OpenTelemetry" @@ -68,9 +70,9 @@ func WithBufferMaxCount(bufferMaxCount int) func(o *options) { } } -// NewExporter returns a trace.Exporter implementation that exports +// NewRawExporter returns a trace.Exporter implementation that exports // the collected spans to Jaeger. -func NewExporter(endpointOption EndpointOption, opts ...Option) (*Exporter, error) { +func NewRawExporter(endpointOption EndpointOption, opts ...Option) (*Exporter, error) { uploader, err := endpointOption() if err != nil { return nil, err @@ -123,6 +125,39 @@ func NewExporter(endpointOption EndpointOption, opts ...Option) (*Exporter, erro return e, nil } +// InstallNewPipeline instantiates a NewExportPipeline and registers it globally. +// Typically called as: +// pipeline, flushFn, err := stdout.InstallNewPipeline(stdout.Config{...}) +// if err != nil { +// ... +// } +// defer flushFn() +// ... Done +func InstallNewPipeline(endpointOption EndpointOption, opts ...Option) (*sdktrace.Provider, func(), error) { + provider, flushFn, err := NewExportPipeline(endpointOption, opts...) + if err != nil { + return provider, flushFn, err + } + global.SetTraceProvider(provider) + return provider, flushFn, nil +} + +// NewExportPipeline sets up a complete export pipeline +// with the recommended setup for trace provider +func NewExportPipeline(endpointOption EndpointOption, opts ...Option) (*sdktrace.Provider, func(), error) { + exporter, err := NewRawExporter(endpointOption, opts...) + if err != nil { + return nil, nil, err + } + syncer := sdktrace.WithSyncer(exporter) + tp, err := sdktrace.NewProvider(syncer) + if err != nil { + return nil, nil, err + } + + return tp, exporter.Flush, nil +} + // Process contains the information exported to jaeger about the source // of the trace data. type Process struct { diff --git a/exporters/trace/jaeger/jaeger_test.go b/exporters/trace/jaeger/jaeger_test.go index 8ac90538457..b6e58f89abc 100644 --- a/exporters/trace/jaeger/jaeger_test.go +++ b/exporters/trace/jaeger/jaeger_test.go @@ -37,7 +37,7 @@ import ( export "go.opentelemetry.io/otel/sdk/export/trace" ) -func TestNewExporter(t *testing.T) { +func TestNewRawExporter(t *testing.T) { const ( collectorEndpoint = "http://localhost" serviceName = "test-service" @@ -45,7 +45,7 @@ func TestNewExporter(t *testing.T) { tagVal = "val" ) // Create Jaeger Exporter - exp, err := NewExporter( + exp, err := NewRawExporter( WithCollectorEndpoint(collectorEndpoint), WithProcess(Process{ ServiceName: serviceName, @@ -60,8 +60,8 @@ func TestNewExporter(t *testing.T) { assert.Len(t, exp.process.Tags, 1) } -func TestNewExporterShouldFailIfCollectorEndpointEmpty(t *testing.T) { - _, err := NewExporter( +func TestNewRawExporterShouldFailIfCollectorEndpointEmpty(t *testing.T) { + _, err := NewRawExporter( WithCollectorEndpoint(""), ) @@ -92,7 +92,7 @@ func TestExporter_ExportSpan(t *testing.T) { tagVal = "val" ) // Create Jaeger Exporter - exp, err := NewExporter( + exp, err := NewRawExporter( withTestCollectorEndpoint(), WithProcess(Process{ ServiceName: serviceName, @@ -121,24 +121,24 @@ func TestExporter_ExportSpan(t *testing.T) { assert.True(t, len(tc.spansUploaded) == 1) } -func TestNewExporterWithAgentEndpoint(t *testing.T) { +func TestNewRawExporterWithAgentEndpoint(t *testing.T) { const agentEndpoint = "localhost:6831" // Create Jaeger Exporter - _, err := NewExporter( + _, err := NewRawExporter( WithAgentEndpoint(agentEndpoint), ) assert.NoError(t, err) } -func TestNewExporterWithAgentShouldFailIfEndpointInvalid(t *testing.T) { +func TestNewRawExporterWithAgentShouldFailIfEndpointInvalid(t *testing.T) { //empty - _, err := NewExporter( + _, err := NewRawExporter( WithAgentEndpoint(""), ) assert.Error(t, err) //invalid endpoint addr - _, err = NewExporter( + _, err = NewRawExporter( WithAgentEndpoint("http://localhost"), ) assert.Error(t, err) From 4e662bb9be32d069eeb40990363e6b0aeb64b49f Mon Sep 17 00:00:00 2001 From: rahulpa Date: Thu, 5 Mar 2020 21:53:43 -0800 Subject: [PATCH 2/5] add With* options to configure SDK options. --- example/jaeger/main.go | 11 +++----- exporters/trace/jaeger/jaeger.go | 46 ++++++++++++++++++++------------ 2 files changed, 32 insertions(+), 25 deletions(-) diff --git a/example/jaeger/main.go b/example/jaeger/main.go index 09f01933e03..39d00993dda 100644 --- a/example/jaeger/main.go +++ b/example/jaeger/main.go @@ -31,7 +31,7 @@ import ( // initTracer creates a new trace provider instance and registers it as global trace provider. func initTracer() func() { // Create and install Jaeger export pipeline - tp, flush, err := jaeger.InstallNewPipeline( + _, flush, err := jaeger.NewExportPipeline( jaeger.WithCollectorEndpoint("http://localhost:14268/api/traces"), jaeger.WithProcess(jaeger.Process{ ServiceName: "trace-demo", @@ -40,18 +40,13 @@ func initTracer() func() { key.Float64("float", 312.23), }, }), + jaeger.WithRegistration(), + jaeger.WithSDK(&sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}), ) if err != nil { log.Fatal(err) } - // For demoing purposes, use trace.AlwaysSample to sample every trace. - // In a production application, you should configure this to a trace.ProbabilitySampler - // set at the desired probability. - tp.ApplyConfig(sdktrace.Config{ - DefaultSampler: sdktrace.AlwaysSample(), - }) - return func() { flush() } diff --git a/exporters/trace/jaeger/jaeger.go b/exporters/trace/jaeger/jaeger.go index 634e4e406cf..e1b345a6c63 100644 --- a/exporters/trace/jaeger/jaeger.go +++ b/exporters/trace/jaeger/jaeger.go @@ -45,6 +45,12 @@ type options struct { //BufferMaxCount defines the total number of traces that can be buffered in memory BufferMaxCount int + + Config *sdktrace.Config + + // Registration is set to true if the trace provider of the new pipeline should be + // registered as Global Trace Provider + Registeration bool } // WithOnError sets the hook to be called when there is @@ -70,6 +76,21 @@ func WithBufferMaxCount(bufferMaxCount int) func(o *options) { } } +// WithSDK sets the SDK config for the exporter pipeline. +func WithSDK(config *sdktrace.Config) func(o *options) { + return func(o *options) { + o.Config = config + } +} + +// WithRegistration enables the registration of the trace provider of the new pipeline +// as Global Trace Provider. +func WithRegistration() func(o *options) { + return func(o *options) { + o.Registeration = true + } +} + // NewRawExporter returns a trace.Exporter implementation that exports // the collected spans to Jaeger. func NewRawExporter(endpointOption EndpointOption, opts ...Option) (*Exporter, error) { @@ -107,6 +128,7 @@ func NewRawExporter(endpointOption EndpointOption, opts ...Option) (*Exporter, e ServiceName: service, Tags: tags, }, + o: o, } bundler := bundler.NewBundler((*gen.Span)(nil), func(bundle interface{}) { if err := e.upload(bundle.([]*gen.Span)); err != nil { @@ -125,23 +147,6 @@ func NewRawExporter(endpointOption EndpointOption, opts ...Option) (*Exporter, e return e, nil } -// InstallNewPipeline instantiates a NewExportPipeline and registers it globally. -// Typically called as: -// pipeline, flushFn, err := stdout.InstallNewPipeline(stdout.Config{...}) -// if err != nil { -// ... -// } -// defer flushFn() -// ... Done -func InstallNewPipeline(endpointOption EndpointOption, opts ...Option) (*sdktrace.Provider, func(), error) { - provider, flushFn, err := NewExportPipeline(endpointOption, opts...) - if err != nil { - return provider, flushFn, err - } - global.SetTraceProvider(provider) - return provider, flushFn, nil -} - // NewExportPipeline sets up a complete export pipeline // with the recommended setup for trace provider func NewExportPipeline(endpointOption EndpointOption, opts ...Option) (*sdktrace.Provider, func(), error) { @@ -154,6 +159,12 @@ func NewExportPipeline(endpointOption EndpointOption, opts ...Option) (*sdktrace if err != nil { return nil, nil, err } + if exporter.o.Config != nil { + tp.ApplyConfig(*exporter.o.Config) + } + if exporter.o.Registeration { + global.SetTraceProvider(tp) + } return tp, exporter.Flush, nil } @@ -173,6 +184,7 @@ type Exporter struct { process *gen.Process bundler *bundler.Bundler uploader batchUploader + o options } var _ export.SpanSyncer = (*Exporter)(nil) From 5f3c38135100ada0393e6c0cef2ce0ba62da589e Mon Sep 17 00:00:00 2001 From: rahulpa Date: Thu, 5 Mar 2020 22:48:14 -0800 Subject: [PATCH 3/5] add test for WithRegistration and WithSDK --- exporters/trace/jaeger/jaeger_test.go | 47 +++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/exporters/trace/jaeger/jaeger_test.go b/exporters/trace/jaeger/jaeger_test.go index b6e58f89abc..c928cca9d51 100644 --- a/exporters/trace/jaeger/jaeger_test.go +++ b/exporters/trace/jaeger/jaeger_test.go @@ -37,6 +37,53 @@ import ( export "go.opentelemetry.io/otel/sdk/export/trace" ) +func TestNewExporterPipelineWithRegistration(t *testing.T) { + tp, fn, err := NewExportPipeline( + WithCollectorEndpoint("http://localhost:14268/api/traces"), + WithRegistration(), + ) + defer fn() + assert.NoError(t, err) + assert.Same(t, tp, global.TraceProvider()) +} + +func TestNewExporterPipelineWithoutRegistration(t *testing.T) { + tp, fn, err := NewExportPipeline( + WithCollectorEndpoint("http://localhost:14268/api/traces"), + ) + defer fn() + assert.NoError(t, err) + assert.NotEqual(t, tp, global.TraceProvider()) +} + +func TestNewExporterPipelineWithSDK(t *testing.T) { + tp, fn, err := NewExportPipeline( + WithCollectorEndpoint("http://localhost:14268/api/traces"), + WithSDK(&sdktrace.Config{ + DefaultSampler: sdktrace.AlwaysSample(), + }), + ) + defer fn() + assert.NoError(t, err) + _, span := tp.Tracer("jaeger test").Start(context.Background(), "always-on") + spanCtx := span.SpanContext() + assert.True(t, spanCtx.IsSampled()) + span.End() + + tp2, fn, err := NewExportPipeline( + WithCollectorEndpoint("http://localhost:14268/api/traces"), + WithSDK(&sdktrace.Config{ + DefaultSampler: sdktrace.NeverSample(), + }), + ) + defer fn() + assert.NoError(t, err) + _, span2 := tp2.Tracer("jaeger test").Start(context.Background(), "never") + span2Ctx := span2.SpanContext() + assert.False(t, span2Ctx.IsSampled()) + span2.End() +} + func TestNewRawExporter(t *testing.T) { const ( collectorEndpoint = "http://localhost" From bd7840a7d7dde0e5ed6db61e9ffda3c0b201e68a Mon Sep 17 00:00:00 2001 From: rahulpa Date: Fri, 6 Mar 2020 22:27:14 -0800 Subject: [PATCH 4/5] rename Registeration with RegisterGlobal --- exporters/trace/jaeger/jaeger.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/exporters/trace/jaeger/jaeger.go b/exporters/trace/jaeger/jaeger.go index e1b345a6c63..fd9a23a858d 100644 --- a/exporters/trace/jaeger/jaeger.go +++ b/exporters/trace/jaeger/jaeger.go @@ -48,9 +48,9 @@ type options struct { Config *sdktrace.Config - // Registration is set to true if the trace provider of the new pipeline should be + // RegisterGlobal is set to true if the trace provider of the new pipeline should be // registered as Global Trace Provider - Registeration bool + RegisterGlobal bool } // WithOnError sets the hook to be called when there is @@ -87,7 +87,7 @@ func WithSDK(config *sdktrace.Config) func(o *options) { // as Global Trace Provider. func WithRegistration() func(o *options) { return func(o *options) { - o.Registeration = true + o.RegisterGlobal = true } } @@ -162,7 +162,7 @@ func NewExportPipeline(endpointOption EndpointOption, opts ...Option) (*sdktrace if exporter.o.Config != nil { tp.ApplyConfig(*exporter.o.Config) } - if exporter.o.Registeration { + if exporter.o.RegisterGlobal { global.SetTraceProvider(tp) } From 7b765d192a31a2322dc1cf69db8ecf48be079043 Mon Sep 17 00:00:00 2001 From: rahulpa Date: Tue, 10 Mar 2020 11:59:43 -0700 Subject: [PATCH 5/5] rename WithRegistration to RegisterAsGlobal --- example/jaeger/main.go | 2 +- exporters/trace/jaeger/jaeger.go | 4 ++-- exporters/trace/jaeger/jaeger_test.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/example/jaeger/main.go b/example/jaeger/main.go index 39d00993dda..75848e33fbc 100644 --- a/example/jaeger/main.go +++ b/example/jaeger/main.go @@ -40,7 +40,7 @@ func initTracer() func() { key.Float64("float", 312.23), }, }), - jaeger.WithRegistration(), + jaeger.RegisterAsGlobal(), jaeger.WithSDK(&sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}), ) if err != nil { diff --git a/exporters/trace/jaeger/jaeger.go b/exporters/trace/jaeger/jaeger.go index fd9a23a858d..e9e1bdb8bd0 100644 --- a/exporters/trace/jaeger/jaeger.go +++ b/exporters/trace/jaeger/jaeger.go @@ -83,9 +83,9 @@ func WithSDK(config *sdktrace.Config) func(o *options) { } } -// WithRegistration enables the registration of the trace provider of the new pipeline +// RegisterAsGlobal enables the registration of the trace provider of the new pipeline // as Global Trace Provider. -func WithRegistration() func(o *options) { +func RegisterAsGlobal() func(o *options) { return func(o *options) { o.RegisterGlobal = true } diff --git a/exporters/trace/jaeger/jaeger_test.go b/exporters/trace/jaeger/jaeger_test.go index c928cca9d51..39568db1e4e 100644 --- a/exporters/trace/jaeger/jaeger_test.go +++ b/exporters/trace/jaeger/jaeger_test.go @@ -40,7 +40,7 @@ import ( func TestNewExporterPipelineWithRegistration(t *testing.T) { tp, fn, err := NewExportPipeline( WithCollectorEndpoint("http://localhost:14268/api/traces"), - WithRegistration(), + RegisterAsGlobal(), ) defer fn() assert.NoError(t, err)