From e9a076b13145ee40605a17a4ff00bc6e3a39ba46 Mon Sep 17 00:00:00 2001
From: Ashish <ssashish@amazon.com>
Date: Mon, 10 Jul 2023 19:24:44 +0530
Subject: [PATCH] Extend existing IndexRecoveryIT for remote indexes (#8505)

Signed-off-by: Ashish Singh <ssashish@amazon.com>
Signed-off-by: sahil buddharaju <sahilbud@amazon.com>
---
 .../indices/recovery/IndexRecoveryIT.java     | 40 ++++++---
 .../remotestore/RemoteIndexRecoveryIT.java    | 88 +++++++++++++++++++
 .../opensearch/index/engine/NoOpEngine.java   | 14 +--
 .../opensearch/test/InternalTestCluster.java  | 16 ++--
 4 files changed, 137 insertions(+), 21 deletions(-)
 create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexRecoveryIT.java

diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java
index d04c31c0d6e24..72b9b32236371 100644
--- a/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java
+++ b/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java
@@ -34,6 +34,7 @@
 
 import org.apache.lucene.analysis.TokenStream;
 import org.apache.lucene.index.IndexCommit;
+import org.hamcrest.Matcher;
 import org.opensearch.OpenSearchException;
 import org.opensearch.Version;
 import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
@@ -101,8 +102,8 @@
 import org.opensearch.indices.IndicesService;
 import org.opensearch.indices.NodeIndicesStats;
 import org.opensearch.indices.analysis.AnalysisModule;
-import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
 import org.opensearch.indices.recovery.RecoveryState.Stage;
+import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
 import org.opensearch.node.NodeClosedException;
 import org.opensearch.node.RecoverySettingsChunkSizePlugin;
 import org.opensearch.plugins.AnalysisPlugin;
@@ -577,21 +578,25 @@ public void testRerouteRecovery() throws Exception {
                 .clear()
                 .setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery))
                 .get();
-            assertThat(statsResponse1.getNodes(), hasSize(2));
-            for (NodeStats nodeStats : statsResponse1.getNodes()) {
+            List<NodeStats> dataNodeStats = statsResponse1.getNodes()
+                .stream()
+                .filter(nodeStats -> nodeStats.getNode().isDataNode())
+                .collect(Collectors.toList());
+            assertThat(dataNodeStats, hasSize(2));
+            for (NodeStats nodeStats : dataNodeStats) {
                 final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats();
                 if (nodeStats.getNode().getName().equals(nodeA)) {
                     assertThat(
                         "node A throttling should increase",
                         recoveryStats.throttleTime().millis(),
-                        greaterThan(finalNodeAThrottling)
+                        getMatcherForThrottling(finalNodeAThrottling)
                     );
                 }
                 if (nodeStats.getNode().getName().equals(nodeB)) {
                     assertThat(
                         "node B throttling should increase",
                         recoveryStats.throttleTime().millis(),
-                        greaterThan(finalNodeBThrottling)
+                        getMatcherForThrottling(finalNodeBThrottling)
                     );
                 }
             }
@@ -623,7 +628,7 @@ public void testRerouteRecovery() throws Exception {
             final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats();
             assertThat(recoveryStats.currentAsSource(), equalTo(0));
             assertThat(recoveryStats.currentAsTarget(), equalTo(0));
-            assertThat(nodeName + " throttling should be >0", recoveryStats.throttleTime().millis(), greaterThan(0L));
+            assertThat(nodeName + " throttling should be >0", recoveryStats.throttleTime().millis(), getMatcherForThrottling(0));
         };
         // we have to use assertBusy as recovery counters are decremented only when the last reference to the RecoveryTarget
         // is decremented, which may happen after the recovery was done.
@@ -644,7 +649,8 @@ public void testRerouteRecovery() throws Exception {
 
         logger.info("--> start node C");
         String nodeC = internalCluster().startNode();
-        assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("3").get().isTimedOut());
+        int nodeCount = internalCluster().getNodeNames().length;
+        assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes(String.valueOf(nodeCount)).get().isTimedOut());
 
         logger.info("--> slowing down recoveries");
         slowDownRecovery(shardSize);
