Skip to content

Commit

Permalink
Adding other cluster manager metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Harsh Garg <gkharsh@amazon.com>
  • Loading branch information
Harsh Garg committed Mar 15, 2024
1 parent 6918920 commit 8d20632
Show file tree
Hide file tree
Showing 13 changed files with 170 additions and 31 deletions.
12 changes: 10 additions & 2 deletions server/src/main/java/org/opensearch/cluster/ClusterModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
import org.opensearch.snapshots.SnapshotsInfoService;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskResultsService;
import org.opensearch.telemetry.metrics.MetricsRegistry;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -143,15 +144,22 @@ public ClusterModule(
List<ClusterPlugin> clusterPlugins,
ClusterInfoService clusterInfoService,
SnapshotsInfoService snapshotsInfoService,
ThreadContext threadContext
ThreadContext threadContext,
MetricsRegistry metricsRegistry
) {
this.clusterPlugins = clusterPlugins;
this.deciderList = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins);
this.allocationDeciders = new AllocationDeciders(deciderList);
this.shardsAllocator = createShardsAllocator(settings, clusterService.getClusterSettings(), clusterPlugins);
this.clusterService = clusterService;
this.indexNameExpressionResolver = new IndexNameExpressionResolver(threadContext);
this.allocationService = new AllocationService(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService);
this.allocationService = new AllocationService(
allocationDeciders,
shardsAllocator,
clusterInfoService,
snapshotsInfoService,
metricsRegistry
);
}

