Skip to content
This repository has been archived by the owner on May 23, 2024. It is now read-only.

Upgrade all samplers to v2 #450

Merged
merged 5 commits into from
Oct 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 64 additions & 31 deletions sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func (s *ConstSampler) Equal(other Sampler) bool {
// ProbabilisticSampler is a sampler that randomly samples a certain percentage
// of traces.
type ProbabilisticSampler struct {
legacySamplerV1Base
samplingRate float64
samplingBoundary uint64
tags []Tag
Expand All @@ -120,11 +121,13 @@ func newProbabilisticSampler(samplingRate float64) *ProbabilisticSampler {
{key: SamplerTypeTagKey, value: SamplerTypeProbabilistic},
{key: SamplerParamTagKey, value: samplingRate},
}
return &ProbabilisticSampler{
s := &ProbabilisticSampler{
samplingRate: samplingRate,
samplingBoundary: uint64(float64(maxRandomNumber) * samplingRate),
tags: tags,
}
s.delegate = s.IsSampled
return s
}

// SamplingRate returns the sampling probability this sampled was constructed with.
Expand Down Expand Up @@ -152,7 +155,8 @@ func (s *ProbabilisticSampler) Equal(other Sampler) bool {

// -----------------------

type rateLimitingSampler struct {
type RateLimitingSampler struct {
legacySamplerV1Base
maxTracesPerSecond float64
rateLimiter utils.RateLimiter
tags []Tag
Expand All @@ -162,43 +166,45 @@ type rateLimitingSampler struct {
// traces follows burstiness of the service, i.e. a service with uniformly distributed requests will have those
// requests sampled uniformly as well, but if requests are bursty, especially sub-second, then a number of
// sequential requests can be sampled each second.
func NewRateLimitingSampler(maxTracesPerSecond float64) Sampler {
func NewRateLimitingSampler(maxTracesPerSecond float64) *RateLimitingSampler {
tags := []Tag{
{key: SamplerTypeTagKey, value: SamplerTypeRateLimiting},
{key: SamplerParamTagKey, value: maxTracesPerSecond},
}
return &rateLimitingSampler{
s := &RateLimitingSampler{
maxTracesPerSecond: maxTracesPerSecond,
rateLimiter: utils.NewRateLimiter(maxTracesPerSecond, math.Max(maxTracesPerSecond, 1.0)),
tags: tags,
}
s.delegate = s.IsSampled
return s
}

// IsSampled implements IsSampled() of Sampler.
func (s *rateLimitingSampler) IsSampled(id TraceID, operation string) (bool, []Tag) {
func (s *RateLimitingSampler) IsSampled(id TraceID, operation string) (bool, []Tag) {
return s.rateLimiter.CheckCredit(1.0), s.tags
}

func (s *rateLimitingSampler) Close() {
func (s *RateLimitingSampler) Close() {
// nothing to do
}

func (s *rateLimitingSampler) Equal(other Sampler) bool {
if o, ok := other.(*rateLimitingSampler); ok {
func (s *RateLimitingSampler) Equal(other Sampler) bool {
if o, ok := other.(*RateLimitingSampler); ok {
return s.maxTracesPerSecond == o.maxTracesPerSecond
}
return false
}

// -----------------------

// GuaranteedThroughputProbabilisticSampler is a sampler that leverages both probabilisticSampler and
// rateLimitingSampler. The rateLimitingSampler is used as a guaranteed lower bound sampler such that
// GuaranteedThroughputProbabilisticSampler is a sampler that leverages both ProbabilisticSampler and
// RateLimitingSampler. The RateLimitingSampler is used as a guaranteed lower bound sampler such that
// every operation is sampled at least once in a time interval defined by the lowerBound. ie a lowerBound
// of 1.0 / (60 * 10) will sample an operation at least once every 10 minutes.
//
// The probabilisticSampler is given higher priority when tags are emitted, ie. if IsSampled() for both
// samplers return true, the tags for probabilisticSampler will be used.
// The ProbabilisticSampler is given higher priority when tags are emitted, ie. if IsSampled() for both
// samplers return true, the tags for ProbabilisticSampler will be used.
type GuaranteedThroughputProbabilisticSampler struct {
probabilisticSampler *ProbabilisticSampler
lowerBoundSampler Sampler
Expand All @@ -208,7 +214,7 @@ type GuaranteedThroughputProbabilisticSampler struct {
}

// NewGuaranteedThroughputProbabilisticSampler returns a delegating sampler that applies both
// probabilisticSampler and rateLimitingSampler.
// ProbabilisticSampler and RateLimitingSampler.
func NewGuaranteedThroughputProbabilisticSampler(
lowerBound, samplingRate float64,
) (*GuaranteedThroughputProbabilisticSampler, error) {
Expand Down Expand Up @@ -253,7 +259,7 @@ func (s *GuaranteedThroughputProbabilisticSampler) Close() {

// Equal implements Equal() of Sampler.
func (s *GuaranteedThroughputProbabilisticSampler) Equal(other Sampler) bool {
// NB The Equal() function is expensive and will be removed. See adaptiveSampler.Equal() for
// NB The Equal() function is expensive and will be removed. See AdaptiveSampler.Equal() for
// more information.
return false
}
Expand All @@ -269,7 +275,7 @@ func (s *GuaranteedThroughputProbabilisticSampler) update(lowerBound, samplingRa

// -----------------------

type adaptiveSampler struct {
type AdaptiveSampler struct {
sync.RWMutex

samplers map[string]*GuaranteedThroughputProbabilisticSampler
Expand All @@ -278,14 +284,15 @@ type adaptiveSampler struct {
maxOperations int
}

// NewAdaptiveSampler returns a delegating sampler that applies both probabilisticSampler and
// rateLimitingSampler via the guaranteedThroughputProbabilisticSampler. This sampler keeps track of all
// operations and delegates calls to the respective guaranteedThroughputProbabilisticSampler.
func NewAdaptiveSampler(strategies *sampling.PerOperationSamplingStrategies, maxOperations int) (Sampler, error) {
// NewAdaptiveSampler returns a delegating sampler that applies both ProbabilisticSampler and
// RateLimitingSampler via the GuaranteedThroughputProbabilisticSampler. This sampler keeps track of all
// operations and delegates calls to the respective GuaranteedThroughputProbabilisticSampler.
// TODO (breaking change) remove error from return value
func NewAdaptiveSampler(strategies *sampling.PerOperationSamplingStrategies, maxOperations int) (*AdaptiveSampler, error) {
return newAdaptiveSampler(strategies, maxOperations), nil
}

func newAdaptiveSampler(strategies *sampling.PerOperationSamplingStrategies, maxOperations int) Sampler {
func newAdaptiveSampler(strategies *sampling.PerOperationSamplingStrategies, maxOperations int) *AdaptiveSampler {
samplers := make(map[string]*GuaranteedThroughputProbabilisticSampler)
for _, strategy := range strategies.PerOperationStrategies {
sampler := newGuaranteedThroughputProbabilisticSampler(
Expand All @@ -294,20 +301,45 @@ func newAdaptiveSampler(strategies *sampling.PerOperationSamplingStrategies, max
)
samplers[strategy.Operation] = sampler
}
return &adaptiveSampler{
return &AdaptiveSampler{
samplers: samplers,
defaultSampler: newProbabilisticSampler(strategies.DefaultSamplingProbability),
lowerBound: strategies.DefaultLowerBoundTracesPerSecond,
maxOperations: maxOperations,
}
}

func (s *adaptiveSampler) IsSampled(id TraceID, operation string) (bool, []Tag) {
func (s *AdaptiveSampler) IsSampled(id TraceID, operation string) (bool, []Tag) {
return false, nil
}

func (s *AdaptiveSampler) OnCreateSpan(span *Span) SamplingDecision {
operationName := span.OperationName()
samplerV1 := s.getSamplerForOperation(operationName)
sampled, tags := samplerV1.IsSampled(span.context.TraceID(), operationName)
return SamplingDecision{sample: sampled, retryable: true, tags: tags}
}

func (s *AdaptiveSampler) OnSetOperationName(span *Span, operationName string) SamplingDecision {
samplerV1 := s.getSamplerForOperation(operationName)
sampled, tags := samplerV1.IsSampled(span.context.TraceID(), operationName)
return SamplingDecision{sample: sampled, retryable: false, tags: tags}
Comment on lines +324 to +326
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is new behavior - do we require this right now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean new to Go client?

I don't think we can avoid it. The main change is not actually here, but in OnCreateSpan, which must return retryable=true in order for tag sampling to work (otherwise the state will be finalized). So if we already have that in OnCreateSpan, adding OnSetOperationName() doesn't change things that much, imo.

}

func (s *AdaptiveSampler) OnSetTag(span *Span, key string, value interface{}) SamplingDecision {
return SamplingDecision{sample: false, retryable: true}
}

func (s *AdaptiveSampler) OnFinishSpan(span *Span) SamplingDecision {
return SamplingDecision{sample: false, retryable: true}
}

func (s *AdaptiveSampler) getSamplerForOperation(operation string) Sampler {
s.RLock()
sampler, ok := s.samplers[operation]
if ok {
defer s.RUnlock()
return sampler.IsSampled(id, operation)
return sampler
}
s.RUnlock()
s.Lock()
Expand All @@ -316,18 +348,18 @@ func (s *adaptiveSampler) IsSampled(id TraceID, operation string) (bool, []Tag)
// Check if sampler has already been created
sampler, ok = s.samplers[operation]
if ok {
return sampler.IsSampled(id, operation)
return sampler
}
// Store only up to maxOperations of unique ops.
if len(s.samplers) >= s.maxOperations {
return s.defaultSampler.IsSampled(id, operation)
return s.defaultSampler
}
newSampler := newGuaranteedThroughputProbabilisticSampler(s.lowerBound, s.defaultSampler.SamplingRate())
s.samplers[operation] = newSampler
return newSampler.IsSampled(id, operation)
return newSampler
}

func (s *adaptiveSampler) Close() {
func (s *AdaptiveSampler) Close() {
s.Lock()
defer s.Unlock()
for _, sampler := range s.samplers {
Expand All @@ -336,16 +368,17 @@ func (s *adaptiveSampler) Close() {
s.defaultSampler.Close()
}

func (s *adaptiveSampler) Equal(other Sampler) bool {
// NB The Equal() function is overly expensive for adaptiveSampler since it's composed of multiple
// TODO (breaking change) remove this in the future
func (s *AdaptiveSampler) Equal(other Sampler) bool {
// NB The Equal() function is overly expensive for AdaptiveSampler since it's composed of multiple
// samplers which all need to be initialized before this function can be called for a comparison.
// Therefore, adaptiveSampler uses the update() function to only alter the samplers that need
// Therefore, AdaptiveSampler uses the update() function to only alter the samplers that need
// changing. Hence this function always returns false so that the update function can be called.
// Once the Equal() function is removed from the Sampler API, this will no longer be needed.
return false
}

func (s *adaptiveSampler) update(strategies *sampling.PerOperationSamplingStrategies) {
func (s *AdaptiveSampler) update(strategies *sampling.PerOperationSamplingStrategies) {
s.Lock()
defer s.Unlock()
newSamplers := map[string]*GuaranteedThroughputProbabilisticSampler{}
Expand Down
Loading