Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[probabilistic sampling processor] encoded sampling probability (support OTEP 235) #31894

Merged
merged 99 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
99 commits
Select commit Hold shift + click to select a range
e822a9b
Add t-value sampler draft
jmacd May 12, 2023
1bc6017
copy/import tracestate parser package
jmacd May 15, 2023
d1fd891
test ot tracestate
jmacd May 16, 2023
85e4472
tidy
jmacd May 16, 2023
bb75f8a
renames
jmacd May 16, 2023
6a57b77
testing two parsers w/ generic code
jmacd May 17, 2023
7fa8130
integrated
jmacd May 17, 2023
36230e7
Comments
jmacd May 17, 2023
7bae35c
revert two files
jmacd May 17, 2023
9010a67
Update with r, s, and t-value. Now using regexps and strings.IndexBy…
jmacd Jun 1, 2023
0e27e40
fix sampler build
jmacd Jun 1, 2023
efcdc3d
add support for s-value for non-consistent mode
jmacd Jun 1, 2023
939c758
WIP
jmacd Jul 10, 2023
b9a1e56
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
jmacd Aug 2, 2023
a31266c
use new proposed syntax see https://github.com/open-telemetry/opentel…
jmacd Aug 2, 2023
690cd64
update tracestate libs for new encoding
jmacd Aug 2, 2023
c8baf29
wip working on probabilistic sampler with two new modes: downsampler …
jmacd Aug 2, 2023
7f47e4a
unsigned implement split
jmacd Aug 3, 2023
422e0b2
two implementations
jmacd Aug 3, 2023
787b9fd
wip
jmacd Sep 5, 2023
ed36f03
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
jmacd Sep 6, 2023
d795210
Updates for OTEP 235
jmacd Sep 6, 2023
09000f7
wip TODO
jmacd Sep 6, 2023
a4d467b
versions.yaml
jmacd Sep 6, 2023
e373b9b
Add proportional sampler mode; comment on TODOs; create SamplerMode t…
jmacd Sep 7, 2023
fe6a085
back from internal
jmacd Oct 4, 2023
396efb1
wip
jmacd Oct 4, 2023
36de5dd
fix existing tests
jmacd Oct 6, 2023
f1aa0ad
:wip:
jmacd Oct 12, 2023
700734e
Update for rejection threshold
jmacd Nov 15, 2023
ae50bdd
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
jmacd Nov 15, 2023
a94b8e7
fix preexisting tests
jmacd Nov 16, 2023
4edcbcb
basic yes/no t-value sampling test
jmacd Nov 16, 2023
53bf119
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
jmacd Nov 29, 2023
3cdb957
add version for sampling pkg
jmacd Nov 29, 2023
e506847
more testing
jmacd Dec 7, 2023
2cddfeb
add probability to threshold with precision option
jmacd Dec 8, 2023
f69d6ee
ProbabilityToThresholdWithPrecision
jmacd Dec 8, 2023
cc02934
test coverage for equalizing and proportional
jmacd Dec 8, 2023
1eecc4a
config test
jmacd Dec 8, 2023
2159107
comments and notes
jmacd Dec 8, 2023
e0898a6
update README
jmacd Dec 8, 2023
d0991ed
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
jmacd Jan 10, 2024
a002774
Remove sampling pkg, it is now upstream
jmacd Feb 14, 2024
3a49922
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
jmacd Feb 28, 2024
fca0184
build w/ new sampling pkg
jmacd Feb 28, 2024
f11e0a4
more test coverage
jmacd Feb 29, 2024
3f495a6
more config tests
jmacd Feb 29, 2024
581095c
test precision underflow
jmacd Mar 1, 2024
7b8fd31
test warning logs
jmacd Mar 1, 2024
1a6be4f
hash_seed fixes
jmacd Mar 1, 2024
712cf17
wip
jmacd Mar 4, 2024
34c0d3b
aip
jmacd Mar 5, 2024
7742668
wip-refactoring
jmacd Mar 13, 2024
8d60168
refactor wip
jmacd Mar 14, 2024
3779caa
cleanup refactor
jmacd Mar 14, 2024
c261ac1
wip
jmacd Mar 14, 2024
34469e4
moving code
jmacd Mar 15, 2024
8dabf47
fix tests; round up small probs to avoid errors
jmacd Mar 15, 2024
d44afb5
preserve legacy behavior
jmacd Mar 15, 2024
1cf9991
logs handled sampling priority differently
jmacd Mar 15, 2024
365d35d
still two errors
jmacd Mar 18, 2024
12a3047
builds
jmacd Mar 19, 2024
8655f42
needs testing
jmacd Mar 19, 2024
468e6c6
fixing tests
jmacd Mar 21, 2024
23b4423
cleanup
jmacd Mar 21, 2024
07841e5
remove strict feature
jmacd Mar 21, 2024
6936bc4
tests fixed
jmacd Mar 21, 2024
c132f4c
wip
jmacd Mar 22, 2024
bd13ac9
typo
jmacd Mar 22, 2024
aa33b1c
more logs tests
jmacd Mar 22, 2024
06556dc
Add more comments
jmacd Mar 22, 2024
a4940e6
update README
jmacd Mar 22, 2024
4f616e9
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
jmacd Mar 22, 2024
b4ca3aa
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
jmacd Mar 25, 2024
fdd26ac
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
jmacd Mar 25, 2024
794d1a1
wip update
jmacd May 30, 2024
a305a7f
undo comment changes
jmacd May 30, 2024
98433af
test all modes logs missing randomness
jmacd May 30, 2024
3aa4608
more missing rando
jmacd May 30, 2024
a0bc49e
smaller diff
jmacd May 30, 2024
d0aea21
comment carrier
jmacd May 30, 2024
7b81625
chlog
jmacd May 30, 2024
fe4dd37
simplify ctcom
jmacd May 30, 2024
a244866
lint
jmacd May 30, 2024
89331bc
combine README updates
jmacd May 30, 2024
04d65c4
tidy
jmacd May 30, 2024
9cb1586
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
jmacd May 30, 2024
a98db61
Add sampler mode use-cases
jmacd Jun 10, 2024
d33660b
rephrase tracestate; logs do not use tracestate
jmacd Jun 10, 2024
c67350d
explain sampling precision
jmacd Jun 10, 2024
b0a9516
move misplaced text
jmacd Jun 10, 2024
95ecbae
remove multierr
jmacd Jun 10, 2024
cbcc853
Apply suggestions from code review
jmacd Jun 11, 2024
ad32651
only debug and info
jmacd Jun 11, 2024
6b71ea8
adjust test for debug-level logs
jmacd Jun 11, 2024
61abf1f
revert change of default mode
jmacd Jun 11, 2024
0664ea1
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
jmacd Jun 12, 2024
1926afb
Merge branch 'main' into jmacd/tvaluesampler
jmacd Jun 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
build w/ new sampling pkg
  • Loading branch information
