From 4e423319de8205459876a3484891f1a8035510bb Mon Sep 17 00:00:00 2001 From: Lakshya Taragi Date: Tue, 1 Oct 2024 07:06:51 +0530 Subject: [PATCH] Address comments Signed-off-by: Lakshya Taragi --- CHANGELOG.md | 1 + .../opensearch/remotestore/RemoteStoreIT.java | 66 +++++++++++++++++++ .../RemoteStorePinnedTimestampsIT.java | 24 ++++--- .../cluster/node/stats/NodesStatsRequest.java | 2 +- .../node/stats/TransportNodesStatsAction.java | 2 +- .../main/java/org/opensearch/node/Node.java | 3 +- .../java/org/opensearch/node/NodeService.java | 12 ++-- .../remotestore/RemoteStoreNodeService.java | 9 --- .../remotestore/RemoteStoreNodeStats.java | 34 ++++++---- .../cluster/node/stats/NodeStatsTests.java | 17 ++++- 10 files changed, 127 insertions(+), 43 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index da14948781849..5c39dbb1a86dd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add support for msearch API to pass search pipeline name - ([#15923](https://github.com/opensearch-project/OpenSearch/pull/15923)) - Add _list/indices API as paginated alternate to _cat/indices ([#14718](https://github.com/opensearch-project/OpenSearch/pull/14718)) - Add success and failure metrics for async shard fetch ([#15976](https://github.com/opensearch-project/OpenSearch/pull/15976)) +- Add new metric REMOTE_STORE to NodeStats API response ([#15611](https://github.com/opensearch-project/OpenSearch/pull/15611)) ### Dependencies - Bump `com.azure:azure-identity` from 1.13.0 to 1.13.2 ([#15578](https://github.com/opensearch-project/OpenSearch/pull/15578)) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index 603b1c1775c1a..ebb911c739eb3 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -1012,4 +1012,70 @@ public void testAsyncTranslogDurabilityRestrictionsThroughIdxTemplates() throws .get() ); } + + public void testCloseIndexWithNoOpSyncAndFlushForSyncTranslog() throws InterruptedException { + internalCluster().startNodes(3); + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), "5s")) + .get(); + Settings.Builder settings = Settings.builder() + .put(remoteStoreIndexSettings(0, 10000L, -1)) + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "1s"); + createIndex(INDEX_NAME, settings.build()); + CountDownLatch latch = new CountDownLatch(1); + new Thread(() -> { + if (randomBoolean()) { + for (int i = 0; i < randomIntBetween(1, 5); i++) { + indexSingleDoc(INDEX_NAME); + } + flushAndRefresh(INDEX_NAME); + } + // Index single doc to start the asyn io processor to run which will lead to 10s wait time before the next sync. + indexSingleDoc(INDEX_NAME); + // Reduce the latch for the main thread to flush after some sleep. + latch.countDown(); + // Index another doc and in this case the flush would have happened before the sync. + indexSingleDoc(INDEX_NAME); + }).start(); + // Wait for atleast one doc to be ingested. + latch.await(); + // Sleep for some time for the next doc to be present in lucene buffer. If flush happens first before the doc #2 + // gets indexed, then it goes into the happy case where the close index happens succefully. + Thread.sleep(1000); + // Flush so that the subsequent sync or flushes are no-op. + flush(INDEX_NAME); + // Closing the index involves translog.sync and shard.flush which are now no-op. + client().admin().indices().close(Requests.closeIndexRequest(INDEX_NAME)).actionGet(); + Thread.sleep(10000); + ensureGreen(INDEX_NAME); + } + + public void testCloseIndexWithNoOpSyncAndFlushForAsyncTranslog() throws InterruptedException { + internalCluster().startNodes(3); + Settings.Builder settings = Settings.builder() + .put(remoteStoreIndexSettings(0, 10000L, -1)) + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "1s") + .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Durability.ASYNC) + .put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "10s"); + createIndex(INDEX_NAME, settings.build()); + CountDownLatch latch = new CountDownLatch(1); + new Thread(() -> { + // Index some docs to start the asyn io processor to run which will lead to 10s wait time before the next sync. + indexSingleDoc(INDEX_NAME); + indexSingleDoc(INDEX_NAME); + indexSingleDoc(INDEX_NAME); + // Reduce the latch for the main thread to flush after some sleep. + latch.countDown(); + }).start(); + // Wait for atleast one doc to be ingested. + latch.await(); + // Flush so that the subsequent sync or flushes are no-op. + flush(INDEX_NAME); + // Closing the index involves translog.sync and shard.flush which are now no-op. + client().admin().indices().close(Requests.closeIndexRequest(INDEX_NAME)).actionGet(); + Thread.sleep(10000); + ensureGreen(INDEX_NAME); + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsIT.java index aba4f7e59365a..024e0e952eea5 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsIT.java @@ -21,9 +21,8 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.REMOTE_STORE_NODE_STATS; +import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.REMOTE_STORE; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class RemoteStorePinnedTimestampsIT extends RemoteStoreBaseIntegTestCase { @@ -195,26 +194,31 @@ public void testLastSuccessfulFetchOfPinnedTimestampsPresentInNodeStats() throws .put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_SCHEDULER_INTERVAL.getKey(), "1m") .build(); internalCluster().startClusterManagerOnlyNode(pinnedTimestampEnabledSettings); - internalCluster().startDataOnlyNodes(2, pinnedTimestampEnabledSettings); - ensureStableCluster(3); + String remoteNodeName = internalCluster().startDataOnlyNodes(1, pinnedTimestampEnabledSettings).get(0); + ensureStableCluster(2); + RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance( + RemoteStorePinnedTimestampService.class, + remoteNodeName + ); - logger.info("Sleeping for 70 seconds to wait for fetching of pinned timestamps"); - Thread.sleep(70000); + remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1)); - long lastSuccessfulFetchOfPinnedTimestamps = RemoteStorePinnedTimestampService.getPinnedTimestamps().v1(); - assertTrue(lastSuccessfulFetchOfPinnedTimestamps > 0L); assertBusy(() -> { + long lastSuccessfulFetchOfPinnedTimestamps = RemoteStorePinnedTimestampService.getPinnedTimestamps().v1(); + assertTrue(lastSuccessfulFetchOfPinnedTimestamps > 0L); NodesStatsResponse nodesStatsResponse = internalCluster().client() .admin() .cluster() .prepareNodesStats() - .addMetric(REMOTE_STORE_NODE_STATS.metricName()) + .addMetric(REMOTE_STORE.metricName()) .execute() .actionGet(); for (NodeStats nodeStats : nodesStatsResponse.getNodes()) { long lastRecordedFetch = nodeStats.getRemoteStoreNodeStats().getLastSuccessfulFetchOfPinnedTimestamps(); assertTrue(lastRecordedFetch >= lastSuccessfulFetchOfPinnedTimestamps); } - }, 1, TimeUnit.MINUTES); + }); + + remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueMinutes(3)); } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java index fe34528e063be..a5b00ed82d3cb 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java @@ -221,7 +221,7 @@ public enum Metric { REPOSITORIES("repositories"), ADMISSION_CONTROL("admission_control"), CACHE_STATS("caches"), - REMOTE_STORE_NODE_STATS("remote_store_node_stats"); + REMOTE_STORE("remote_store"); private String metricName; diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java index 40a6bcb35ca96..a98d245af872b 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java @@ -130,7 +130,7 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) { NodesStatsRequest.Metric.REPOSITORIES.containedIn(metrics), NodesStatsRequest.Metric.ADMISSION_CONTROL.containedIn(metrics), NodesStatsRequest.Metric.CACHE_STATS.containedIn(metrics), - NodesStatsRequest.Metric.REMOTE_STORE_NODE_STATS.containedIn(metrics) + NodesStatsRequest.Metric.REMOTE_STORE.containedIn(metrics) ); } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index ae351d0dc5dd8..4962d72d8728a 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -1329,8 +1329,7 @@ protected Node( segmentReplicationStatsTracker, repositoryService, admissionControlService, - cacheService, - remoteStoreNodeService + cacheService ); final SearchService searchService = newSearchService( diff --git a/server/src/main/java/org/opensearch/node/NodeService.java b/server/src/main/java/org/opensearch/node/NodeService.java index ae3868a5980bc..15164f6b74b32 100644 --- a/server/src/main/java/org/opensearch/node/NodeService.java +++ b/server/src/main/java/org/opensearch/node/NodeService.java @@ -54,7 +54,8 @@ import org.opensearch.indices.IndicesService; import org.opensearch.ingest.IngestService; import org.opensearch.monitor.MonitorService; -import org.opensearch.node.remotestore.RemoteStoreNodeService; +import org.opensearch.node.remotestore.RemoteStoreNodeStats; +import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService; import org.opensearch.plugins.PluginsService; import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlService; import org.opensearch.repositories.RepositoriesService; @@ -102,7 +103,6 @@ public class NodeService implements Closeable { private final AdmissionControlService admissionControlService; private final SegmentReplicationStatsTracker segmentReplicationStatsTracker; private final CacheService cacheService; - private final RemoteStoreNodeService remoteStoreNodeService; NodeService( Settings settings, @@ -130,8 +130,7 @@ public class NodeService implements Closeable { SegmentReplicationStatsTracker segmentReplicationStatsTracker, RepositoriesService repositoriesService, AdmissionControlService admissionControlService, - CacheService cacheService, - RemoteStoreNodeService remoteStoreNodeService + CacheService cacheService ) { this.settings = settings; this.threadPool = threadPool; @@ -161,7 +160,6 @@ public class NodeService implements Closeable { clusterService.addStateApplier(searchPipelineService); this.segmentReplicationStatsTracker = segmentReplicationStatsTracker; this.cacheService = cacheService; - this.remoteStoreNodeService = remoteStoreNodeService; } public NodeInfo info( @@ -246,7 +244,7 @@ public NodeStats stats( boolean repositoriesStats, boolean admissionControl, boolean cacheService, - boolean remoteStoreNodeService + boolean remoteStoreNodeStats ) { // for indices stats we want to include previous allocated shards stats as well (it will // only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats) @@ -280,7 +278,7 @@ public NodeStats stats( repositoriesStats ? this.repositoriesService.getRepositoriesStats() : null, admissionControl ? this.admissionControlService.stats() : null, cacheService ? this.cacheService.stats(indices) : null, - remoteStoreNodeService ? this.remoteStoreNodeService.getRemoteStoreNodeStats() : null + remoteStoreNodeStats ? new RemoteStoreNodeStats(RemoteStorePinnedTimestampService.getPinnedTimestamps().v1()) : null ); } diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java index 228d52bb1d6bc..cc5d8b0e62e90 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java @@ -18,7 +18,6 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.util.FeatureFlags; -import org.opensearch.indices.RemoteStoreSettings; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import org.opensearch.repositories.RepositoryException; @@ -250,12 +249,4 @@ public static boolean isMigratingToRemoteStore(Metadata metadata) { return (isMixedMode && isRemoteStoreMigrationDirection); } - - public RemoteStoreNodeStats getRemoteStoreNodeStats() { - long lastSuccessfulFetchOfPinnedTimestamps = 0; - if (RemoteStoreSettings.isPinnedTimestampsEnabled()) { - lastSuccessfulFetchOfPinnedTimestamps = RemoteStorePinnedTimestampService.getPinnedTimestamps().v1(); - } - return new RemoteStoreNodeStats(lastSuccessfulFetchOfPinnedTimestamps); - } } diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeStats.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeStats.java index 09ce6f95283c6..180187f69b0da 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeStats.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeStats.java @@ -8,15 +8,14 @@ package org.opensearch.node.remotestore; -import org.opensearch.Version; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; -import org.opensearch.indices.RemoteStoreSettings; import java.io.IOException; +import java.util.Objects; /** * Node level remote store stats @@ -24,13 +23,13 @@ */ public class RemoteStoreNodeStats implements Writeable, ToXContentFragment { - public static final String STATS_NAME = "remote_store_node_stats"; + public static final String STATS_NAME = "remote_store"; public static final String LAST_SUCCESSFUL_FETCH_OF_PINNED_TIMESTAMPS = "last_successful_fetch_of_pinned_timestamps"; /** * Time stamp for the last successful fetch of pinned timestamps by the {@linkplain RemoteStorePinnedTimestampService} */ - private long lastSuccessfulFetchOfPinnedTimestamps; + private final long lastSuccessfulFetchOfPinnedTimestamps; public RemoteStoreNodeStats(final long lastSuccessfulFetchOfPinnedTimestamps) { this.lastSuccessfulFetchOfPinnedTimestamps = lastSuccessfulFetchOfPinnedTimestamps; @@ -41,18 +40,12 @@ public long getLastSuccessfulFetchOfPinnedTimestamps() { } public RemoteStoreNodeStats(StreamInput in) throws IOException { - // TODO: change version to V_2_18_0 - if (in.getVersion().onOrAfter(Version.CURRENT) && RemoteStoreSettings.isPinnedTimestampsEnabled()) { - this.lastSuccessfulFetchOfPinnedTimestamps = in.readOptionalLong(); - } + this.lastSuccessfulFetchOfPinnedTimestamps = in.readLong(); } @Override public void writeTo(StreamOutput out) throws IOException { - // TODO: change version to V_2_18_0 - if (out.getVersion().onOrAfter(Version.CURRENT) && RemoteStoreSettings.isPinnedTimestampsEnabled()) { - out.writeOptionalLong(this.lastSuccessfulFetchOfPinnedTimestamps); - } + out.writeLong(this.lastSuccessfulFetchOfPinnedTimestamps); } @Override @@ -66,4 +59,21 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws public String toString() { return "RemoteStoreNodeStats{ lastSuccessfulFetchOfPinnedTimestamps=" + lastSuccessfulFetchOfPinnedTimestamps + "}"; } + + @Override + public boolean equals(Object o) { + if (o == null) { + return false; + } + if (o.getClass() != RemoteStoreNodeStats.class) { + return false; + } + RemoteStoreNodeStats other = (RemoteStoreNodeStats) o; + return this.lastSuccessfulFetchOfPinnedTimestamps == other.lastSuccessfulFetchOfPinnedTimestamps; + } + + @Override + public int hashCode() { + return Objects.hash(lastSuccessfulFetchOfPinnedTimestamps); + } } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java index d6cee59f57098..761b82e9d1c59 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -95,6 +95,7 @@ import org.opensearch.node.NodeResourceUsageStats; import org.opensearch.node.NodesResourceUsageStats; import org.opensearch.node.ResponseCollectorService; +import org.opensearch.node.remotestore.RemoteStoreNodeStats; import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController; import org.opensearch.ratelimitting.admissioncontrol.controllers.CpuBasedAdmissionController; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; @@ -614,6 +615,14 @@ public void testSerialization() throws IOException { } else { assertEquals(nodeCacheStats, deserializedNodeCacheStats); } + + RemoteStoreNodeStats remoteStoreNodeStats = nodeStats.getRemoteStoreNodeStats(); + RemoteStoreNodeStats deserializedRemoteStoreNodeStats = deserializedNodeStats.getRemoteStoreNodeStats(); + if (remoteStoreNodeStats == null) { + assertNull(deserializedRemoteStoreNodeStats); + } else { + assertEquals(remoteStoreNodeStats, deserializedRemoteStoreNodeStats); + } } } } @@ -996,6 +1005,12 @@ public void apply(String action, AdmissionControlActionType admissionControlActi nodeCacheStats = new NodeCacheStats(cacheStatsMap, flags); } + RemoteStoreNodeStats remoteStoreNodeStats = null; + if (frequently()) { + long lastSuccessfulFetchOfPinnedTimestamps = randomNonNegativeLong(); + remoteStoreNodeStats = new RemoteStoreNodeStats(lastSuccessfulFetchOfPinnedTimestamps); + } + // TODO: Only remote_store based aspects of NodeIndicesStats are being tested here. // It is possible to test other metrics in NodeIndicesStats as well since it extends Writeable now return new NodeStats( @@ -1028,7 +1043,7 @@ public void apply(String action, AdmissionControlActionType admissionControlActi null, admissionControlStats, nodeCacheStats, - null + remoteStoreNodeStats ); }