Skip to content

Commit

Permalink
Added sampler based on and
Browse files Browse the repository at this point in the history
Signed-off-by: Dev Agarwal <devagarwal1803@gmail.com>
  • Loading branch information
devagarwal1803 committed Aug 31, 2023
1 parent bb38ed4 commit 7c44065
Show file tree
Hide file tree
Showing 15 changed files with 373 additions and 22 deletions.
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Segment Replication] Support realtime reads for GET requests ([#9212](https://github.com/opensearch-project/OpenSearch/pull/9212))
- [Feature] Expose term frequency in Painless script score context ([#9081](https://github.com/opensearch-project/OpenSearch/pull/9081))
- Add support for reading partial files to HDFS repository ([#9513](https://github.com/opensearch-project/OpenSearch/issues/9513))
- Add support for extensions to search responses using SearchExtBuilder ([#9379](https://github.com/opensearch-project/OpenSearch/pull/9379))
- Add support for extensions to search responses using SearchExtBuilder ([#9379](https://github.com/opensearch-project/OpenSearch/pull/9379))
- [BWC and API enforcement] Decorate the existing APIs with proper annotations (part 1) ([#9520](https://github.com/opensearch-project/OpenSearch/pull/9520))

### Dependencies
Expand Down Expand Up @@ -164,6 +164,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote Store] Rate limiter integration for remote store uploads and downloads([#9448](https://github.com/opensearch-project/OpenSearch/pull/9448/))
- [Remote Store] Implicitly use replication type SEGMENT for remote store clusters ([#9264](https://github.com/opensearch-project/OpenSearch/pull/9264))
- Use non-concurrent path for sort request on timeseries index and field([#9562](https://github.com/opensearch-project/OpenSearch/pull/9562))
- Added sampler based on `Blanket Probabilistic Sampling rate` and `Override for on demand` ([#9621](https://github.com/opensearch-project/OpenSearch/issues/9621))

### Deprecated

Expand All @@ -182,4 +183,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Security

[Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.10...2.x
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.10...2.x
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ public IntegrationTestOTelTelemetryPlugin(Settings settings) {
/**
* This method overrides getTelemetry() method in OTel plugin class, so we create only one instance of global OpenTelemetry
* resetForTest() will set OpenTelemetry to null again.
* @param settings cluster settings
* @param telemetrySettings telemetry settings
*/
public Optional<Telemetry> getTelemetry(TelemetrySettings settings) {
public Optional<Telemetry> getTelemetry(TelemetrySettings telemetrySettings) {
GlobalOpenTelemetry.resetForTest();
return super.getTelemetry(settings);
return super.getTelemetry(telemetrySettings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,17 @@ public List<Setting<?>> getSettings() {
}

@Override
public Optional<Telemetry> getTelemetry(TelemetrySettings settings) {
return Optional.of(telemetry());
public Optional<Telemetry> getTelemetry(TelemetrySettings telemetrySettings) {
return Optional.of(telemetry(telemetrySettings));
}

@Override
public String getName() {
return OTEL_TRACER_NAME;
}

private Telemetry telemetry() {
return new OTelTelemetry(new OTelTracingTelemetry(OTelResourceProvider.get(settings)), new MetricsTelemetry() {
private Telemetry telemetry(TelemetrySettings telemetrySettings) {
return new OTelTelemetry(new OTelTracingTelemetry(OTelResourceProvider.get(telemetrySettings, settings)), new MetricsTelemetry() {
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
package org.opensearch.telemetry.tracing;

import org.opensearch.common.settings.Settings;
import org.opensearch.telemetry.TelemetrySettings;
import org.opensearch.telemetry.tracing.exporter.OTelSpanExporterFactory;
import org.opensearch.telemetry.tracing.sampler.ProbabilisticSampler;
import org.opensearch.telemetry.tracing.sampler.RequestSampler;

import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -37,15 +40,16 @@ private OTelResourceProvider() {}

/**
* Creates OpenTelemetry instance with default configuration
* @param telemetrySettings telemetry settings
* @param settings cluster settings
* @return OpenTelemetry instance
*/
public static OpenTelemetry get(Settings settings) {
public static OpenTelemetry get(TelemetrySettings telemetrySettings, Settings settings) {
return get(
settings,
OTelSpanExporterFactory.create(settings),
ContextPropagators.create(W3CTraceContextPropagator.getInstance()),
Sampler.alwaysOn()
Sampler.parentBased(new RequestSampler(new ProbabilisticSampler(telemetrySettings)))
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.telemetry.tracing.sampler;

import org.opensearch.telemetry.TelemetrySettings;

import java.util.List;
import java.util.Objects;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.trace.data.LinkData;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import io.opentelemetry.sdk.trace.samplers.SamplingResult;

/**
* ProbabilisticSampler implements a head-based sampling strategy based on provided settings.
*/
public class ProbabilisticSampler implements Sampler {
private Sampler defaultSampler;
private final TelemetrySettings telemetrySettings;
private double samplingRatio;

/**
* Constructor
*
* @param telemetrySettings Telemetry settings.
*/
public ProbabilisticSampler(TelemetrySettings telemetrySettings) {
this.telemetrySettings = Objects.requireNonNull(telemetrySettings);
this.samplingRatio = telemetrySettings.getTracerHeadSamplerSamplingRatio();
this.defaultSampler = Sampler.traceIdRatioBased(samplingRatio);
}

Sampler getSampler() {
double newSamplingRatio = telemetrySettings.getTracerHeadSamplerSamplingRatio();
if (isSamplingRatioChanged(newSamplingRatio)) {
synchronized (this) {
this.samplingRatio = newSamplingRatio;
defaultSampler = Sampler.traceIdRatioBased(samplingRatio);
}
}
return defaultSampler;
}

private boolean isSamplingRatioChanged(double newSamplingRatio) {
return Double.compare(this.samplingRatio, newSamplingRatio) != 0;
}

double getSamplingRatio() {
return samplingRatio;
}

@Override
public SamplingResult shouldSample(
Context parentContext,
String traceId,
String name,
SpanKind spanKind,
Attributes attributes,
List<LinkData> parentLinks
) {
return getSampler().shouldSample(parentContext, traceId, name, spanKind, attributes, parentLinks);
}

@Override
public String getDescription() {
return "Probabilistic Sampler";
}

@Override
public String toString() {
return getDescription();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.telemetry.tracing.sampler;

import java.util.List;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.trace.data.LinkData;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import io.opentelemetry.sdk.trace.samplers.SamplingResult;

/**
* HeadBased sampler
*/
public class RequestSampler implements Sampler {
private final Sampler defaultSampler;

// TODO: Pick value of TRACE from PR #9415.
private static final String TRACE = "trace";

/**
* Creates Head based sampler
* @param defaultSampler defaultSampler
*/
public RequestSampler(Sampler defaultSampler) {
this.defaultSampler = defaultSampler;
}

@Override
public SamplingResult shouldSample(
Context parentContext,
String traceId,
String name,
SpanKind spanKind,
Attributes attributes,
List<LinkData> parentLinks
) {

final String trace = attributes.get(AttributeKey.stringKey(TRACE));

if (trace != null) {
return (Boolean.parseBoolean(trace) == true) ? SamplingResult.recordAndSample() : SamplingResult.drop();
} else {
return defaultSampler.shouldSample(parentContext, traceId, name, spanKind, attributes, parentLinks);
}

}

@Override
public String getDescription() {
return "Request Sampler";
}

@Override
public String toString() {
return getDescription();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* This package contains classes needed for sampler.
*/
package org.opensearch.telemetry.tracing.sampler;
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import static org.opensearch.telemetry.OTelTelemetrySettings.TRACER_EXPORTER_BATCH_SIZE_SETTING;
import static org.opensearch.telemetry.OTelTelemetrySettings.TRACER_EXPORTER_DELAY_SETTING;
import static org.opensearch.telemetry.OTelTelemetrySettings.TRACER_EXPORTER_MAX_QUEUE_SIZE_SETTING;
import static org.opensearch.telemetry.TelemetrySettings.TRACER_ENABLED_SETTING;
import static org.opensearch.telemetry.TelemetrySettings.TRACER_SAMPLER_PROBABILITY;

public class OTelTelemetryPluginTests extends OpenSearchTestCase {

Expand All @@ -42,7 +44,9 @@ public void setup() {
// io.opentelemetry.sdk.OpenTelemetrySdk.close waits only for 10 seconds for shutdown to complete.
Settings settings = Settings.builder().put(TRACER_EXPORTER_DELAY_SETTING.getKey(), "1s").build();
oTelTracerModulePlugin = new OTelTelemetryPlugin(settings);
telemetry = oTelTracerModulePlugin.getTelemetry(null);
telemetry = oTelTracerModulePlugin.getTelemetry(
new TelemetrySettings(Settings.EMPTY, new ClusterSettings(settings, Set.of(TRACER_ENABLED_SETTING, TRACER_SAMPLER_PROBABILITY)))
);
tracingTelemetry = telemetry.get().getTracingTelemetry();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.telemetry.tracing.sampler;

import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.telemetry.TelemetrySettings;
import org.opensearch.test.OpenSearchTestCase;

import java.util.Set;

import io.opentelemetry.sdk.trace.samplers.Sampler;

import static org.opensearch.telemetry.OTelTelemetrySettings.TRACER_EXPORTER_DELAY_SETTING;
import static org.opensearch.telemetry.TelemetrySettings.TRACER_ENABLED_SETTING;
import static org.opensearch.telemetry.TelemetrySettings.TRACER_SAMPLER_PROBABILITY;

public class ProbabilisticSamplerTests extends OpenSearchTestCase {

// When ProbabilisticSampler is created with OTelTelemetrySettings as null
public void testProbabilisticSamplerWithNullSettings() {
// Verify that the constructor throws IllegalArgumentException when given null settings
assertThrows(NullPointerException.class, () -> { new ProbabilisticSampler(null); });
}

public void testDefaultGetSampler() {
Settings settings = Settings.builder().put(TRACER_EXPORTER_DELAY_SETTING.getKey(), "1s").build();
TelemetrySettings telemetrySettings = new TelemetrySettings(
Settings.EMPTY,
new ClusterSettings(settings, Set.of(TRACER_SAMPLER_PROBABILITY, TRACER_ENABLED_SETTING))
);

// Probabilistic Sampler
ProbabilisticSampler probabilisticSampler = new ProbabilisticSampler(telemetrySettings);

assertNotNull(probabilisticSampler.getSampler());
assertEquals(0.01, probabilisticSampler.getSamplingRatio(), 0.0d);
}

public void testGetSamplerWithUpdatedSamplingRatio() {
Settings settings = Settings.builder().put(TRACER_EXPORTER_DELAY_SETTING.getKey(), "1s").build();
TelemetrySettings telemetrySettings = new TelemetrySettings(
Settings.EMPTY,
new ClusterSettings(settings, Set.of(TRACER_SAMPLER_PROBABILITY, TRACER_ENABLED_SETTING))
);

// Probabilistic Sampler
ProbabilisticSampler probabilisticSampler = new ProbabilisticSampler(telemetrySettings);
assertEquals(0.01d, probabilisticSampler.getSamplingRatio(), 0.0d);

telemetrySettings.setSamplingProbability(0.02);

// Need to call getSampler() to update the value of tracerHeadSamplerSamplingRatio
Sampler updatedProbabilisticSampler = probabilisticSampler.getSampler();
assertEquals(0.02, probabilisticSampler.getSamplingRatio(), 0.0d);
}

}
Loading

0 comments on commit 7c44065

Please sign in to comment.