Skip to content
This repository has been archived by the owner on May 23, 2024. It is now read-only.

Support delayed sampling decisions #447

Merged
merged 7 commits into from
Oct 15, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,23 @@ type Sampler interface {

// ConstSampler is a sampler that always makes the same decision.
type ConstSampler struct {
legacySamplerV1Base
Decision bool
tags []Tag
}

// NewConstSampler creates a ConstSampler.
func NewConstSampler(sample bool) Sampler {
func NewConstSampler(sample bool) *ConstSampler {
tags := []Tag{
{key: SamplerTypeTagKey, value: SamplerTypeConst},
{key: SamplerParamTagKey, value: sample},
}
return &ConstSampler{Decision: sample, tags: tags}
s := &ConstSampler{
Decision: sample,
tags: tags,
}
s.delegate = s.IsSampled
return s
}

// IsSampled implements IsSampled() of Sampler.
Expand Down
68 changes: 68 additions & 0 deletions sampler_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package jaeger

type SamplingDecision struct {
sample bool
retryable bool
tags []Tag
}

type SamplerV2 interface {
OnCreateSpan(span *Span) SamplingDecision
OnSetOperationName(span *Span, operationName string) SamplingDecision
OnSetTag(span *Span, key string, value interface{}) SamplingDecision

// Close does a clean shutdown of the sampler, stopping any background
// go-routines it may have started.
Close()
}

// samplerV1toV2 wraps legacy V1 sampler into an adapter that make it look like V2.
func samplerV1toV2(s Sampler) SamplerV2 {
if s2, ok := s.(SamplerV2); ok {
return s2
}
type legacySamplerV1toV2Adapter struct {
legacySamplerV1Base
}
return &legacySamplerV1toV2Adapter{
legacySamplerV1Base: legacySamplerV1Base{
delegate: s.IsSampled,
},
}
}

// legacySamplerV1Base is used as a base for simple samplers that only implement
// the legacy isSampled() function that is not sensitive to its arguments.
type legacySamplerV1Base struct {
delegate func(id TraceID, operation string) (sampled bool, tags []Tag)
}

func (s *legacySamplerV1Base) OnCreateSpan(span *Span) SamplingDecision {
isSampled, tags := s.delegate(span.context.traceID, span.operationName)
return SamplingDecision{sample: isSampled, retryable: false, tags: tags}
}

func (s *legacySamplerV1Base) OnSetOperationName(span *Span, operationName string) SamplingDecision {
isSampled, tags := s.delegate(span.context.traceID, span.operationName)
return SamplingDecision{sample: isSampled, retryable: false, tags: tags}
}

func (s *legacySamplerV1Base) OnSetTag(span *Span, key string, value interface{}) SamplingDecision {
return SamplingDecision{sample: false, retryable: true}
}

func (s *legacySamplerV1Base) Close() {}
17 changes: 17 additions & 0 deletions sampler_v2_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright (c) 2019 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package jaeger

var _ SamplerV2 = new(ConstSampler)
64 changes: 53 additions & 11 deletions span.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Span struct {

tracer *Tracer

// TODO: (major) change to use a pointer
context SpanContext

// The name of the "operation" this span is an instance of.
Expand Down Expand Up @@ -74,24 +75,36 @@ type Tag struct {
// SetOperationName sets or changes the operation name.
func (s *Span) SetOperationName(operationName string) opentracing.Span {
s.Lock()
defer s.Unlock()
if s.context.IsSampled() {
s.operationName = operationName
s.operationName = operationName
if !s.isSamplingFinalized() {
decision := s.tracer.sampler.OnSetOperationName(s, operationName)
s.applySamplingDecision(decision, false)
}
s.Unlock()
s.observer.OnSetOperationName(operationName)
return s
}

// SetTag implements SetTag() of opentracing.Span
func (s *Span) SetTag(key string, value interface{}) opentracing.Span {
return s.setTagInternal(key, value, true)
}

func (s *Span) setTagInternal(key string, value interface{}, lock bool) opentracing.Span {
s.observer.OnSetTag(key, value)
if key == string(ext.SamplingPriority) && !setSamplingPriority(s, value) {
return s
}
s.Lock()
defer s.Unlock()
if s.context.IsSampled() {
s.setTagNoLocking(key, value)
if !s.isSamplingFinalized() {
decision := s.tracer.sampler.OnSetTag(s, key, value)
s.applySamplingDecision(decision, lock)
}
if s.isWriteable() {
if lock {
s.Lock()
defer s.Unlock()
}
s.appendTagNoLocking(key, value)
}
return s
}
Expand Down Expand Up @@ -128,7 +141,7 @@ func (s *Span) Tags() opentracing.Tags {
return result
}

func (s *Span) setTagNoLocking(key string, value interface{}) {
func (s *Span) appendTagNoLocking(key string, value interface{}) {
s.tags = append(s.tags, Tag{key: key, value: value})
}

Expand All @@ -148,7 +161,7 @@ func (s *Span) logFieldsNoLocking(fields ...log.Field) {
Fields: fields,
Timestamp: time.Now(),
}
s.appendLog(lr)
s.appendLogNoLocking(lr)
}

// LogKV implements opentracing.Span API
Expand Down Expand Up @@ -185,12 +198,12 @@ func (s *Span) Log(ld opentracing.LogData) {
if ld.Timestamp.IsZero() {
ld.Timestamp = s.tracer.timeNow()
}
s.appendLog(ld.ToLogRecord())
s.appendLogNoLocking(ld.ToLogRecord())
}
}

// this function should only be called while holding a Write lock
func (s *Span) appendLog(lr opentracing.LogRecord) {
func (s *Span) appendLogNoLocking(lr opentracing.LogRecord) {
// TODO add logic to limit number of logs per span (issue #46)
s.logs = append(s.logs, lr)
}
Expand Down Expand Up @@ -300,6 +313,34 @@ func (s *Span) serviceName() string {
return s.tracer.serviceName
}

func (s *Span) applySamplingDecision(decision SamplingDecision, lock bool) {
if !decision.retryable {
s.context.samplingState.setFinal()
}
if decision.sample {
s.context.samplingState.setSampled()
if len(decision.tags) > 0 {
if lock {
s.Lock()
defer s.Unlock()
}
for _, tag := range decision.tags {
s.appendTagNoLocking(tag.key, tag.value)
}
}
}
}

// Span can be written to if it is sampled or the sampling decision has not been finalized.
func (s *Span) isWriteable() bool {
state := s.context.samplingState
return !state.isFinal() || state.isSampled()
}

func (s *Span) isSamplingFinalized() bool {
return s.context.samplingState.isFinal()
}

// setSamplingPriority returns true if the flag was updated successfully, false otherwise.
// The behavior of setSamplingPriority is surprising
// If noDebugFlagOnForcedSampling is set
Expand All @@ -322,6 +363,7 @@ func setSamplingPriority(s *Span, value interface{}) bool {
}
if s.tracer.options.noDebugFlagOnForcedSampling {
s.context.samplingState.setSampled()
return true
} else if s.tracer.isDebugAllowed(s.operationName) {
s.context.samplingState.setDebugAndSampled()
return true
Expand Down
21 changes: 19 additions & 2 deletions context.go → span_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,19 @@ type SpanContext struct {

// samplingState is shared across all spans
samplingState *samplingState

// remote indicates that span context represents a remote parent
remote bool
}

type samplingState struct {
stateFlags atomic.Int32 // Only lower 8 bits are used. We use an int32 instead of a byte to use CAS operations
// Span context's state flags that are propagated across processes. Only lower 8 bits are used.
// We use an int32 instead of byte to be able to use CAS operations.
stateFlags atomic.Int32

// When state is not final, sampling will be retried on other span write operations,
// like SetOperationName / SetTag, and the spans will remain writable.
final atomic.Bool
}

func (s *samplingState) setFlag(newFlag int32) {
Expand Down Expand Up @@ -110,6 +119,10 @@ func (s *samplingState) setFlags(flags byte) {
s.stateFlags.Store(int32(flags))
}

func (s *samplingState) setFinal() {
s.final.Store(true)
}

func (s *samplingState) flags() byte {
return byte(s.stateFlags.Load())
}
Expand All @@ -126,6 +139,10 @@ func (s *samplingState) isFirehose() bool {
return s.stateFlags.Load()&flagFirehose == flagFirehose
}

func (s *samplingState) isFinal() bool {
return s.final.Load()
}

// ForeachBaggageItem implements ForeachBaggageItem() of opentracing.SpanContext
func (c SpanContext) ForeachBaggageItem(handler func(k, v string) bool) {
for k, v := range c.baggage {
Expand Down Expand Up @@ -252,7 +269,7 @@ func (c SpanContext) WithBaggageItem(key, value string) SpanContext {
newBaggage[key] = value
}
// Use positional parameters so the compiler will help catch new fields.
return SpanContext{c.traceID, c.spanID, c.parentID, newBaggage, "", c.samplingState}
return SpanContext{c.traceID, c.spanID, c.parentID, newBaggage, "", c.samplingState, c.remote}
}

// isDebugIDContainerOnly returns true when the instance of the context is only
Expand Down
File renamed without changes.
Loading