jmacd committed Feb 28, 2024
commit fca01849c0bff5ed43924c42ab9032dc29f50927
2 changes: 1 addition & 1 deletion pkg/sampling/probability.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func ProbabilityToThresholdWithPrecision(prob float64, prec uint8) (Threshold, e

// Check if leading zeros plus precision is above the maximum.
// This is called underflow because the requested precision
// leads to complete no significant figures.
// leads to no significant figures.
if prec > NumHexDigits {
return AlwaysSampleThreshold, ErrPrecisionUnderflow
}
Expand Down
8 changes: 7 additions & 1 deletion processor/probabilisticsamplerprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ type Config struct {
// probability of each span by `SamplingProbability`.
SamplerMode SamplerMode `mapstructure:"mode"`

// StrictRandomness indicates whether input is expected to
// check the W3C Trace Context Level 2 Random flag before
// consistent probability sampling. It is unlikely this will
// be useful until support for the flag is widely deployed,
StrictRandomness bool `mapstructure:"strict_randomness"`

///////
// Logs only fields below.

Expand All @@ -82,7 +88,7 @@ type Config struct {
SamplingPriority string `mapstructure:"sampling_priority"`

// How many hex digits of th: value to use, max, from 1 up to
// 14. Default is 3.
// 14. Default is 5.
SamplingPrecision uint8 `mapstructure:"sampling_precision"`
}

Expand Down
2 changes: 1 addition & 1 deletion processor/probabilisticsamplerprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestLoadConfig(t *testing.T) {
id: component.NewIDWithName(metadata.Type, "logs"),
expected: &Config{
SamplingPercentage: 15.3,
SamplingPrecision: 3,
SamplingPrecision: defaultPrecision,
HashSeed: 22,
SamplerMode: "hash_seed",
AttributeSource: "record",
Expand Down
2 changes: 1 addition & 1 deletion processor/probabilisticsamplerprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (

var onceMetrics sync.Once

const defaultPrecision = 3
const defaultPrecision = 5

// NewFactory returns a new factory for the Probabilistic sampler processor.
func NewFactory() processor.Factory {
Expand Down
193 changes: 114 additions & 79 deletions processor/probabilisticsamplerprocessor/tracesprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,31 +44,56 @@ const (
numHashBuckets = 0x4000 // Using a power of 2 to avoid division.
bitMaskHashBuckets = numHashBuckets - 1
percentageScaleFactor = numHashBuckets / 100.0

// randomFlagValue is defined in W3C Trace Context Level 2.
randomFlagValue = 0x2
)

var ErrInconsistentArrivingTValue = fmt.Errorf("inconsistent arriving t-value: span should not have been sampled")
var (
ErrInconsistentArrivingTValue = fmt.Errorf("inconsistent arriving t-value: span should not have been sampled")
ErrMissingRandomness = fmt.Errorf("missing randomness; trace flag not set")
)

type traceSampler interface {
// decide reports the result based on a probabilistic decision.
decide(s ptrace.Span) (bool, *sampling.W3CTraceState, error)

// updateTracestate modifies the OTelTraceState assuming it will be
// sampled, probabilistically or otherwise. The "should" parameter
// is the result from decide(), for the span's TraceID, which
// will not be recalculated.
updateTracestate(tid pcommon.TraceID, should bool, wts *sampling.W3CTraceState) error
// is the result from decide().
updateTracestate(tid pcommon.TraceID, should bool, wts *sampling.W3CTraceState)
}

type traceProcessor struct {
sampler traceSampler
logger *zap.Logger
}

// inconsistentCommon implements updateTracestate() for samplers that
// do not use OTel consistent sampling.
type inconsistentCommon struct {
}

// traceHasher is the original hash-based implementation.
type traceHasher struct {
// Hash-based calculation
hashScaledSamplerate uint32
hashSeed uint32
probability float64

inconsistentCommon
}

// zeroProbability is a bypass for all cases with Percent==0.
type zeroProbability struct {
inconsistentCommon
}

// inconsistentCommon implements updateTracestate() for samplers that
// use OTel consistent sampling.
type consistentCommon struct {
// strict randomness checking
strict bool
}

// traceEqualizer adjusts thresholds absolutely. Cannot be used with zero.
Expand All @@ -78,37 +103,64 @@ type traceEqualizer struct {

// tValueEncoding includes the leading "t:"
tValueEncoding string

consistentCommon
}

// traceEqualizer adjusts thresholds relatively. Cannot be used with zero.
type traceProportionalizer struct {
// ratio in the range [2**-56, 1]
ratio float64
prec uint8

// precision in number of hex digits
prec uint8

consistentCommon
}

// zeroProbability is a bypass for all cases with Percent==0.
type zeroProbability struct {
func (*consistentCommon) updateTracestate(tid pcommon.TraceID, should bool, wts *sampling.W3CTraceState) {
// When this sampler decided not to sample, the t-value becomes zero.
if !should {
wts.OTelValue().ClearTValue()
}
}

func randomnessFromSpan(s ptrace.Span) (sampling.Randomness, *sampling.W3CTraceState, error) {
func (cc *consistentCommon) randomnessFromSpan(s ptrace.Span) (sampling.Randomness, *sampling.W3CTraceState, error) {
state := s.TraceState()
raw := state.AsRaw()

// Parse the arriving TraceState.
wts, err := sampling.NewW3CTraceState(raw)
var randomness sampling.Randomness
if err == nil && wts.OTelValue().HasRValue() {
// When the tracestate is OK and has r-value, use it.
randomness = wts.OTelValue().RValueRandomness()
} else {
// See https://github.com/open-telemetry/opentelemetry-proto/pull/503
// which merged but unreleased at the time of writing.
//
// Note: When we have an additional flag indicating this
// randomness is present we should inspect the flag
// and return that no randomness is available, here.
randomness = sampling.TraceIDToRandomness(s.TraceID())
if err == nil {
if rv, has := wts.OTelValue().RValueRandomness(); has {
// When the tracestate is OK and has r-value, use it.
randomness = rv
} else if cc.strict && (s.Flags()&randomFlagValue) != randomFlagValue {
// If strict and the flag is missing
err = ErrMissingRandomness
} else {
// Whether !strict or the random flag is correctly set.
randomness = sampling.TraceIDToRandomness(s.TraceID())
}
}

// Consistency check: if the TraceID is out of range, the
// TValue is a lie. If inconsistent, clear it and return an error.
if err == nil {
otts := wts.OTelValue()
if tv, has := otts.TValueThreshold(); has {
if !tv.ShouldSample(randomness) {
if cc.strict {
err = ErrInconsistentArrivingTValue
} else {
// TODO: warning?
otts.ClearTValue()
}
}
}
}

return randomness, &wts, err
}

Expand Down Expand Up @@ -158,11 +210,17 @@ func newTracesProcessor(ctx context.Context, set processor.CreateSettings, cfg *
tp.sampler = &traceEqualizer{
tValueEncoding: threshold.TValue(),
traceIDThreshold: threshold,
consistentCommon: consistentCommon{
strict: cfg.StrictRandomness,
},
}
case Proportional:
tp.sampler = &traceProportionalizer{
ratio: ratio,
prec: cfg.SamplingPrecision,
consistentCommon: consistentCommon{
strict: cfg.StrictRandomness,
},
}
}
}
Expand All @@ -185,90 +243,70 @@ func (ts *traceHasher) decide(s ptrace.Span) (bool, *sampling.W3CTraceState, err
return decision, nil, nil
}

func (ts *traceHasher) updateTracestate(_ pcommon.TraceID, _ bool, _ *sampling.W3CTraceState) error {
// No changes; any t-value will pass through.
return nil
func (*inconsistentCommon) updateTracestate(_ pcommon.TraceID, _ bool, _ *sampling.W3CTraceState) {
}

func (ts *traceEqualizer) decide(s ptrace.Span) (bool, *sampling.W3CTraceState, error) {
rnd, wts, err := randomnessFromSpan(s)
rnd, wts, err := ts.randomnessFromSpan(s)
if err != nil {
return false, nil, err
}
otts := wts.OTelValue()
// Consistency check: if the TraceID is out of range, the
// TValue is a lie. If inconsistent, clear it.
if otts.HasTValue() {
if !otts.TValueThreshold().ShouldSample(rnd) {
err = ErrInconsistentArrivingTValue
otts.ClearTValue()
}
} else if !otts.HasTValue() {
// Note: We could in this case attach another
// tracestate to signify that the incoming sampling
// threshold was at one point unknown.
}

return ts.traceIDThreshold.ShouldSample(rnd), wts, err
}

func (ts *traceEqualizer) updateTracestate(tid pcommon.TraceID, should bool, wts *sampling.W3CTraceState) error {
// When this sampler decided not to sample, the t-value becomes zero.
// Incoming TValue consistency is not checked when this happens.
if !should {
wts.OTelValue().ClearTValue()
return nil
should := ts.traceIDThreshold.ShouldSample(rnd)
if should {
// This error is unchecked by the rules of consistent probability sampling.
// If it was sampled correctly before, and it is still sampled after this
// decision, then the rejection threshold must be rising.
_ = wts.OTelValue().UpdateTValueWithSampling(ts.traceIDThreshold, ts.tValueEncoding)
}
// Spans that appear consistently sampled but arrive w/ zero
// adjusted count remain zero.
return wts.OTelValue().UpdateTValueWithSampling(ts.traceIDThreshold, ts.tValueEncoding)

return should, wts, err
}

func (ts *traceProportionalizer) decide(s ptrace.Span) (bool, *sampling.W3CTraceState, error) {
rnd, wts, err := randomnessFromSpan(s)
rnd, wts, err := ts.randomnessFromSpan(s)
if err != nil {
return false, nil, err
}

incoming := 1.0
otts := wts.OTelValue()
// Consistency check: if the TraceID is out of range, the
// TValue is a lie. If inconsistent, clear it.
if otts.HasTValue() && !otts.TValueThreshold().ShouldSample(rnd) {
err = ErrInconsistentArrivingTValue
otts.ClearTValue()
if tv, has := otts.TValueThreshold(); has {
incoming = tv.Probability()
}

incoming := 1.0
if otts.HasTValue() {
incoming = otts.TValueThreshold().Probability()
} else {
// Note: We could in this case attach another
// tracestate to signify that the incoming sampling
// threshold was at one point unknown.
// There is a potential here for the product probability to
// underflow, which is checked here.
threshold, err := sampling.ProbabilityToThresholdWithPrecision(incoming*ts.ratio, ts.prec)

if err == sampling.ErrProbabilityRange {
// Considered valid, a case where the sampling probability
// has fallen below the minimum supported value and simply
// becomes unsampled.
return false, wts, nil
} else if err == sampling.ErrPrecisionUnderflow {
// Considered valid, any case where precision underflow
// occurs, use full-precision encoding.
threshold, err = sampling.ProbabilityToThreshold(incoming * ts.ratio)
}
if err != nil {
return false, wts, err
}

threshold, _ := sampling.ProbabilityToThresholdWithPrecision(incoming*ts.ratio, ts.prec)
should := threshold.ShouldSample(rnd)
if should {
// Note: an unchecked error here, because the threshold is
// larger by construction via `incoming*ts.ratio`, which was
// already range-checked above.
_ = otts.UpdateTValueWithSampling(threshold, threshold.TValue())
}
return should, wts, err
}

func (ts *traceProportionalizer) updateTracestate(tid pcommon.TraceID, should bool, wts *sampling.W3CTraceState) error {
if !should {
wts.OTelValue().ClearTValue()
}
return nil
}

func (*zeroProbability) decide(s ptrace.Span) (bool, *sampling.W3CTraceState, error) {
return false, nil, nil
}

func (*zeroProbability) updateTracestate(_ pcommon.TraceID, _ bool, _ *sampling.W3CTraceState) error {
return nil
}

func (tp *traceProcessor) processTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) {
td.ResourceSpans().RemoveIf(func(rs ptrace.ResourceSpans) bool {
rs.ScopeSpans().RemoveIf(func(ils ptrace.ScopeSpans) bool {
Expand All @@ -286,13 +324,13 @@ func (tp *traceProcessor) processTraces(ctx context.Context, td ptrace.Traces) (
return true
}

probSample, wts, err := tp.sampler.decide(s)
probShould, wts, err := tp.sampler.decide(s)
if err != nil {
tp.logger.Error("trace-state", zap.Error(err))
}

forceSample := priority == mustSampleSpan
sampled := forceSample || probSample
sampled := forceSample || probShould

if forceSample {
_ = stats.RecordWithTags(
Expand All @@ -309,10 +347,7 @@ func (tp *traceProcessor) processTraces(ctx context.Context, td ptrace.Traces) (
}

if sampled && wts != nil {
err := tp.sampler.updateTracestate(s.TraceID(), probSample, wts)
if err != nil {
tp.logger.Debug("tracestate update", zap.Error(err))
}
tp.sampler.updateTracestate(s.TraceID(), probShould, wts)

var w strings.Builder
if err := wts.Serialize(&w); err != nil {
Expand Down