Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds latency metrics for ClusterState Appliers and Listeners #12333

Merged
merged 10 commits into from
May 20, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 2.x]
### Added
- Add latency metrics for instrumenting critical clusterManager code paths ([#12333](https://github.com/opensearch-project/OpenSearch/pull/12333))
- Add support for Azure Managed Identity in repository-azure ([#12423](https://github.com/opensearch-project/OpenSearch/issues/12423))
- Add useCompoundFile index setting ([#13478](https://github.com/opensearch-project/OpenSearch/pull/13478))
- Make outbound side of transport protocol dependent ([#13293](https://github.com/opensearch-project/OpenSearch/pull/13293))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.rest.RestHandler;
import org.opensearch.test.ClusterServiceUtils;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.ScalingExecutorBuilder;
Expand Down Expand Up @@ -50,8 +51,7 @@ public void setup() {
clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE);
clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE);

clusterService = new ClusterService(settings, clusterSettings, threadPool);

clusterService = ClusterServiceUtils.createClusterService(settings, clusterSettings, threadPool);
}

public void testGetSettings() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.opensearch.search.aggregations.support.ValueType;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.test.ClusterServiceUtils;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Before;

Expand Down Expand Up @@ -57,7 +58,7 @@ public void setup() {
clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED);
clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE);
clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE);
clusterService = new ClusterService(settings, clusterSettings, null);
clusterService = ClusterServiceUtils.createClusterService(settings, clusterSettings, null);
when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(true);
when(queryInsightsService.getTopQueriesService(MetricType.LATENCY)).thenReturn(topQueriesService);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesResponse;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
import org.opensearch.test.ClusterServiceUtils;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;
Expand All @@ -33,7 +34,7 @@ public class TransportTopQueriesActionTests extends OpenSearchTestCase {
private final Settings.Builder settingsBuilder = Settings.builder();
private final Settings settings = settingsBuilder.build();
private final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
private final ClusterService clusterService = new ClusterService(settings, clusterSettings, threadPool);
private final ClusterService clusterService = ClusterServiceUtils.createClusterService(settings, clusterSettings, threadPool);
private final TransportService transportService = mock(TransportService.class);
private final QueryInsightsService topQueriesByLatencyService = mock(QueryInsightsService.class);
private final ActionFilters actionFilters = mock(ActionFilters.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster;

import org.opensearch.telemetry.metrics.Histogram;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.tags.Tags;

import java.util.Objects;
import java.util.Optional;

/**
* Class containing metrics (counters/latency) specific to ClusterManager.
gargharsh3134 marked this conversation as resolved.
Show resolved Hide resolved
*
* @opensearch.internal
*/
public final class ClusterManagerMetrics {

private static final String LATENCY_METRIC_UNIT_MS = "ms";

public final Histogram clusterStateAppliersHistogram;
public final Histogram clusterStateListenersHistogram;
public final Histogram rerouteHistogram;
public final Histogram clusterStateComputeHistogram;
public final Histogram clusterStatePublishHistogram;

public ClusterManagerMetrics(MetricsRegistry metricsRegistry) {
clusterStateAppliersHistogram = metricsRegistry.createHistogram(
"cluster.state.appliers.latency",
"Histogram for tracking the latency of cluster state appliers",
LATENCY_METRIC_UNIT_MS
);
clusterStateListenersHistogram = metricsRegistry.createHistogram(
"cluster.state.listeners.latency",
"Histogram for tracking the latency of cluster state listeners",
LATENCY_METRIC_UNIT_MS
);
rerouteHistogram = metricsRegistry.createHistogram(
"allocation.reroute.latency",
"Histogram for recording latency of shard re-routing",
LATENCY_METRIC_UNIT_MS
);
clusterStateComputeHistogram = metricsRegistry.createHistogram(
"cluster.state.new.compute.latency",
"Histogram for recording time taken to compute new cluster state",
LATENCY_METRIC_UNIT_MS
);
clusterStatePublishHistogram = metricsRegistry.createHistogram(
"cluster.state.publish.success.latency",
"Histogram for recording time taken to publish a new cluster state",
LATENCY_METRIC_UNIT_MS
);
}

public void recordLatency(Histogram histogram, Double value) {
histogram.record(value);
}

public void recordLatency(Histogram histogram, Double value, Optional<Tags> tags) {
if (Objects.isNull(tags) || tags.isEmpty()) {
histogram.record(value);
return;
}
histogram.record(value, tags.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ public ClusterModule(
List<ClusterPlugin> clusterPlugins,
ClusterInfoService clusterInfoService,
SnapshotsInfoService snapshotsInfoService,
ThreadContext threadContext
ThreadContext threadContext,
ClusterManagerMetrics clusterManagerMetrics
) {
this.clusterPlugins = clusterPlugins;
this.deciderList = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins);
Expand All @@ -159,7 +160,8 @@ public ClusterModule(
shardsAllocator,
clusterInfoService,
snapshotsInfoService,
settings
settings,
clusterManagerMetrics
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.Version;
import org.opensearch.cluster.ClusterInfoService;
import org.opensearch.cluster.ClusterManagerMetrics;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.RestoreInProgress;
import org.opensearch.cluster.health.ClusterHealthStatus;
Expand All @@ -56,10 +57,12 @@
import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.gateway.GatewayAllocator;
import org.opensearch.gateway.PriorityComparator;
import org.opensearch.gateway.ShardsBatchGatewayAllocator;
import org.opensearch.snapshots.SnapshotsInfoService;
import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -96,6 +99,7 @@ public class AllocationService {
private final ShardsAllocator shardsAllocator;
private final ClusterInfoService clusterInfoService;
private SnapshotsInfoService snapshotsInfoService;
private final ClusterManagerMetrics clusterManagerMetrics;

// only for tests that use the GatewayAllocator as the unique ExistingShardsAllocator
public AllocationService(
Expand All @@ -105,32 +109,40 @@ public AllocationService(
ClusterInfoService clusterInfoService,
SnapshotsInfoService snapshotsInfoService
) {
this(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService);
this(
allocationDeciders,
shardsAllocator,
clusterInfoService,
snapshotsInfoService,
new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE)
);
setExistingShardsAllocators(Collections.singletonMap(GatewayAllocator.ALLOCATOR_NAME, gatewayAllocator));
}

public AllocationService(
AllocationDeciders allocationDeciders,
ShardsAllocator shardsAllocator,
ClusterInfoService clusterInfoService,
SnapshotsInfoService snapshotsInfoService
SnapshotsInfoService snapshotsInfoService,
ClusterManagerMetrics clusterManagerMetrics
) {
this(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService, Settings.EMPTY);
this(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService, Settings.EMPTY, clusterManagerMetrics);
}

public AllocationService(
AllocationDeciders allocationDeciders,
ShardsAllocator shardsAllocator,
ClusterInfoService clusterInfoService,
SnapshotsInfoService snapshotsInfoService,
Settings settings

Settings settings,
ClusterManagerMetrics clusterManagerMetrics
) {
this.allocationDeciders = allocationDeciders;
this.shardsAllocator = shardsAllocator;
this.clusterInfoService = clusterInfoService;
this.snapshotsInfoService = snapshotsInfoService;
this.settings = settings;
this.clusterManagerMetrics = clusterManagerMetrics;
}

/**
Expand Down Expand Up @@ -550,11 +562,15 @@ private void reroute(RoutingAllocation allocation) {
assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metadata(), allocation).isEmpty()
: "auto-expand replicas out of sync with number of nodes in the cluster";
assert assertInitialized();

long rerouteStartTimeNS = System.nanoTime();
removeDelayMarkers(allocation);

allocateExistingUnassignedShards(allocation); // try to allocate existing shard copies first
shardsAllocator.allocate(allocation);
clusterManagerMetrics.recordLatency(
clusterManagerMetrics.rerouteHistogram,
(double) Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - rerouteStartTimeNS))
);
assert RoutingNodes.assertShardStats(allocation.routingNodes());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterManagerMetrics;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateApplier;
import org.opensearch.cluster.ClusterStateListener;
Expand All @@ -61,13 +62,15 @@
import org.opensearch.common.util.concurrent.PrioritizedOpenSearchThreadPoolExecutor;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.telemetry.metrics.tags.Tags;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;

import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -120,8 +123,15 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
private final String nodeName;

private NodeConnectionsService nodeConnectionsService;

public ClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
private final ClusterManagerMetrics clusterManagerMetrics;

public ClusterApplierService(
gargharsh3134 marked this conversation as resolved.
Show resolved Hide resolved
String nodeName,
Settings settings,
ClusterSettings clusterSettings,
ThreadPool threadPool,
ClusterManagerMetrics clusterManagerMetrics
) {
this.clusterSettings = clusterSettings;
this.threadPool = threadPool;
this.state = new AtomicReference<>();
Expand All @@ -132,6 +142,7 @@ public ClusterApplierService(String nodeName, Settings settings, ClusterSettings
CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
this::setSlowTaskLoggingThreshold
);
this.clusterManagerMetrics = clusterManagerMetrics;
}

private void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) {
Expand Down Expand Up @@ -597,15 +608,21 @@ private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent, S
callClusterStateAppliers(clusterChangedEvent, stopWatch, lowPriorityStateAppliers);
}

private static void callClusterStateAppliers(
private void callClusterStateAppliers(
ClusterChangedEvent clusterChangedEvent,
StopWatch stopWatch,
Collection<ClusterStateApplier> clusterStateAppliers
) {
for (ClusterStateApplier applier : clusterStateAppliers) {
logger.trace("calling [{}] with change to version [{}]", applier, clusterChangedEvent.state().version());
try (TimingHandle ignored = stopWatch.timing("running applier [" + applier + "]")) {
long applierStartTimeNS = System.nanoTime();
applier.applyClusterState(clusterChangedEvent);
clusterManagerMetrics.recordLatency(
clusterManagerMetrics.clusterStateAppliersHistogram,
(double) Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - applierStartTimeNS)),
Optional.of(Tags.create().addTag("Operation", applier.getClass().getSimpleName()))
);
}
}
}
Expand All @@ -624,7 +641,13 @@ private void callClusterStateListener(
try {
logger.trace("calling [{}] with change to version [{}]", listener, clusterChangedEvent.state().version());
try (TimingHandle ignored = stopWatch.timing("notifying listener [" + listener + "]")) {
long listenerStartTimeNS = System.nanoTime();
listener.clusterChanged(clusterChangedEvent);
clusterManagerMetrics.recordLatency(
clusterManagerMetrics.clusterStateListenersHistogram,
(double) Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - listenerStartTimeNS)),
Optional.of(Tags.create().addTag("Operation", listener.getClass().getSimpleName()))
);
}
} catch (Exception ex) {
logger.warn("failed to notify ClusterStateListener", ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.cluster.service;

import org.opensearch.cluster.ClusterManagerMetrics;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
Expand All @@ -20,7 +21,12 @@
*/
@PublicApi(since = "2.2.0")
public class ClusterManagerService extends MasterService {
public ClusterManagerService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
super(settings, clusterSettings, threadPool);
public ClusterManagerService(
Settings settings,
ClusterSettings clusterSettings,
ThreadPool threadPool,
ClusterManagerMetrics clusterManagerMetrics
) {
super(settings, clusterSettings, threadPool, clusterManagerMetrics);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.cluster.service;

import org.opensearch.cluster.ClusterManagerMetrics;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateApplier;
Expand Down Expand Up @@ -91,12 +92,17 @@ public class ClusterService extends AbstractLifecycleComponent {

private IndexingPressureService indexingPressureService;

public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
public ClusterService(
Settings settings,
ClusterSettings clusterSettings,
ThreadPool threadPool,
ClusterManagerMetrics clusterManagerMetrics
) {
this(
settings,
clusterSettings,
new ClusterManagerService(settings, clusterSettings, threadPool),
new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), settings, clusterSettings, threadPool)
new ClusterManagerService(settings, clusterSettings, threadPool, clusterManagerMetrics),
new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), settings, clusterSettings, threadPool, clusterManagerMetrics)
);
}

Expand Down
Loading
Loading