public static List<Entry> getNamedWriteables() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@
import org.opensearch.gateway.GatewayAllocator;
import org.opensearch.gateway.PriorityComparator;
import org.opensearch.snapshots.SnapshotsInfoService;
import org.opensearch.telemetry.metrics.Histogram;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -91,6 +94,7 @@ public class AllocationService {
private final ShardsAllocator shardsAllocator;
private final ClusterInfoService clusterInfoService;
private SnapshotsInfoService snapshotsInfoService;
private Histogram rerouteHistogram;

// only for tests that use the GatewayAllocator as the unique ExistingShardsAllocator
public AllocationService(
Expand All @@ -100,20 +104,30 @@ public AllocationService(
ClusterInfoService clusterInfoService,
SnapshotsInfoService snapshotsInfoService
) {
this(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService);
this(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService, NoopMetricsRegistry.INSTANCE);
setExistingShardsAllocators(Collections.singletonMap(GatewayAllocator.ALLOCATOR_NAME, gatewayAllocator));
}

public AllocationService(
AllocationDeciders allocationDeciders,
ShardsAllocator shardsAllocator,
ClusterInfoService clusterInfoService,
SnapshotsInfoService snapshotsInfoService
SnapshotsInfoService snapshotsInfoService,
MetricsRegistry metricsRegistry
) {
this.allocationDeciders = allocationDeciders;
this.shardsAllocator = shardsAllocator;
this.clusterInfoService = clusterInfoService;
this.snapshotsInfoService = snapshotsInfoService;
initializeMetrics(metricsRegistry);
}

private void initializeMetrics(MetricsRegistry metricsRegistry) {
this.rerouteHistogram = metricsRegistry.createHistogram(
"allocation.reroute.latency",
"Histogram for recording latency of shard re-routing",
"ms"
);
}

/**
Expand Down Expand Up @@ -533,11 +547,12 @@ private void reroute(RoutingAllocation allocation) {
assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metadata(), allocation).isEmpty()
: "auto-expand replicas out of sync with number of nodes in the cluster";
assert assertInitialized();

long rerouteStartTimeMS = System.currentTimeMillis();
removeDelayMarkers(allocation);

allocateExistingUnassignedShards(allocation); // try to allocate existing shard copies first
shardsAllocator.allocate(allocation);
this.rerouteHistogram.record((double) Math.max(0, System.currentTimeMillis() - rerouteStartTimeMS));
assert RoutingNodes.assertShardStats(allocation.routingNodes());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.threadpool.ThreadPool;

/**
Expand All @@ -20,7 +21,12 @@
*/
@PublicApi(since = "2.2.0")
public class ClusterManagerService extends MasterService {
public ClusterManagerService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
super(settings, clusterSettings, threadPool);
public ClusterManagerService(
Settings settings,
ClusterSettings clusterSettings,
ThreadPool threadPool,
MetricsRegistry metricsRegistry
) {
super(settings, clusterSettings, threadPool, metricsRegistry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public ClusterService(Settings settings, ClusterSettings clusterSettings, Thread
this(
settings,
clusterSettings,
new ClusterManagerService(settings, clusterSettings, threadPool),
new ClusterManagerService(settings, clusterSettings, threadPool, metricsRegistry),
new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), settings, clusterSettings, threadPool, metricsRegistry)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.discovery.Discovery;
import org.opensearch.node.Node;
import org.opensearch.telemetry.metrics.Histogram;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.tags.Tags;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -135,8 +138,10 @@ public class MasterService extends AbstractLifecycleComponent {
protected final ClusterManagerTaskThrottler clusterManagerTaskThrottler;
private final ClusterManagerThrottlingStats throttlingStats;
private final ClusterStateStats stateStats;
private Histogram clusterStateComputeHistogram;
private Histogram clusterStatePublishHistogram;

public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, MetricsRegistry metricsRegistry) {
this.nodeName = Objects.requireNonNull(Node.NODE_NAME_SETTING.get(settings));

this.slowTaskLoggingThreshold = CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings);
Expand All @@ -154,6 +159,20 @@ public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadP
);
this.stateStats = new ClusterStateStats();
this.threadPool = threadPool;
initializeMetrics(metricsRegistry);
}

private void initializeMetrics(MetricsRegistry metricsRegistry) {
this.clusterStateComputeHistogram = metricsRegistry.createHistogram(
"cluster.state.new.compute.latency",
"Histogram for recording time taken to compute new cluster state",
"ms"
);
this.clusterStatePublishHistogram = metricsRegistry.createHistogram(
"cluster.state.publish.success.latency",
"Histogram for recording time taken to publish a new cluster state",
"ms"
);
}

private void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) {
Expand Down Expand Up @@ -302,6 +321,10 @@ private void runTasks(TaskInputs taskInputs) {
taskOutputs.notifyFailedTasks();
final TimeValue computationTime = getTimeSince(computationStartTime);
logExecutionTime(computationTime, "compute cluster state update", summary);
clusterStateComputeHistogram.record(
computationTime.getMillis(),
Tags.create().addTag("Operation", taskInputs.executor.getClass().getSimpleName())
);

if (taskOutputs.clusterStateUnchanged()) {
final long notificationStartTime = threadPool.preciseRelativeTimeInNanos();
Expand Down Expand Up @@ -361,6 +384,7 @@ protected boolean blockingAllowed() {
final long durationMillis = getTimeSince(startTimeNanos).millis();
stateStats.stateUpdateTook(durationMillis);
stateStats.stateUpdated();
clusterStatePublishHistogram.record(durationMillis);
} catch (Exception e) {
stateStats.stateUpdateFailed();
onPublicationFailed(clusterChangedEvent, taskOutputs, startTimeNanos, e);
Expand Down
3 changes: 2 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,8 @@ protected Node(
clusterPlugins,
clusterInfoService,
snapshotsInfoService,
threadPool.getThreadContext()
threadPool.getThreadContext(),
metricsRegistry
);
modules.add(clusterModule);
IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.gateway.GatewayAllocator;
import org.opensearch.plugins.ClusterPlugin;
import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry;
import org.opensearch.test.ClusterServiceUtils;
import org.opensearch.test.gateway.TestGatewayAllocator;

Expand Down Expand Up @@ -165,7 +166,7 @@ public void testRegisterAllocationDeciderDuplicate() {
public Collection<AllocationDecider> createAllocationDeciders(Settings settings, ClusterSettings clusterSettings) {
return Collections.singletonList(new EnableAllocationDecider(settings, clusterSettings));
}
}), clusterInfoService, null, threadContext)
}), clusterInfoService, null, threadContext, NoopMetricsRegistry.INSTANCE)
);
assertEquals(e.getMessage(), "Cannot specify allocation decider [" + EnableAllocationDecider.class.getName() + "] twice");
}
Expand All @@ -176,7 +177,7 @@ public void testRegisterAllocationDecider() {
public Collection<AllocationDecider> createAllocationDeciders(Settings settings, ClusterSettings clusterSettings) {
return Collections.singletonList(new FakeAllocationDecider());
}
}), clusterInfoService, null, threadContext);
}), clusterInfoService, null, threadContext, NoopMetricsRegistry.INSTANCE);
assertTrue(module.deciderList.stream().anyMatch(d -> d.getClass().equals(FakeAllocationDecider.class)));
}

Expand All @@ -186,7 +187,7 @@ private ClusterModule newClusterModuleWithShardsAllocator(Settings settings, Str
public Map<String, Supplier<ShardsAllocator>> getShardsAllocators(Settings settings, ClusterSettings clusterSettings) {
return Collections.singletonMap(name, supplier);
}
}), clusterInfoService, null, threadContext);
}), clusterInfoService, null, threadContext, NoopMetricsRegistry.INSTANCE);
}

