Skip to content
This repository has been archived by the owner on May 23, 2024. It is now read-only.

Support delayed sampling decisions #447

Merged
merged 7 commits into from
Oct 15, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,32 @@ type Metrics struct {
// Number of traces started by this tracer as not sampled
TracesStartedNotSampled metrics.Counter `metric:"traces" tags:"state=started,sampled=n" help:"Number of traces started by this tracer as not sampled"`

// Number of traces started by this tracer with delayed sampling
TracesStartedDelayedSampling metrics.Counter `metric:"traces" tags:"state=started,sampled=n" help:"Number of traces started by this tracer with delayed sampling"`

// Number of externally started sampled traces this tracer joined
TracesJoinedSampled metrics.Counter `metric:"traces" tags:"state=joined,sampled=y" help:"Number of externally started sampled traces this tracer joined"`

// Number of externally started not-sampled traces this tracer joined
TracesJoinedNotSampled metrics.Counter `metric:"traces" tags:"state=joined,sampled=n" help:"Number of externally started not-sampled traces this tracer joined"`

// Number of sampled spans started by this tracer
SpansStartedSampled metrics.Counter `metric:"started_spans" tags:"sampled=y" help:"Number of sampled spans started by this tracer"`
SpansStartedSampled metrics.Counter `metric:"started_spans" tags:"sampled=y" help:"Number of spans started by this tracer as sampled"`

// Number of not sampled spans started by this tracer
SpansStartedNotSampled metrics.Counter `metric:"started_spans" tags:"sampled=n" help:"Number of spans started by this tracer as not sampled"`

// Number of spans with delayed sampling started by this tracer
SpansStartedDelayedSampling metrics.Counter `metric:"started_spans" tags:"sampled=delayed" help:"Number of spans started by this tracer with delayed sampling"`

// Number of unsampled spans started by this tracer
SpansStartedNotSampled metrics.Counter `metric:"started_spans" tags:"sampled=n" help:"Number of unsampled spans started by this tracer"`
// Number of spans finished by this tracer
SpansFinishedSampled metrics.Counter `metric:"finished_spans" tags:"sampled=y" help:"Number of sampled spans finished by this tracer"`

// Number of spans finished by this tracer
SpansFinishedNotSampled metrics.Counter `metric:"finished_spans" tags:"sampled=n" help:"Number of not-sampled spans finished by this tracer"`

// Number of spans finished by this tracer
SpansFinished metrics.Counter `metric:"finished_spans" help:"Number of spans finished by this tracer"`
SpansFinishedDelayedSampling metrics.Counter `metric:"finished_spans" tags:"sampled=delayed" help:"Number of spans with delayed sampling finished by this tracer"`

// Number of errors decoding tracing context
DecodingErrors metrics.Counter `metric:"span_context_decoding_errors" help:"Number of errors decoding tracing context"`
Expand Down
2 changes: 1 addition & 1 deletion propagation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func TestSpanPropagator(t *testing.T) {

metricsFactory.AssertCounterMetrics(t, []metricstest.ExpectedMetric{
{Name: "jaeger.tracer.started_spans", Tags: map[string]string{"sampled": "y"}, Value: 1 + 2*len(tests)},
{Name: "jaeger.tracer.finished_spans", Value: 1 + len(tests)},
{Name: "jaeger.tracer.finished_spans", Tags: map[string]string{"sampled": "y"}, Value: 1 + len(tests)},
{Name: "jaeger.tracer.traces", Tags: map[string]string{"state": "started", "sampled": "y"}, Value: 1},
{Name: "jaeger.tracer.traces", Tags: map[string]string{"state": "joined", "sampled": "y"}, Value: len(tests)},
}...)
Expand Down
2 changes: 2 additions & 0 deletions reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
// Reporter is called by the tracer when a span is completed to report the span to the tracing collector.
type Reporter interface {
// Report submits a new span to collectors, possibly asynchronously and/or with buffering.
// If the reporter is processing Span asynchronously then it needs to Retain() the span,
// and then Release() it when no longer needed, to avoid span data corruption.
Report(span *Span)

// Close does a clean shutdown of the reporter, flushing any traces that may be buffered in memory.
Expand Down
10 changes: 8 additions & 2 deletions sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,23 @@ type Sampler interface {

// ConstSampler is a sampler that always makes the same decision.
type ConstSampler struct {
legacySamplerV1Base
Decision bool
tags []Tag
}

// NewConstSampler creates a ConstSampler.
func NewConstSampler(sample bool) Sampler {
func NewConstSampler(sample bool) *ConstSampler {
tags := []Tag{
{key: SamplerTypeTagKey, value: SamplerTypeConst},
{key: SamplerParamTagKey, value: sample},
}
return &ConstSampler{Decision: sample, tags: tags}
s := &ConstSampler{
Decision: sample,
tags: tags,
}
s.delegate = s.IsSampled
return s
}

// IsSampled implements IsSampled() of Sampler.
Expand Down
73 changes: 73 additions & 0 deletions sampler_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package jaeger

type SamplingDecision struct {
sample bool
retryable bool
tags []Tag
}

type SamplerV2 interface {
OnCreateSpan(span *Span) SamplingDecision
OnSetOperationName(span *Span, operationName string) SamplingDecision
OnSetTag(span *Span, key string, value interface{}) SamplingDecision
OnFinishSpan(span *Span) SamplingDecision

// Close does a clean shutdown of the sampler, stopping any background
// go-routines it may have started.
Close()
}

// samplerV1toV2 wraps legacy V1 sampler into an adapter that make it look like V2.
func samplerV1toV2(s Sampler) SamplerV2 {
if s2, ok := s.(SamplerV2); ok {
return s2
}
type legacySamplerV1toV2Adapter struct {
legacySamplerV1Base
}
return &legacySamplerV1toV2Adapter{
legacySamplerV1Base: legacySamplerV1Base{
delegate: s.IsSampled,
},
}
}

// legacySamplerV1Base is used as a base for simple samplers that only implement
// the legacy isSampled() function that is not sensitive to its arguments.
type legacySamplerV1Base struct {
delegate func(id TraceID, operation string) (sampled bool, tags []Tag)
}

func (s *legacySamplerV1Base) OnCreateSpan(span *Span) SamplingDecision {
isSampled, tags := s.delegate(span.context.traceID, span.operationName)
return SamplingDecision{sample: isSampled, retryable: false, tags: tags}
}

func (s *legacySamplerV1Base) OnSetOperationName(span *Span, operationName string) SamplingDecision {
isSampled, tags := s.delegate(span.context.traceID, span.operationName)
return SamplingDecision{sample: isSampled, retryable: false, tags: tags}
}

func (s *legacySamplerV1Base) OnSetTag(span *Span, key string, value interface{}) SamplingDecision {
return SamplingDecision{sample: false, retryable: true}
}

func (s *legacySamplerV1Base) OnFinishSpan(span *Span) SamplingDecision {
return SamplingDecision{sample: false, retryable: true}
}

func (s *legacySamplerV1Base) Close() {}
17 changes: 17 additions & 0 deletions sampler_v2_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright (c) 2019 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package jaeger

var _ SamplerV2 = new(ConstSampler)
70 changes: 58 additions & 12 deletions span.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Span struct {

tracer *Tracer

// TODO: (major) change to use a pointer
context SpanContext

// The name of the "operation" this span is an instance of.
Expand Down Expand Up @@ -74,24 +75,36 @@ type Tag struct {
// SetOperationName sets or changes the operation name.
func (s *Span) SetOperationName(operationName string) opentracing.Span {
s.Lock()
defer s.Unlock()
if s.context.IsSampled() {
s.operationName = operationName
s.operationName = operationName
if !s.isSamplingFinalized() {
decision := s.tracer.sampler.OnSetOperationName(s, operationName)
s.applySamplingDecision(decision, false)
}
s.Unlock()
s.observer.OnSetOperationName(operationName)
return s
}

// SetTag implements SetTag() of opentracing.Span
func (s *Span) SetTag(key string, value interface{}) opentracing.Span {
return s.setTagInternal(key, value, true)
}

func (s *Span) setTagInternal(key string, value interface{}, lock bool) opentracing.Span {
s.observer.OnSetTag(key, value)
if key == string(ext.SamplingPriority) && !setSamplingPriority(s, value) {
return s
}
s.Lock()
defer s.Unlock()
if s.context.IsSampled() {
s.setTagNoLocking(key, value)
if !s.isSamplingFinalized() {
decision := s.tracer.sampler.OnSetTag(s, key, value)
s.applySamplingDecision(decision, lock)
}
if s.isWriteable() {
if lock {
s.Lock()
defer s.Unlock()
}
s.appendTagNoLocking(key, value)
}
return s
}
Expand Down Expand Up @@ -128,7 +141,7 @@ func (s *Span) Tags() opentracing.Tags {
return result
}

func (s *Span) setTagNoLocking(key string, value interface{}) {
func (s *Span) appendTagNoLocking(key string, value interface{}) {
s.tags = append(s.tags, Tag{key: key, value: value})
}

Expand All @@ -148,7 +161,7 @@ func (s *Span) logFieldsNoLocking(fields ...log.Field) {
Fields: fields,
Timestamp: time.Now(),
}
s.appendLog(lr)
s.appendLogNoLocking(lr)
}

// LogKV implements opentracing.Span API
Expand Down Expand Up @@ -185,12 +198,12 @@ func (s *Span) Log(ld opentracing.LogData) {
if ld.Timestamp.IsZero() {
ld.Timestamp = s.tracer.timeNow()
}
s.appendLog(ld.ToLogRecord())
s.appendLogNoLocking(ld.ToLogRecord())
}
}

// this function should only be called while holding a Write lock
func (s *Span) appendLog(lr opentracing.LogRecord) {
func (s *Span) appendLogNoLocking(lr opentracing.LogRecord) {
// TODO add logic to limit number of logs per span (issue #46)
s.logs = append(s.logs, lr)
}
Expand Down Expand Up @@ -224,8 +237,12 @@ func (s *Span) FinishWithOptions(options opentracing.FinishOptions) {
}
s.observer.OnFinish(options)
s.Lock()
s.duration = options.FinishTime.Sub(s.startTime)
if !s.isSamplingFinalized() {
decision := s.tracer.sampler.OnFinishSpan(s)
s.applySamplingDecision(decision, false)
}
if s.context.IsSampled() {
s.duration = options.FinishTime.Sub(s.startTime)
// Note: bulk logs are not subject to maxLogsPerSpan limit
if options.LogRecords != nil {
s.logs = append(s.logs, options.LogRecords...)
Expand Down Expand Up @@ -300,6 +317,34 @@ func (s *Span) serviceName() string {
return s.tracer.serviceName
}

func (s *Span) applySamplingDecision(decision SamplingDecision, lock bool) {
if !decision.retryable {
s.context.samplingState.setFinal()
}
if decision.sample {
s.context.samplingState.setSampled()
if len(decision.tags) > 0 {
if lock {
s.Lock()
defer s.Unlock()
}
for _, tag := range decision.tags {
s.appendTagNoLocking(tag.key, tag.value)
}
}
}
}

// Span can be written to if it is sampled or the sampling decision has not been finalized.
func (s *Span) isWriteable() bool {
state := s.context.samplingState
return !state.isFinal() || state.isSampled()
}

func (s *Span) isSamplingFinalized() bool {
return s.context.samplingState.isFinal()
}

// setSamplingPriority returns true if the flag was updated successfully, false otherwise.
// The behavior of setSamplingPriority is surprising
// If noDebugFlagOnForcedSampling is set
Expand All @@ -322,6 +367,7 @@ func setSamplingPriority(s *Span, value interface{}) bool {
}
if s.tracer.options.noDebugFlagOnForcedSampling {
s.context.samplingState.setSampled()
return true
} else if s.tracer.isDebugAllowed(s.operationName) {
s.context.samplingState.setDebugAndSampled()
return true
Expand Down
21 changes: 19 additions & 2 deletions context.go → span_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,19 @@ type SpanContext struct {

// samplingState is shared across all spans
samplingState *samplingState

// remote indicates that span context represents a remote parent
remote bool
}

type samplingState struct {
stateFlags atomic.Int32 // Only lower 8 bits are used. We use an int32 instead of a byte to use CAS operations
// Span context's state flags that are propagated across processes. Only lower 8 bits are used.
// We use an int32 instead of byte to be able to use CAS operations.
stateFlags atomic.Int32

// When state is not final, sampling will be retried on other span write operations,
// like SetOperationName / SetTag, and the spans will remain writable.
final atomic.Bool
}

func (s *samplingState) setFlag(newFlag int32) {
Expand Down Expand Up @@ -110,6 +119,10 @@ func (s *samplingState) setFlags(flags byte) {
s.stateFlags.Store(int32(flags))
}

func (s *samplingState) setFinal() {
s.final.Store(true)
}

func (s *samplingState) flags() byte {
return byte(s.stateFlags.Load())
}
Expand All @@ -126,6 +139,10 @@ func (s *samplingState) isFirehose() bool {
return s.stateFlags.Load()&flagFirehose == flagFirehose
}

func (s *samplingState) isFinal() bool {
return s.final.Load()
}

// ForeachBaggageItem implements ForeachBaggageItem() of opentracing.SpanContext
func (c SpanContext) ForeachBaggageItem(handler func(k, v string) bool) {
for k, v := range c.baggage {
Expand Down Expand Up @@ -252,7 +269,7 @@ func (c SpanContext) WithBaggageItem(key, value string) SpanContext {
newBaggage[key] = value
}
// Use positional parameters so the compiler will help catch new fields.
return SpanContext{c.traceID, c.spanID, c.parentID, newBaggage, "", c.samplingState}
return SpanContext{c.traceID, c.spanID, c.parentID, newBaggage, "", c.samplingState, c.remote}
}

// isDebugIDContainerOnly returns true when the instance of the context is only
Expand Down
File renamed without changes.
Loading