Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[tracing] add simplified export pipeline setup for Jaeger #459

Merged
merged 6 commits into from
Mar 10, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 5 additions & 13 deletions example/jaeger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ import (

// initTracer creates a new trace provider instance and registers it as global trace provider.
func initTracer() func() {
// Create Jaeger Exporter
exporter, err := jaeger.NewExporter(
// Create and install Jaeger export pipeline
_, flush, err := jaeger.NewExportPipeline(
jaeger.WithCollectorEndpoint("http://localhost:14268/api/traces"),
jaeger.WithProcess(jaeger.Process{
ServiceName: "trace-demo",
Expand All @@ -40,23 +40,15 @@ func initTracer() func() {
key.Float64("float", 312.23),
},
}),
jaeger.RegisterAsGlobal(),
jaeger.WithSDK(&sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
)
if err != nil {
log.Fatal(err)
}

// For demoing purposes, always sample. In a production application, you should
// configure this to a trace.ProbabilitySampler set at the desired
// probability.
tp, err := sdktrace.NewProvider(
sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
sdktrace.WithSyncer(exporter))
if err != nil {
log.Fatal(err)
}
global.SetTraceProvider(tp)
return func() {
exporter.Flush()
flush()
}
}

Expand Down
51 changes: 49 additions & 2 deletions exporters/trace/jaeger/jaeger.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import (
"google.golang.org/grpc/codes"

"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/global"
gen "go.opentelemetry.io/otel/exporters/trace/jaeger/internal/gen-go/jaeger"
export "go.opentelemetry.io/otel/sdk/export/trace"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)

const defaultServiceName = "OpenTelemetry"
Expand All @@ -43,6 +45,12 @@ type options struct {

//BufferMaxCount defines the total number of traces that can be buffered in memory
BufferMaxCount int

Config *sdktrace.Config

// RegisterGlobal is set to true if the trace provider of the new pipeline should be
// registered as Global Trace Provider
RegisterGlobal bool
}

// WithOnError sets the hook to be called when there is
Expand All @@ -68,9 +76,24 @@ func WithBufferMaxCount(bufferMaxCount int) func(o *options) {
}
}

// NewExporter returns a trace.Exporter implementation that exports
// WithSDK sets the SDK config for the exporter pipeline.
func WithSDK(config *sdktrace.Config) func(o *options) {
return func(o *options) {
o.Config = config
}
}

// RegisterAsGlobal enables the registration of the trace provider of the new pipeline
// as Global Trace Provider.
func RegisterAsGlobal() func(o *options) {
return func(o *options) {
o.RegisterGlobal = true
}
}

// NewRawExporter returns a trace.Exporter implementation that exports
// the collected spans to Jaeger.
func NewExporter(endpointOption EndpointOption, opts ...Option) (*Exporter, error) {
func NewRawExporter(endpointOption EndpointOption, opts ...Option) (*Exporter, error) {
uploader, err := endpointOption()
if err != nil {
return nil, err
Expand Down Expand Up @@ -105,6 +128,7 @@ func NewExporter(endpointOption EndpointOption, opts ...Option) (*Exporter, erro
ServiceName: service,
Tags: tags,
},
o: o,
}
bundler := bundler.NewBundler((*gen.Span)(nil), func(bundle interface{}) {
if err := e.upload(bundle.([]*gen.Span)); err != nil {
Expand All @@ -123,6 +147,28 @@ func NewExporter(endpointOption EndpointOption, opts ...Option) (*Exporter, erro
return e, nil
}

// NewExportPipeline sets up a complete export pipeline
// with the recommended setup for trace provider
func NewExportPipeline(endpointOption EndpointOption, opts ...Option) (*sdktrace.Provider, func(), error) {
exporter, err := NewRawExporter(endpointOption, opts...)
if err != nil {
return nil, nil, err
}
syncer := sdktrace.WithSyncer(exporter)
tp, err := sdktrace.NewProvider(syncer)
if err != nil {
return nil, nil, err
}
if exporter.o.Config != nil {
tp.ApplyConfig(*exporter.o.Config)
}
if exporter.o.RegisterGlobal {
global.SetTraceProvider(tp)
}

return tp, exporter.Flush, nil
}

// Process contains the information exported to jaeger about the source
// of the trace data.
type Process struct {
Expand All @@ -138,6 +184,7 @@ type Exporter struct {
process *gen.Process
bundler *bundler.Bundler
uploader batchUploader
o options
}

var _ export.SpanSyncer = (*Exporter)(nil)
Expand Down
67 changes: 57 additions & 10 deletions exporters/trace/jaeger/jaeger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,62 @@ import (
export "go.opentelemetry.io/otel/sdk/export/trace"
)

func TestNewExporter(t *testing.T) {
func TestNewExporterPipelineWithRegistration(t *testing.T) {
tp, fn, err := NewExportPipeline(
WithCollectorEndpoint("http://localhost:14268/api/traces"),
RegisterAsGlobal(),
)
defer fn()
assert.NoError(t, err)
assert.Same(t, tp, global.TraceProvider())
}

func TestNewExporterPipelineWithoutRegistration(t *testing.T) {
tp, fn, err := NewExportPipeline(
WithCollectorEndpoint("http://localhost:14268/api/traces"),
)
defer fn()
assert.NoError(t, err)
assert.NotEqual(t, tp, global.TraceProvider())
}

func TestNewExporterPipelineWithSDK(t *testing.T) {
tp, fn, err := NewExportPipeline(
WithCollectorEndpoint("http://localhost:14268/api/traces"),
WithSDK(&sdktrace.Config{
DefaultSampler: sdktrace.AlwaysSample(),
}),
)
defer fn()
assert.NoError(t, err)
_, span := tp.Tracer("jaeger test").Start(context.Background(), "always-on")
spanCtx := span.SpanContext()
assert.True(t, spanCtx.IsSampled())
span.End()

tp2, fn, err := NewExportPipeline(
WithCollectorEndpoint("http://localhost:14268/api/traces"),
WithSDK(&sdktrace.Config{
DefaultSampler: sdktrace.NeverSample(),
}),
)
defer fn()
assert.NoError(t, err)
_, span2 := tp2.Tracer("jaeger test").Start(context.Background(), "never")
span2Ctx := span2.SpanContext()
assert.False(t, span2Ctx.IsSampled())
span2.End()
}

func TestNewRawExporter(t *testing.T) {
const (
collectorEndpoint = "http://localhost"
serviceName = "test-service"
tagKey = "key"
tagVal = "val"
)
// Create Jaeger Exporter
exp, err := NewExporter(
exp, err := NewRawExporter(
WithCollectorEndpoint(collectorEndpoint),
WithProcess(Process{
ServiceName: serviceName,
Expand All @@ -60,8 +107,8 @@ func TestNewExporter(t *testing.T) {
assert.Len(t, exp.process.Tags, 1)
}

func TestNewExporterShouldFailIfCollectorEndpointEmpty(t *testing.T) {
_, err := NewExporter(
func TestNewRawExporterShouldFailIfCollectorEndpointEmpty(t *testing.T) {
_, err := NewRawExporter(
WithCollectorEndpoint(""),
)

Expand Down Expand Up @@ -92,7 +139,7 @@ func TestExporter_ExportSpan(t *testing.T) {
tagVal = "val"
)
// Create Jaeger Exporter
exp, err := NewExporter(
exp, err := NewRawExporter(
withTestCollectorEndpoint(),
WithProcess(Process{
ServiceName: serviceName,
Expand Down Expand Up @@ -121,24 +168,24 @@ func TestExporter_ExportSpan(t *testing.T) {
assert.True(t, len(tc.spansUploaded) == 1)
}

func TestNewExporterWithAgentEndpoint(t *testing.T) {
func TestNewRawExporterWithAgentEndpoint(t *testing.T) {
const agentEndpoint = "localhost:6831"
// Create Jaeger Exporter
_, err := NewExporter(
_, err := NewRawExporter(
WithAgentEndpoint(agentEndpoint),
)
assert.NoError(t, err)
}

func TestNewExporterWithAgentShouldFailIfEndpointInvalid(t *testing.T) {
func TestNewRawExporterWithAgentShouldFailIfEndpointInvalid(t *testing.T) {
//empty
_, err := NewExporter(
_, err := NewRawExporter(
WithAgentEndpoint(""),
)
assert.Error(t, err)

//invalid endpoint addr
_, err = NewExporter(
_, err = NewRawExporter(
WithAgentEndpoint("http://localhost"),
)
assert.Error(t, err)
Expand Down