From 80243e745e5a253fdd0e59fd1134f2fd4eecefe7 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Tue, 29 Aug 2023 18:18:52 -0700 Subject: [PATCH] [Segment Replication] Allow segment replication with on disk files not referenced by reader with matching checksum Signed-off-by: Suraj Singh --- .../replication/SegmentReplicationTarget.java | 60 ++++- .../index/shard/RemoteIndexShardTests.java | 255 ++++++++++++++++++ 2 files changed, 310 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 7a5f9608dace0..8c323adcd0924 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -9,11 +9,15 @@ package org.opensearch.indices.replication; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.IndexFormatTooNewException; import org.apache.lucene.index.IndexFormatTooOldException; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; import org.opensearch.OpenSearchCorruptionException; import org.opensearch.OpenSearchException; import org.opensearch.action.StepListener; @@ -33,8 +37,15 @@ import org.opensearch.indices.replication.common.ReplicationTarget; import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.opensearch.index.shard.RemoteStoreRefreshListener.EXCLUDE_FILES; /** * Represents the target of a replication event. @@ -177,14 +188,53 @@ public void startReplication(ActionListener listener) { private List getFiles(CheckpointInfoResponse checkpointInfo) throws IOException { cancellableThreads.checkForCancel(); state.setStage(SegmentReplicationState.Stage.FILE_DIFF); - final Store.RecoveryDiff diff = Store.segmentReplicationDiff(checkpointInfo.getMetadataMap(), indexShard.getSegmentMetadataMap()); - logger.trace(() -> new ParameterizedMessage("Replication diff for checkpoint {} {}", checkpointInfo.getCheckpoint(), diff)); + Map activeReaderMetadataMap = indexShard.getSegmentMetadataMap(); + Map primaryMetadataMap = checkpointInfo.getMetadataMap(); + + Store.RecoveryDiff diff = Store.segmentReplicationDiff(primaryMetadataMap, activeReaderMetadataMap); + List unreferencedDiskFiles = Arrays.asList(indexShard.store().directory().listAll()) + .stream() + .filter( + name -> !activeReaderMetadataMap.containsKey(name) + && !EXCLUDE_FILES.contains(name) + && !name.startsWith(IndexFileNames.SEGMENTS) + ) + .collect(Collectors.toList()); + + Set unreferencedFilesWithSameChecksum = new HashSet<>(); + Set unreferencedFilesWithChecksumMismatch = new HashSet<>(); + for (String file : unreferencedDiskFiles) { + // If primary does not contain file, ignore it. + if (primaryMetadataMap.containsKey(file) == false) { + continue; + } + try (IndexInput indexInput = indexShard.store().directory().openInput(file, IOContext.DEFAULT)) { + String checksum = Store.digestToString(CodecUtil.retrieveChecksum(indexInput)); + if (primaryMetadataMap.get(file).checksum().equals(checksum)) { + unreferencedFilesWithSameChecksum.add(file); + } else { + unreferencedFilesWithChecksumMismatch.add(file); + } + } + } + final List missingFiles = diff.missing.stream() + .filter(md -> unreferencedFilesWithSameChecksum.contains(md.name()) == false) + .collect(Collectors.toList()); + + logger.trace( + () -> new ParameterizedMessage( + "Replication diff for checkpoint {} {} {}", + checkpointInfo.getCheckpoint(), + missingFiles, + diff.different + ) + ); /* * Segments are immutable. So if the replica has any segments with the same name that differ from the one in the incoming * snapshot from source that means the local copy of the segment has been corrupted/changed in some way and we throw an * IllegalStateException to fail the shard */ - if (diff.different.isEmpty() == false) { + if (diff.different.isEmpty() == false || unreferencedFilesWithChecksumMismatch.isEmpty() == false) { throw new OpenSearchCorruptionException( new ParameterizedMessage( "Shard {} has local copies of segments that differ from the primary {}", @@ -194,10 +244,10 @@ private List getFiles(CheckpointInfoResponse checkpointInfo) ); } - for (StoreFileMetadata file : diff.missing) { + for (StoreFileMetadata file : missingFiles) { state.getIndex().addFileDetail(file.name(), file.length(), false); } - return diff.missing; + return missingFiles; } private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse) throws OpenSearchCorruptionException { diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java index 8e27c9ff9ae1a..6b062a2650b7c 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java @@ -11,31 +11,50 @@ import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.util.Version; +import org.opensearch.OpenSearchCorruptionException; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; +import org.opensearch.core.action.ActionListener; import org.opensearch.index.engine.DocIdSeqNoAndSource; import org.opensearch.index.engine.Engine; import org.opensearch.index.engine.InternalEngine; import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.replication.CheckpointInfoResponse; +import org.opensearch.indices.replication.GetSegmentFilesResponse; +import org.opensearch.indices.replication.RemoteStoreReplicationSource; +import org.opensearch.indices.replication.SegmentReplicationSourceFactory; +import org.opensearch.indices.replication.SegmentReplicationState; +import org.opensearch.indices.replication.SegmentReplicationTarget; +import org.opensearch.indices.replication.SegmentReplicationTargetService; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.common.ReplicationFailedException; import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.CorruptionUtils; import org.hamcrest.MatcherAssert; +import org.junit.Assert; import org.junit.Before; import java.io.IOException; import java.nio.file.Path; import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import static org.opensearch.index.engine.EngineTestCase.assertAtMostOneLuceneDocumentPerSequenceNumber; +import static org.opensearch.index.shard.RemoteStoreRefreshListener.EXCLUDE_FILES; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class RemoteIndexShardTests extends SegmentReplicationIndexShardTests { @@ -343,6 +362,242 @@ public void testPrimaryRestart() throws Exception { } } + /** + * This test validates that unreferenced on disk file are ignored while requesting files from replication source to + * prevent FileAlreadyExistsException. It does so by only copying files in first round of segment replication without + * committing locally so that in next round of segment replication those files are not considered for download again + */ + public void testSegRepSucceedsOnPreviousCopiedFiles() throws Exception { + try (ReplicationGroup shards = createGroup(1, getIndexSettings(), new NRTReplicationEngineFactory())) { + shards.startAll(); + IndexShard primary = shards.getPrimary(); + final IndexShard replica = shards.getReplicas().get(0); + + shards.indexDocs(10); + primary.refresh("Test"); + + final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); + final SegmentReplicationTargetService targetService = newTargetService(sourceFactory); + Runnable[] runAfterGetFiles = { () -> { throw new RuntimeException("Simulated"); }, () -> {} }; + AtomicInteger index = new AtomicInteger(0); + RemoteStoreReplicationSource testRSReplicationSource = new RemoteStoreReplicationSource(replica) { + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + super.getCheckpointMetadata(replicationId, checkpoint, listener); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + IndexShard indexShard, + ActionListener listener + ) { + super.getSegmentFiles(replicationId, checkpoint, filesToFetch, indexShard, listener); + runAfterGetFiles[index.getAndIncrement()].run(); + } + + @Override + public String getDescription() { + return "TestRemoteStoreReplicationSource"; + } + }; + when(sourceFactory.get(any())).thenReturn(testRSReplicationSource); + CountDownLatch latch = new CountDownLatch(1); + + // Start first round of segment replication. This should fail with simulated error but with replica having + // files in its local store but not in active reader. + final SegmentReplicationTarget target = targetService.startReplication( + replica, + primary.getLatestReplicationCheckpoint(), + new SegmentReplicationTargetService.SegmentReplicationListener() { + @Override + public void onReplicationDone(SegmentReplicationState state) { + Assert.fail("Replication should fail with simulated error"); + } + + @Override + public void onReplicationFailure( + SegmentReplicationState state, + ReplicationFailedException e, + boolean sendShardFailure + ) { + assertFalse(sendShardFailure); + logger.error("Replication error", e); + latch.countDown(); + } + } + ); + latch.await(); + Set onDiskFiles = new HashSet<>(Arrays.asList(replica.store().directory().listAll())); + onDiskFiles.removeIf(name -> EXCLUDE_FILES.contains(name) || name.startsWith(IndexFileNames.SEGMENTS)); + List activeFiles = replica.getSegmentMetadataMap() + .values() + .stream() + .map(metadata -> metadata.name()) + .collect(Collectors.toList()); + assertTrue("Files should not be committed", activeFiles.isEmpty()); + assertEquals("Files should be copied to disk", false, onDiskFiles.isEmpty()); + assertEquals(target.state().getStage(), SegmentReplicationState.Stage.GET_FILES); + + // Start next round of segment replication + CountDownLatch waitForSecondRound = new CountDownLatch(1); + final SegmentReplicationTarget newTarget = targetService.startReplication( + replica, + primary.getLatestReplicationCheckpoint(), + new SegmentReplicationTargetService.SegmentReplicationListener() { + @Override + public void onReplicationDone(SegmentReplicationState state) { + waitForSecondRound.countDown(); + } + + @Override + public void onReplicationFailure( + SegmentReplicationState state, + ReplicationFailedException e, + boolean sendShardFailure + ) { + logger.error("Replication error", e); + Assert.fail("Replication should not fail"); + waitForSecondRound.countDown(); + } + } + ); + waitForSecondRound.await(); + assertEquals(newTarget.state().getStage(), SegmentReplicationState.Stage.DONE); + activeFiles = replica.getSegmentMetadataMap().values().stream().map(metadata -> metadata.name()).collect(Collectors.toList()); + assertTrue("Replica should have consistent disk & reader", activeFiles.containsAll(onDiskFiles)); + shards.removeReplica(replica); + closeShards(replica); + } + } + + public void testSegRepFailsOnPreviousCopiedFilesWithChecksumMismatch() throws Exception { + try (ReplicationGroup shards = createGroup(1, getIndexSettings(), new NRTReplicationEngineFactory())) { + shards.startAll(); + IndexShard primary = shards.getPrimary(); + final IndexShard replica = shards.getReplicas().get(0); + + shards.indexDocs(10); + primary.refresh("Test"); + + final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); + final SegmentReplicationTargetService targetService = newTargetService(sourceFactory); + Runnable[] runAfterGetFiles = { () -> { throw new RuntimeException("Simulated"); }, () -> {} }; + AtomicInteger index = new AtomicInteger(0); + RemoteStoreReplicationSource testRSReplicationSource = new RemoteStoreReplicationSource(replica) { + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + super.getCheckpointMetadata(replicationId, checkpoint, listener); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + IndexShard indexShard, + ActionListener listener + ) { + super.getSegmentFiles(replicationId, checkpoint, filesToFetch, indexShard, listener); + runAfterGetFiles[index.getAndIncrement()].run(); + } + + @Override + public String getDescription() { + return "TestRemoteStoreReplicationSource"; + } + }; + when(sourceFactory.get(any())).thenReturn(testRSReplicationSource); + CountDownLatch latch = new CountDownLatch(1); + + // Start first round of segment replication. This should fail with simulated error but with replica having + // files in its local store but not in active reader. + final SegmentReplicationTarget target = targetService.startReplication( + replica, + primary.getLatestReplicationCheckpoint(), + new SegmentReplicationTargetService.SegmentReplicationListener() { + @Override + public void onReplicationDone(SegmentReplicationState state) { + Assert.fail("Replication should fail with simulated error"); + } + + @Override + public void onReplicationFailure( + SegmentReplicationState state, + ReplicationFailedException e, + boolean sendShardFailure + ) { + assertFalse(sendShardFailure); + logger.error("Replication error", e); + latch.countDown(); + } + } + ); + latch.await(); + Set onDiskFiles = new HashSet<>(Arrays.asList(replica.store().directory().listAll())); + onDiskFiles.removeIf(name -> EXCLUDE_FILES.contains(name) || name.startsWith(IndexFileNames.SEGMENTS)); + List activeFiles = replica.getSegmentMetadataMap() + .values() + .stream() + .map(metadata -> metadata.name()) + .collect(Collectors.toList()); + assertTrue("Files should not be committed", activeFiles.isEmpty()); + assertEquals("Files should be copied to disk", false, onDiskFiles.isEmpty()); + assertEquals(target.state().getStage(), SegmentReplicationState.Stage.GET_FILES); + + // Corrupt segment files so that there is checksum mis-match and next round of segment replication also fails + final Path indexPath = replica.shardPath().getDataPath().resolve(ShardPath.INDEX_FOLDER_NAME); + CorruptionUtils.corruptIndex(random(), indexPath, false); + + // Start next round of segment replication + CountDownLatch waitForSecondRound = new CountDownLatch(1); + final SegmentReplicationTarget newTarget = targetService.startReplication( + replica, + primary.getLatestReplicationCheckpoint(), + new SegmentReplicationTargetService.SegmentReplicationListener() { + @Override + public void onReplicationDone(SegmentReplicationState state) { + Assert.fail("Replication should fail with corruption exception"); + waitForSecondRound.countDown(); + } + + @Override + public void onReplicationFailure( + SegmentReplicationState state, + ReplicationFailedException e, + boolean sendShardFailure + ) { + logger.error("Replication error", e); + assertTrue(e.unwrapCause().getCause() instanceof OpenSearchCorruptionException); + waitForSecondRound.countDown(); + } + } + ); + waitForSecondRound.await(); + assertEquals(newTarget.state().getStage(), SegmentReplicationState.Stage.FINALIZE_REPLICATION); + // Fetching getSegmentMetadataMap from replica results in java.nio.file.NoSuchFileException + // activeFiles = replica.getSegmentMetadataMap().values().stream().map(metadata -> + // metadata.name()).collect(Collectors.toList()); + assertTrue("Files should not be committed", activeFiles.isEmpty()); + assertFalse("Files should be copied to disk", onDiskFiles.isEmpty()); + for (String file : onDiskFiles) { + replica.store().deleteQuiet(file); + } + shards.removeReplica(replica); + closeShards(replica); + } + } + private void assertSingleSegmentFile(IndexShard shard, String fileName) throws IOException { final Set segmentsFileNames = Arrays.stream(shard.store().directory().listAll()) .filter(file -> file.startsWith(IndexFileNames.SEGMENTS))