From 60200911139e16016b324518244678d4763ee8c7 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Sat, 9 Jul 2016 14:34:48 -0400 Subject: [PATCH] Upgrade to OpenTracing API with SpanContext (#20) --- Makefile | 7 +- README.md | 2 +- config/config.go | 3 +- context.go | 97 +++++++++++++++++++++----- context_test.go | 2 +- crossdock/common/json.go | 6 +- crossdock/server/server.go | 12 ++-- crossdock/server/tchannel.go | 22 +++--- crossdock/server/trace.go | 13 ++-- glide.lock | 6 +- glide.yaml | 2 +- interop.go | 70 ++++--------------- propagation.go | 126 ++++++++++++---------------------- propagation_test.go | 83 +++++++++++----------- reporter_test.go | 12 ++-- span.go | 85 ++++++++--------------- span_test.go | 8 +-- thrift_span.go | 10 +-- thrift_span_test.go | 2 +- tracer.go | 129 ++++++++++++++++++++++++----------- tracer_options.go | 7 ++ tracer_test.go | 61 ++++++++++------- zipkin.go | 47 +++++-------- zipkin_test.go | 22 +++--- 24 files changed, 417 insertions(+), 417 deletions(-) diff --git a/Makefile b/Makefile index 40c3ba70..4403109f 100644 --- a/Makefile +++ b/Makefile @@ -23,10 +23,13 @@ THRIFT=docker run -v "${PWD}:/data" $(THRIFT_IMG) thrift THRIFT_GO_ARGS=thrift_import="github.com/apache/thrift/lib/go/thrift" THRIFT_GEN_DIR=thrift-gen +PASS=$(shell printf "\033[32mPASS\033[0m") +FAIL=$(shell printf "\033[31mFAIL\033[0m") +COLORIZE=sed ''/PASS/s//$(PASS)/'' | sed ''/FAIL/s//$(FAIL)/'' + .PHONY: test test: - $(GOTEST) $(PACKAGES) - + $(GOTEST) $(PACKAGES) | $(COLORIZE) .PHONY: fmt fmt: diff --git a/README.md b/README.md index 20859cdb..99d58f7c 100644 --- a/README.md +++ b/README.md @@ -66,7 +66,7 @@ this interface can be set on the `Config` object before calling the Since this tracer is fully compliant with OpenTracing API 1.0, all code instrumentation should only use the API itself, as described -in [opentracing-go] +in the [opentracing-go] (https://github.com/opentracing/opentracing-go) documentation. ## Features diff --git a/config/config.go b/config/config.go index 345faa10..33786045 100644 --- a/config/config.go +++ b/config/config.go @@ -123,7 +123,8 @@ func (c Configuration) New( serviceName, sampler, reporter, - jaeger.TracerOptions.Metrics(metrics)) + jaeger.TracerOptions.Metrics(metrics), + jaeger.TracerOptions.Logger(c.Logger)) return tracer, closer, nil } diff --git a/context.go b/context.go index 71ab97e7..7ba0de74 100644 --- a/context.go +++ b/context.go @@ -25,6 +25,9 @@ import ( "fmt" "strconv" "strings" + "sync" + + "github.com/opentracing/opentracing-go" ) const ( @@ -37,8 +40,10 @@ var ( errMalformedTracerStateString = errors.New("String does not match tracer state format") ) -// TraceContext represents propagated span identity and state -type TraceContext struct { +// SpanContext represents propagated span identity and state +type SpanContext struct { + sync.RWMutex + // traceID represents globally unique ID of the trace. // Usually generated as a random number. traceID uint64 @@ -53,70 +58,126 @@ type TraceContext struct { // flags is a bitmap containing such bits as 'sampled' and 'debug'. flags byte + + // Distributed Context baggage + baggage map[string]string +} + +// SetBaggageItem implements SetBaggageItem() of opentracing.SpanContext +func (c *SpanContext) SetBaggageItem(key, value string) opentracing.SpanContext { + key = normalizeBaggageKey(key) + c.Lock() + defer c.Unlock() + if c.baggage == nil { + c.baggage = make(map[string]string) + } + c.baggage[key] = value + return c +} + +// BaggageItem implements BaggageItem() of opentracing.SpanContext +func (c *SpanContext) BaggageItem(key string) string { + key = normalizeBaggageKey(key) + c.RLock() + defer c.RUnlock() + return c.baggage[key] +} + +// ForeachBaggageItem implements ForeachBaggageItem() of opentracing.SpanContext +func (c *SpanContext) ForeachBaggageItem(handler func(k, v string) bool) { + c.RLock() + defer c.RUnlock() + for k, v := range c.baggage { + if !handler(k, v) { + break + } + } } // IsSampled returns whether this trace was chosen for permanent storage // by the sampling mechanism of the tracer. -func (c *TraceContext) IsSampled() bool { +func (c *SpanContext) IsSampled() bool { return (c.flags & flagSampled) == flagSampled } -func (c *TraceContext) String() string { +func (c *SpanContext) String() string { return fmt.Sprintf("%x:%x:%x:%x", c.traceID, c.spanID, c.parentID, c.flags) } // ContextFromString reconstructs the Context encoded in a string -func ContextFromString(value string) (TraceContext, error) { - var context TraceContext +func ContextFromString(value string) (*SpanContext, error) { + var context = new(SpanContext) if value == "" { - return context, errEmptyTracerStateString + return nil, errEmptyTracerStateString } parts := strings.Split(value, ":") if len(parts) != 4 { - return context, errMalformedTracerStateString + return nil, errMalformedTracerStateString } var err error if context.traceID, err = strconv.ParseUint(parts[0], 16, 64); err != nil { - return context, err + return nil, err } if context.spanID, err = strconv.ParseUint(parts[1], 16, 64); err != nil { - return context, err + return nil, err } if context.parentID, err = strconv.ParseUint(parts[2], 16, 64); err != nil { - return context, err + return nil, err } flags, err := strconv.ParseUint(parts[3], 10, 8) if err != nil { - return context, err + return nil, err } context.flags = byte(flags) return context, nil } // TraceID implements TraceID() of SpanID -func (c TraceContext) TraceID() uint64 { +func (c *SpanContext) TraceID() uint64 { return c.traceID } // SpanID implements SpanID() of SpanID -func (c TraceContext) SpanID() uint64 { +func (c *SpanContext) SpanID() uint64 { return c.spanID } // ParentID implements ParentID() of SpanID -func (c TraceContext) ParentID() uint64 { +func (c *SpanContext) ParentID() uint64 { return c.parentID } -// NewTraceContext creates a new instance of TraceContext -func NewTraceContext(traceID, spanID, parentID uint64, sampled bool) *TraceContext { +// NewSpanContext creates a new instance of SpanContext +func NewSpanContext(traceID, spanID, parentID uint64, sampled bool) *SpanContext { flags := byte(0) if sampled { flags = flagSampled } - return &TraceContext{ + return &SpanContext{ traceID: traceID, spanID: spanID, parentID: parentID, flags: flags} } + +// CopyFrom copies data from ctx into this context, including span identity and baggage. +func (c *SpanContext) CopyFrom(ctx *SpanContext) { + c.Lock() + defer c.Unlock() + + ctx.RLock() + defer ctx.RUnlock() + + c.traceID = ctx.traceID + c.spanID = ctx.spanID + c.parentID = ctx.parentID + c.flags = ctx.flags + if l := len(ctx.baggage); l > 0 { + c.baggage = make(map[string]string, l) + for k, v := range ctx.baggage { + c.baggage[k] = v + } + } else { + c.baggage = nil + } +} diff --git a/context_test.go b/context_test.go index 2179b026..6bf0be36 100644 --- a/context_test.go +++ b/context_test.go @@ -27,7 +27,7 @@ func TestContextFromString(t *testing.T) { assert.EqualValues(t, 1, ctx.spanID) assert.EqualValues(t, 1, ctx.parentID) assert.EqualValues(t, 1, ctx.flags) - ctx = *NewTraceContext(1, 1, 1, true) + ctx = NewSpanContext(1, 1, 1, true) assert.EqualValues(t, 1, ctx.traceID) assert.EqualValues(t, 1, ctx.spanID) assert.EqualValues(t, 1, ctx.parentID) diff --git a/crossdock/common/json.go b/crossdock/common/json.go index 8d911080..f10538d1 100644 --- a/crossdock/common/json.go +++ b/crossdock/common/json.go @@ -71,9 +71,9 @@ func injectSpan(ctx context.Context, req *http.Request) (opentracing.Span, error if span == nil { return nil, nil } - span = opentracing.StartChildSpan(span, "post") - ext.SpanKind.Set(span, ext.SpanKindRPCClient) + span = span.Tracer().StartSpan("post", opentracing.ChildOf(span.Context())) + ext.SpanKindRPCClient.Set(span) c := opentracing.HTTPHeaderTextMapCarrier(req.Header) - err := span.Tracer().Inject(span, opentracing.TextMap, c) + err := span.Tracer().Inject(span.Context(), opentracing.TextMap, c) return span, err } diff --git a/crossdock/server/server.go b/crossdock/server/server.go index 7e5499f4..dede31f6 100644 --- a/crossdock/server/server.go +++ b/crossdock/server/server.go @@ -126,18 +126,14 @@ func (s *Server) handleJSON( newReq func() interface{}, handle func(ctx context.Context, req interface{}) (interface{}, error), ) { - span, err := s.Tracer.Join("post", opentracing.TextMap, opentracing.HTTPHeaderTextMapCarrier(r.Header)) - if err != nil && err != opentracing.ErrTraceNotFound { + spanCtx, err := s.Tracer.Extract(opentracing.TextMap, opentracing.HTTPHeaderTextMapCarrier(r.Header)) + if err != nil && err != opentracing.ErrSpanContextNotFound { http.Error(w, fmt.Sprintf("Cannot read request body: %+v", err), http.StatusBadRequest) return } - if span != nil { - ext.SpanKind.Set(span, ext.SpanKindRPCServer) - } + span := s.Tracer.StartSpan("post", ext.RPCServerOption(spanCtx)) ctx := opentracing.ContextWithSpan(context.Background(), span) - if span != nil { - defer span.Finish() - } + defer span.Finish() body, err := ioutil.ReadAll(r.Body) if err != nil { diff --git a/crossdock/server/tchannel.go b/crossdock/server/tchannel.go index 8c01d368..5105b331 100644 --- a/crossdock/server/tchannel.go +++ b/crossdock/server/tchannel.go @@ -26,6 +26,7 @@ import ( "time" "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" "github.com/uber/tchannel-go" "github.com/uber/tchannel-go/thrift" "golang.org/x/net/context" @@ -115,15 +116,15 @@ func convertOpenTracingSpan(ctx context.Context, builder *tchannel.ContextBuilde if span == nil { return } - carrier := &jaeger.TraceContextCarrier{} - if err := span.Tracer().Inject(span, jaeger.TraceContextFormat, carrier); err != nil { + sc := new(jaeger.SpanContext) + if err := span.Tracer().Inject(span.Context(), jaeger.TraceContextFormat, sc); err != nil { return } - sc := carrier.TraceContext builder.SetExternalSpan(sc.TraceID(), sc.SpanID(), sc.ParentID(), sc.IsSampled()) - for k, v := range carrier.Baggage { + sc.ForeachBaggageItem(func(k, v string) bool { builder.AddHeader(k, v) - } + return true + }) } // setupOpenTracingContext extracts a TChannel tracing Span from the context, converts @@ -143,15 +144,16 @@ func setupOpenTracingContext(tracer opentracing.Tracer, ctx context.Context, met tSpan := tchannel.CurrentSpan(ctx) if tSpan != nil { // populate a fake carrier and try to create OpenTracing Span - carrier := &jaeger.TraceContextCarrier{Baggage: headers} - carrier.TraceContext = *jaeger.NewTraceContext( + sc := jaeger.NewSpanContext( tSpan.TraceID(), tSpan.SpanID(), tSpan.ParentID(), tSpan.TracingEnabled()) + for k, v := range headers { + sc.SetBaggageItem(k, v) + } if tracer == nil { tracer = opentracing.GlobalTracer() } - if span, err := tracer.Join(method, jaeger.TraceContextFormat, carrier); err == nil { - ctx = opentracing.ContextWithSpan(ctx, span) - } + span := tracer.StartSpan(method, ext.RPCServerOption(sc)) + ctx = opentracing.ContextWithSpan(ctx, span) } return thrift.WithHeaders(ctx, headers) } diff --git a/crossdock/server/trace.go b/crossdock/server/trace.go index 4e3c8572..f855f537 100644 --- a/crossdock/server/trace.go +++ b/crossdock/server/trace.go @@ -38,7 +38,7 @@ func (s *Server) doStartTrace(req *tracetest.StartTraceRequest) (*tracetest.Trac if req.Sampled { ext.SamplingPriority.Set(span, 1) } - span.SetBaggageItem(BaggageKey, req.Baggage) + span.Context().SetBaggageItem(BaggageKey, req.Baggage) defer span.Finish() ctx := opentracing.ContextWithSpan(context.Background(), span) @@ -98,13 +98,10 @@ func observeSpan(ctx context.Context, tracer opentracing.Tracer) (*tracetest.Obs if span == nil { return nil, errNoSpanObserved } - c := jaeger.TraceContextCarrier{} - if err := tracer.Inject(span, jaeger.TraceContextFormat, &c); err != nil { - return nil, err - } + sc := span.Context().(*jaeger.SpanContext) observedSpan := tracetest.NewObservedSpan() - observedSpan.TraceId = fmt.Sprintf("%x", c.TraceContext.TraceID()) - observedSpan.Sampled = c.TraceContext.IsSampled() - observedSpan.Baggage = span.BaggageItem(BaggageKey) + observedSpan.TraceId = fmt.Sprintf("%x", sc.TraceID()) + observedSpan.Sampled = sc.IsSampled() + observedSpan.Baggage = sc.BaggageItem(BaggageKey) return observedSpan, nil } diff --git a/glide.lock b/glide.lock index cb4fb1c2..c4d26097 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: a491069b174ad3476c71f7c2fcb693b1c9357af31910266a821a15851cdfea01 -updated: 2016-07-07T20:04:42.16621302-04:00 +hash: 6357a0a7292615576eda928aceaf21a65270ee320e482ae4860e6faa8a1616ec +updated: 2016-07-09T14:23:14.733187191-04:00 imports: - name: github.com/apache/thrift version: 23d6746079d7b5fdb38214387c63f987e68a6d8f @@ -12,7 +12,7 @@ imports: subpackages: - spew - name: github.com/opentracing/opentracing-go - version: d5b9be1fcf7d467664d3b8f9cb4f3c8c5ac0a753 + version: 4281ca32dbd11726bca3581fc06a9bc94365ec85 subpackages: - ext - name: github.com/pmezard/go-difflib diff --git a/glide.yaml b/glide.yaml index d28ea306..cd0fc65e 100644 --- a/glide.yaml +++ b/glide.yaml @@ -5,7 +5,7 @@ import: subpackages: - lib/go/thrift - package: github.com/opentracing/opentracing-go - version: d5b9be1fcf7d467664d3b8f9cb4f3c8c5ac0a753 + version: 4281ca32dbd11726bca3581fc06a9bc94365ec85 subpackages: - ext - package: golang.org/x/net diff --git a/interop.go b/interop.go index 8fbb7f20..bbe64e04 100644 --- a/interop.go +++ b/interop.go @@ -22,84 +22,40 @@ package jaeger import ( "github.com/opentracing/opentracing-go" - "strings" ) +// TODO this file should not be needed after TChannel PR. + type formatKey int -// TraceContextFormat is a constant used as OpenTracing Format. Requires TraceContextCarrier. +// TraceContextFormat is a constant used as OpenTracing Format. +// Requires *SpanContext as carrier. // This format is intended for interop with TChannel or other Zipkin-like tracers. const TraceContextFormat formatKey = iota -// TraceContextCarrier is a carrier type used with TraceContextFormat. -type TraceContextCarrier struct { - TraceContext TraceContext - Baggage map[string]string -} - type jaegerTraceContextPropagator struct { tracer *tracer } -func (p *jaegerTraceContextPropagator) InjectSpan( - sp opentracing.Span, +func (p *jaegerTraceContextPropagator) Inject( + ctx *SpanContext, abstractCarrier interface{}, ) error { - sc, ok := sp.(*span) - if !ok { - return opentracing.ErrInvalidSpan - } - carrier, ok := abstractCarrier.(*TraceContextCarrier) + carrier, ok := abstractCarrier.(*SpanContext) if !ok { return opentracing.ErrInvalidCarrier } - sc.RLock() - defer sc.RUnlock() - - carrier.TraceContext = sc.TraceContext - - if l := len(sc.baggage); l > 0 && carrier.Baggage == nil { - carrier.Baggage = make(map[string]string, l) - } - for k, v := range sc.baggage { - safeKey := encodeBaggageKeyAsHeader(k) - carrier.Baggage[safeKey] = v - } + carrier.CopyFrom(ctx) return nil } -func (p *jaegerTraceContextPropagator) Join( - operationName string, - abstractCarrier interface{}, -) (opentracing.Span, error) { - carrier, ok := abstractCarrier.(*TraceContextCarrier) +func (p *jaegerTraceContextPropagator) Extract(abstractCarrier interface{}) (*SpanContext, error) { + carrier, ok := abstractCarrier.(*SpanContext) if !ok { return nil, opentracing.ErrInvalidCarrier } - context := carrier.TraceContext - if context.traceID == 0 { - return nil, opentracing.ErrTraceNotFound - } - var baggage map[string]string - if l := len(carrier.Baggage); l > 0 { - baggage = make(map[string]string, l) - for k, v := range carrier.Baggage { - lowerCaseKey := strings.ToLower(k) - if strings.HasPrefix(lowerCaseKey, TraceBaggageHeaderPrefix) { - key := decodeBaggageHeaderKey(lowerCaseKey) - baggage[key] = v - } - } - } - sp := p.tracer.newSpan() - sp.TraceContext = context - sp.baggage = baggage - return p.tracer.startSpanInternal( - sp, - operationName, - p.tracer.timeNow(), - nil, - true, // join with external trace - ), nil + ctx := new(SpanContext) + ctx.CopyFrom(carrier) + return ctx, nil } diff --git a/propagation.go b/propagation.go index baccb76e..7fed3bb5 100644 --- a/propagation.go +++ b/propagation.go @@ -31,34 +31,28 @@ import ( opentracing "github.com/opentracing/opentracing-go" ) -// Injector is responsible for injecting Span instances in a manner suitable +// Injector is responsible for injecting SpanContext instances in a manner suitable // for propagation via a format-specific "carrier" object. Typically the // injection will take place across an RPC boundary, but message queues and // other IPC mechanisms are also reasonable places to use an Injector. type Injector interface { - // InjectSpan takes `span` and injects it into `carrier`. The actual type - // of `carrier` depends on the `format` passed to `Tracer.Injector()`. + // Inject takes `SpanContext` and injects it into `carrier`. The actual type + // of `carrier` depends on the `format` passed to `Tracer.Inject()`. // // Implementations may return opentracing.ErrInvalidCarrier or any other // implementation-specific error if injection fails. - InjectSpan(span opentracing.Span, carrier interface{}) error + Inject(ctx *SpanContext, carrier interface{}) error } -// Extractor is responsible for extracting Span instances from an +// Extractor is responsible for extracting SpanContext instances from a // format-specific "carrier" object. Typically the extraction will take place // on the server side of an RPC boundary, but message queues and other IPC // mechanisms are also reasonable places to use an Extractor. type Extractor interface { - // Join returns a Span instance with operation name `operationName` - // given `carrier`, or (nil, opentracing.ErrTraceNotFound) if no trace could be found to - // join with in the `carrier`. - // - // Implementations may return opentracing.ErrInvalidCarrier, - // opentracing.ErrTraceCorrupted, or implementation-specific errors if there - // are more fundamental problems with `carrier`. - // - // Upon success, the returned Span instance is already started. - Join(operationName string, carrier interface{}) (opentracing.Span, error) + // Extract decodes a SpanContext instance from the given `carrier`, + // or (nil, opentracing.ErrSpanContextNotFound) if no context could + // be found in the `carrier`. + Extract(carrier interface{}) (*SpanContext, error) } type textMapPropagator struct { @@ -81,14 +75,10 @@ func newBinaryPropagator(tracer *tracer) *binaryPropagator { } } -func (p *textMapPropagator) InjectSpan( - sp opentracing.Span, +func (p *textMapPropagator) Inject( + sc *SpanContext, abstractCarrier interface{}, ) error { - sc, ok := sp.(*span) - if !ok { - return opentracing.ErrInvalidSpan - } textMapWriter, ok := abstractCarrier.(opentracing.TextMapWriter) if !ok { return opentracing.ErrInvalidCarrier @@ -97,7 +87,7 @@ func (p *textMapPropagator) InjectSpan( sc.RLock() defer sc.RUnlock() - textMapWriter.Set(TracerStateHeaderName, sc.TraceContext.String()) + textMapWriter.Set(TracerStateHeaderName, sc.String()) for k, v := range sc.baggage { safeKey := encodeBaggageKeyAsHeader(k) textMapWriter.Set(safeKey, v) @@ -105,28 +95,26 @@ func (p *textMapPropagator) InjectSpan( return nil } -func (p *textMapPropagator) Join( - operationName string, - abstractCarrier interface{}, -) (opentracing.Span, error) { +func (p *textMapPropagator) Extract(abstractCarrier interface{}) (*SpanContext, error) { textMapReader, ok := abstractCarrier.(opentracing.TextMapReader) if !ok { return nil, opentracing.ErrInvalidCarrier } - var carrier TraceContextCarrier + var ctx *SpanContext + var baggage map[string]string err := textMapReader.ForeachKey(func(key, value string) error { lowerCaseKey := strings.ToLower(key) if lowerCaseKey == TracerStateHeaderName { var err error - if carrier.TraceContext, err = ContextFromString(value); err != nil { + if ctx, err = ContextFromString(value); err != nil { return err } } else if strings.HasPrefix(lowerCaseKey, TraceBaggageHeaderPrefix) { - if carrier.Baggage == nil { - carrier.Baggage = make(map[string]string) + if baggage == nil { + baggage = make(map[string]string) } dk := decodeBaggageHeaderKey(lowerCaseKey) - carrier.Baggage[dk] = value + baggage[dk] = value } return nil }) @@ -134,29 +122,17 @@ func (p *textMapPropagator) Join( p.tracer.metrics.DecodingErrors.Inc(1) return nil, err } - if carrier.TraceContext.traceID == 0 { - return nil, opentracing.ErrTraceNotFound + if ctx == nil || ctx.traceID == 0 { + return nil, opentracing.ErrSpanContextNotFound } - sp := p.tracer.newSpan() - sp.TraceContext = carrier.TraceContext - sp.baggage = carrier.Baggage - return p.tracer.startSpanInternal( - sp, - operationName, - p.tracer.timeNow(), - nil, // tags - true, // join - ), nil + ctx.baggage = baggage + return ctx, nil } -func (p *binaryPropagator) InjectSpan( - sp opentracing.Span, +func (p *binaryPropagator) Inject( + sc *SpanContext, abstractCarrier interface{}, ) error { - sc, ok := sp.(*span) - if !ok { - return opentracing.ErrInvalidSpan - } carrier, ok := abstractCarrier.(io.Writer) if !ok { return opentracing.ErrInvalidCarrier @@ -198,75 +174,63 @@ func (p *binaryPropagator) InjectSpan( return nil } -func (p *binaryPropagator) Join( - operationName string, - abstractCarrier interface{}, -) (opentracing.Span, error) { +func (p *binaryPropagator) Extract(abstractCarrier interface{}) (*SpanContext, error) { carrier, ok := abstractCarrier.(io.Reader) if !ok { return nil, opentracing.ErrInvalidCarrier } - var context TraceContext + ctx := new(SpanContext) - if err := binary.Read(carrier, binary.BigEndian, &context.traceID); err != nil { - return nil, opentracing.ErrTraceCorrupted + if err := binary.Read(carrier, binary.BigEndian, &ctx.traceID); err != nil { + return nil, opentracing.ErrSpanContextCorrupted } - if err := binary.Read(carrier, binary.BigEndian, &context.spanID); err != nil { - return nil, opentracing.ErrTraceCorrupted + if err := binary.Read(carrier, binary.BigEndian, &ctx.spanID); err != nil { + return nil, opentracing.ErrSpanContextCorrupted } - if err := binary.Read(carrier, binary.BigEndian, &context.parentID); err != nil { - return nil, opentracing.ErrTraceCorrupted + if err := binary.Read(carrier, binary.BigEndian, &ctx.parentID); err != nil { + return nil, opentracing.ErrSpanContextCorrupted } - if err := binary.Read(carrier, binary.BigEndian, &context.flags); err != nil { - return nil, opentracing.ErrTraceCorrupted + if err := binary.Read(carrier, binary.BigEndian, &ctx.flags); err != nil { + return nil, opentracing.ErrSpanContextCorrupted } // Handle the baggage items var numBaggage int32 if err := binary.Read(carrier, binary.BigEndian, &numBaggage); err != nil { - return nil, opentracing.ErrTraceCorrupted + return nil, opentracing.ErrSpanContextCorrupted } - var baggageMap map[string]string + var baggage map[string]string if iNumBaggage := int(numBaggage); iNumBaggage > 0 { - baggageMap = make(map[string]string, iNumBaggage) + baggage = make(map[string]string, iNumBaggage) buf := p.buffers.Get().(*bytes.Buffer) defer p.buffers.Put(buf) var keyLen, valLen int32 for i := 0; i < iNumBaggage; i++ { if err := binary.Read(carrier, binary.BigEndian, &keyLen); err != nil { - return nil, opentracing.ErrTraceCorrupted + return nil, opentracing.ErrSpanContextCorrupted } buf.Reset() buf.Grow(int(keyLen)) if n, err := io.CopyN(buf, carrier, int64(keyLen)); err != nil || int32(n) != keyLen { - return nil, opentracing.ErrTraceCorrupted + return nil, opentracing.ErrSpanContextCorrupted } key := buf.String() if err := binary.Read(carrier, binary.BigEndian, &valLen); err != nil { - return nil, opentracing.ErrTraceCorrupted + return nil, opentracing.ErrSpanContextCorrupted } buf.Reset() buf.Grow(int(valLen)) if n, err := io.CopyN(buf, carrier, int64(valLen)); err != nil || int32(n) != valLen { - return nil, opentracing.ErrTraceCorrupted + return nil, opentracing.ErrSpanContextCorrupted } - baggageMap[key] = buf.String() + baggage[key] = buf.String() } } - sp := p.tracer.newSpan() - sp.TraceContext = context - sp.baggage = baggageMap - - return p.tracer.startSpanInternal( - sp, - operationName, - p.tracer.timeNow(), - nil, // tags - true, // join - ), nil + ctx.baggage = baggage + return ctx, nil } // Converts a baggage item key into an http header format, diff --git a/propagation_test.go b/propagation_test.go index 8c0a74e8..f7ff2078 100644 --- a/propagation_test.go +++ b/propagation_test.go @@ -3,17 +3,16 @@ package jaeger import ( "bytes" "net/http" - "reflect" "testing" "time" "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) func TestSpanPropagator(t *testing.T) { - var err error const op = "test" reporter := NewInMemoryReporter() stats := NewInMemoryStatsCollector() @@ -22,34 +21,33 @@ func TestSpanPropagator(t *testing.T) { tmc := opentracing.HTTPHeaderTextMapCarrier(http.Header{}) tests := []struct { - format, carrier interface{} + format, carrier, formatName interface{} }{ - {TraceContextFormat, &TraceContextCarrier{}}, - {opentracing.Binary, &bytes.Buffer{}}, - {opentracing.TextMap, tmc}, + {TraceContextFormat, new(SpanContext), "TraceContextFormat"}, + {opentracing.Binary, new(bytes.Buffer), "Binary"}, + {opentracing.TextMap, tmc, "TextMap"}, } sp := tracer.StartSpan(op) - sp.SetBaggageItem("foo", "bar") - for i, test := range tests { - child := opentracing.StartChildSpan(sp, op) - if err := tracer.Inject(child, test.format, test.carrier); err != nil { - t.Fatalf("test %d, format %+v: %v", i, test.format, err) - } + sp.SetTag("x", "y") // to avoid later comparing nil vs. [] + sp.Context().SetBaggageItem("foo", "bar") + for _, test := range tests { + // starting normal child to extract its serialized context + child := tracer.StartSpan(op, opentracing.ChildOf(sp.Context())) + err := tracer.Inject(child.Context(), test.format, test.carrier) + assert.NoError(t, err) // Note: we're not finishing the above span - child, err = tracer.Join(op, test.format, test.carrier) - if err != nil { - t.Fatalf("test %d, format %+v: %v", i, test.format, err) - } + childCtx, err := tracer.Extract(test.format, test.carrier) + assert.NoError(t, err) + child = tracer.StartSpan(op, ext.RPCServerOption(childCtx)) + child.SetTag("x", "y") // to avoid later comparing nil vs. [] child.Finish() } sp.Finish() closer.Close() otSpans := reporter.GetSpans() - if a, e := len(otSpans), len(tests)+1; a != e { - t.Fatalf("expected %d spans, got %d", e, a) - } + require.Equal(t, len(tests)+1, len(otSpans), "unexpected number of spans reporter") spans := make([]*span, len(otSpans)) for i, s := range otSpans { @@ -61,32 +59,29 @@ func TestSpanPropagator(t *testing.T) { exp.duration = time.Duration(123) exp.startTime = time.Time{}.Add(1) - if exp.ParentID() != 0 { - t.Fatalf("Root span's ParentID %d is not 0", exp.ParentID()) + if exp.context.ParentID() != 0 { + t.Fatalf("Root span's ParentID %d is not 0", exp.context.ParentID()) } for i, sp := range spans { - if a, e := sp.ParentID(), exp.SpanID(); a != e { + if a, e := sp.context.ParentID(), exp.context.SpanID(); a != e { t.Fatalf("%d: ParentID %d does not match expectation %d", i, a, e) } else { // Prepare for comparison. - sp.TraceContext.spanID, sp.TraceContext.parentID = exp.SpanID(), 0 + sp.context.spanID, sp.context.parentID = exp.context.SpanID(), 0 sp.duration, sp.startTime = exp.duration, exp.startTime } - if a, e := sp.TraceID(), exp.TraceID(); a != e { - t.Fatalf("%d: TraceID changed from %d to %d", i, e, a) - } - if !reflect.DeepEqual(exp.TraceContext, sp.TraceContext) { - t.Fatalf("%d: wanted %+v, got %+v", i, exp.TraceContext, sp.TraceContext) - } - if !reflect.DeepEqual(exp.baggage, sp.baggage) { - t.Fatalf("%d: wanted %+v, got %+v", i, exp.baggage, sp.baggage) - } - // Strictly speaking, the above two checks are not necessary, but they are helpful - // for troubleshooting when something does not compare equal. - if !reflect.DeepEqual(exp, sp) { - t.Fatalf("%d: wanted %+v, got %+v", i, exp, sp) - } + assert.Equal(t, exp.context, sp.context) + assert.Equal(t, exp.tags, sp.tags) + assert.Equal(t, exp.logs, sp.logs) + assert.EqualValues(t, "server", sp.spanKind) + // Override collections to avoid tripping comparison on different pointers + sp.context = exp.context + sp.tags = exp.tags + sp.logs = exp.logs + sp.spanKind = exp.spanKind + // Compare the rest of the fields + assert.Equal(t, exp, sp) } assert.EqualValues(t, map[string]int64{ @@ -121,7 +116,7 @@ func TestDecodingError(t *testing.T) { httpHeader := http.Header{} httpHeader.Add(TracerStateHeaderName, badHeader) tmc := opentracing.HTTPHeaderTextMapCarrier(httpHeader) - _, err := tracer.Join("test", opentracing.TextMap, tmc) + _, err := tracer.Extract(opentracing.TextMap, tmc) assert.Error(t, err) assert.Equal(t, map[string]int64{"jaeger.decoding-errors": 1}, stats.GetCounterValues()) } @@ -131,16 +126,16 @@ func TestBaggagePropagationHTTP(t *testing.T) { defer closer.Close() sp1 := tracer.StartSpan("s1").(*span) - sp1.SetBaggageItem("Some_Key", "12345") - assert.Equal(t, "12345", sp1.BaggageItem("some-KEY")) - sp1.SetBaggageItem("Some_Key", "98765") - assert.Equal(t, "98765", sp1.BaggageItem("some-KEY")) + sp1.Context().SetBaggageItem("Some_Key", "12345") + assert.Equal(t, "12345", sp1.Context().BaggageItem("some-KEY")) + sp1.Context().SetBaggageItem("Some_Key", "98765") + assert.Equal(t, "98765", sp1.Context().BaggageItem("some-KEY")) h := http.Header{} - err := tracer.Inject(sp1, opentracing.TextMap, opentracing.HTTPHeaderTextMapCarrier(h)) + err := tracer.Inject(sp1.context, opentracing.TextMap, opentracing.HTTPHeaderTextMapCarrier(h)) require.NoError(t, err) - sp2, err := tracer.Join("x", opentracing.TextMap, opentracing.HTTPHeaderTextMapCarrier(h)) + sp2, err := tracer.Extract(opentracing.TextMap, opentracing.HTTPHeaderTextMapCarrier(h)) require.NoError(t, err) assert.Equal(t, "98765", sp2.BaggageItem("some-KEY")) } diff --git a/reporter_test.go b/reporter_test.go index c6c77005..305a9e47 100644 --- a/reporter_test.go +++ b/reporter_test.go @@ -93,7 +93,7 @@ func (s *reporterSuite) flushReporter() { func (s *reporterSuite) TestRootSpanAnnotations() { sp := s.tracer.StartSpan("get_name") - ext.SpanKind.Set(sp, ext.SpanKindRPCServer) + ext.SpanKindRPCServer.Set(sp) ext.PeerService.Set(sp, s.serviceName) sp.Finish() s.flushReporter() @@ -108,17 +108,17 @@ func (s *reporterSuite) TestRootSpanAnnotations() { func (s *reporterSuite) TestClientSpanAnnotations() { sp := s.tracer.StartSpan("get_name") - ext.SpanKind.Set(sp, ext.SpanKindRPCServer) + ext.SpanKindRPCServer.Set(sp) ext.PeerService.Set(sp, s.serviceName) - sp2 := opentracing.StartChildSpan(sp, "get_last_name") - ext.SpanKind.Set(sp2, ext.SpanKindRPCClient) + sp2 := s.tracer.StartSpan("get_last_name", opentracing.ChildOf(sp.Context())) + ext.SpanKindRPCClient.Set(sp2) ext.PeerService.Set(sp2, s.serviceName) sp2.Finish() sp.Finish() s.flushReporter() s.Equal(2, len(s.collector.Spans())) zSpan := s.collector.Spans()[0] // child span is reported first - s.EqualValues(zSpan.ID, sp2.(*span).spanID) + s.EqualValues(zSpan.ID, sp2.(*span).context.spanID) s.Equal(2, len(zSpan.Annotations), "expecting two annotations, cs and cr") s.Equal(1, len(zSpan.BinaryAnnotations), "expecting one binary annotation sa") s.NotNil(findAnnotation(zSpan, "cs"), "expecting cs annotation") @@ -227,7 +227,7 @@ func testRemoteReporter( TracerOptions.Metrics(metrics)) span := tracer.StartSpan("leela") - ext.SpanKind.Set(span, ext.SpanKindRPCClient) + ext.SpanKindRPCClient.Set(span) ext.PeerService.Set(span, "downstream") span.Finish() closer.Close() // close the tracer, which also closes and flushes the reporter diff --git a/span.go b/span.go index 1c1476ea..5784bf16 100644 --- a/span.go +++ b/span.go @@ -35,7 +35,7 @@ type span struct { sync.RWMutex tracer *tracer - TraceContext + context *SpanContext // The name of the "operation" this span is an instance of. // Known as a "span name" in some implementations. @@ -70,9 +70,6 @@ type span struct { // The span's "micro-log" logs []opentracing.LogData - - // Distributed Context baggage - baggage map[string]string } type tag struct { @@ -82,7 +79,7 @@ type tag struct { // Sets or changes the operation name. func (s *span) SetOperationName(operationName string) opentracing.Span { - if s.IsSampled() { + if s.context.IsSampled() { s.Lock() defer s.Unlock() s.operationName = operationName @@ -95,34 +92,38 @@ func (s *span) SetTag(key string, value interface{}) opentracing.Span { if key == string(ext.SamplingPriority) && setSamplingPriority(s, key, value) { return s } - if s.IsSampled() { - handled := false - if handler, ok := specialTagHandlers[key]; ok { - handled = handler(s, key, value) - } - if !handled { - s.Lock() - defer s.Unlock() - s.tags = append(s.tags, tag{key: key, value: value}) - } + if s.context.IsSampled() { + s.Lock() + defer s.Unlock() + s.setTagNoLocking(key, value) } return s } +func (s *span) setTagNoLocking(key string, value interface{}) { + handled := false + if handler, ok := specialTagHandlers[key]; ok { + handled = handler(s, key, value) + } + if !handled { + s.tags = append(s.tags, tag{key: key, value: value}) + } +} + func (s *span) LogEvent(event string) { - if s.IsSampled() { + if s.context.IsSampled() { s.Log(opentracing.LogData{Event: event}) } } func (s *span) LogEventWithPayload(event string, payload interface{}) { - if s.IsSampled() { + if s.context.IsSampled() { s.Log(opentracing.LogData{Event: event, Payload: payload}) } } func (s *span) Log(ld opentracing.LogData) { - if s.IsSampled() { + if s.context.IsSampled() { s.Lock() defer s.Unlock() @@ -138,7 +139,7 @@ func (s *span) Finish() { } func (s *span) FinishWithOptions(options opentracing.FinishOptions) { - if s.IsSampled() { + if s.context.IsSampled() { finishTime := options.FinishTime if finishTime.IsZero() { finishTime = s.tracer.timeNow() @@ -154,32 +155,8 @@ func (s *span) FinishWithOptions(options opentracing.FinishOptions) { s.tracer.reportSpan(s) } -func (s *span) SetBaggageItem(key, value string) opentracing.Span { - key = normalizeBaggageKey(key) - s.Lock() - defer s.Unlock() - if s.baggage == nil { - s.baggage = make(map[string]string) - } - s.baggage[key] = value - return s -} - -func (s *span) BaggageItem(key string) string { - key = normalizeBaggageKey(key) - s.RLock() - defer s.RUnlock() - return s.baggage[key] -} - -func (s *span) ForeachBaggageItem(handler func(k, v string) bool) { - s.RLock() - defer s.RUnlock() - for k, v := range s.baggage { - if !handler(k, v) { - break - } - } +func (s *span) Context() opentracing.SpanContext { + return s.context } func (s *span) Tracer() opentracing.Tracer { @@ -189,7 +166,7 @@ func (s *span) Tracer() opentracing.Tracer { func (s *span) String() string { s.RLock() defer s.RUnlock() - return s.TraceContext.String() + return s.context.String() } func (s *span) peerDefined() bool { @@ -199,13 +176,13 @@ func (s *span) peerDefined() bool { func (s *span) isRPC() bool { s.RLock() defer s.RUnlock() - return s.spanKind == string(ext.SpanKindRPCClient) || s.spanKind == string(ext.SpanKindRPCServer) + return s.spanKind == string(ext.SpanKindRPCClientEnum) || s.spanKind == string(ext.SpanKindRPCServerEnum) } func (s *span) isRPCClient() bool { s.RLock() defer s.RUnlock() - return s.spanKind == string(ext.SpanKindRPCClient) + return s.spanKind == string(ext.SpanKindRPCClientEnum) } var specialTagHandlers = map[string]func(*span, string, interface{}) bool{ @@ -216,8 +193,6 @@ var specialTagHandlers = map[string]func(*span, string, interface{}) bool{ } func setSpanKind(s *span, key string, value interface{}) bool { - s.Lock() - defer s.Unlock() if val, ok := value.(string); ok { s.spanKind = val return true @@ -230,8 +205,6 @@ func setSpanKind(s *span, key string, value interface{}) bool { } func setPeerIPv4(s *span, key string, value interface{}) bool { - s.Lock() - defer s.Unlock() if val, ok := value.(string); ok { if ip, err := utils.IPToUint32(val); err == nil { s.peer.Ipv4 = int32(ip) @@ -250,8 +223,6 @@ func setPeerIPv4(s *span, key string, value interface{}) bool { } func setPeerPort(s *span, key string, value interface{}) bool { - s.Lock() - defer s.Unlock() if val, ok := value.(string); ok { if port, err := utils.ParsePort(val); err == nil { s.peer.Port = int16(port) @@ -270,8 +241,6 @@ func setPeerPort(s *span, key string, value interface{}) bool { } func setPeerService(s *span, key string, value interface{}) bool { - s.Lock() - defer s.Unlock() if val, ok := value.(string); ok { s.peer.ServiceName = val return true @@ -284,9 +253,9 @@ func setSamplingPriority(s *span, key string, value interface{}) bool { defer s.Unlock() if val, ok := value.(uint16); ok { if val > 0 { - s.flags = s.flags | flagDebug | flagSampled + s.context.flags = s.context.flags | flagDebug | flagSampled } else { - s.flags = s.flags & (^flagSampled) + s.context.flags = s.context.flags & (^flagSampled) } return true } diff --git a/span_test.go b/span_test.go index efa2a026..afc6aa34 100644 --- a/span_test.go +++ b/span_test.go @@ -10,18 +10,18 @@ func TestBaggageIterator(t *testing.T) { defer closer.Close() sp1 := tracer.StartSpan("s1").(*span) - sp1.SetBaggageItem("Some_Key", "12345") - sp1.SetBaggageItem("Some-other-key", "42") + sp1.Context().SetBaggageItem("Some_Key", "12345") + sp1.Context().SetBaggageItem("Some-other-key", "42") b := make(map[string]string) - sp1.ForeachBaggageItem(func(k, v string) bool { + sp1.Context().ForeachBaggageItem(func(k, v string) bool { b[k] = v return true }) assert.Equal(t, map[string]string{"some-key": "12345", "some-other-key": "42"}, b) b = make(map[string]string) - sp1.ForeachBaggageItem(func(k, v string) bool { + sp1.Context().ForeachBaggageItem(func(k, v string) bool { b[k] = v return false // break out early }) diff --git a/thrift_span.go b/thrift_span.go index edcfb3fa..b4a26998 100644 --- a/thrift_span.go +++ b/thrift_span.go @@ -40,7 +40,7 @@ const ( // buildThriftSpan builds thrift span based on internal span. func buildThriftSpan(span *span) *z.Span { - parentID := int64(span.parentID) + parentID := int64(span.context.parentID) var ptrParentID *int64 if parentID != 0 { ptrParentID = &parentID @@ -51,8 +51,8 @@ func buildThriftSpan(span *span) *z.Span { ServiceName: span.tracer.serviceName, Ipv4: int32(span.tracer.hostIPv4)} thriftSpan := &z.Span{ - TraceID: int64(span.traceID), - ID: int64(span.spanID), + TraceID: int64(span.context.traceID), + ID: int64(span.context.spanID), ParentID: ptrParentID, Name: span.operationName, Timestamp: ×tamp, @@ -66,9 +66,9 @@ func buildAnnotations(span *span, endpoint *z.Endpoint) []*z.Annotation { // automatically adding 2 Zipkin CoreAnnotations annotations := make([]*z.Annotation, 0, 2+len(span.logs)) var startLabel, endLabel string - if span.spanKind == string(ext.SpanKindRPCClient) { + if span.spanKind == string(ext.SpanKindRPCClientEnum) { startLabel, endLabel = z.CLIENT_SEND, z.CLIENT_RECV - } else if span.spanKind == string(ext.SpanKindRPCServer) { + } else if span.spanKind == string(ext.SpanKindRPCServerEnum) { startLabel, endLabel = z.SERVER_RECV, z.SERVER_SEND } if !span.startTime.IsZero() && startLabel != "" { diff --git a/thrift_span_test.go b/thrift_span_test.go index 40f4a622..a185c5c8 100644 --- a/thrift_span_test.go +++ b/thrift_span_test.go @@ -14,7 +14,7 @@ func TestFirstInProcessSpan(t *testing.T) { defer closer.Close() sp1 := tracer.StartSpan("s1").(*span) - sp2 := opentracing.StartChildSpan(sp1, "sp2").(*span) + sp2 := tracer.StartSpan("sp2", opentracing.ChildOf(sp1.Context())).(*span) sp2.Finish() sp1.Finish() diff --git a/tracer.go b/tracer.go index e51d4136..01778f12 100644 --- a/tracer.go +++ b/tracer.go @@ -21,11 +21,14 @@ package jaeger import ( + "fmt" "io" + "reflect" "sync" "time" "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" "github.com/uber/jaeger-client-go/utils" ) @@ -37,6 +40,7 @@ type tracer struct { sampler Sampler reporter Reporter metrics Metrics + logger Logger timeNow func() time.Time randomNumber func() uint64 @@ -105,44 +109,82 @@ func NewTracer( } t.hostIPv4 = localIPInt32 } + if t.logger == nil { + t.logger = NullLogger + } return t, t } -func (t *tracer) StartSpan(operationName string) opentracing.Span { - return t.StartSpanWithOptions( - opentracing.StartSpanOptions{ - OperationName: operationName, - }) +func (t *tracer) StartSpan( + operationName string, + options ...opentracing.StartSpanOption, +) opentracing.Span { + sso := opentracing.StartSpanOptions{} + for _, o := range options { + o.Apply(&sso) + } + return t.startSpanWithOptions(operationName, sso) } -func (t *tracer) StartSpanWithOptions(options opentracing.StartSpanOptions) opentracing.Span { +func (t *tracer) startSpanWithOptions( + operationName string, + options opentracing.StartSpanOptions, +) opentracing.Span { startTime := options.StartTime if startTime.IsZero() { startTime = t.timeNow() } sp := t.newSpan() - if options.Parent == nil { - sp.traceID = t.randomID() - sp.spanID = sp.traceID - sp.parentID = 0 - sp.flags = byte(0) - if t.sampler.IsSampled(sp.traceID) { - sp.flags |= flagSampled + ctx := new(SpanContext) + sp.context = ctx + + var parent *SpanContext + for _, ref := range options.References { + if ref.Type == opentracing.ChildOfRef { + if p, ok := ref.Referee.(*SpanContext); ok { + parent = p + } else { + t.logger.Error(fmt.Sprintf( + "ChildOf reference contains invalid type of SpanReference: %s", + reflect.ValueOf(ref.Referee))) + } + } else { + // TODO support other types of span references + } + } + + rpcServer := false + if v, ok := options.Tags[ext.SpanKindRPCServer.Key]; ok { + rpcServer = (v == ext.SpanKindRPCServerEnum || v == string(ext.SpanKindRPCServerEnum)) + } + + if parent == nil { + ctx.traceID = t.randomID() + ctx.spanID = ctx.traceID + ctx.parentID = 0 + ctx.flags = byte(0) + if t.sampler.IsSampled(ctx.traceID) { + ctx.flags |= flagSampled } } else { - parent := options.Parent.(*span) parent.RLock() - sp.traceID = parent.traceID - sp.spanID = t.randomID() - sp.parentID = parent.spanID - sp.flags = parent.flags + ctx.traceID = parent.traceID + if rpcServer { + // Support Zipkin's one-span-per-RPC model + ctx.spanID = parent.spanID + ctx.parentID = parent.parentID + } else { + ctx.spanID = t.randomID() + ctx.parentID = parent.spanID + } + ctx.flags = parent.flags // copy baggage items if l := len(parent.baggage); l > 0 { - sp.baggage = make(map[string]string, len(parent.baggage)) + ctx.baggage = make(map[string]string, len(parent.baggage)) for k, v := range parent.baggage { - sp.baggage[k] = v + ctx.baggage[k] = v } } parent.RUnlock() @@ -150,29 +192,32 @@ func (t *tracer) StartSpanWithOptions(options opentracing.StartSpanOptions) open return t.startSpanInternal( sp, - options.OperationName, + operationName, startTime, options.Tags, - false, /* not a join with external trace */ + rpcServer, /* joining with external trace if rpcServer */ ) } // Inject implements Inject() method of opentracing.Tracer -func (t *tracer) Inject(sp opentracing.Span, format interface{}, carrier interface{}) error { +func (t *tracer) Inject(ctx opentracing.SpanContext, format interface{}, carrier interface{}) error { + c, ok := ctx.(*SpanContext) + if !ok { + return opentracing.ErrInvalidSpanContext + } if injector, ok := t.injectors[format]; ok { - return injector.InjectSpan(sp, carrier) + return injector.Inject(c, carrier) } return opentracing.ErrUnsupportedFormat } -// Join implements Join() method of opentracing.Tracer -func (t *tracer) Join( - operationName string, +// Extract implements Extract() method of opentracing.Tracer +func (t *tracer) Extract( format interface{}, carrier interface{}, -) (opentracing.Span, error) { +) (opentracing.SpanContext, error) { if extractor, ok := t.extractors[format]; ok { - return extractor.Join(operationName, carrier) + return extractor.Extract(carrier) } return nil, opentracing.ErrUnsupportedFormat } @@ -191,10 +236,10 @@ func (t *tracer) newSpan() *span { return &span{} } sp := t.spanPool.Get().(*span) + sp.context = nil sp.tracer = nil sp.tags = nil sp.logs = nil - sp.baggage = nil return sp } @@ -209,36 +254,40 @@ func (t *tracer) startSpanInternal( sp.operationName = operationName sp.startTime = startTime sp.duration = 0 - sp.firstInProcess = join || sp.parentID == 0 + sp.firstInProcess = join || sp.context.parentID == 0 if tags != nil && len(tags) > 0 { sp.tags = make([]tag, 0, len(tags)) for k, v := range tags { - sp.tags = append(sp.tags, tag{key: k, value: v}) + if k == string(ext.SamplingPriority) && setSamplingPriority(sp, k, v) { + continue + } + sp.setTagNoLocking(k, v) } } // emit metrics t.metrics.SpansStarted.Inc(1) - if sp.IsSampled() { + if sp.context.IsSampled() { t.metrics.SpansSampled.Inc(1) - if sp.parentID == 0 { - t.metrics.TracesStartedSampled.Inc(1) - } else if join { + if join { t.metrics.TracesJoinedSampled.Inc(1) + } else if sp.context.parentID == 0 { + t.metrics.TracesStartedSampled.Inc(1) } } else { t.metrics.SpansNotSampled.Inc(1) - if sp.parentID == 0 { - t.metrics.TracesStartedNotSampled.Inc(1) - } else if join { + if join { t.metrics.TracesJoinedNotSampled.Inc(1) + } else if sp.context.parentID == 0 { + t.metrics.TracesStartedNotSampled.Inc(1) } + } return sp } func (t *tracer) reportSpan(sp *span) { t.metrics.SpansFinished.Inc(1) - if sp.IsSampled() { + if sp.context.IsSampled() { t.reporter.Report(sp) } if t.poolSpans { diff --git a/tracer_options.go b/tracer_options.go index 60052e65..712c3d4b 100644 --- a/tracer_options.go +++ b/tracer_options.go @@ -38,6 +38,13 @@ func (tracerOptions) Metrics(m *Metrics) TracerOption { } } +// Logger creates a TracerOption that gives the tracer a Logger. +func (tracerOptions) Logger(logger Logger) TracerOption { + return func(tracer *tracer) { + tracer.logger = logger + } +} + // TimeNow creates a TracerOption that gives the tracer a function // used to generate timestamps for spans. func (tracerOptions) TimeNow(timeNow func() time.Time) TracerOption { diff --git a/tracer_test.go b/tracer_test.go index 2f1d19b0..4375cda5 100644 --- a/tracer_test.go +++ b/tracer_test.go @@ -72,7 +72,7 @@ func (s *tracerSuite) TestBeginRootSpan() { s.tracer.(*tracer).randomNumber = func() uint64 { return someID } sp := s.tracer.StartSpan("get_name") - ext.SpanKind.Set(sp, ext.SpanKindRPCServer) + ext.SpanKindRPCServer.Set(sp) ext.PeerService.Set(sp, "peer-service") s.NotNil(sp) ss := sp.(*span) @@ -81,8 +81,8 @@ func (s *tracerSuite) TestBeginRootSpan() { s.Equal("server", ss.spanKind, "Span must be server-side") s.Equal("peer-service", ss.peer.ServiceName, "Client is 'peer-service'") - s.EqualValues(someID, ss.traceID) - s.EqualValues(0, ss.parentID) + s.EqualValues(someID, ss.context.traceID) + s.EqualValues(0, ss.context.parentID) s.Equal(startTime, ss.startTime) @@ -97,29 +97,42 @@ func (s *tracerSuite) TestBeginRootSpan() { }, s.stats.GetCounterValues()) } -func (s *tracerSuite) TestBeginRootSpanWithOptions() { +func (s *tracerSuite) TestStartRootSpanWithOptions() { ts := time.Now() - options := opentracing.StartSpanOptions{ - OperationName: "get_address", - StartTime: ts, - } - sp := s.tracer.StartSpanWithOptions(options) + sp := s.tracer.StartSpan("get_address", opentracing.StartTime(ts)) ss := sp.(*span) s.Equal("get_address", ss.operationName) s.Equal("", ss.spanKind, "Span must not be RPC") s.Equal(ts, ss.startTime) } -func (s *tracerSuite) TestBeginChildSpan() { +func (s *tracerSuite) TestStartChildSpan() { + sp1 := s.tracer.StartSpan("get_address") + sp2 := s.tracer.StartSpan("get_street", opentracing.ChildOf(sp1.Context())) + s.Equal(sp1.(*span).context.spanID, sp2.(*span).context.parentID) + sp2.Finish() + s.NotNil(sp2.(*span).duration) + sp1.Finish() + s.EqualValues(map[string]int64{ + "jaeger.spans|group=sampling|sampled=y": 2, + "jaeger.traces|sampled=y|state=started": 1, + "jaeger.spans|group=lifecycle|state=started": 2, + "jaeger.spans|group=lifecycle|state=finished": 2, + }, s.stats.GetCounterValues()) +} + +func (s *tracerSuite) TestStartRPCServerSpan() { sp1 := s.tracer.StartSpan("get_address") - sp2 := opentracing.StartChildSpan(sp1, "get_street") - s.Equal(sp1.(*span).spanID, sp2.(*span).parentID) + sp2 := s.tracer.StartSpan("get_street", ext.RPCServerOption(sp1.Context())) + s.Equal(sp1.(*span).context.spanID, sp2.(*span).context.spanID) + s.Equal(sp1.(*span).context.parentID, sp2.(*span).context.parentID) sp2.Finish() s.NotNil(sp2.(*span).duration) sp1.Finish() s.EqualValues(map[string]int64{ "jaeger.spans|group=sampling|sampled=y": 2, "jaeger.traces|sampled=y|state=started": 1, + "jaeger.traces|sampled=y|state=joined": 1, "jaeger.spans|group=lifecycle|state=started": 2, "jaeger.spans|group=lifecycle|state=finished": 2, }, s.stats.GetCounterValues()) @@ -134,12 +147,12 @@ func (s *tracerSuite) TestSetOperationName() { func (s *tracerSuite) TestSamplerEffects() { s.tracer.(*tracer).sampler = NewConstSampler(true) sp := s.tracer.StartSpan("test") - flags := sp.(*span).flags + flags := sp.(*span).context.flags s.EqualValues(flagSampled, flags&flagSampled) s.tracer.(*tracer).sampler = NewConstSampler(false) sp = s.tracer.StartSpan("test") - flags = sp.(*span).flags + flags = sp.(*span).context.flags s.EqualValues(0, flags&flagSampled) } @@ -151,7 +164,7 @@ func (s *tracerSuite) TestRandomIDNotZero() { return } sp := s.tracer.StartSpan("get_name").(*span) - s.EqualValues(int64(1), sp.traceID) + s.EqualValues(int64(1), sp.context.traceID) rng := utils.NewRand(0) rng.Seed(1) // for test coverage @@ -166,19 +179,19 @@ func TestInjectorExtractorOptions(t *testing.T) { sp := tracer.StartSpan("x") c := &dummyCarrier{} - err := tracer.Inject(sp, "dummy", []int{}) + err := tracer.Inject(sp.Context(), "dummy", []int{}) assert.Equal(t, opentracing.ErrInvalidCarrier, err) - err = tracer.Inject(sp, "dummy", c) + err = tracer.Inject(sp.Context(), "dummy", c) assert.NoError(t, err) assert.True(t, c.ok) c.ok = false - _, err = tracer.Join("z", "dummy", []int{}) + _, err = tracer.Extract("dummy", []int{}) assert.Equal(t, opentracing.ErrInvalidCarrier, err) - _, err = tracer.Join("z", "dummy", c) - assert.Equal(t, opentracing.ErrTraceNotFound, err) + _, err = tracer.Extract("dummy", c) + assert.Equal(t, opentracing.ErrSpanContextNotFound, err) c.ok = true - _, err = tracer.Join("z", "dummy", c) + _, err = tracer.Extract("dummy", c) assert.NoError(t, err) } @@ -187,7 +200,7 @@ type dummyCarrier struct { ok bool } -func (p *dummyPropagator) InjectSpan(span opentracing.Span, carrier interface{}) error { +func (p *dummyPropagator) Inject(ctx *SpanContext, carrier interface{}) error { c, ok := carrier.(*dummyCarrier) if !ok { return opentracing.ErrInvalidCarrier @@ -196,7 +209,7 @@ func (p *dummyPropagator) InjectSpan(span opentracing.Span, carrier interface{}) return nil } -func (p *dummyPropagator) Join(operationName string, carrier interface{}) (opentracing.Span, error) { +func (p *dummyPropagator) Extract(carrier interface{}) (*SpanContext, error) { c, ok := carrier.(*dummyCarrier) if !ok { return nil, opentracing.ErrInvalidCarrier @@ -204,5 +217,5 @@ func (p *dummyPropagator) Join(operationName string, carrier interface{}) (opent if c.ok { return nil, nil } - return nil, opentracing.ErrTraceNotFound + return nil, opentracing.ErrSpanContextNotFound } diff --git a/zipkin.go b/zipkin.go index 5845d987..4188bf37 100644 --- a/zipkin.go +++ b/zipkin.go @@ -24,52 +24,37 @@ type zipkinPropagator struct { tracer *tracer } -func (p *zipkinPropagator) InjectSpan( - sp opentracing.Span, +func (p *zipkinPropagator) Inject( + ctx *SpanContext, abstractCarrier interface{}, ) error { - sc, ok := sp.(*span) - if !ok { - return opentracing.ErrInvalidSpan - } carrier, ok := abstractCarrier.(ZipkinSpan) if !ok { return opentracing.ErrInvalidCarrier } - sc.RLock() - defer sc.RUnlock() - - carrier.SetTraceID(sc.TraceContext.TraceID()) - carrier.SetSpanID(sc.TraceContext.SpanID()) - carrier.SetParentID(sc.TraceContext.ParentID()) - carrier.SetFlags(sc.TraceContext.flags) + ctx.RLock() + defer ctx.RUnlock() + carrier.SetTraceID(ctx.TraceID()) + carrier.SetSpanID(ctx.SpanID()) + carrier.SetParentID(ctx.ParentID()) + carrier.SetFlags(ctx.flags) return nil } -func (p *zipkinPropagator) Join( - operationName string, - abstractCarrier interface{}, -) (opentracing.Span, error) { +func (p *zipkinPropagator) Extract(abstractCarrier interface{}) (*SpanContext, error) { carrier, ok := abstractCarrier.(ZipkinSpan) if !ok { return nil, opentracing.ErrInvalidCarrier } if carrier.TraceID() == 0 { - return nil, opentracing.ErrTraceNotFound + return nil, opentracing.ErrSpanContextNotFound } - sp := p.tracer.newSpan() - sp.TraceContext.traceID = carrier.TraceID() - sp.TraceContext.spanID = carrier.SpanID() - sp.TraceContext.parentID = carrier.ParentID() - sp.TraceContext.flags = carrier.Flags() - - return p.tracer.startSpanInternal( - sp, - operationName, - p.tracer.timeNow(), - nil, - true, // join with external trace - ), nil + ctx := new(SpanContext) + ctx.traceID = carrier.TraceID() + ctx.spanID = carrier.SpanID() + ctx.parentID = carrier.ParentID() + ctx.flags = carrier.Flags() + return ctx, nil } diff --git a/zipkin_test.go b/zipkin_test.go index 188a3369..473c06eb 100644 --- a/zipkin_test.go +++ b/zipkin_test.go @@ -3,6 +3,7 @@ package jaeger import ( "testing" + "github.com/opentracing/opentracing-go/ext" "github.com/stretchr/testify/assert" ) @@ -14,24 +15,25 @@ func TestZipkinPropagator(t *testing.T) { sp := tracer.StartSpan("y") // Note: we intentionally use string as format, as that's what TChannel would need to do - if err := tracer.Inject(sp, "zipkin-span-format", carrier); err != nil { + if err := tracer.Inject(sp.Context(), "zipkin-span-format", carrier); err != nil { t.Fatalf("Inject failed: %+v", err) } sp1 := sp.(*span) - assert.Equal(t, sp1.traceID, carrier.traceID) - assert.Equal(t, sp1.spanID, carrier.spanID) - assert.Equal(t, sp1.parentID, carrier.parentID) - assert.Equal(t, sp1.flags, carrier.flags) + assert.Equal(t, sp1.context.traceID, carrier.traceID) + assert.Equal(t, sp1.context.spanID, carrier.spanID) + assert.Equal(t, sp1.context.parentID, carrier.parentID) + assert.Equal(t, sp1.context.flags, carrier.flags) - sp2, err := tracer.Join("z", "zipkin-span-format", carrier) + sp2ctx, err := tracer.Extract("zipkin-span-format", carrier) if err != nil { t.Fatalf("Extract failed: %+v", err) } + sp2 := tracer.StartSpan("x", ext.RPCServerOption(sp2ctx)) sp3 := sp2.(*span) - assert.Equal(t, sp1.traceID, sp3.traceID) - assert.Equal(t, sp1.spanID, sp3.spanID) - assert.Equal(t, sp1.parentID, sp3.parentID) - assert.Equal(t, sp1.flags, sp3.flags) + assert.Equal(t, sp1.context.traceID, sp3.context.traceID) + assert.Equal(t, sp1.context.spanID, sp3.context.spanID) + assert.Equal(t, sp1.context.parentID, sp3.context.parentID) + assert.Equal(t, sp1.context.flags, sp3.context.flags) } // TestZipkinSpan is a mock-up of TChannel's internal Span struct