Skip to content
This repository has been archived by the owner on May 23, 2024. It is now read-only.

All spans of a trace share sampling state #443

Merged
merged 8 commits into from
Oct 11, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

129 changes: 108 additions & 21 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@ package jaeger
import (
"errors"
"fmt"
"go.uber.org/atomic"
"strconv"
"strings"
)

const (
flagSampled = byte(1)
flagDebug = byte(2)
flagFirehose = byte(8)
flagUnsampled = 0

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flagUnsampled is unused (from varcheck)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flagUnsampled is unused (from deadcode)

flagSampled = 1
flagDebug = 2
flagFirehose = 8
)

var (
Expand Down Expand Up @@ -56,9 +58,6 @@ type SpanContext struct {
// Should be 0 if the current span is a root span.
parentID SpanID

// flags is a bitmap containing such bits as 'sampled' and 'debug'.
flags byte

// Distributed Context baggage. The is a snapshot in time.
baggage map[string]string

Expand All @@ -67,6 +66,92 @@ type SpanContext struct {
//
// See JaegerDebugHeader in constants.go
debugID string

// samplingState is shared across all spans
samplingState *samplingState
}

type samplingState struct {
flags atomic.Int32 // Only lower 8 bits are used. We use an int32 instead of a byte to use CAS operations
}

func (s *samplingState) setSampled() {
// TODO: add tests for all these transitions
swapped := s.flags.CAS(flagUnsampled, flagSampled)
if swapped {
return
}
swapped = s.flags.CAS(flagDebug, flagDebug|flagSampled)
if swapped {
return
}
swapped = s.flags.CAS(flagFirehose, flagFirehose|flagSampled)
if swapped {
return
}
s.flags.CAS(flagFirehose|flagDebug, flagDebug|flagFirehose|flagSampled)
}

func (s *samplingState) setDebug() {
swapped := s.flags.CAS(flagUnsampled, flagSampled|flagDebug)
if swapped {
return
}

swapped = s.flags.CAS(flagSampled, flagSampled|flagDebug)
if swapped {
return
}

swapped = s.flags.CAS(flagFirehose, flagFirehose|flagDebug)
if swapped {
return
}

s.flags.CAS(flagFirehose|flagSampled, flagFirehose|flagSampled|flagDebug)
}

func (s *samplingState) setFirehose() {
swapped := s.flags.CAS(flagUnsampled, flagUnsampled|flagFirehose)
if swapped {
return
}

swapped = s.flags.CAS(flagSampled, flagSampled|flagFirehose)
if swapped {
return
}

swapped = s.flags.CAS(flagDebug, flagDebug|flagFirehose)
if swapped {
return
}

s.flags.CAS(flagDebug|flagSampled, flagDebug|flagUnsampled|flagFirehose)
}

func (s *samplingState) resetFlags() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

func (*samplingState).resetFlags is unused (from unused)

s.flags.Store(flagUnsampled)
}

func (s *samplingState) setFromFlags(flags byte) {
s.flags.Store(int32(flags))
}

func (s *samplingState) getFlags() byte {
return byte(s.flags.Load())
}

func (s *samplingState) isSampled() bool {
return s.flags.Load()&flagSampled == flagSampled
}

func (s *samplingState) isDebug() bool {
return s.flags.Load()&flagDebug == flagDebug
}

func (s *samplingState) isFirehose() bool {
return s.flags.Load()&flagFirehose == flagFirehose
}

// ForeachBaggageItem implements ForeachBaggageItem() of opentracing.SpanContext
Expand All @@ -81,17 +166,17 @@ func (c SpanContext) ForeachBaggageItem(handler func(k, v string) bool) {
// IsSampled returns whether this trace was chosen for permanent storage
// by the sampling mechanism of the tracer.
func (c SpanContext) IsSampled() bool {
return (c.flags & flagSampled) == flagSampled
return c.samplingState.isSampled()
}

// IsDebug indicates whether sampling was explicitly requested by the service.
func (c SpanContext) IsDebug() bool {
return (c.flags & flagDebug) == flagDebug
return c.samplingState.isDebug()
}

// IsFirehose indicates whether the firehose flag was set
func (c SpanContext) IsFirehose() bool {
return (c.flags & flagFirehose) == flagFirehose
return c.samplingState.isFirehose()
}

// IsValid indicates whether this context actually represents a valid trace.
Expand All @@ -101,9 +186,9 @@ func (c SpanContext) IsValid() bool {

func (c SpanContext) String() string {
if c.traceID.High == 0 {
return fmt.Sprintf("%x:%x:%x:%x", c.traceID.Low, uint64(c.spanID), uint64(c.parentID), c.flags)
return fmt.Sprintf("%x:%x:%x:%x", c.traceID.Low, uint64(c.spanID), uint64(c.parentID), c.samplingState.flags.Load())
}
return fmt.Sprintf("%x%016x:%x:%x:%x", c.traceID.High, c.traceID.Low, uint64(c.spanID), uint64(c.parentID), c.flags)
return fmt.Sprintf("%x%016x:%x:%x:%x", c.traceID.High, c.traceID.Low, uint64(c.spanID), uint64(c.parentID), c.samplingState.flags.Load())
}

// ContextFromString reconstructs the Context encoded in a string
Expand All @@ -130,7 +215,8 @@ func ContextFromString(value string) (SpanContext, error) {
if err != nil {
return emptyContext, err
}
context.flags = byte(flags)
context.samplingState = &samplingState{}
context.samplingState.setFromFlags(byte(flags))
return context, nil
}

Expand All @@ -151,16 +237,17 @@ func (c SpanContext) ParentID() SpanID {

// NewSpanContext creates a new instance of SpanContext
func NewSpanContext(traceID TraceID, spanID, parentID SpanID, sampled bool, baggage map[string]string) SpanContext {
flags := byte(0)
samplingState := &samplingState{}
if sampled {
flags = flagSampled
samplingState.setSampled()
}

return SpanContext{
traceID: traceID,
spanID: spanID,
parentID: parentID,
flags: flags,
baggage: baggage}
traceID: traceID,
spanID: spanID,
parentID: parentID,
samplingState: samplingState,
baggage: baggage}
}

// CopyFrom copies data from ctx into this context, including span identity and baggage.
Expand All @@ -169,7 +256,7 @@ func (c *SpanContext) CopyFrom(ctx *SpanContext) {
c.traceID = ctx.traceID
c.spanID = ctx.spanID
c.parentID = ctx.parentID
c.flags = ctx.flags
c.samplingState = ctx.samplingState
if l := len(ctx.baggage); l > 0 {
c.baggage = make(map[string]string, l)
for k, v := range ctx.baggage {
Expand All @@ -193,7 +280,7 @@ func (c SpanContext) WithBaggageItem(key, value string) SpanContext {
newBaggage[key] = value
}
// Use positional parameters so the compiler will help catch new fields.
return SpanContext{c.traceID, c.spanID, c.parentID, c.flags, newBaggage, ""}
return SpanContext{c.traceID, c.spanID, c.parentID, newBaggage, "", c.samplingState}
}

// isDebugIDContainerOnly returns true when the instance of the context is only
Expand Down
4 changes: 2 additions & 2 deletions context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ func TestContextFromString(t *testing.T) {
assert.EqualValues(t, TraceID{Low: 1}, ctx.traceID)
assert.EqualValues(t, 1, ctx.spanID)
assert.EqualValues(t, 1, ctx.parentID)
assert.EqualValues(t, 1, ctx.flags)
assert.True(t, ctx.samplingState.isSampled())
ctx = NewSpanContext(TraceID{Low: 1}, 1, 1, true, nil)
assert.EqualValues(t, TraceID{Low: 1}, ctx.traceID)
assert.EqualValues(t, 1, ctx.spanID)
assert.EqualValues(t, 1, ctx.parentID)
assert.EqualValues(t, 1, ctx.flags)
assert.True(t, ctx.samplingState.isSampled())
assert.Equal(t, "ff", SpanID(255).String())
assert.Equal(t, "ff", TraceID{Low: 255}.String())
assert.Equal(t, "ff00000000000000ff", TraceID{High: 255, Low: 255}.String())
Expand Down
2 changes: 1 addition & 1 deletion jaeger_thrift_span.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func BuildJaegerThrift(span *Span) *j.Span {
SpanId: int64(span.context.spanID),
ParentSpanId: int64(span.context.parentID),
OperationName: span.operationName,
Flags: int32(span.context.flags),
Flags: int32(span.context.samplingState.getFlags()),
StartTime: startTime,
Duration: duration,
Tags: buildTags(span.tags, span.tracer.options.maxTagValueLength),
Expand Down
8 changes: 6 additions & 2 deletions propagation.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (p *BinaryPropagator) Inject(
if err := binary.Write(carrier, binary.BigEndian, sc.parentID); err != nil {
return err
}
if err := binary.Write(carrier, binary.BigEndian, sc.flags); err != nil {
if err := binary.Write(carrier, binary.BigEndian, sc.samplingState.getFlags()); err != nil {
return err
}

Expand Down Expand Up @@ -222,6 +222,7 @@ func (p *BinaryPropagator) Extract(abstractCarrier interface{}) (SpanContext, er
return emptyContext, opentracing.ErrInvalidCarrier
}
var ctx SpanContext
ctx.samplingState = &samplingState{}

if err := binary.Read(carrier, binary.BigEndian, &ctx.traceID); err != nil {
return emptyContext, opentracing.ErrSpanContextCorrupted
Expand All @@ -232,9 +233,12 @@ func (p *BinaryPropagator) Extract(abstractCarrier interface{}) (SpanContext, er
if err := binary.Read(carrier, binary.BigEndian, &ctx.parentID); err != nil {
return emptyContext, opentracing.ErrSpanContextCorrupted
}
if err := binary.Read(carrier, binary.BigEndian, &ctx.flags); err != nil {

var flags byte
if err := binary.Read(carrier, binary.BigEndian, &flags); err != nil {
return emptyContext, opentracing.ErrSpanContextCorrupted
}
ctx.samplingState.setFromFlags(flags)

// Handle the baggage items
var numBaggage int32
Expand Down
11 changes: 4 additions & 7 deletions span.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,17 +306,14 @@ func setSamplingPriority(s *Span, value interface{}) bool {
if !ok {
return false
}
s.Lock()
defer s.Unlock()
if val == 0 {
s.context.flags = s.context.flags & (^flagSampled)
s.context.samplingState.resetFlags()
return true
}
if s.tracer.options.noDebugFlagOnForcedSampling {
s.context.flags = s.context.flags | flagSampled
return true
s.context.samplingState.setSampled()
} else if s.tracer.isDebugAllowed(s.operationName) {
s.context.flags = s.context.flags | flagDebug | flagSampled
s.context.samplingState.setDebug()
return true
}
return false
Expand All @@ -326,5 +323,5 @@ func setSamplingPriority(s *Span, value interface{}) bool {
func EnableFirehose(s *Span) {
s.Lock()
defer s.Unlock()
s.context.flags |= flagFirehose
s.context.samplingState.setFirehose()
}
9 changes: 5 additions & 4 deletions tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,12 +272,12 @@ func (t *Tracer) startSpanWithOptions(
}
ctx.spanID = SpanID(ctx.traceID.Low)
ctx.parentID = 0
ctx.flags = byte(0)
ctx.samplingState = &samplingState{}
if hasParent && parent.isDebugIDContainerOnly() && t.isDebugAllowed(operationName) {
ctx.flags |= (flagSampled | flagDebug)
ctx.samplingState.setDebug()
samplerTags = []Tag{{key: JaegerDebugHeader, value: parent.debugID}}
} else if sampled, tags := t.sampler.IsSampled(ctx.traceID, operationName); sampled {
ctx.flags |= flagSampled
ctx.samplingState.setSampled()
samplerTags = tags
}
} else {
Expand All @@ -290,7 +290,7 @@ func (t *Tracer) startSpanWithOptions(
ctx.spanID = SpanID(t.randomID())
ctx.parentID = parent.spanID
}
ctx.flags = parent.flags
ctx.samplingState = parent.samplingState
}
if hasParent {
// copy baggage items
Expand All @@ -305,6 +305,7 @@ func (t *Tracer) startSpanWithOptions(

sp := t.newSpan()
sp.context = ctx
sp.context.samplingState = ctx.samplingState
sp.observer = t.observer.OnStartSpan(sp, operationName, options)
return t.startSpanInternal(
sp,
Expand Down
8 changes: 4 additions & 4 deletions tracer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,13 +214,13 @@ func (s *tracerSuite) TestSetOperationName() {
func (s *tracerSuite) TestSamplerEffects() {
s.tracer.(*Tracer).sampler = NewConstSampler(true)
sp := s.tracer.StartSpan("test")
flags := sp.(*Span).context.flags
s.EqualValues(flagSampled, flags&flagSampled)
samplingState := sp.(*Span).context.samplingState
s.True(samplingState.isSampled())

s.tracer.(*Tracer).sampler = NewConstSampler(false)
sp = s.tracer.StartSpan("test")
flags = sp.(*Span).context.flags
s.EqualValues(0, flags&flagSampled)
samplingState = sp.(*Span).context.samplingState
s.False(samplingState.isSampled())
}

func (s *tracerSuite) TestRandomIDNotZero() {
Expand Down
4 changes: 4 additions & 0 deletions transport_udp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func TestEmitBatchOverhead(t *testing.T) {
client := j.NewAgentClientFactory(transport, protocolFactory)

span := &Span{operationName: "test-span", tracer: jaegerTracer}
span.context.samplingState = &samplingState{}
spanSize := getThriftSpanByteLength(t, span)

tests := []int{1, 2, 14, 15, 377, 500, 65000, 0xFFFF}
Expand Down Expand Up @@ -88,6 +89,7 @@ func TestUDPSenderFlush(t *testing.T) {
defer agent.Close()

span := &Span{operationName: "test-span", tracer: jaegerTracer}
span.context.samplingState = &samplingState{}
spanSize := getThriftSpanByteLength(t, span)
processSize := getThriftProcessByteLengthFromTracer(t, jaegerTracer)

Expand Down Expand Up @@ -134,6 +136,7 @@ func TestUDPSenderAppend(t *testing.T) {
defer agent.Close()

span := &Span{operationName: "test-span", tracer: jaegerTracer}
span.context.samplingState = &samplingState{}
spanSize := getThriftSpanByteLength(t, span)
processSize := getThriftProcessByteLengthFromTracer(t, jaegerTracer)

Expand Down Expand Up @@ -209,6 +212,7 @@ func TestUDPSenderHugeSpan(t *testing.T) {
defer agent.Close()

span := &Span{operationName: "test-span", tracer: jaegerTracer}
span.context.samplingState = &samplingState{}
spanSize := getThriftSpanByteLength(t, span)

sender, err := NewUDPTransport(agent.SpanServerAddr(), spanSize/2+emitBatchOverhead)
Expand Down
Loading