diff --git a/Gopkg.lock b/Gopkg.lock index 1ed86f4a..7c0ae06b 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -233,6 +233,7 @@ "github.com/uber/jaeger-lib/metrics", "github.com/uber/jaeger-lib/metrics/metricstest", "github.com/uber/jaeger-lib/metrics/prometheus", + "go.uber.org/atomic", "go.uber.org/zap", "go.uber.org/zap/zapcore", ] diff --git a/context.go b/context.go index 43553655..a44abad3 100644 --- a/context.go +++ b/context.go @@ -17,14 +17,15 @@ package jaeger import ( "errors" "fmt" + "go.uber.org/atomic" "strconv" "strings" ) const ( - flagSampled = byte(1) - flagDebug = byte(2) - flagFirehose = byte(8) + flagSampled = 1 + flagDebug = 2 + flagFirehose = 8 ) var ( @@ -56,9 +57,6 @@ type SpanContext struct { // Should be 0 if the current span is a root span. parentID SpanID - // flags is a bitmap containing such bits as 'sampled' and 'debug'. - flags byte - // Distributed Context baggage. The is a snapshot in time. baggage map[string]string @@ -67,6 +65,65 @@ type SpanContext struct { // // See JaegerDebugHeader in constants.go debugID string + + // samplingState is shared across all spans + samplingState *samplingState +} + +type samplingState struct { + stateFlags atomic.Int32 // Only lower 8 bits are used. We use an int32 instead of a byte to use CAS operations +} + +func (s *samplingState) setFlag(newFlag int32) { + swapped := false + for !swapped { + old := s.stateFlags.Load() + swapped = s.stateFlags.CAS(old, old|newFlag) + } +} + +func (s *samplingState) unsetFlag(newFlag int32) { + swapped := false + for !swapped { + old := s.stateFlags.Load() + swapped = s.stateFlags.CAS(old, old&^newFlag) + } +} + +func (s *samplingState) setSampled() { + s.setFlag(flagSampled) +} + +func (s *samplingState) unsetSampled() { + s.unsetFlag(flagSampled) +} + +func (s *samplingState) setDebugAndSampled() { + s.setFlag(flagDebug | flagSampled) +} + +func (s *samplingState) setFirehose() { + s.setFlag(flagFirehose) +} + +func (s *samplingState) setFlags(flags byte) { + s.stateFlags.Store(int32(flags)) +} + +func (s *samplingState) flags() byte { + return byte(s.stateFlags.Load()) +} + +func (s *samplingState) isSampled() bool { + return s.stateFlags.Load()&flagSampled == flagSampled +} + +func (s *samplingState) isDebug() bool { + return s.stateFlags.Load()&flagDebug == flagDebug +} + +func (s *samplingState) isFirehose() bool { + return s.stateFlags.Load()&flagFirehose == flagFirehose } // ForeachBaggageItem implements ForeachBaggageItem() of opentracing.SpanContext @@ -81,17 +138,17 @@ func (c SpanContext) ForeachBaggageItem(handler func(k, v string) bool) { // IsSampled returns whether this trace was chosen for permanent storage // by the sampling mechanism of the tracer. func (c SpanContext) IsSampled() bool { - return (c.flags & flagSampled) == flagSampled + return c.samplingState.isSampled() } // IsDebug indicates whether sampling was explicitly requested by the service. func (c SpanContext) IsDebug() bool { - return (c.flags & flagDebug) == flagDebug + return c.samplingState.isDebug() } // IsFirehose indicates whether the firehose flag was set func (c SpanContext) IsFirehose() bool { - return (c.flags & flagFirehose) == flagFirehose + return c.samplingState.isFirehose() } // IsValid indicates whether this context actually represents a valid trace. @@ -101,9 +158,9 @@ func (c SpanContext) IsValid() bool { func (c SpanContext) String() string { if c.traceID.High == 0 { - return fmt.Sprintf("%x:%x:%x:%x", c.traceID.Low, uint64(c.spanID), uint64(c.parentID), c.flags) + return fmt.Sprintf("%x:%x:%x:%x", c.traceID.Low, uint64(c.spanID), uint64(c.parentID), c.samplingState.stateFlags.Load()) } - return fmt.Sprintf("%x%016x:%x:%x:%x", c.traceID.High, c.traceID.Low, uint64(c.spanID), uint64(c.parentID), c.flags) + return fmt.Sprintf("%x%016x:%x:%x:%x", c.traceID.High, c.traceID.Low, uint64(c.spanID), uint64(c.parentID), c.samplingState.stateFlags.Load()) } // ContextFromString reconstructs the Context encoded in a string @@ -130,7 +187,8 @@ func ContextFromString(value string) (SpanContext, error) { if err != nil { return emptyContext, err } - context.flags = byte(flags) + context.samplingState = &samplingState{} + context.samplingState.setFlags(byte(flags)) return context, nil } @@ -151,16 +209,17 @@ func (c SpanContext) ParentID() SpanID { // NewSpanContext creates a new instance of SpanContext func NewSpanContext(traceID TraceID, spanID, parentID SpanID, sampled bool, baggage map[string]string) SpanContext { - flags := byte(0) + samplingState := &samplingState{} if sampled { - flags = flagSampled + samplingState.setSampled() } + return SpanContext{ - traceID: traceID, - spanID: spanID, - parentID: parentID, - flags: flags, - baggage: baggage} + traceID: traceID, + spanID: spanID, + parentID: parentID, + samplingState: samplingState, + baggage: baggage} } // CopyFrom copies data from ctx into this context, including span identity and baggage. @@ -169,7 +228,7 @@ func (c *SpanContext) CopyFrom(ctx *SpanContext) { c.traceID = ctx.traceID c.spanID = ctx.spanID c.parentID = ctx.parentID - c.flags = ctx.flags + c.samplingState = ctx.samplingState if l := len(ctx.baggage); l > 0 { c.baggage = make(map[string]string, l) for k, v := range ctx.baggage { @@ -193,7 +252,7 @@ func (c SpanContext) WithBaggageItem(key, value string) SpanContext { newBaggage[key] = value } // Use positional parameters so the compiler will help catch new fields. - return SpanContext{c.traceID, c.spanID, c.parentID, c.flags, newBaggage, ""} + return SpanContext{c.traceID, c.spanID, c.parentID, newBaggage, "", c.samplingState} } // isDebugIDContainerOnly returns true when the instance of the context is only diff --git a/context_test.go b/context_test.go index 97b7e697..f04e1f38 100644 --- a/context_test.go +++ b/context_test.go @@ -57,12 +57,12 @@ func TestContextFromString(t *testing.T) { assert.EqualValues(t, TraceID{Low: 1}, ctx.traceID) assert.EqualValues(t, 1, ctx.spanID) assert.EqualValues(t, 1, ctx.parentID) - assert.EqualValues(t, 1, ctx.flags) + assert.True(t, ctx.IsSampled()) ctx = NewSpanContext(TraceID{Low: 1}, 1, 1, true, nil) assert.EqualValues(t, TraceID{Low: 1}, ctx.traceID) assert.EqualValues(t, 1, ctx.spanID) assert.EqualValues(t, 1, ctx.parentID) - assert.EqualValues(t, 1, ctx.flags) + assert.True(t, ctx.IsSampled()) assert.Equal(t, "ff", SpanID(255).String()) assert.Equal(t, "ff", TraceID{Low: 255}.String()) assert.Equal(t, "ff00000000000000ff", TraceID{High: 255, Low: 255}.String()) diff --git a/jaeger_thrift_span.go b/jaeger_thrift_span.go index 6ce1caf8..f0f1afe2 100644 --- a/jaeger_thrift_span.go +++ b/jaeger_thrift_span.go @@ -35,7 +35,7 @@ func BuildJaegerThrift(span *Span) *j.Span { SpanId: int64(span.context.spanID), ParentSpanId: int64(span.context.parentID), OperationName: span.operationName, - Flags: int32(span.context.flags), + Flags: int32(span.context.samplingState.flags()), StartTime: startTime, Duration: duration, Tags: buildTags(span.tags, span.tracer.options.maxTagValueLength), diff --git a/propagation.go b/propagation.go index 5b50cfb7..42fd64b5 100644 --- a/propagation.go +++ b/propagation.go @@ -193,7 +193,7 @@ func (p *BinaryPropagator) Inject( if err := binary.Write(carrier, binary.BigEndian, sc.parentID); err != nil { return err } - if err := binary.Write(carrier, binary.BigEndian, sc.flags); err != nil { + if err := binary.Write(carrier, binary.BigEndian, sc.samplingState.flags()); err != nil { return err } @@ -222,6 +222,7 @@ func (p *BinaryPropagator) Extract(abstractCarrier interface{}) (SpanContext, er return emptyContext, opentracing.ErrInvalidCarrier } var ctx SpanContext + ctx.samplingState = &samplingState{} if err := binary.Read(carrier, binary.BigEndian, &ctx.traceID); err != nil { return emptyContext, opentracing.ErrSpanContextCorrupted @@ -232,9 +233,12 @@ func (p *BinaryPropagator) Extract(abstractCarrier interface{}) (SpanContext, er if err := binary.Read(carrier, binary.BigEndian, &ctx.parentID); err != nil { return emptyContext, opentracing.ErrSpanContextCorrupted } - if err := binary.Read(carrier, binary.BigEndian, &ctx.flags); err != nil { + + var flags byte + if err := binary.Read(carrier, binary.BigEndian, &flags); err != nil { return emptyContext, opentracing.ErrSpanContextCorrupted } + ctx.samplingState.setFlags(flags) // Handle the baggage items var numBaggage int32 diff --git a/span.go b/span.go index 9df8b601..b5066413 100644 --- a/span.go +++ b/span.go @@ -301,22 +301,29 @@ func (s *Span) serviceName() string { } // setSamplingPriority returns true if the flag was updated successfully, false otherwise. +// The behavior of setSamplingPriority is surprising +// If noDebugFlagOnForcedSampling is set +// setSamplingPriority(span, 1) always sets only flagSampled +// If noDebugFlagOnForcedSampling is unset, and isDebugAllowed passes +// setSamplingPriority(span, 1) sets both flagSampled and flagDebug +// However, +// setSamplingPriority(span, 0) always only resets flagSampled +// +// This means that doing a setSamplingPriority(span, 1) followed by setSamplingPriority(span, 0) can +// leave flagDebug set func setSamplingPriority(s *Span, value interface{}) bool { val, ok := value.(uint16) if !ok { return false } - s.Lock() - defer s.Unlock() if val == 0 { - s.context.flags = s.context.flags & (^flagSampled) + s.context.samplingState.unsetSampled() return true } if s.tracer.options.noDebugFlagOnForcedSampling { - s.context.flags = s.context.flags | flagSampled - return true + s.context.samplingState.setSampled() } else if s.tracer.isDebugAllowed(s.operationName) { - s.context.flags = s.context.flags | flagDebug | flagSampled + s.context.samplingState.setDebugAndSampled() return true } return false @@ -326,5 +333,5 @@ func setSamplingPriority(s *Span, value interface{}) bool { func EnableFirehose(s *Span) { s.Lock() defer s.Unlock() - s.context.flags |= flagFirehose + s.context.samplingState.setFirehose() } diff --git a/span_test.go b/span_test.go index cb57dade..23c03355 100644 --- a/span_test.go +++ b/span_test.go @@ -179,6 +179,22 @@ func TestSetTag_SamplingPriority(t *testing.T) { } } +func TestUnsetSampledFlagOnly(t *testing.T) { + tracer, closer := NewTracer( + "Arwen", + NewConstSampler(true), + NewNullReporter(), + ) + defer closer.Close() + + span := tracer.StartSpan("breakfast").(*Span) + ext.SamplingPriority.Set(span, 1) + assert.Equal(t, byte(flagSampled|flagDebug), span.context.samplingState.flags()) + + ext.SamplingPriority.Set(span, 0) + assert.Equal(t, byte(flagDebug), span.context.samplingState.flags(), "Should reset only sampled flag") +} + func TestSetFirehoseMode(t *testing.T) { tracer, closer := NewTracer("DOOP", NewConstSampler(true), NewNullReporter()) defer closer.Close() diff --git a/tracer.go b/tracer.go index 745a0c38..91c1b817 100644 --- a/tracer.go +++ b/tracer.go @@ -272,12 +272,12 @@ func (t *Tracer) startSpanWithOptions( } ctx.spanID = SpanID(ctx.traceID.Low) ctx.parentID = 0 - ctx.flags = byte(0) + ctx.samplingState = &samplingState{} if hasParent && parent.isDebugIDContainerOnly() && t.isDebugAllowed(operationName) { - ctx.flags |= (flagSampled | flagDebug) + ctx.samplingState.setDebugAndSampled() samplerTags = []Tag{{key: JaegerDebugHeader, value: parent.debugID}} } else if sampled, tags := t.sampler.IsSampled(ctx.traceID, operationName); sampled { - ctx.flags |= flagSampled + ctx.samplingState.setSampled() samplerTags = tags } } else { @@ -290,7 +290,7 @@ func (t *Tracer) startSpanWithOptions( ctx.spanID = SpanID(t.randomID()) ctx.parentID = parent.spanID } - ctx.flags = parent.flags + ctx.samplingState = parent.samplingState } if hasParent { // copy baggage items diff --git a/tracer_test.go b/tracer_test.go index e31f4691..f6da8909 100644 --- a/tracer_test.go +++ b/tracer_test.go @@ -214,13 +214,11 @@ func (s *tracerSuite) TestSetOperationName() { func (s *tracerSuite) TestSamplerEffects() { s.tracer.(*Tracer).sampler = NewConstSampler(true) sp := s.tracer.StartSpan("test") - flags := sp.(*Span).context.flags - s.EqualValues(flagSampled, flags&flagSampled) + s.True(sp.(*Span).context.IsSampled()) s.tracer.(*Tracer).sampler = NewConstSampler(false) sp = s.tracer.StartSpan("test") - flags = sp.(*Span).context.flags - s.EqualValues(0, flags&flagSampled) + s.False(sp.(*Span).context.IsSampled()) } func (s *tracerSuite) TestRandomIDNotZero() { diff --git a/transport_udp_test.go b/transport_udp_test.go index 7af7b53e..2214038e 100644 --- a/transport_udp_test.go +++ b/transport_udp_test.go @@ -53,12 +53,18 @@ func getThriftProcessByteLength(t *testing.T, process *j.Process) int { return transport.Len() } +func newSpan() *Span { + span := &Span{operationName: "test-span", tracer: jaegerTracer} + span.context.samplingState = &samplingState{} + return span +} + func TestEmitBatchOverhead(t *testing.T) { transport := thrift.NewTMemoryBufferLen(1000) protocolFactory := thrift.NewTCompactProtocolFactory() client := j.NewAgentClientFactory(transport, protocolFactory) - span := &Span{operationName: "test-span", tracer: jaegerTracer} + span := newSpan() spanSize := getThriftSpanByteLength(t, span) tests := []int{1, 2, 14, 15, 377, 500, 65000, 0xFFFF} @@ -87,7 +93,7 @@ func TestUDPSenderFlush(t *testing.T) { require.NoError(t, err) defer agent.Close() - span := &Span{operationName: "test-span", tracer: jaegerTracer} + span := newSpan() spanSize := getThriftSpanByteLength(t, span) processSize := getThriftProcessByteLengthFromTracer(t, jaegerTracer) @@ -133,7 +139,7 @@ func TestUDPSenderAppend(t *testing.T) { require.NoError(t, err) defer agent.Close() - span := &Span{operationName: "test-span", tracer: jaegerTracer} + span := newSpan() spanSize := getThriftSpanByteLength(t, span) processSize := getThriftProcessByteLengthFromTracer(t, jaegerTracer) @@ -208,7 +214,7 @@ func TestUDPSenderHugeSpan(t *testing.T) { require.NoError(t, err) defer agent.Close() - span := &Span{operationName: "test-span", tracer: jaegerTracer} + span := newSpan() spanSize := getThriftSpanByteLength(t, span) sender, err := NewUDPTransport(agent.SpanServerAddr(), spanSize/2+emitBatchOverhead) diff --git a/zipkin.go b/zipkin.go index 636952b7..98cab4b6 100644 --- a/zipkin.go +++ b/zipkin.go @@ -55,7 +55,7 @@ func (p *zipkinPropagator) Inject( carrier.SetTraceID(ctx.TraceID().Low) // TODO this cannot work with 128bit IDs carrier.SetSpanID(uint64(ctx.SpanID())) carrier.SetParentID(uint64(ctx.ParentID())) - carrier.SetFlags(ctx.flags) + carrier.SetFlags(ctx.samplingState.flags()) return nil } @@ -71,6 +71,7 @@ func (p *zipkinPropagator) Extract(abstractCarrier interface{}) (SpanContext, er ctx.traceID.Low = carrier.TraceID() ctx.spanID = SpanID(carrier.SpanID()) ctx.parentID = SpanID(carrier.ParentID()) - ctx.flags = carrier.Flags() + ctx.samplingState = &samplingState{} + ctx.samplingState.setFlags(carrier.Flags()) return ctx, nil } diff --git a/zipkin_test.go b/zipkin_test.go index 2d1d464e..e7325085 100644 --- a/zipkin_test.go +++ b/zipkin_test.go @@ -36,7 +36,7 @@ func TestZipkinPropagator(t *testing.T) { assert.Equal(t, sp1.context.traceID, TraceID{Low: carrier.traceID}) assert.Equal(t, sp1.context.spanID, SpanID(carrier.spanID)) assert.Equal(t, sp1.context.parentID, SpanID(carrier.parentID)) - assert.Equal(t, sp1.context.flags, carrier.flags) + assert.Equal(t, sp1.context.samplingState.flags(), carrier.flags) sp2ctx, err := tracer.Extract("zipkin-span-format", carrier) if err != nil { @@ -47,7 +47,7 @@ func TestZipkinPropagator(t *testing.T) { assert.Equal(t, sp1.context.traceID, sp3.context.traceID) assert.Equal(t, sp1.context.spanID, sp3.context.spanID) assert.Equal(t, sp1.context.parentID, sp3.context.parentID) - assert.Equal(t, sp1.context.flags, sp3.context.flags) + assert.Equal(t, sp1.context.samplingState.flags(), sp3.context.samplingState.flags()) } // TestZipkinSpan is a mock-up of TChannel's internal Span struct