Skip to content
This repository has been archived by the owner on May 23, 2024. It is now read-only.

Commit

Permalink
Support delayed sampling decisions (#447)
Browse files Browse the repository at this point in the history
* Support delayed sampling decision

Signed-off-by: Yuri Shkuro <ys@uber.com>

* go fmt

Signed-off-by: Yuri Shkuro <ys@uber.com>

* Add sampling on finishing span; fix metrics

Signed-off-by: Yuri Shkuro <ys@uber.com>

* Remove unused field

Signed-off-by: Yuri Shkuro <ys@uber.com>

* Fix comment

Signed-off-by: Yuri Shkuro <ys@uber.com>

* Fix comment

Signed-off-by: Yuri Shkuro <ys@uber.com>

* Remove empty branch

Signed-off-by: Yuri Shkuro <ys@uber.com>
  • Loading branch information
yurishkuro authored Oct 15, 2019
1 parent ff64c7c commit c9bbde9
Show file tree
Hide file tree
Showing 11 changed files with 255 additions and 82 deletions.
20 changes: 16 additions & 4 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,32 @@ type Metrics struct {
// Number of traces started by this tracer as not sampled
TracesStartedNotSampled metrics.Counter `metric:"traces" tags:"state=started,sampled=n" help:"Number of traces started by this tracer as not sampled"`

// Number of traces started by this tracer with delayed sampling
TracesStartedDelayedSampling metrics.Counter `metric:"traces" tags:"state=started,sampled=n" help:"Number of traces started by this tracer with delayed sampling"`

// Number of externally started sampled traces this tracer joined
TracesJoinedSampled metrics.Counter `metric:"traces" tags:"state=joined,sampled=y" help:"Number of externally started sampled traces this tracer joined"`

// Number of externally started not-sampled traces this tracer joined
TracesJoinedNotSampled metrics.Counter `metric:"traces" tags:"state=joined,sampled=n" help:"Number of externally started not-sampled traces this tracer joined"`

// Number of sampled spans started by this tracer
SpansStartedSampled metrics.Counter `metric:"started_spans" tags:"sampled=y" help:"Number of sampled spans started by this tracer"`
SpansStartedSampled metrics.Counter `metric:"started_spans" tags:"sampled=y" help:"Number of spans started by this tracer as sampled"`

// Number of not sampled spans started by this tracer
SpansStartedNotSampled metrics.Counter `metric:"started_spans" tags:"sampled=n" help:"Number of spans started by this tracer as not sampled"`

// Number of spans with delayed sampling started by this tracer
SpansStartedDelayedSampling metrics.Counter `metric:"started_spans" tags:"sampled=delayed" help:"Number of spans started by this tracer with delayed sampling"`

// Number of unsampled spans started by this tracer
SpansStartedNotSampled metrics.Counter `metric:"started_spans" tags:"sampled=n" help:"Number of unsampled spans started by this tracer"`
// Number of spans finished by this tracer
SpansFinishedSampled metrics.Counter `metric:"finished_spans" tags:"sampled=y" help:"Number of sampled spans finished by this tracer"`

// Number of spans finished by this tracer
SpansFinishedNotSampled metrics.Counter `metric:"finished_spans" tags:"sampled=n" help:"Number of not-sampled spans finished by this tracer"`

// Number of spans finished by this tracer
SpansFinished metrics.Counter `metric:"finished_spans" help:"Number of spans finished by this tracer"`
SpansFinishedDelayedSampling metrics.Counter `metric:"finished_spans" tags:"sampled=delayed" help:"Number of spans with delayed sampling finished by this tracer"`

// Number of errors decoding tracing context
DecodingErrors metrics.Counter `metric:"span_context_decoding_errors" help:"Number of errors decoding tracing context"`
Expand Down
2 changes: 1 addition & 1 deletion propagation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func TestSpanPropagator(t *testing.T) {

metricsFactory.AssertCounterMetrics(t, []metricstest.ExpectedMetric{
{Name: "jaeger.tracer.started_spans", Tags: map[string]string{"sampled": "y"}, Value: 1 + 2*len(tests)},
{Name: "jaeger.tracer.finished_spans", Value: 1 + len(tests)},
{Name: "jaeger.tracer.finished_spans", Tags: map[string]string{"sampled": "y"}, Value: 1 + len(tests)},
{Name: "jaeger.tracer.traces", Tags: map[string]string{"state": "started", "sampled": "y"}, Value: 1},
{Name: "jaeger.tracer.traces", Tags: map[string]string{"state": "joined", "sampled": "y"}, Value: len(tests)},
}...)
Expand Down
2 changes: 2 additions & 0 deletions reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
// Reporter is called by the tracer when a span is completed to report the span to the tracing collector.
type Reporter interface {
// Report submits a new span to collectors, possibly asynchronously and/or with buffering.
// If the reporter is processing Span asynchronously then it needs to Retain() the span,
// and then Release() it when no longer needed, to avoid span data corruption.
Report(span *Span)

// Close does a clean shutdown of the reporter, flushing any traces that may be buffered in memory.
Expand Down
10 changes: 8 additions & 2 deletions sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,23 @@ type Sampler interface {

// ConstSampler is a sampler that always makes the same decision.
type ConstSampler struct {
legacySamplerV1Base
Decision bool
tags []Tag
}

// NewConstSampler creates a ConstSampler.
func NewConstSampler(sample bool) Sampler {
func NewConstSampler(sample bool) *ConstSampler {
tags := []Tag{
{key: SamplerTypeTagKey, value: SamplerTypeConst},
{key: SamplerParamTagKey, value: sample},
}
return &ConstSampler{Decision: sample, tags: tags}
s := &ConstSampler{
Decision: sample,
tags: tags,
}
s.delegate = s.IsSampled
return s
}

// IsSampled implements IsSampled() of Sampler.
Expand Down
73 changes: 73 additions & 0 deletions sampler_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package jaeger

type SamplingDecision struct {
sample bool
retryable bool
tags []Tag
}

type SamplerV2 interface {
OnCreateSpan(span *Span) SamplingDecision
OnSetOperationName(span *Span, operationName string) SamplingDecision
OnSetTag(span *Span, key string, value interface{}) SamplingDecision
OnFinishSpan(span *Span) SamplingDecision

// Close does a clean shutdown of the sampler, stopping any background
// go-routines it may have started.
Close()
}

// samplerV1toV2 wraps legacy V1 sampler into an adapter that make it look like V2.
func samplerV1toV2(s Sampler) SamplerV2 {
if s2, ok := s.(SamplerV2); ok {
return s2
}
type legacySamplerV1toV2Adapter struct {
legacySamplerV1Base
}
return &legacySamplerV1toV2Adapter{
legacySamplerV1Base: legacySamplerV1Base{
delegate: s.IsSampled,
},
}
}

// legacySamplerV1Base is used as a base for simple samplers that only implement
// the legacy isSampled() function that is not sensitive to its arguments.
type legacySamplerV1Base struct {
delegate func(id TraceID, operation string) (sampled bool, tags []Tag)
}

func (s *legacySamplerV1Base) OnCreateSpan(span *Span) SamplingDecision {
isSampled, tags := s.delegate(span.context.traceID, span.operationName)
return SamplingDecision{sample: isSampled, retryable: false, tags: tags}
}

func (s *legacySamplerV1Base) OnSetOperationName(span *Span, operationName string) SamplingDecision {
isSampled, tags := s.delegate(span.context.traceID, span.operationName)
return SamplingDecision{sample: isSampled, retryable: false, tags: tags}
}

func (s *legacySamplerV1Base) OnSetTag(span *Span, key string, value interface{}) SamplingDecision {
return SamplingDecision{sample: false, retryable: true}
}

func (s *legacySamplerV1Base) OnFinishSpan(span *Span) SamplingDecision {
return SamplingDecision{sample: false, retryable: true}
}

func (s *legacySamplerV1Base) Close() {}
17 changes: 17 additions & 0 deletions sampler_v2_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright (c) 2019 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package jaeger

var _ SamplerV2 = new(ConstSampler)
70 changes: 58 additions & 12 deletions span.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Span struct {

tracer *Tracer

// TODO: (major) change to use a pointer
context SpanContext

// The name of the "operation" this span is an instance of.
Expand Down Expand Up @@ -74,24 +75,36 @@ type Tag struct {
// SetOperationName sets or changes the operation name.
func (s *Span) SetOperationName(operationName string) opentracing.Span {
s.Lock()
defer s.Unlock()
if s.context.IsSampled() {
s.operationName = operationName
s.operationName = operationName
if !s.isSamplingFinalized() {
decision := s.tracer.sampler.OnSetOperationName(s, operationName)
s.applySamplingDecision(decision, false)
}
s.Unlock()
s.observer.OnSetOperationName(operationName)
return s
}

// SetTag implements SetTag() of opentracing.Span
func (s *Span) SetTag(key string, value interface{}) opentracing.Span {
return s.setTagInternal(key, value, true)
}

func (s *Span) setTagInternal(key string, value interface{}, lock bool) opentracing.Span {
s.observer.OnSetTag(key, value)
if key == string(ext.SamplingPriority) && !setSamplingPriority(s, value) {
return s
}
s.Lock()
defer s.Unlock()
if s.context.IsSampled() {
s.setTagNoLocking(key, value)
if !s.isSamplingFinalized() {
decision := s.tracer.sampler.OnSetTag(s, key, value)
s.applySamplingDecision(decision, lock)
}
if s.isWriteable() {
if lock {
s.Lock()
defer s.Unlock()
}
s.appendTagNoLocking(key, value)
}
return s
}
Expand Down Expand Up @@ -128,7 +141,7 @@ func (s *Span) Tags() opentracing.Tags {
return result
}

func (s *Span) setTagNoLocking(key string, value interface{}) {
func (s *Span) appendTagNoLocking(key string, value interface{}) {
s.tags = append(s.tags, Tag{key: key, value: value})
}

Expand All @@ -148,7 +161,7 @@ func (s *Span) logFieldsNoLocking(fields ...log.Field) {
Fields: fields,
Timestamp: time.Now(),
}
s.appendLog(lr)
s.appendLogNoLocking(lr)
}

// LogKV implements opentracing.Span API
Expand Down Expand Up @@ -185,12 +198,12 @@ func (s *Span) Log(ld opentracing.LogData) {
if ld.Timestamp.IsZero() {
ld.Timestamp = s.tracer.timeNow()
}
s.appendLog(ld.ToLogRecord())
s.appendLogNoLocking(ld.ToLogRecord())
}
}

// this function should only be called while holding a Write lock
func (s *Span) appendLog(lr opentracing.LogRecord) {
func (s *Span) appendLogNoLocking(lr opentracing.LogRecord) {
// TODO add logic to limit number of logs per span (issue #46)
s.logs = append(s.logs, lr)
}
Expand Down Expand Up @@ -224,8 +237,12 @@ func (s *Span) FinishWithOptions(options opentracing.FinishOptions) {
}
s.observer.OnFinish(options)
s.Lock()
s.duration = options.FinishTime.Sub(s.startTime)
if !s.isSamplingFinalized() {
decision := s.tracer.sampler.OnFinishSpan(s)
s.applySamplingDecision(decision, false)
}
if s.context.IsSampled() {
s.duration = options.FinishTime.Sub(s.startTime)
// Note: bulk logs are not subject to maxLogsPerSpan limit
if options.LogRecords != nil {
s.logs = append(s.logs, options.LogRecords...)
Expand Down Expand Up @@ -300,6 +317,34 @@ func (s *Span) serviceName() string {
return s.tracer.serviceName
}

func (s *Span) applySamplingDecision(decision SamplingDecision, lock bool) {
if !decision.retryable {
s.context.samplingState.setFinal()
}
if decision.sample {
s.context.samplingState.setSampled()
if len(decision.tags) > 0 {
if lock {
s.Lock()
defer s.Unlock()
}
for _, tag := range decision.tags {
s.appendTagNoLocking(tag.key, tag.value)
}
}
}
}

// Span can be written to if it is sampled or the sampling decision has not been finalized.
func (s *Span) isWriteable() bool {
state := s.context.samplingState
return !state.isFinal() || state.isSampled()
}

func (s *Span) isSamplingFinalized() bool {
return s.context.samplingState.isFinal()
}

// setSamplingPriority returns true if the flag was updated successfully, false otherwise.
// The behavior of setSamplingPriority is surprising
// If noDebugFlagOnForcedSampling is set
Expand All @@ -322,6 +367,7 @@ func setSamplingPriority(s *Span, value interface{}) bool {
}
if s.tracer.options.noDebugFlagOnForcedSampling {
s.context.samplingState.setSampled()
return true
} else if s.tracer.isDebugAllowed(s.operationName) {
s.context.samplingState.setDebugAndSampled()
return true
Expand Down
21 changes: 19 additions & 2 deletions context.go → span_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,19 @@ type SpanContext struct {

// samplingState is shared across all spans
samplingState *samplingState

// remote indicates that span context represents a remote parent
remote bool
}

type samplingState struct {
stateFlags atomic.Int32 // Only lower 8 bits are used. We use an int32 instead of a byte to use CAS operations
// Span context's state flags that are propagated across processes. Only lower 8 bits are used.
// We use an int32 instead of byte to be able to use CAS operations.
stateFlags atomic.Int32

// When state is not final, sampling will be retried on other span write operations,
// like SetOperationName / SetTag, and the spans will remain writable.
final atomic.Bool
}

func (s *samplingState) setFlag(newFlag int32) {
Expand Down Expand Up @@ -110,6 +119,10 @@ func (s *samplingState) setFlags(flags byte) {
s.stateFlags.Store(int32(flags))
}

func (s *samplingState) setFinal() {
s.final.Store(true)
}

func (s *samplingState) flags() byte {
return byte(s.stateFlags.Load())
}
Expand All @@ -126,6 +139,10 @@ func (s *samplingState) isFirehose() bool {
return s.stateFlags.Load()&flagFirehose == flagFirehose
}

func (s *samplingState) isFinal() bool {
return s.final.Load()
}

// ForeachBaggageItem implements ForeachBaggageItem() of opentracing.SpanContext
func (c SpanContext) ForeachBaggageItem(handler func(k, v string) bool) {
for k, v := range c.baggage {
Expand Down Expand Up @@ -252,7 +269,7 @@ func (c SpanContext) WithBaggageItem(key, value string) SpanContext {
newBaggage[key] = value
}
// Use positional parameters so the compiler will help catch new fields.
return SpanContext{c.traceID, c.spanID, c.parentID, newBaggage, "", c.samplingState}
return SpanContext{c.traceID, c.spanID, c.parentID, newBaggage, "", c.samplingState, c.remote}
}

// isDebugIDContainerOnly returns true when the instance of the context is only
Expand Down
File renamed without changes.
Loading

0 comments on commit c9bbde9

Please sign in to comment.