Skip to content

Commit

Permalink
Support pluggable injectors/extrators; implement Zipkin propagator (j…
Browse files Browse the repository at this point in the history
  • Loading branch information
yurishkuro authored Jun 24, 2016
1 parent 9ed65b0 commit fbffee0
Show file tree
Hide file tree
Showing 5 changed files with 217 additions and 41 deletions.
68 changes: 27 additions & 41 deletions tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,8 @@ type tracer struct {
poolSpans bool
spanPool sync.Pool

textPropagator *textMapPropagator
binaryPropagator *binaryPropagator
interopPropagator *jaegerTraceContextPropagator
injectors map[interface{}]Injector
extractors map[interface{}]Extractor
}

// NewTracer creates Tracer implementation that reports tracing to Jaeger.
Expand All @@ -63,12 +62,31 @@ func NewTracer(
serviceName: serviceName,
sampler: sampler,
reporter: reporter,
injectors: make(map[interface{}]Injector),
extractors: make(map[interface{}]Extractor),
metrics: *NewMetrics(NullStatsReporter, nil),
spanPool: sync.Pool{New: func() interface{} {
return &span{}
}},
}

// register default injectors/extractors
textPropagator := newTextMapPropagator(t)
t.injectors[opentracing.TextMap] = textPropagator
t.extractors[opentracing.TextMap] = textPropagator

binaryPropagator := newBinaryPropagator(t)
t.injectors[opentracing.Binary] = binaryPropagator
t.extractors[opentracing.Binary] = binaryPropagator

interopPropagator := &jaegerTraceContextPropagator{tracer: t}
t.injectors[TraceContextFormat] = interopPropagator
t.extractors[TraceContextFormat] = interopPropagator

zipkinPropagator := &zipkinPropagator{tracer: t}
t.injectors[ZipkinSpanFormat] = zipkinPropagator
t.extractors[ZipkinSpanFormat] = zipkinPropagator

for _, option := range options {
option(t)
}
Expand All @@ -88,10 +106,6 @@ func NewTracer(
t.hostIPv4 = localIPInt32
}

// TODO convert to a map of propagators
t.textPropagator = newTextMapPropagator(t)
t.binaryPropagator = newBinaryPropagator(t)
t.interopPropagator = &jaegerTraceContextPropagator{tracer: t}
return t, t
}

Expand Down Expand Up @@ -145,24 +159,10 @@ func (t *tracer) StartSpanWithOptions(options opentracing.StartSpanOptions) open

// Inject implements Inject() method of opentracing.Tracer
func (t *tracer) Inject(sp opentracing.Span, format interface{}, carrier interface{}) error {
injector := t.injector(format)
if injector == nil {
return opentracing.ErrUnsupportedFormat
}
return injector.InjectSpan(sp, carrier)
}

