Skip to content

Commit

Permalink
Improving inter-operation with legacy samplers (#1629)
Browse files Browse the repository at this point in the history
  • Loading branch information
PeterF778 authored Jan 6, 2025
1 parent 7a01c8c commit ac36bb6
Show file tree
Hide file tree
Showing 17 changed files with 263 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import java.util.List;

/** An interface for components to be used by composite consistent probability samplers. */
public interface ComposableSampler {
public interface Composable {

/**
* Returns the SamplingIntent that is used for the sampling decision. The SamplingIntent includes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,17 @@ public SamplingIntent getSamplingIntent(
Attributes attributes,
List<LinkData> parentLinks) {

return () -> getInvalidThreshold();
return new SamplingIntent() {
@Override
public long getThreshold() {
return getInvalidThreshold();
}

@Override
public boolean isAdjustedCountReliable() {
return false;
}
};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
@Immutable
final class ConsistentAnyOf extends ConsistentSampler {

private final ComposableSampler[] delegates;
private final Composable[] delegates;

private final String description;

Expand All @@ -36,7 +36,7 @@ final class ConsistentAnyOf extends ConsistentSampler {
*
* @param delegates the delegate samplers
*/
ConsistentAnyOf(@Nullable ComposableSampler... delegates) {
ConsistentAnyOf(@Nullable Composable... delegates) {
if (delegates == null || delegates.length == 0) {
throw new IllegalArgumentException(
"At least one delegate must be specified for ConsistentAnyOf");
Expand All @@ -59,30 +59,53 @@ public SamplingIntent getSamplingIntent(
List<LinkData> parentLinks) {

SamplingIntent[] intents = new SamplingIntent[delegates.length];
int k = 0;

// If any of the delegates provides a valid threshold, the resulting threshold is the minimum
// value T from the set of those valid threshold values, otherwise it is invalid threshold.
long minimumThreshold = getInvalidThreshold();
for (ComposableSampler delegate : delegates) {

// If any of the delegates returning the threshold value equal to T returns true upon calling
// its IsAdjustedCountReliable() method, the resulting isAdjustedCountReliable is true,
// otherwise it is false.
boolean isAdjustedCountCorrect = false;

int k = 0;
for (Composable delegate : delegates) {
SamplingIntent delegateIntent =
delegate.getSamplingIntent(parentContext, name, spanKind, attributes, parentLinks);
long delegateThreshold = delegateIntent.getThreshold();
if (isValidThreshold(delegateThreshold)) {
if (isValidThreshold(minimumThreshold)) {
minimumThreshold = Math.min(delegateThreshold, minimumThreshold);
if (delegateThreshold == minimumThreshold) {
if (delegateIntent.isAdjustedCountReliable()) {
isAdjustedCountCorrect = true;
}
} else if (delegateThreshold < minimumThreshold) {
minimumThreshold = delegateThreshold;
isAdjustedCountCorrect = delegateIntent.isAdjustedCountReliable();
}
} else {
minimumThreshold = delegateThreshold;
isAdjustedCountCorrect = delegateIntent.isAdjustedCountReliable();
}
}
intents[k++] = delegateIntent;
}

long resultingThreshold = minimumThreshold;
boolean isResultingAdjustedCountCorrect = isAdjustedCountCorrect;

return new SamplingIntent() {
@Override
public long getThreshold() {
return resultingThreshold;
}

@Override
public boolean isAdjustedCountReliable() {
return isResultingAdjustedCountCorrect;
}

@Override
public Attributes getAttributes() {
AttributesBuilder builder = Attributes.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.opentelemetry.contrib.sampler.consistent56;

import static io.opentelemetry.contrib.sampler.consistent56.ConsistentSamplingUtil.getInvalidThreshold;
import static io.opentelemetry.contrib.sampler.consistent56.ConsistentSamplingUtil.getMinThreshold;
import static java.util.Objects.requireNonNull;

import io.opentelemetry.api.common.Attributes;
Expand All @@ -23,9 +24,9 @@
* sampling decision is delegated to the root sampler.
*/
@Immutable
final class ConsistentParentBasedSampler extends ConsistentSampler {
public class ConsistentParentBasedSampler extends ConsistentSampler {

private final ComposableSampler rootSampler;
private final Composable rootSampler;

private final String description;

Expand All @@ -35,14 +36,14 @@ final class ConsistentParentBasedSampler extends ConsistentSampler {
*
* @param rootSampler the root sampler
*/
ConsistentParentBasedSampler(ComposableSampler rootSampler) {
protected ConsistentParentBasedSampler(Composable rootSampler) {
this.rootSampler = requireNonNull(rootSampler);
this.description =
"ConsistentParentBasedSampler{rootSampler=" + rootSampler.getDescription() + '}';
}

@Override
public SamplingIntent getSamplingIntent(
public final SamplingIntent getSamplingIntent(
Context parentContext,
String name,
SpanKind spanKind,
Expand All @@ -62,13 +63,51 @@ public SamplingIntent getSamplingIntent(
OtelTraceState otelTraceState = OtelTraceState.parse(otelTraceStateString);

long parentThreshold;
boolean isParentAdjustedCountCorrect;
if (otelTraceState.hasValidThreshold()) {
parentThreshold = otelTraceState.getThreshold();
isParentAdjustedCountCorrect = true;
} else {
parentThreshold = getInvalidThreshold();
// If no threshold, look at the sampled flag
parentThreshold = parentSpanContext.isSampled() ? getMinThreshold() : getInvalidThreshold();
isParentAdjustedCountCorrect = false;
}

return () -> parentThreshold;
return new SamplingIntent() {
@Override
public long getThreshold() {
return parentThreshold;
}

@Override
public boolean isAdjustedCountReliable() {
return isParentAdjustedCountCorrect;
}

@Override
public Attributes getAttributes() {
if (parentSpanContext.isRemote()) {
return getAttributesWhenParentRemote(name, spanKind, attributes, parentLinks);
} else {
return getAttributesWhenParentLocal(name, spanKind, attributes, parentLinks);
}
}

@Override
public TraceState updateTraceState(TraceState parentState) {
return parentState;
}
};
}

protected Attributes getAttributesWhenParentLocal(
String name, SpanKind spanKind, Attributes attributes, List<LinkData> parentLinks) {
return Attributes.empty();
}

protected Attributes getAttributesWhenParentRemote(
String name, SpanKind spanKind, Attributes attributes, List<LinkData> parentLinks) {
return Attributes.empty();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public State(
private final double targetSpansPerNanosecondLimit;
private final double probabilitySmoothingFactor;
private final AtomicReference<State> state;
private final ComposableSampler delegate;
private final Composable delegate;

/**
* Constructor.
Expand All @@ -133,7 +133,7 @@ public State(
* @param nanoTimeSupplier a supplier for the current nano time
*/
ConsistentRateLimitingSampler(
ComposableSampler delegate,
Composable delegate,
double targetSpansPerSecondLimit,
double adaptationTimeSeconds,
LongSupplier nanoTimeSupplier) {
Expand Down Expand Up @@ -255,6 +255,11 @@ public long getThreshold() {
return suggestedThreshold;
}

@Override
public boolean isAdjustedCountReliable() {
return delegateIntent.isAdjustedCountReliable();
}

@Override
public Attributes getAttributes() {
return delegateIntent.getAttributes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

/**
* A consistent sampler that uses Span categorization and uses a different delegate sampler for each
* category. Categorization of Spans is aided by Predicates, which can be combined with
* ComposableSamplers into PredicatedSamplers.
* category. Categorization of Spans is aided by Predicates, which can be combined with Composables
* into PredicatedSamplers.
*/
@Immutable
final class ConsistentRuleBasedSampler extends ConsistentSampler {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

/** Abstract base class for consistent samplers. */
@SuppressWarnings("InconsistentOverloads")
public abstract class ConsistentSampler implements Sampler, ComposableSampler {
public abstract class ConsistentSampler implements Sampler, Composable {

/**
* Returns a {@link ConsistentSampler} that samples all spans.
Expand Down Expand Up @@ -61,7 +61,7 @@ public static ConsistentSampler probabilityBased(double samplingProbability) {
*
* @param rootSampler the root sampler
*/
public static ConsistentSampler parentBased(ComposableSampler rootSampler) {
public static ConsistentSampler parentBased(Composable rootSampler) {
return new ConsistentParentBasedSampler(rootSampler);
}

Expand Down Expand Up @@ -103,7 +103,7 @@ static ConsistentSampler rateLimited(
* exponential smoothing)
*/
public static ConsistentSampler rateLimited(
ComposableSampler delegate, double targetSpansPerSecondLimit, double adaptationTimeSeconds) {
Composable delegate, double targetSpansPerSecondLimit, double adaptationTimeSeconds) {
return rateLimited(
delegate, targetSpansPerSecondLimit, adaptationTimeSeconds, System::nanoTime);
}
Expand Down Expand Up @@ -138,7 +138,7 @@ static ConsistentSampler rateLimited(
* @param nanoTimeSupplier a supplier for the current nano time
*/
static ConsistentSampler rateLimited(
ComposableSampler delegate,
Composable delegate,
double targetSpansPerSecondLimit,
double adaptationTimeSeconds,
LongSupplier nanoTimeSupplier) {
Expand All @@ -159,7 +159,7 @@ static ConsistentSampler rateLimited(
* @param delegates the delegate samplers, at least one delegate must be specified
* @return the ConsistentAnyOf sampler
*/
public static ConsistentSampler anyOf(ComposableSampler... delegates) {
public static ConsistentSampler anyOf(Composable... delegates) {
return new ConsistentAnyOf(delegates);
}

Expand All @@ -184,19 +184,23 @@ public final SamplingResult shouldSample(

// determine sampling decision
boolean isSampled;
boolean isAdjustedCountCorrect;
if (isValidThreshold(threshold)) {
long randomness = getRandomness(otelTraceState, traceId);
isSampled = threshold <= randomness;
isAdjustedCountCorrect = intent.isAdjustedCountReliable();
} else { // DROP
isSampled = false;
isAdjustedCountCorrect = false;
}

SamplingDecision samplingDecision;
if (isSampled) {
samplingDecision = SamplingDecision.RECORD_AND_SAMPLE;
SamplingDecision samplingDecision =
isSampled ? SamplingDecision.RECORD_AND_SAMPLE : SamplingDecision.DROP;

// determine tracestate changes
if (isSampled && isAdjustedCountCorrect) {
otelTraceState.setThreshold(threshold);
} else {
samplingDecision = SamplingDecision.DROP;
otelTraceState.invalidateThreshold();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,17 @@ static Predicate isRootSpan() {
};
}

/*
* Return a Predicate that will only match Spans with local parent
*/
static Predicate hasLocalParent() {
return (parentContext, name, spanKind, attributes, parentLinks) -> {
Span parentSpan = Span.fromContext(parentContext);
SpanContext parentSpanContext = parentSpan.getSpanContext();
return !parentSpanContext.isValid() || !parentSpanContext.isRemote();
};
}

/*
* Return a Predicate that matches all Spans
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@

import static java.util.Objects.requireNonNull;

/** A class for holding a pair (Predicate, ComposableSampler) */
/** A class for holding a pair (Predicate, Composable) */
public final class PredicatedSampler {

public static PredicatedSampler onMatch(Predicate predicate, ComposableSampler sampler) {
public static PredicatedSampler onMatch(Predicate predicate, Composable sampler) {
return new PredicatedSampler(predicate, sampler);
}

private final Predicate predicate;
private final ComposableSampler sampler;
private final Composable sampler;

private PredicatedSampler(Predicate predicate, ComposableSampler sampler) {
private PredicatedSampler(Predicate predicate, Composable sampler) {
this.predicate = requireNonNull(predicate);
this.sampler = requireNonNull(sampler);
}
Expand All @@ -26,7 +26,7 @@ public Predicate getPredicate() {
return predicate;
}

public ComposableSampler getSampler() {
public Composable getSampler() {
return sampler;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ public interface SamplingIntent {
*/
long getThreshold();

/*
* Return true if the adjusted count (calculated as reciprocal of the sampling probability) can be faithfully used to estimate span metrics.
*/
default boolean isAdjustedCountReliable() {
return true;
}

/**
* Returns a set of Attributes to be added to the Span in case of positive sampling decision.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ final class CoinFlipSampler extends ConsistentSampler {

private static final SplittableRandom random = new SplittableRandom(0x160a50a2073e17e6L);

private final ComposableSampler samplerA;
private final ComposableSampler samplerB;
private final Composable samplerA;
private final Composable samplerB;
private final double probability;
private final String description;

Expand All @@ -36,7 +36,7 @@ final class CoinFlipSampler extends ConsistentSampler {
* @param samplerA the first delegate sampler
* @param samplerB the second delegate sampler
*/
CoinFlipSampler(ComposableSampler samplerA, ComposableSampler samplerB) {
CoinFlipSampler(Composable samplerA, Composable samplerB) {
this(samplerA, samplerB, 0.5);
}

Expand All @@ -48,7 +48,7 @@ final class CoinFlipSampler extends ConsistentSampler {
* @param samplerA the first delegate sampler
* @param samplerB the second delegate sampler
*/
CoinFlipSampler(ComposableSampler samplerA, ComposableSampler samplerB, double probability) {
CoinFlipSampler(Composable samplerA, Composable samplerB, double probability) {
this.samplerA = requireNonNull(samplerA);
this.samplerB = requireNonNull(samplerB);
this.probability = probability;
Expand Down
Loading

0 comments on commit ac36bb6

Please sign in to comment.