Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into dual-repl-segrep-lag
Browse files Browse the repository at this point in the history
  • Loading branch information
shourya035 authored Jun 10, 2024
2 parents bb5e868 + c639e9a commit 2f8475d
Show file tree
Hide file tree
Showing 29 changed files with 329 additions and 65 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Query Insights] Add exporter support for top n queries ([#12982](https://github.com/opensearch-project/OpenSearch/pull/12982))
- [Query Insights] Add X-Opaque-Id to search request metadata for top n queries ([#13374](https://github.com/opensearch-project/OpenSearch/pull/13374))
- Add support for query level resource usage tracking ([#13172](https://github.com/opensearch-project/OpenSearch/pull/13172))
- Move Remote Store Migration from DocRep to GA and modify remote migration settings name ([#14100](https://github.com/opensearch-project/OpenSearch/pull/14100))

### Dependencies
- Bump `com.github.spullara.mustache.java:compiler` from 0.9.10 to 0.9.13 ([#13329](https://github.com/opensearch-project/OpenSearch/pull/13329), [#13559](https://github.com/opensearch-project/OpenSearch/pull/13559))
Expand All @@ -47,6 +48,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add ability for Boolean and date field queries to run when only doc_values are enabled ([#11650](https://github.com/opensearch-project/OpenSearch/pull/11650))
- Refactor implementations of query phase searcher, allow QueryCollectorContext to have zero collectors ([#13481](https://github.com/opensearch-project/OpenSearch/pull/13481))
- Adds support to inject telemetry instances to plugins ([#13636](https://github.com/opensearch-project/OpenSearch/pull/13636))
- Adds support to provide tags with value in Gauge metric. ([#13994](https://github.com/opensearch-project/OpenSearch/pull/13994))
- Move cache removal notifications outside lru lock ([#14017](https://github.com/opensearch-project/OpenSearch/pull/14017))

### Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public Closeable createGauge(String name, String description, String unit, Suppl
return metricsTelemetry.createGauge(name, description, unit, valueProvider, tags);
}

@Override
public Closeable createGauge(String name, String description, String unit, Supplier<TaggedMeasurement> value) {
return metricsTelemetry.createGauge(name, description, unit, value);
}

@Override
public void close() throws IOException {
metricsTelemetry.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,16 @@ public interface MetricsRegistry extends Closeable {
*/
Closeable createGauge(String name, String description, String unit, Supplier<Double> valueProvider, Tags tags);

/**
* Creates the Observable Gauge type of Metric. Where the value provider will be called at a certain frequency
* to capture the value.
*
* @param name name of the observable gauge.
* @param description any description about the metric.
* @param unit unit of the metric.
* @param value value provider.
* @return closeable to dispose/close the Gauge metric.
*/
Closeable createGauge(String name, String description, String unit, Supplier<TaggedMeasurement> value);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.telemetry.metrics;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.telemetry.metrics.tags.Tags;

/**
* Observable Measurement for the Asynchronous instruments.
* @opensearch.experimental
*/
@ExperimentalApi
public final class TaggedMeasurement {
private final Double value;
private final Tags tags;

/**
* Factory method to create the {@link TaggedMeasurement} object.
* @param value value.
* @param tags tags to be added per value.
* @return tagged measurement TaggedMeasurement
*/
public static TaggedMeasurement create(double value, Tags tags) {
return new TaggedMeasurement(value, tags);
}

private TaggedMeasurement(double value, Tags tags) {
this.value = value;
this.tags = tags;
}

/**
* Returns the value.
* @return value
*/
public Double getValue() {
return value;
}

/**
* Returns the tags.
* @return tags
*/
public Tags getTags() {
return tags;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.telemetry.metrics.Counter;
import org.opensearch.telemetry.metrics.Histogram;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.TaggedMeasurement;
import org.opensearch.telemetry.metrics.tags.Tags;

import java.io.Closeable;
Expand Down Expand Up @@ -52,6 +53,11 @@ public Closeable createGauge(String name, String description, String unit, Suppl
return () -> {};
}

@Override
public Closeable createGauge(String name, String description, String unit, Supplier<TaggedMeasurement> value) {
return () -> {};
}

@Override
public void close() throws IOException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,19 @@ public void testGauge() {
assertSame(mockCloseable, closeable);
}

@SuppressWarnings("unchecked")
public void testGaugeWithValueAndTagSupplier() {
Closeable mockCloseable = mock(Closeable.class);
when(defaultMeterRegistry.createGauge(any(String.class), any(String.class), any(String.class), any(Supplier.class))).thenReturn(
mockCloseable
);
Closeable closeable = defaultMeterRegistry.createGauge(
"org.opensearch.telemetry.metrics.DefaultMeterRegistryTests.testObservableGauge",
"test observable gauge",
"ms",
() -> TaggedMeasurement.create(1.0, Tags.EMPTY)
);
assertSame(mockCloseable, closeable);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.metrics.data.DoublePointData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableExponentialHistogramPointData;
Expand Down Expand Up @@ -147,6 +150,36 @@ public void testGauge() throws Exception {

}

public void testGaugeWithValueAndTagSupplier() throws Exception {
String metricName = "test-gauge";
MetricsRegistry metricsRegistry = internalCluster().getInstance(MetricsRegistry.class);
InMemorySingletonMetricsExporter.INSTANCE.reset();
Tags tags = Tags.create().addTag("test", "integ-test");
final AtomicInteger testValue = new AtomicInteger(0);
Supplier<TaggedMeasurement> valueProvider = () -> {
return TaggedMeasurement.create(Double.valueOf(testValue.incrementAndGet()), tags);
};
Closeable gaugeCloseable = metricsRegistry.createGauge(metricName, "test", "ms", valueProvider);
// Sleep for about 2.2s to wait for metrics to be published.
Thread.sleep(2200);

InMemorySingletonMetricsExporter exporter = InMemorySingletonMetricsExporter.INSTANCE;

assertTrue(getMaxObservableGaugeValue(exporter, metricName) >= 2.0);

gaugeCloseable.close();
double observableGaugeValueAfterStop = getMaxObservableGaugeValue(exporter, metricName);

Map<AttributeKey<?>, Object> attributes = getMetricAttributes(exporter, metricName);

assertEquals("integ-test", attributes.get(AttributeKey.stringKey("test")));

// Sleep for about 1.2s to wait for metrics to see that closed observableGauge shouldn't execute the callable.
Thread.sleep(1200);
assertEquals(observableGaugeValueAfterStop, getMaxObservableGaugeValue(exporter, metricName), 0.0);

}

private static double getMaxObservableGaugeValue(InMemorySingletonMetricsExporter exporter, String metricName) {
List<MetricData> dataPoints = exporter.getFinishedMetricItems()
.stream()
Expand All @@ -159,6 +192,15 @@ private static double getMaxObservableGaugeValue(InMemorySingletonMetricsExporte
return totalValue;
}

private static Map<AttributeKey<?>, Object> getMetricAttributes(InMemorySingletonMetricsExporter exporter, String metricName) {
List<MetricData> dataPoints = exporter.getFinishedMetricItems()
.stream()
.filter(a -> a.getName().contains(metricName))
.collect(Collectors.toList());
Attributes attributes = dataPoints.get(0).getDoubleGaugeData().getPoints().stream().findAny().get().getAttributes();
return attributes.asMap();
}

@After
public void reset() {
InMemorySingletonMetricsExporter.INSTANCE.reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,17 @@ public Closeable createGauge(String name, String description, String unit, Suppl
return () -> doubleObservableGauge.close();
}

@Override
public Closeable createGauge(String name, String description, String unit, Supplier<TaggedMeasurement> value) {
ObservableDoubleGauge doubleObservableGauge = AccessController.doPrivileged(
(PrivilegedAction<ObservableDoubleGauge>) () -> otelMeter.gaugeBuilder(name)
.setUnit(unit)
.setDescription(description)
.buildWithCallback(record -> record.record(value.get().getValue(), OTelAttributesConverter.convert(value.get().getTags())))
);
return () -> doubleObservableGauge.close();
}

@Override
public void close() throws IOException {
meterProvider.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,4 +180,34 @@ public void testGauge() throws Exception {
closeable.close();
verify(observableDoubleGauge).close();
}

@SuppressWarnings({ "rawtypes", "unchecked" })
public void testGaugeWithValueAndTagsSupplier() throws Exception {
String observableGaugeName = "test-gauge";
String description = "test";
String unit = "1";
Meter mockMeter = mock(Meter.class);
OpenTelemetry mockOpenTelemetry = mock(OpenTelemetry.class);
ObservableDoubleGauge observableDoubleGauge = mock(ObservableDoubleGauge.class);
DoubleGaugeBuilder mockOTelDoubleGaugeBuilder = mock(DoubleGaugeBuilder.class);
MeterProvider meterProvider = mock(MeterProvider.class);
when(meterProvider.get(OTelTelemetryPlugin.INSTRUMENTATION_SCOPE_NAME)).thenReturn(mockMeter);
MetricsTelemetry metricsTelemetry = new OTelMetricsTelemetry(
new RefCountedReleasable("telemetry", mockOpenTelemetry, () -> {}),
meterProvider
);
when(mockMeter.gaugeBuilder(Mockito.contains(observableGaugeName))).thenReturn(mockOTelDoubleGaugeBuilder);
when(mockOTelDoubleGaugeBuilder.setDescription(description)).thenReturn(mockOTelDoubleGaugeBuilder);
when(mockOTelDoubleGaugeBuilder.setUnit(unit)).thenReturn(mockOTelDoubleGaugeBuilder);
when(mockOTelDoubleGaugeBuilder.buildWithCallback(any(Consumer.class))).thenReturn(observableDoubleGauge);

Closeable closeable = metricsTelemetry.createGauge(
observableGaugeName,
description,
unit,
() -> TaggedMeasurement.create(1.0, Tags.EMPTY)
);
closeable.close();
verify(observableDoubleGauge).close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1288,8 +1288,8 @@ public void testDeleteAndCreateSameIndexShardOnSameNode() throws Exception {
final Index index = state.metadata().index(indexName).getIndex();

assertBusy(() -> {
assertThat(Files.exists(shardDirectory(node_1, index, 0)), equalTo(false));
assertThat(Files.exists(shardDirectory(node_2, index, 0)), equalTo(true));
assertFalse(Arrays.stream(shardDirectory(node_1, index, 0)).anyMatch(Files::exists));
assertEquals(1, Arrays.stream(shardDirectory(node_2, index, 0)).filter(Files::exists).count());
});

logger.info("Moving the shard: {} again from node:{} to node:{}", indexName + "#0", node_2, node_1);
Expand All @@ -1302,11 +1302,10 @@ public void testDeleteAndCreateSameIndexShardOnSameNode() throws Exception {
.setWaitForNoInitializingShards(true)
.get();
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(Files.exists(shardDirectory(node_1, index, 0)), equalTo(true));

assertBusy(() -> {
assertThat(Files.exists(shardDirectory(node_1, index, 0)), equalTo(true));
assertThat(Files.exists(shardDirectory(node_2, index, 0)), equalTo(false));
assertEquals(1, Arrays.stream(shardDirectory(node_1, index, 0)).filter(Files::exists).count());
assertFalse(Arrays.stream(shardDirectory(node_2, index, 0)).anyMatch(Files::exists));
});

logger.info("Clearing the cache for index:{}. And verify the request stats doesn't go negative", indexName);
Expand All @@ -1319,11 +1318,12 @@ public void testDeleteAndCreateSameIndexShardOnSameNode() throws Exception {
assertTrue(stats.getMemorySizeInBytes() == 0);
}

private Path shardDirectory(String server, Index index, int shard) {
private Path[] shardDirectory(String server, Index index, int shard) {
NodeEnvironment env = internalCluster().getInstance(NodeEnvironment.class, server);
final Path[] paths = env.availableShardPaths(new ShardId(index, shard));
assert paths.length == 1;
return paths[0];
// the available paths of the shard may be bigger than the 1,
// it depends on `InternalTestCluster.numDataPaths`.
return paths;
}

private void setupIndex(Client client, String index) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.repositories.fs.ReloadableFsRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Before;
Expand Down Expand Up @@ -87,11 +86,6 @@ protected Settings nodeSettings(int nodeOrdinal) {
}
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build();
}

protected void setFailRate(String repoName, int value) throws ExecutionException, InterruptedException {
GetRepositoriesRequest gr = new GetRepositoriesRequest(new String[] { repoName });
GetRepositoriesResponse res = client().admin().cluster().getRepositories(gr).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.snapshots.SnapshotInfo;
Expand All @@ -42,6 +43,11 @@ protected int minimumNumberOfReplicas() {
return 1;
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build();
}

public void testMixedModeAddRemoteNodes() throws Exception {
internalCluster().setBootstrapClusterManagerNodeIndex(0);
List<String> cmNodes = internalCluster().startNodes(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.action.admin.indices.shrink.ResizeType;
import org.opensearch.action.support.ActiveShardCount;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.OpenSearchIntegTestCase;

Expand All @@ -27,6 +28,11 @@ public class ResizeIndexMigrationTestCase extends MigrationBaseTestCase {
private final static String DOC_REP_DIRECTION = "docrep";
private final static String MIXED_MODE = "mixed";

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build();
}

/*
* This test will verify the resize request failure, when cluster mode is mixed
* and index is on DocRep node, and migration to remote store is in progress.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.util.set.Sets;
import org.opensearch.core.ParseField;
Expand Down Expand Up @@ -384,9 +383,7 @@ public static Collection<AllocationDecider> createAllocationDeciders(
addAllocationDecider(deciders, new AwarenessAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new NodeLoadAwareAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new TargetPoolAllocationDecider());
if (FeatureFlags.isEnabled(FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING)) {
addAllocationDecider(deciders, new RemoteStoreMigrationAllocationDecider(settings, clusterSettings));
}
addAllocationDecider(deciders, new RemoteStoreMigrationAllocationDecider(settings, clusterSettings));

clusterPlugins.stream()
.flatMap(p -> p.createAllocationDeciders(settings, clusterSettings).stream())
Expand Down
Loading

0 comments on commit 2f8475d

Please sign in to comment.