Skip to content

Commit

Permalink
Allow all in-process spans of a trace to share sampling state (jaeger…
Browse files Browse the repository at this point in the history
…tracing#443)

* [WIP] Share span state

Signed-off-by: Prithvi Raj <p.r@uber.com>

* Address Feedback

Signed-off-by: Prithvi Raj <p.r@uber.com>

* Address Feedback

Signed-off-by: Prithvi Raj <p.r@uber.com>

* Remove unused

Signed-off-by: Prithvi Raj <p.r@uber.com>

* Cleanup

Signed-off-by: Prithvi Raj <p.r@uber.com>

* Remove unused var

Signed-off-by: Prithvi Raj <p.r@uber.com>

* Address feedback

Signed-off-by: Prithvi Raj <p.r@uber.com>

* Address feedback

Signed-off-by: Prithvi Raj <p.r@uber.com>
  • Loading branch information
vprithvi authored and yurishkuro committed Oct 11, 2019
1 parent e5ad840 commit ff64c7c
Show file tree
Hide file tree
Showing 12 changed files with 141 additions and 49 deletions.
1 change: 1 addition & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

101 changes: 80 additions & 21 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ package jaeger
import (
"errors"
"fmt"
"go.uber.org/atomic"
"strconv"
"strings"
)

const (
flagSampled = byte(1)
flagDebug = byte(2)
flagFirehose = byte(8)
flagSampled = 1
flagDebug = 2
flagFirehose = 8
)

var (
Expand Down Expand Up @@ -56,9 +57,6 @@ type SpanContext struct {
// Should be 0 if the current span is a root span.
parentID SpanID

// flags is a bitmap containing such bits as 'sampled' and 'debug'.
flags byte

// Distributed Context baggage. The is a snapshot in time.
baggage map[string]string

Expand All @@ -67,6 +65,65 @@ type SpanContext struct {
//
// See JaegerDebugHeader in constants.go
debugID string

// samplingState is shared across all spans
samplingState *samplingState
}

type samplingState struct {
stateFlags atomic.Int32 // Only lower 8 bits are used. We use an int32 instead of a byte to use CAS operations
}

func (s *samplingState) setFlag(newFlag int32) {
swapped := false
for !swapped {
old := s.stateFlags.Load()
swapped = s.stateFlags.CAS(old, old|newFlag)
}
}

func (s *samplingState) unsetFlag(newFlag int32) {
swapped := false
for !swapped {
old := s.stateFlags.Load()
swapped = s.stateFlags.CAS(old, old&^newFlag)
}
}

func (s *samplingState) setSampled() {
s.setFlag(flagSampled)
}

func (s *samplingState) unsetSampled() {
s.unsetFlag(flagSampled)
}

func (s *samplingState) setDebugAndSampled() {
s.setFlag(flagDebug | flagSampled)
}

func (s *samplingState) setFirehose() {
s.setFlag(flagFirehose)
}

func (s *samplingState) setFlags(flags byte) {
s.stateFlags.Store(int32(flags))
}

func (s *samplingState) flags() byte {
return byte(s.stateFlags.Load())
}

func (s *samplingState) isSampled() bool {
return s.stateFlags.Load()&flagSampled == flagSampled
}

func (s *samplingState) isDebug() bool {
return s.stateFlags.Load()&flagDebug == flagDebug
}

func (s *samplingState) isFirehose() bool {
return s.stateFlags.Load()&flagFirehose == flagFirehose
}

// ForeachBaggageItem implements ForeachBaggageItem() of opentracing.SpanContext
Expand All @@ -81,17 +138,17 @@ func (c SpanContext) ForeachBaggageItem(handler func(k, v string) bool) {
// IsSampled returns whether this trace was chosen for permanent storage
// by the sampling mechanism of the tracer.
func (c SpanContext) IsSampled() bool {
return (c.flags & flagSampled) == flagSampled
return c.samplingState.isSampled()
}

// IsDebug indicates whether sampling was explicitly requested by the service.
func (c SpanContext) IsDebug() bool {
return (c.flags & flagDebug) == flagDebug
return c.samplingState.isDebug()
}

// IsFirehose indicates whether the firehose flag was set
func (c SpanContext) IsFirehose() bool {
return (c.flags & flagFirehose) == flagFirehose
return c.samplingState.isFirehose()
}

// IsValid indicates whether this context actually represents a valid trace.
Expand All @@ -101,9 +158,9 @@ func (c SpanContext) IsValid() bool {

func (c SpanContext) String() string {
if c.traceID.High == 0 {
return fmt.Sprintf("%x:%x:%x:%x", c.traceID.Low, uint64(c.spanID), uint64(c.parentID), c.flags)
return fmt.Sprintf("%x:%x:%x:%x", c.traceID.Low, uint64(c.spanID), uint64(c.parentID), c.samplingState.stateFlags.Load())
}
return fmt.Sprintf("%x%016x:%x:%x:%x", c.traceID.High, c.traceID.Low, uint64(c.spanID), uint64(c.parentID), c.flags)
return fmt.Sprintf("%x%016x:%x:%x:%x", c.traceID.High, c.traceID.Low, uint64(c.spanID), uint64(c.parentID), c.samplingState.stateFlags.Load())
}

// ContextFromString reconstructs the Context encoded in a string
Expand All @@ -130,7 +187,8 @@ func ContextFromString(value string) (SpanContext, error) {
if err != nil {
return emptyContext, err
}
context.flags = byte(flags)
context.samplingState = &samplingState{}
context.samplingState.setFlags(byte(flags))
return context, nil
}

Expand All @@ -151,16 +209,17 @@ func (c SpanContext) ParentID() SpanID {

// NewSpanContext creates a new instance of SpanContext
func NewSpanContext(traceID TraceID, spanID, parentID SpanID, sampled bool, baggage map[string]string) SpanContext {
flags := byte(0)
samplingState := &samplingState{}
if sampled {
flags = flagSampled
samplingState.setSampled()
}

return SpanContext{
traceID: traceID,
spanID: spanID,
parentID: parentID,
flags: flags,
baggage: baggage}
traceID: traceID,
spanID: spanID,
parentID: parentID,
samplingState: samplingState,
baggage: baggage}
}

// CopyFrom copies data from ctx into this context, including span identity and baggage.
Expand All @@ -169,7 +228,7 @@ func (c *SpanContext) CopyFrom(ctx *SpanContext) {
c.traceID = ctx.traceID
c.spanID = ctx.spanID
c.parentID = ctx.parentID
c.flags = ctx.flags
c.samplingState = ctx.samplingState
if l := len(ctx.baggage); l > 0 {
c.baggage = make(map[string]string, l)
for k, v := range ctx.baggage {
Expand All @@ -193,7 +252,7 @@ func (c SpanContext) WithBaggageItem(key, value string) SpanContext {
newBaggage[key] = value
}
// Use positional parameters so the compiler will help catch new fields.
return SpanContext{c.traceID, c.spanID, c.parentID, c.flags, newBaggage, ""}
return SpanContext{c.traceID, c.spanID, c.parentID, newBaggage, "", c.samplingState}
}

// isDebugIDContainerOnly returns true when the instance of the context is only
Expand Down
4 changes: 2 additions & 2 deletions context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ func TestContextFromString(t *testing.T) {
assert.EqualValues(t, TraceID{Low: 1}, ctx.traceID)
assert.EqualValues(t, 1, ctx.spanID)
assert.EqualValues(t, 1, ctx.parentID)
assert.EqualValues(t, 1, ctx.flags)
assert.True(t, ctx.IsSampled())
ctx = NewSpanContext(TraceID{Low: 1}, 1, 1, true, nil)
assert.EqualValues(t, TraceID{Low: 1}, ctx.traceID)
assert.EqualValues(t, 1, ctx.spanID)
assert.EqualValues(t, 1, ctx.parentID)
assert.EqualValues(t, 1, ctx.flags)
assert.True(t, ctx.IsSampled())
assert.Equal(t, "ff", SpanID(255).String())
assert.Equal(t, "ff", TraceID{Low: 255}.String())
assert.Equal(t, "ff00000000000000ff", TraceID{High: 255, Low: 255}.String())
Expand Down
2 changes: 1 addition & 1 deletion jaeger_thrift_span.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func BuildJaegerThrift(span *Span) *j.Span {
SpanId: int64(span.context.spanID),
ParentSpanId: int64(span.context.parentID),
OperationName: span.operationName,
Flags: int32(span.context.flags),
Flags: int32(span.context.samplingState.flags()),
StartTime: startTime,
Duration: duration,
Tags: buildTags(span.tags, span.tracer.options.maxTagValueLength),
Expand Down
8 changes: 6 additions & 2 deletions propagation.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (p *BinaryPropagator) Inject(
if err := binary.Write(carrier, binary.BigEndian, sc.parentID); err != nil {
return err
}
if err := binary.Write(carrier, binary.BigEndian, sc.flags); err != nil {
if err := binary.Write(carrier, binary.BigEndian, sc.samplingState.flags()); err != nil {
return err
}

Expand Down Expand Up @@ -222,6 +222,7 @@ func (p *BinaryPropagator) Extract(abstractCarrier interface{}) (SpanContext, er
return emptyContext, opentracing.ErrInvalidCarrier
}
var ctx SpanContext
ctx.samplingState = &samplingState{}

if err := binary.Read(carrier, binary.BigEndian, &ctx.traceID); err != nil {
return emptyContext, opentracing.ErrSpanContextCorrupted
Expand All @@ -232,9 +233,12 @@ func (p *BinaryPropagator) Extract(abstractCarrier interface{}) (SpanContext, er
if err := binary.Read(carrier, binary.BigEndian, &ctx.parentID); err != nil {
return emptyContext, opentracing.ErrSpanContextCorrupted
}
if err := binary.Read(carrier, binary.BigEndian, &ctx.flags); err != nil {

var flags byte
if err := binary.Read(carrier, binary.BigEndian, &flags); err != nil {
return emptyContext, opentracing.ErrSpanContextCorrupted
}
ctx.samplingState.setFlags(flags)

// Handle the baggage items
var numBaggage int32
Expand Down
21 changes: 14 additions & 7 deletions span.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,22 +301,29 @@ func (s *Span) serviceName() string {
}

// setSamplingPriority returns true if the flag was updated successfully, false otherwise.
// The behavior of setSamplingPriority is surprising
// If noDebugFlagOnForcedSampling is set
// setSamplingPriority(span, 1) always sets only flagSampled
// If noDebugFlagOnForcedSampling is unset, and isDebugAllowed passes
// setSamplingPriority(span, 1) sets both flagSampled and flagDebug
// However,
// setSamplingPriority(span, 0) always only resets flagSampled
//
// This means that doing a setSamplingPriority(span, 1) followed by setSamplingPriority(span, 0) can
// leave flagDebug set
func setSamplingPriority(s *Span, value interface{}) bool {
val, ok := value.(uint16)
if !ok {
return false
}
s.Lock()
defer s.Unlock()
if val == 0 {
s.context.flags = s.context.flags & (^flagSampled)
s.context.samplingState.unsetSampled()
return true
}
if s.tracer.options.noDebugFlagOnForcedSampling {
s.context.flags = s.context.flags | flagSampled
return true
s.context.samplingState.setSampled()
} else if s.tracer.isDebugAllowed(s.operationName) {
s.context.flags = s.context.flags | flagDebug | flagSampled
s.context.samplingState.setDebugAndSampled()
return true
}
return false
Expand All @@ -326,5 +333,5 @@ func setSamplingPriority(s *Span, value interface{}) bool {
func EnableFirehose(s *Span) {
s.Lock()
defer s.Unlock()
s.context.flags |= flagFirehose
s.context.samplingState.setFirehose()
}
16 changes: 16 additions & 0 deletions span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,22 @@ func TestSetTag_SamplingPriority(t *testing.T) {
}
}

func TestUnsetSampledFlagOnly(t *testing.T) {
tracer, closer := NewTracer(
"Arwen",
NewConstSampler(true),
NewNullReporter(),
)
defer closer.Close()

span := tracer.StartSpan("breakfast").(*Span)
ext.SamplingPriority.Set(span, 1)
assert.Equal(t, byte(flagSampled|flagDebug), span.context.samplingState.flags())

ext.SamplingPriority.Set(span, 0)
assert.Equal(t, byte(flagDebug), span.context.samplingState.flags(), "Should reset only sampled flag")
}

func TestSetFirehoseMode(t *testing.T) {
tracer, closer := NewTracer("DOOP", NewConstSampler(true), NewNullReporter())
defer closer.Close()
Expand Down
8 changes: 4 additions & 4 deletions tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,12 +272,12 @@ func (t *Tracer) startSpanWithOptions(
}
ctx.spanID = SpanID(ctx.traceID.Low)
ctx.parentID = 0
ctx.flags = byte(0)
ctx.samplingState = &samplingState{}
if hasParent && parent.isDebugIDContainerOnly() && t.isDebugAllowed(operationName) {
ctx.flags |= (flagSampled | flagDebug)
ctx.samplingState.setDebugAndSampled()
samplerTags = []Tag{{key: JaegerDebugHeader, value: parent.debugID}}
} else if sampled, tags := t.sampler.IsSampled(ctx.traceID, operationName); sampled {
ctx.flags |= flagSampled
ctx.samplingState.setSampled()
samplerTags = tags
}
} else {
Expand All @@ -290,7 +290,7 @@ func (t *Tracer) startSpanWithOptions(
ctx.spanID = SpanID(t.randomID())
ctx.parentID = parent.spanID
}
ctx.flags = parent.flags
ctx.samplingState = parent.samplingState
}
if hasParent {
// copy baggage items
Expand Down
6 changes: 2 additions & 4 deletions tracer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,13 +214,11 @@ func (s *tracerSuite) TestSetOperationName() {
func (s *tracerSuite) TestSamplerEffects() {
s.tracer.(*Tracer).sampler = NewConstSampler(true)
sp := s.tracer.StartSpan("test")
flags := sp.(*Span).context.flags
s.EqualValues(flagSampled, flags&flagSampled)
s.True(sp.(*Span).context.IsSampled())

s.tracer.(*Tracer).sampler = NewConstSampler(false)
sp = s.tracer.StartSpan("test")
flags = sp.(*Span).context.flags
s.EqualValues(0, flags&flagSampled)
s.False(sp.(*Span).context.IsSampled())
}

func (s *tracerSuite) TestRandomIDNotZero() {
Expand Down
14 changes: 10 additions & 4 deletions transport_udp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,18 @@ func getThriftProcessByteLength(t *testing.T, process *j.Process) int {
return transport.Len()
}

func newSpan() *Span {
span := &Span{operationName: "test-span", tracer: jaegerTracer}
span.context.samplingState = &samplingState{}
return span
}

func TestEmitBatchOverhead(t *testing.T) {
transport := thrift.NewTMemoryBufferLen(1000)
protocolFactory := thrift.NewTCompactProtocolFactory()
client := j.NewAgentClientFactory(transport, protocolFactory)

span := &Span{operationName: "test-span", tracer: jaegerTracer}
span := newSpan()
spanSize := getThriftSpanByteLength(t, span)

tests := []int{1, 2, 14, 15, 377, 500, 65000, 0xFFFF}
Expand Down Expand Up @@ -87,7 +93,7 @@ func TestUDPSenderFlush(t *testing.T) {
require.NoError(t, err)
defer agent.Close()

span := &Span{operationName: "test-span", tracer: jaegerTracer}
span := newSpan()
spanSize := getThriftSpanByteLength(t, span)
processSize := getThriftProcessByteLengthFromTracer(t, jaegerTracer)

Expand Down Expand Up @@ -133,7 +139,7 @@ func TestUDPSenderAppend(t *testing.T) {
require.NoError(t, err)
defer agent.Close()

span := &Span{operationName: "test-span", tracer: jaegerTracer}
span := newSpan()
spanSize := getThriftSpanByteLength(t, span)
processSize := getThriftProcessByteLengthFromTracer(t, jaegerTracer)

Expand Down Expand Up @@ -208,7 +214,7 @@ func TestUDPSenderHugeSpan(t *testing.T) {
require.NoError(t, err)
defer agent.Close()

span := &Span{operationName: "test-span", tracer: jaegerTracer}
span := newSpan()
spanSize := getThriftSpanByteLength(t, span)

sender, err := NewUDPTransport(agent.SpanServerAddr(), spanSize/2+emitBatchOverhead)
Expand Down
Loading

0 comments on commit ff64c7c

Please sign in to comment.