Skip to content

Commit

Permalink
Added sampler based on Blanket Probabilistic Sampling rate and Overri…
Browse files Browse the repository at this point in the history
…de for on demand

Signed-off-by: Dev Agarwal <devagarwal1803@gmail.com>
  • Loading branch information
devagarwal1803 committed Aug 31, 2023
1 parent bb38ed4 commit b2ab2d0
Show file tree
Hide file tree
Showing 15 changed files with 373 additions and 22 deletions.
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Segment Replication] Support realtime reads for GET requests ([#9212](https://github.com/opensearch-project/OpenSearch/pull/9212))
- [Feature] Expose term frequency in Painless script score context ([#9081](https://github.com/opensearch-project/OpenSearch/pull/9081))
- Add support for reading partial files to HDFS repository ([#9513](https://github.com/opensearch-project/OpenSearch/issues/9513))
- Add support for extensions to search responses using SearchExtBuilder ([#9379](https://github.com/opensearch-project/OpenSearch/pull/9379))
- Add support for extensions to search responses using SearchExtBuilder ([#9379](https://github.com/opensearch-project/OpenSearch/pull/9379))
- [BWC and API enforcement] Decorate the existing APIs with proper annotations (part 1) ([#9520](https://github.com/opensearch-project/OpenSearch/pull/9520))

### Dependencies
Expand Down Expand Up @@ -164,6 +164,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote Store] Rate limiter integration for remote store uploads and downloads([#9448](https://github.com/opensearch-project/OpenSearch/pull/9448/))
- [Remote Store] Implicitly use replication type SEGMENT for remote store clusters ([#9264](https://github.com/opensearch-project/OpenSearch/pull/9264))
- Use non-concurrent path for sort request on timeseries index and field([#9562](https://github.com/opensearch-project/OpenSearch/pull/9562))
- Added sampler based on `Blanket Probabilistic Sampling rate` and `Override for on demand` ([#9621](https://github.com/opensearch-project/OpenSearch/issues/9621))

### Deprecated

Expand All @@ -182,4 +183,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Security

[Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.10...2.x
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.10...2.x
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ public IntegrationTestOTelTelemetryPlugin(Settings settings) {
/**
* This method overrides getTelemetry() method in OTel plugin class, so we create only one instance of global OpenTelemetry
* resetForTest() will set OpenTelemetry to null again.
* @param settings cluster settings
* @param telemetrySettings telemetry settings
*/
public Optional<Telemetry> getTelemetry(TelemetrySettings settings) {
public Optional<Telemetry> getTelemetry(TelemetrySettings telemetrySettings) {
GlobalOpenTelemetry.resetForTest();
return super.getTelemetry(settings);
return super.getTelemetry(telemetrySettings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,17 @@ public List<Setting<?>> getSettings() {
}

@Override
public Optional<Telemetry> getTelemetry(TelemetrySettings settings) {
return Optional.of(telemetry());
public Optional<Telemetry> getTelemetry(TelemetrySettings telemetrySettings) {
return Optional.of(telemetry(telemetrySettings));
}

@Override
public String getName() {
return OTEL_TRACER_NAME;
}

private Telemetry telemetry() {
return new OTelTelemetry(new OTelTracingTelemetry(OTelResourceProvider.get(settings)), new MetricsTelemetry() {
private Telemetry telemetry(TelemetrySettings telemetrySettings) {
return new OTelTelemetry(new OTelTracingTelemetry(OTelResourceProvider.get(telemetrySettings, settings)), new MetricsTelemetry() {
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
package org.opensearch.telemetry.tracing;

import org.opensearch.common.settings.Settings;
import org.opensearch.telemetry.TelemetrySettings;
import org.opensearch.telemetry.tracing.exporter.OTelSpanExporterFactory;
import org.opensearch.telemetry.tracing.sampler.ProbabilisticSampler;
import org.opensearch.telemetry.tracing.sampler.RequestSampler;

import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -37,15 +40,16 @@ private OTelResourceProvider() {}

/**
* Creates OpenTelemetry instance with default configuration
* @param telemetrySettings telemetry settings
* @param settings cluster settings
* @return OpenTelemetry instance
*/
public static OpenTelemetry get(Settings settings) {
public static OpenTelemetry get(TelemetrySettings telemetrySettings, Settings settings) {
return get(
settings,
OTelSpanExporterFactory.create(settings),
ContextPropagators.create(W3CTraceContextPropagator.getInstance()),
Sampler.alwaysOn()
Sampler.parentBased(new RequestSampler(new ProbabilisticSampler(telemetrySettings)))
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.telemetry.tracing.sampler;

import org.opensearch.telemetry.TelemetrySettings;

import java.util.List;
import java.util.Objects;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.trace.data.LinkData;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import io.opentelemetry.sdk.trace.samplers.SamplingResult;

/**
* ProbabilisticSampler implements a head-based sampling strategy based on provided settings.
*/
public class ProbabilisticSampler implements Sampler {
private Sampler defaultSampler;
private final TelemetrySettings telemetrySettings;
private double samplingRatio;

/**
* Constructor
*
* @param telemetrySettings Telemetry settings.
*/
public ProbabilisticSampler(TelemetrySettings telemetrySettings) {
this.telemetrySettings = Objects.requireNonNull(telemetrySettings);
this.samplingRatio = telemetrySettings.getTracerHeadSamplerSamplingRatio();
this.defaultSampler = Sampler.traceIdRatioBased(samplingRatio);
}

Sampler getSampler() {
double newSamplingRatio = telemetrySettings.getTracerHeadSamplerSamplingRatio();
if (isSamplingRatioChanged(newSamplingRatio)) {
synchronized (this) {
this.samplingRatio = newSamplingRatio;
defaultSampler = Sampler.traceIdRatioBased(samplingRatio);
}
}
return defaultSampler;
}

private boolean isSamplingRatioChanged(double newSamplingRatio) {
return Double.compare(this.samplingRatio, newSamplingRatio) != 0;
}

double getSamplingRatio() {
return samplingRatio;
}

@Override
public SamplingResult shouldSample(
Context parentContext,
String traceId,
String name,
SpanKind spanKind,
Attributes attributes,
List<LinkData> parentLinks
) {
return getSampler().shouldSample(parentContext, traceId, name, spanKind, attributes, parentLinks);
}

@Override
public String getDescription() {
return "Probabilistic Sampler";
}

@Override
public String toString() {
return getDescription();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.telemetry.tracing.sampler;

import java.util.List;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.trace.data.LinkData;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import io.opentelemetry.sdk.trace.samplers.SamplingResult;

/**
* HeadBased sampler
*/
public class RequestSampler implements Sampler {
private final Sampler defaultSampler;

// TODO: Pick value of TRACE from PR #9415.
private static final String TRACE = "trace";

/**
* Creates Head based sampler
* @param defaultSampler defaultSampler
*/
public RequestSampler(Sampler defaultSampler) {
this.defaultSampler = defaultSampler;
}

@Override
public SamplingResult shouldSample(
Context parentContext,
String traceId,
String name,
SpanKind spanKind,
Attributes attributes,
List<LinkData> parentLinks
) {

final String trace = attributes.get(AttributeKey.stringKey(TRACE));

if (trace != null) {
return (Boolean.parseBoolean(trace) == true) ? SamplingResult.recordAndSample() : SamplingResult.drop();
} else {
return defaultSampler.shouldSample(parentContext, traceId, name, spanKind, attributes, parentLinks);
}

}

@Override
public String getDescription() {
return "Request Sampler";
}

@Override
public String toString() {
return getDescription();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* This package contains classes needed for sampler.
*/
package org.opensearch.telemetry.tracing.sampler;
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import static org.opensearch.telemetry.OTelTelemetrySettings.TRACER_EXPORTER_BATCH_SIZE_SETTING;
import static org.opensearch.telemetry.OTelTelemetrySettings.TRACER_EXPORTER_DELAY_SETTING;
import static org.opensearch.telemetry.OTelTelemetrySettings.TRACER_EXPORTER_MAX_QUEUE_SIZE_SETTING;
import static org.opensearch.telemetry.TelemetrySettings.TRACER_ENABLED_SETTING;
import static org.opensearch.telemetry.TelemetrySettings.TRACER_SAMPLER_PROBABILITY;

public class OTelTelemetryPluginTests extends OpenSearchTestCase {

Expand All @@ -42,7 +44,9 @@ public void setup() {
// io.opentelemetry.sdk.OpenTelemetrySdk.close waits only for 10 seconds for shutdown to complete.
Settings settings = Settings.builder().put(TRACER_EXPORTER_DELAY_SETTING.getKey(), "1s").build();
oTelTracerModulePlugin = new OTelTelemetryPlugin(settings);
telemetry = oTelTracerModulePlugin.getTelemetry(null);
telemetry = oTelTracerModulePlugin.getTelemetry(
new TelemetrySettings(Settings.EMPTY, new ClusterSettings(settings, Set.of(TRACER_ENABLED_SETTING, TRACER_SAMPLER_PROBABILITY)))
);
tracingTelemetry = telemetry.get().getTracingTelemetry();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.telemetry.tracing.sampler;

import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.telemetry.TelemetrySettings;
import org.opensearch.test.OpenSearchTestCase;

import java.util.Set;

import io.opentelemetry.sdk.trace.samplers.Sampler;

import static org.opensearch.telemetry.OTelTelemetrySettings.TRACER_EXPORTER_DELAY_SETTING;
import static org.opensearch.telemetry.TelemetrySettings.TRACER_ENABLED_SETTING;
import static org.opensearch.telemetry.TelemetrySettings.TRACER_SAMPLER_PROBABILITY;

public class ProbabilisticSamplerTests extends OpenSearchTestCase {

// When ProbabilisticSampler is created with OTelTelemetrySettings as null
public void testProbabilisticSamplerWithNullSettings() {
// Verify that the constructor throws IllegalArgumentException when given null settings
assertThrows(NullPointerException.class, () -> { new ProbabilisticSampler(null); });
}

public void testDefaultGetSampler() {
Settings settings = Settings.builder().put(TRACER_EXPORTER_DELAY_SETTING.getKey(), "1s").build();
TelemetrySettings telemetrySettings = new TelemetrySettings(
Settings.EMPTY,
new ClusterSettings(settings, Set.of(TRACER_SAMPLER_PROBABILITY, TRACER_ENABLED_SETTING))
);

// Probabilistic Sampler
ProbabilisticSampler probabilisticSampler = new ProbabilisticSampler(telemetrySettings);

assertNotNull(probabilisticSampler.getSampler());
assertEquals(0.01, probabilisticSampler.getSamplingRatio(), 0.0d);
}

public void testGetSamplerWithUpdatedSamplingRatio() {
Settings settings = Settings.builder().put(TRACER_EXPORTER_DELAY_SETTING.getKey(), "1s").build();
TelemetrySettings telemetrySettings = new TelemetrySettings(
Settings.EMPTY,
new ClusterSettings(settings, Set.of(TRACER_SAMPLER_PROBABILITY, TRACER_ENABLED_SETTING))
);

// Probabilistic Sampler
ProbabilisticSampler probabilisticSampler = new ProbabilisticSampler(telemetrySettings);
assertEquals(0.01d, probabilisticSampler.getSamplingRatio(), 0.0d);

telemetrySettings.setSamplingProbability(0.02);

// Need to call getSampler() to update the value of tracerHeadSamplerSamplingRatio
Sampler updatedProbabilisticSampler = probabilisticSampler.getSampler();
assertEquals(0.02, probabilisticSampler.getSamplingRatio(), 0.0d);
}

}
Loading

0 comments on commit b2ab2d0

Please sign in to comment.