public void testRegisterShardsAllocator() {
Expand All @@ -207,7 +208,15 @@ public void testUnknownShardsAllocator() {
Settings settings = Settings.builder().put(ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING.getKey(), "dne").build();
IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> new ClusterModule(settings, clusterService, Collections.emptyList(), clusterInfoService, null, threadContext)
() -> new ClusterModule(
settings,
clusterService,
Collections.emptyList(),
clusterInfoService,
null,
threadContext,
NoopMetricsRegistry.INSTANCE
)
);
assertEquals("Unknown ShardsAllocator [dne]", e.getMessage());
}
Expand Down Expand Up @@ -290,7 +299,8 @@ public void testRejectsReservedExistingShardsAllocatorName() {
Collections.singletonList(existingShardsAllocatorPlugin(GatewayAllocator.ALLOCATOR_NAME)),
clusterInfoService,
null,
threadContext
threadContext,
NoopMetricsRegistry.INSTANCE
);
expectThrows(IllegalArgumentException.class, () -> clusterModule.setExistingShardsAllocators(new TestGatewayAllocator()));
}
Expand All @@ -302,7 +312,8 @@ public void testRejectsDuplicateExistingShardsAllocatorName() {
Arrays.asList(existingShardsAllocatorPlugin("duplicate"), existingShardsAllocatorPlugin("duplicate")),
clusterInfoService,
null,
threadContext
threadContext,
NoopMetricsRegistry.INSTANCE
);
expectThrows(IllegalArgumentException.class, () -> clusterModule.setExistingShardsAllocators(new TestGatewayAllocator()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.opensearch.monitor.StatusInfo;
import org.opensearch.node.Node;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.ClusterServiceUtils;
import org.opensearch.test.OpenSearchTestCase;
Expand Down Expand Up @@ -179,7 +180,8 @@ private void setupRealClusterManagerServiceAndCoordinator(long term, ClusterStat
ClusterManagerService clusterManagerService = new ClusterManagerService(
Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test_node").build(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
threadPool
threadPool,
NoopMetricsRegistry.INSTANCE
);
AtomicReference<ClusterState> clusterStateRef = new AtomicReference<>(initialState);
clusterManagerService.setClusterStatePublisher((event, publishListener, ackListener) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.gateway.GatewayAllocator;
import org.opensearch.snapshots.EmptySnapshotsInfoService;
import org.opensearch.telemetry.metrics.Histogram;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.gateway.TestGatewayAllocator;

Expand All @@ -77,6 +80,12 @@
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.mockito.ArgumentMatchers.anyDouble;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class AllocationServiceTests extends OpenSearchTestCase {

Expand Down Expand Up @@ -137,6 +146,9 @@ public void testAssignsPrimariesInPriorityOrderThenReplicas() {
.put(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), Integer.MAX_VALUE)
.build();
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
final MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
final Histogram rerouteHistogram = mock(Histogram.class);
when(metricsRegistry.createHistogram(anyString(), anyString(), anyString())).thenReturn(rerouteHistogram);
final AllocationService allocationService = new AllocationService(
new AllocationDeciders(
Arrays.asList(
Expand All @@ -158,7 +170,8 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing
}
},
new EmptyClusterInfoService(),
EmptySnapshotsInfoService.INSTANCE
EmptySnapshotsInfoService.INSTANCE,
metricsRegistry
);

final String unrealisticAllocatorName = "unrealistic";
Expand Down Expand Up @@ -258,10 +271,13 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing
assertThat(routingTable3.index("mediumPriority").shardsWithState(ShardRoutingState.STARTED).size(), equalTo(4));
assertTrue(routingTable3.index("lowPriority").allPrimaryShardsActive());
assertThat(routingTable3.index("invalid").shardsWithState(ShardRoutingState.STARTED), empty());

verify(metricsRegistry, times(1)).createHistogram(anyString(), anyString(), anyString());
verify(rerouteHistogram, times(3)).record(anyDouble());
}

public void testExplainsNonAllocationOfShardWithUnknownAllocator() {
final AllocationService allocationService = new AllocationService(null, null, null, null);
final AllocationService allocationService = new AllocationService(null, null, null, null, NoopMetricsRegistry.INSTANCE);
allocationService.setExistingShardsAllocators(
Collections.singletonMap(GatewayAllocator.ALLOCATOR_NAME, new TestGatewayAllocator())
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.opensearch.test.ClusterServiceUtils.createNoOpNodeConnectionsService;
import static org.opensearch.test.ClusterServiceUtils.setState;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyDouble;
import static org.mockito.ArgumentMatchers.anyString;
Expand All @@ -81,10 +85,6 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import static org.opensearch.test.ClusterServiceUtils.createNoOpNodeConnectionsService;
import static org.opensearch.test.ClusterServiceUtils.setState;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;

public class ClusterApplierServiceTests extends OpenSearchTestCase {

Expand Down Expand Up @@ -134,7 +134,8 @@ private TimedClusterApplierService createTimedClusterService(boolean makeCluster
TimedClusterApplierService timedClusterApplierService = new TimedClusterApplierService(
Settings.builder().put("cluster.name", "ClusterApplierServiceTests").build(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
threadPool, metricsRegistry
threadPool,
metricsRegistry
);
timedClusterApplierService.setNodeConnectionsService(createNoOpNodeConnectionsService());
timedClusterApplierService.setInitialState(
Expand Down Expand Up @@ -673,7 +674,12 @@ static class TimedClusterApplierService extends ClusterApplierService {
volatile Long currentTimeOverride = null;
boolean applicationMayFail;

TimedClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, MetricsRegistry metricsRegistry) {
TimedClusterApplierService(
Settings settings,
ClusterSettings clusterSettings,
ThreadPool threadPool,
MetricsRegistry metricsRegistry
) {
super("test_node", settings, clusterSettings, threadPool, metricsRegistry);
this.clusterSettings = clusterSettings;
}
Expand Down
Loading

0 comments on commit 8d20632

Please sign in to comment.