@@ -678,7 +684,7 @@ public void testRerouteRecovery() throws Exception {
         assertOnGoingRecoveryState(nodeCRecoveryStates.get(0), 0, PeerRecoverySource.INSTANCE, false, nodeB, nodeC);
         validateIndexRecoveryState(nodeCRecoveryStates.get(0).getIndex());
 
-        if (randomBoolean()) {
+        if (randomBoolean() && shouldAssertOngoingRecoveryInRerouteRecovery()) {
             // shutdown node with relocation source of replica shard and check if recovery continues
             internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeA));
             ensureStableCluster(2);
@@ -722,6 +728,14 @@ public void testRerouteRecovery() throws Exception {
         validateIndexRecoveryState(nodeCRecoveryStates.get(0).getIndex());
     }
 
+    protected boolean shouldAssertOngoingRecoveryInRerouteRecovery() {
+        return false;
+    }
+
+    protected Matcher<Long> getMatcherForThrottling(long value) {
+        return greaterThan(value);
+    }
+
     public void testSnapshotRecovery() throws Exception {
         logger.info("--> start node A");
         String nodeA = internalCluster().startNode();
@@ -824,7 +838,7 @@ private IndicesStatsResponse createAndPopulateIndex(String name, int nodeCount,
         ensureGreen();
 
         logger.info("--> indexing sample data");
-        final int numDocs = between(MIN_DOC_COUNT, MAX_DOC_COUNT);
+        final int numDocs = numDocs();
         final IndexRequestBuilder[] docs = new IndexRequestBuilder[numDocs];
 
         for (int i = 0; i < numDocs; i++) {
@@ -846,6 +860,10 @@ private void validateIndexRecoveryState(ReplicationLuceneIndex indexState) {
         assertThat(indexState.recoveredBytesPercent(), lessThanOrEqualTo(100.0f));
     }
 
+    protected int numDocs() {
+        return between(MIN_DOC_COUNT, MAX_DOC_COUNT);
+    }
+
     public void testTransientErrorsDuringRecoveryAreRetried() throws Exception {
         final String indexName = "test";
         final Settings nodeSettings = Settings.builder()
@@ -1384,10 +1402,10 @@ public void testHistoryRetention() throws Exception {
             flush(indexName);
         }
 
-        String firstNodeToStop = randomFrom(internalCluster().getNodeNames());
+        String firstNodeToStop = randomFrom(internalCluster().getDataNodeNames());
         Settings firstNodeToStopDataPathSettings = internalCluster().dataPathSettings(firstNodeToStop);
         internalCluster().stopRandomNode(InternalTestCluster.nameFilter(firstNodeToStop));
-        String secondNodeToStop = randomFrom(internalCluster().getNodeNames());
+        String secondNodeToStop = randomFrom(internalCluster().getDataNodeNames());
         Settings secondNodeToStopDataPathSettings = internalCluster().dataPathSettings(secondNodeToStop);
         internalCluster().stopRandomNode(InternalTestCluster.nameFilter(secondNodeToStop));
 
diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexRecoveryIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexRecoveryIT.java
new file mode 100644
index 0000000000000..11c9993ac7874
--- /dev/null
+++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexRecoveryIT.java
@@ -0,0 +1,88 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.remotestore;
+
+import org.hamcrest.Matcher;
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.Before;
+import org.opensearch.cluster.metadata.IndexMetadata;
+import org.opensearch.common.settings.Settings;
+import org.opensearch.common.util.FeatureFlags;
+import org.opensearch.index.IndexModule;
+import org.opensearch.index.IndexSettings;
+import org.opensearch.indices.recovery.IndexRecoveryIT;
+import org.opensearch.indices.replication.common.ReplicationType;
+import org.opensearch.test.OpenSearchIntegTestCase;
+
+import java.nio.file.Path;
+
+import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
+
+@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
+public class RemoteIndexRecoveryIT extends IndexRecoveryIT {
+
+    protected static final String REPOSITORY_NAME = "test-remore-store-repo";
+
+    protected Path absolutePath;
+
+    @Override
+    protected Settings featureFlagSettings() {
+        return Settings.builder()
+            .put(super.featureFlagSettings())
+            .put(FeatureFlags.REMOTE_STORE, "true")
+            .put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true")
+            .build();
+    }
+
+    @Before
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        internalCluster().startClusterManagerOnlyNode();
+        absolutePath = randomRepoPath().toAbsolutePath();
+        assertAcked(
+            clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath))
+        );
+    }
+
+    @Override
+    public Settings indexSettings() {
+        return Settings.builder()
+            .put(super.indexSettings())
+            .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
+            .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
+            .put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME)
+            .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true)
+            .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME)
+            .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "300s")
+            .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
+            .build();
+    }
+
+    @After
+    public void teardown() {
+        assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
+    }
+
+    @Override
+    protected Matcher<Long> getMatcherForThrottling(long value) {
+        return Matchers.greaterThanOrEqualTo(value);
+    }
+
+    @Override
+    protected int numDocs() {
+        return randomIntBetween(100, 200);
+    }
+
+    @Override
+    protected boolean shouldAssertOngoingRecoveryInRerouteRecovery() {
+        return false;
+    }
+}
diff --git a/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java b/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java
index 2b126e627bd3d..5c548df1cbb60 100644
--- a/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java
+++ b/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java
@@ -209,11 +209,15 @@ public void trimUnreferencedTranslogFiles() throws TranslogException {
                                 translog.trimUnreferencedReaders();
                                 // refresh the translog stats
                                 translogStats = translog.stats();
-                                assert translog.currentFileGeneration() == translog.getMinFileGeneration() : "translog was not trimmed "
-                                    + " current gen "
-                                    + translog.currentFileGeneration()
-                                    + " != min gen "
-                                    + translog.getMinFileGeneration();
+                                // When remote translog is enabled, the min file generation is dependent on the (N-1)
+                                // lastRefreshedCheckpoint SeqNo - refer RemoteStoreRefreshListener. This leads to older generations not
+                                // being trimmed and leading to current generation being higher than the min file generation.
+                                assert engineConfig.getIndexSettings().isRemoteTranslogStoreEnabled()
+                                    || translog.currentFileGeneration() == translog.getMinFileGeneration() : "translog was not trimmed "
+                                        + " current gen "
+                                        + translog.currentFileGeneration()
+                                        + " != min gen "
+                                        + translog.getMinFileGeneration();
                             }
                         }
                     } catch (final Exception e) {
diff --git a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java
index 49d8b64bc71cd..3f7bb71b27681 100644
--- a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java
+++ b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java
@@ -1524,11 +1524,13 @@ public void assertSeqNos() throws Exception {
                         }
                         assertThat(replicaShardRouting + " seq_no_stats mismatch", seqNoStats, equalTo(primarySeqNoStats));
                         // the local knowledge on the primary of the global checkpoint equals the global checkpoint on the shard
-                        assertThat(
-                            replicaShardRouting + " global checkpoint syncs mismatch",
-                            seqNoStats.getGlobalCheckpoint(),
-                            equalTo(syncGlobalCheckpoints.get(replicaShardRouting.allocationId().getId()))
-                        );
+                        if (primaryShard.isRemoteTranslogEnabled() == false) {
+                            assertThat(
+                                replicaShardRouting + " global checkpoint syncs mismatch",
+                                seqNoStats.getGlobalCheckpoint(),
+                                equalTo(syncGlobalCheckpoints.get(replicaShardRouting.allocationId().getId()))
+                            );
+                        }
                     }
                 }
             }
@@ -2155,6 +2157,10 @@ synchronized Set<String> allDataNodesButN(int count) {
         return set;
     }
 
+    public Set<String> getDataNodeNames() {
+        return allDataNodesButN(0);
+    }
+
     /**
      * Returns a set of nodes that have at least one shard of the given index.
      */