func (t *tracer) injector(format interface{}) Injector {
if format == opentracing.TextMap {
return t.textPropagator
}
if format == opentracing.Binary {
return t.binaryPropagator
if injector, ok := t.injectors[format]; ok {
return injector.InjectSpan(sp, carrier)
}
if format == TraceContextFormat {
return t.interopPropagator
}
return nil
return opentracing.ErrUnsupportedFormat
}

// Join implements Join() method of opentracing.Tracer
Expand All @@ -171,24 +171,10 @@ func (t *tracer) Join(
format interface{},
carrier interface{},
) (opentracing.Span, error) {
extractor := t.extractor(format)
if extractor == nil {
return nil, opentracing.ErrUnsupportedFormat
}
return extractor.Join(operationName, carrier)
}

func (t *tracer) extractor(format interface{}) Extractor {
if format == opentracing.TextMap {
return t.textPropagator
}
if format == opentracing.Binary {
return t.binaryPropagator
if extractor, ok := t.extractors[format]; ok {
return extractor.Join(operationName, carrier)
}
if format == TraceContextFormat {
return t.interopPropagator
}
return nil
return nil, opentracing.ErrUnsupportedFormat
}

// Close releases all resources used by the Tracer and flushes any remaining buffered spans.
Expand Down
12 changes: 12 additions & 0 deletions tracer_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,15 @@ func (tracerOptions) HostIPv4(hostIPv4 uint32) TracerOption {
tracer.hostIPv4 = hostIPv4
}
}

func (tracerOptions) Injector(format interface{}, injector Injector) TracerOption {
return func(tracer *tracer) {
tracer.injectors[format] = injector
}
}

func (tracerOptions) Extractor(format interface{}, extractor Extractor) TracerOption {
return func(tracer *tracer) {
tracer.extractors[format] = extractor
}
}
51 changes: 51 additions & 0 deletions tracer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"

"github.com/uber/jaeger-client-go/utils"
Expand Down Expand Up @@ -155,3 +156,53 @@ func (s *tracerSuite) TestRandomIDNotZero() {
rng := utils.NewRand(0)
rng.Seed(1) // for test coverage
}

func TestInjectorExtractorOptions(t *testing.T) {
tracer, tc := NewTracer("x", NewConstSampler(true), NewNullReporter(),
TracerOptions.Injector("dummy", &dummyPropagator{}),
TracerOptions.Extractor("dummy", &dummyPropagator{}),
)
defer tc.Close()

sp := tracer.StartSpan("x")
c := &dummyCarrier{}
err := tracer.Inject(sp, "dummy", []int{})
assert.Equal(t, opentracing.ErrInvalidCarrier, err)
err = tracer.Inject(sp, "dummy", c)
assert.NoError(t, err)
assert.True(t, c.ok)

c.ok = false
_, err = tracer.Join("z", "dummy", []int{})
assert.Equal(t, opentracing.ErrInvalidCarrier, err)
_, err = tracer.Join("z", "dummy", c)
assert.Equal(t, opentracing.ErrTraceNotFound, err)
c.ok = true
_, err = tracer.Join("z", "dummy", c)
assert.NoError(t, err)
}

type dummyPropagator struct{}
type dummyCarrier struct {
ok bool
}

func (p *dummyPropagator) InjectSpan(span opentracing.Span, carrier interface{}) error {
c, ok := carrier.(*dummyCarrier)
if !ok {
return opentracing.ErrInvalidCarrier
}
c.ok = true
return nil
}

func (p *dummyPropagator) Join(operationName string, carrier interface{}) (opentracing.Span, error) {
c, ok := carrier.(*dummyCarrier)
if !ok {
return nil, opentracing.ErrInvalidCarrier
}
if c.ok {
return nil, nil
}
return nil, opentracing.ErrTraceNotFound
}
75 changes: 75 additions & 0 deletions zipkin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package jaeger

import (
"github.com/opentracing/opentracing-go"
)

// ZipkinSpanFormat is an OpenTracing carrier format constant
const ZipkinSpanFormat = "zipkin-span-format"

// ZipkinSpan is a type of Carrier used for integration with Zipkin-aware RPC frameworks
// (like TChannel). It does not support baggage, only trace IDs.
type ZipkinSpan interface {
TraceID() uint64
SpanID() uint64
ParentID() uint64
Flags() byte
SetTraceID(traceID uint64)
SetSpanID(spanID uint64)
SetParentID(parentID uint64)
SetFlags(flags byte)
}

type zipkinPropagator struct {
tracer *tracer
}

func (p *zipkinPropagator) InjectSpan(
sp opentracing.Span,
abstractCarrier interface{},
) error {
sc, ok := sp.(*span)
if !ok {
return opentracing.ErrInvalidSpan
}
carrier, ok := abstractCarrier.(ZipkinSpan)
if !ok {
return opentracing.ErrInvalidCarrier
}

sc.RLock()
defer sc.RUnlock()

carrier.SetTraceID(sc.TraceContext.TraceID())
carrier.SetSpanID(sc.TraceContext.SpanID())
carrier.SetParentID(sc.TraceContext.ParentID())
carrier.SetFlags(sc.TraceContext.flags)

return nil
}

func (p *zipkinPropagator) Join(
operationName string,
abstractCarrier interface{},
) (opentracing.Span, error) {
carrier, ok := abstractCarrier.(ZipkinSpan)
if !ok {
return nil, opentracing.ErrInvalidCarrier
}
if carrier.TraceID() == 0 {
return nil, opentracing.ErrTraceNotFound
}
sp := p.tracer.newSpan()
sp.TraceContext.traceID = carrier.TraceID()
sp.TraceContext.spanID = carrier.SpanID()
sp.TraceContext.parentID = carrier.ParentID()
sp.TraceContext.flags = carrier.Flags()

return p.tracer.startSpanInternal(
sp,
operationName,
p.tracer.timeNow(),
nil,
true, // join with external trace
), nil
}
52 changes: 52 additions & 0 deletions zipkin_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package jaeger

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestZipkinPropagator(t *testing.T) {
tracer, tCloser := NewTracer("x", NewConstSampler(true), NewNullReporter())
defer tCloser.Close()

carrier := &TestZipkinSpan{}
sp := tracer.StartSpan("y")

// Note: we intentionally use string as format, as that's what TChannel would need to do
if err := tracer.Inject(sp, "zipkin-span-format", carrier); err != nil {
t.Fatalf("Inject failed: %+v", err)
}
sp1 := sp.(*span)
assert.Equal(t, sp1.traceID, carrier.traceID)
assert.Equal(t, sp1.spanID, carrier.spanID)
assert.Equal(t, sp1.parentID, carrier.parentID)
assert.Equal(t, sp1.flags, carrier.flags)

sp2, err := tracer.Join("z", "zipkin-span-format", carrier)
if err != nil {
t.Fatalf("Extract failed: %+v", err)
}
sp3 := sp2.(*span)
assert.Equal(t, sp1.traceID, sp3.traceID)
assert.Equal(t, sp1.spanID, sp3.spanID)
assert.Equal(t, sp1.parentID, sp3.parentID)
assert.Equal(t, sp1.flags, sp3.flags)
}

// TestZipkinSpan is a mock-up of TChannel's internal Span struct
type TestZipkinSpan struct {
traceID uint64
parentID uint64
spanID uint64
flags byte
}

func (s TestZipkinSpan) TraceID() uint64 { return s.traceID }
func (s TestZipkinSpan) ParentID() uint64 { return s.parentID }
func (s TestZipkinSpan) SpanID() uint64 { return s.spanID }
func (s TestZipkinSpan) Flags() byte { return s.flags }
func (s *TestZipkinSpan) SetTraceID(traceID uint64) { s.traceID = traceID }
func (s *TestZipkinSpan) SetSpanID(spanID uint64) { s.spanID = spanID }
func (s *TestZipkinSpan) SetParentID(parentID uint64) { s.parentID = parentID }
func (s *TestZipkinSpan) SetFlags(flags byte) { s.flags = flags }

0 comments on commit fbffee0

Please sign in to comment.