diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java index dd92bfb47afdb..0561a7cedd44f 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java @@ -366,97 +366,6 @@ public void testPrimaryRestart() throws Exception { } } - /** - * This test validates that unreferenced on disk file are ignored while requesting files from replication source to - * prevent FileAlreadyExistsException. It does so by only copying files in first round of segment replication without - * committing locally so that in next round of segment replication those files are not considered for download again - */ - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/10885") - public void testSegRepSucceedsOnPreviousCopiedFiles() throws Exception { - try (ReplicationGroup shards = createGroup(1, getIndexSettings(), new NRTReplicationEngineFactory())) { - shards.startAll(); - IndexShard primary = shards.getPrimary(); - final IndexShard replica = shards.getReplicas().get(0); - - shards.indexDocs(10); - primary.refresh("Test"); - - final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); - final SegmentReplicationTargetService targetService = newTargetService(sourceFactory); - when(sourceFactory.get(any())).thenReturn( - getRemoteStoreReplicationSource(replica, () -> { throw new RuntimeException("Simulated"); }) - ); - CountDownLatch latch = new CountDownLatch(1); - - // Start first round of segment replication. This should fail with simulated error but with replica having - // files in its local store but not in active reader. - final SegmentReplicationTarget target = targetService.startReplication( - replica, - primary.getLatestReplicationCheckpoint(), - new SegmentReplicationTargetService.SegmentReplicationListener() { - @Override - public void onReplicationDone(SegmentReplicationState state) { - latch.countDown(); - Assert.fail("Replication should fail with simulated error"); - } - - @Override - public void onReplicationFailure( - SegmentReplicationState state, - ReplicationFailedException e, - boolean sendShardFailure - ) { - latch.countDown(); - assertFalse(sendShardFailure); - logger.error("Replication error", e); - } - } - ); - latch.await(); - Set onDiskFiles = new HashSet<>(Arrays.asList(replica.store().directory().listAll())); - onDiskFiles.removeIf(name -> EXCLUDE_FILES.contains(name) || name.startsWith(IndexFileNames.SEGMENTS)); - List activeFiles = replica.getSegmentMetadataMap() - .values() - .stream() - .map(metadata -> metadata.name()) - .collect(Collectors.toList()); - assertTrue("Files should not be committed", activeFiles.isEmpty()); - assertEquals("Files should be copied to disk", false, onDiskFiles.isEmpty()); - assertEquals(target.state().getStage(), SegmentReplicationState.Stage.GET_FILES); - - // Start next round of segment replication and not throwing exception resulting in commit on replica - when(sourceFactory.get(any())).thenReturn(getRemoteStoreReplicationSource(replica, () -> {})); - CountDownLatch waitForSecondRound = new CountDownLatch(1); - final SegmentReplicationTarget newTarget = targetService.startReplication( - replica, - primary.getLatestReplicationCheckpoint(), - new SegmentReplicationTargetService.SegmentReplicationListener() { - @Override - public void onReplicationDone(SegmentReplicationState state) { - waitForSecondRound.countDown(); - } - - @Override - public void onReplicationFailure( - SegmentReplicationState state, - ReplicationFailedException e, - boolean sendShardFailure - ) { - waitForSecondRound.countDown(); - logger.error("Replication error", e); - Assert.fail("Replication should not fail"); - } - } - ); - waitForSecondRound.await(); - assertEquals(newTarget.state().getStage(), SegmentReplicationState.Stage.DONE); - activeFiles = replica.getSegmentMetadataMap().values().stream().map(metadata -> metadata.name()).collect(Collectors.toList()); - assertTrue("Replica should have consistent disk & reader", activeFiles.containsAll(onDiskFiles)); - shards.removeReplica(replica); - closeShards(replica); - } - } - /** * This test validates that local non-readable (corrupt, partially) on disk are deleted vs failing the * replication event. This test mimics local files (not referenced by reader) by throwing exception post file copy and diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java index 8b4b3aff701b4..56fc90d130da3 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java @@ -29,21 +29,16 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.core.action.ActionListener; -import org.opensearch.core.index.shard.ShardId; -import org.opensearch.index.IndexSettings; import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.replication.TestReplicationSource; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; -import org.opensearch.index.store.StoreTests; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.ReplicationFailedException; import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.indices.replication.common.ReplicationType; -import org.opensearch.test.DummyShardLock; -import org.opensearch.test.IndexSettingsModule; import org.junit.Assert; import java.io.FileNotFoundException; @@ -81,11 +76,6 @@ public class SegmentReplicationTargetTests extends IndexShardTestCase { private static final Map SI_SNAPSHOT_DIFFERENT = Map.of(SEGMENT_FILE_DIFF.name(), SEGMENT_FILE_DIFF); - private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings( - "index", - Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT).build() - ); - private SegmentInfos testSegmentInfos; @Override @@ -441,7 +431,10 @@ public void test_MissingFiles_NotCausingFailure() throws IOException { // Generate a list of MetadataSnapshot containing two elements. The second snapshot contains extra files // generated due to delete operations. These two snapshots can then be used in test to mock the primary shard // snapshot (2nd element which contains delete operations) and replica's existing snapshot (1st element). - List storeMetadataSnapshots = generateStoreMetadataSnapshot(docCount); + List storeMetadataSnapshots = generateStoreMetadataSnapshot(docCount, spyIndexShard); + // Delete on-disk files so that they are not considered for file diff + deleteContent(spyIndexShard.store().directory()); + spyIndexShard.store().close(); SegmentReplicationSource segrepSource = new TestReplicationSource() { @Override @@ -486,14 +479,72 @@ public void onFailure(Exception e) { }); } + /** + * This tests ensures that on-disk files on replica are taken into consideration while evaluating the files diff + * from primary. The test mocks the files referred by active reader to a smaller subset so that logic to filter + * out on-disk files be exercised. + * @throws IOException if an indexing operation fails or segment replication fails + */ + public void test_OnDiskFiles_ReusedForReplication() throws IOException { + int docCount = 1 + random().nextInt(10); + // Generate a list of MetadataSnapshot containing two elements. The second snapshot contains extra files + // generated due to delete operations. These two snapshots can then be used in test to mock the primary shard + // snapshot (2nd element which contains delete operations) and replica's existing snapshot (1st element). + List storeMetadataSnapshots = generateStoreMetadataSnapshot(docCount, spyIndexShard); + + SegmentReplicationSource segrepSource = new TestReplicationSource() { + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + listener.onResponse(new CheckpointInfoResponse(checkpoint, storeMetadataSnapshots.get(1).asMap(), buffer.toArrayCopy())); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + IndexShard indexShard, + BiConsumer fileProgressTracker, + ActionListener listener + ) { + // No files should be requested from replication source + assertEquals(0, filesToFetch.size()); + listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); + } + }; + + segrepTarget = new SegmentReplicationTarget(spyIndexShard, repCheckpoint, segrepSource, mock( + SegmentReplicationTargetService.SegmentReplicationListener.class + )); + // Mask the files returned by active reader. This is needed so that logic to filter out on disk is exercised + when(spyIndexShard.getSegmentMetadataMap()).thenReturn(storeMetadataSnapshots.get(0).asMap()); + segrepTarget.startReplication(new ActionListener() { + @Override + public void onResponse(Void replicationResponse) { + segrepTarget.markAsDone(); + } + + @Override + public void onFailure(Exception e) { + Assert.fail(); + } + }); + } + + /** * Generates a list of Store.MetadataSnapshot with two elements where second snapshot has extra files due to delete * operation. A list of snapshots is returned so that identical files have same checksum. * @param docCount the number of documents to index in the first snapshot + * @param shard The IndexShard object to use for writing * @return a list of Store.MetadataSnapshot with two elements where second snapshot has extra files due to delete * @throws IOException if one of the indexing operations fails */ - private List generateStoreMetadataSnapshot(int docCount) throws IOException { + private List generateStoreMetadataSnapshot(int docCount, IndexShard shard) throws IOException { List docList = new ArrayList<>(); for (int i = 0; i < docCount; i++) { Document document = new Document(); @@ -507,8 +558,7 @@ private List generateStoreMetadataSnapshot(int docCount) IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random)).setCodec(TestUtil.getDefaultCodec()); iwc.setMergePolicy(NoMergePolicy.INSTANCE); iwc.setUseCompoundFile(true); - final ShardId shardId = new ShardId("index", "_na_", 1); - Store store = new Store(shardId, INDEX_SETTINGS, StoreTests.newDirectory(random()), new DummyShardLock(shardId)); + Store store = shard.store(); IndexWriter writer = new IndexWriter(store.directory(), iwc); for (Document d : docList) { writer.addDocument(d); @@ -519,7 +569,6 @@ private List generateStoreMetadataSnapshot(int docCount) writer.deleteDocuments(new Term("id", Integer.toString(random().nextInt(docCount)))); writer.commit(); Store.MetadataSnapshot storeMetadataWithDeletes = store.getMetadata(); - deleteContent(store.directory()); writer.close(); store.close(); return Arrays.asList(storeMetadata, storeMetadataWithDeletes);