Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Suraj Singh <surajrider@gmail.com>
  • Loading branch information
dreamer-89 committed Aug 31, 2023
1 parent 80243e7 commit dd0928d
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 158 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.index.SegmentInfos;
Expand All @@ -37,16 +36,13 @@
import org.opensearch.indices.replication.common.ReplicationTarget;

import java.io.IOException;
import java.util.Arrays;
import java.io.UncheckedIOException;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static org.opensearch.index.shard.RemoteStoreRefreshListener.EXCLUDE_FILES;

/**
* Represents the target of a replication event.
*
Expand Down Expand Up @@ -188,37 +184,18 @@ public void startReplication(ActionListener<Void> listener) {
private List<StoreFileMetadata> getFiles(CheckpointInfoResponse checkpointInfo) throws IOException {
cancellableThreads.checkForCancel();
state.setStage(SegmentReplicationState.Stage.FILE_DIFF);
Map<String, StoreFileMetadata> activeReaderMetadataMap = indexShard.getSegmentMetadataMap();
Map<String, StoreFileMetadata> primaryMetadataMap = checkpointInfo.getMetadataMap();

Store.RecoveryDiff diff = Store.segmentReplicationDiff(primaryMetadataMap, activeReaderMetadataMap);
List<String> unreferencedDiskFiles = Arrays.asList(indexShard.store().directory().listAll())
.stream()
.filter(
name -> !activeReaderMetadataMap.containsKey(name)
&& !EXCLUDE_FILES.contains(name)
&& !name.startsWith(IndexFileNames.SEGMENTS)
)
.collect(Collectors.toList());
final Store.RecoveryDiff diff = Store.segmentReplicationDiff(checkpointInfo.getMetadataMap(), indexShard.getSegmentMetadataMap());
// local files
final Set<String> localFiles = new HashSet<>(List.of(store.directory().listAll()));
// set of local files that can be reused
final Set<String> reuseFiles = diff.missing.stream()
.filter(storeFileMetadata -> localFiles.contains(storeFileMetadata.name()))
.filter(this::validateLocalChecksum)
.map(StoreFileMetadata::name)
.collect(Collectors.toSet());

Set<String> unreferencedFilesWithSameChecksum = new HashSet<>();
Set<String> unreferencedFilesWithChecksumMismatch = new HashSet<>();
for (String file : unreferencedDiskFiles) {
// If primary does not contain file, ignore it.
if (primaryMetadataMap.containsKey(file) == false) {
continue;
}
try (IndexInput indexInput = indexShard.store().directory().openInput(file, IOContext.DEFAULT)) {
String checksum = Store.digestToString(CodecUtil.retrieveChecksum(indexInput));
if (primaryMetadataMap.get(file).checksum().equals(checksum)) {
unreferencedFilesWithSameChecksum.add(file);
} else {
unreferencedFilesWithChecksumMismatch.add(file);
}
}
}
final List<StoreFileMetadata> missingFiles = diff.missing.stream()
.filter(md -> unreferencedFilesWithSameChecksum.contains(md.name()) == false)
.filter(md -> reuseFiles.contains(md.name()) == false)
.collect(Collectors.toList());

logger.trace(
Expand All @@ -234,7 +211,7 @@ private List<StoreFileMetadata> getFiles(CheckpointInfoResponse checkpointInfo)
* snapshot from source that means the local copy of the segment has been corrupted/changed in some way and we throw an
* IllegalStateException to fail the shard
*/
if (diff.different.isEmpty() == false || unreferencedFilesWithChecksumMismatch.isEmpty() == false) {
if (diff.different.isEmpty() == false) {
throw new OpenSearchCorruptionException(
new ParameterizedMessage(
"Shard {} has local copies of segments that differ from the primary {}",
Expand All @@ -250,6 +227,21 @@ private List<StoreFileMetadata> getFiles(CheckpointInfoResponse checkpointInfo)
return missingFiles;
}

private boolean validateLocalChecksum(StoreFileMetadata file) {
try (IndexInput indexInput = indexShard.store().directory().openInput(file.name(), IOContext.DEFAULT)) {
String checksum = Store.digestToString(CodecUtil.retrieveChecksum(indexInput));
if (file.checksum().equals(checksum)) {
return true;
} else {
// clear local copy with mismatch. Safe because file is not referenced by active reader.
store.deleteQuiet(file.name());
return false;
}
} catch (IOException e) {
throw new UncheckedIOException("Error reading " + file, e);
}
}

private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse) throws OpenSearchCorruptionException {
cancellableThreads.checkForCancel();
state.setStage(SegmentReplicationState.Stage.FINALIZE_REPLICATION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.util.Version;
import org.opensearch.OpenSearchCorruptionException;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
Expand All @@ -32,7 +31,6 @@
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationFailedException;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.CorruptionUtils;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -477,127 +475,6 @@ public void onReplicationFailure(
}
}

public void testSegRepFailsOnPreviousCopiedFilesWithChecksumMismatch() throws Exception {
try (ReplicationGroup shards = createGroup(1, getIndexSettings(), new NRTReplicationEngineFactory())) {
shards.startAll();
IndexShard primary = shards.getPrimary();
final IndexShard replica = shards.getReplicas().get(0);

shards.indexDocs(10);
primary.refresh("Test");

final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class);
final SegmentReplicationTargetService targetService = newTargetService(sourceFactory);
Runnable[] runAfterGetFiles = { () -> { throw new RuntimeException("Simulated"); }, () -> {} };
AtomicInteger index = new AtomicInteger(0);
RemoteStoreReplicationSource testRSReplicationSource = new RemoteStoreReplicationSource(replica) {
@Override
public void getCheckpointMetadata(
long replicationId,
ReplicationCheckpoint checkpoint,
ActionListener<CheckpointInfoResponse> listener
) {
super.getCheckpointMetadata(replicationId, checkpoint, listener);
}

@Override
public void getSegmentFiles(
long replicationId,
ReplicationCheckpoint checkpoint,
List<StoreFileMetadata> filesToFetch,
IndexShard indexShard,
ActionListener<GetSegmentFilesResponse> listener
) {
super.getSegmentFiles(replicationId, checkpoint, filesToFetch, indexShard, listener);
runAfterGetFiles[index.getAndIncrement()].run();
}

@Override
public String getDescription() {
return "TestRemoteStoreReplicationSource";
}
};
when(sourceFactory.get(any())).thenReturn(testRSReplicationSource);
CountDownLatch latch = new CountDownLatch(1);

// Start first round of segment replication. This should fail with simulated error but with replica having
// files in its local store but not in active reader.
final SegmentReplicationTarget target = targetService.startReplication(
replica,
primary.getLatestReplicationCheckpoint(),
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
Assert.fail("Replication should fail with simulated error");
}

@Override
public void onReplicationFailure(
SegmentReplicationState state,
ReplicationFailedException e,
boolean sendShardFailure
) {
assertFalse(sendShardFailure);
logger.error("Replication error", e);
latch.countDown();
}
}
);
latch.await();
Set<String> onDiskFiles = new HashSet<>(Arrays.asList(replica.store().directory().listAll()));
onDiskFiles.removeIf(name -> EXCLUDE_FILES.contains(name) || name.startsWith(IndexFileNames.SEGMENTS));
List<String> activeFiles = replica.getSegmentMetadataMap()
.values()
.stream()
.map(metadata -> metadata.name())
.collect(Collectors.toList());
assertTrue("Files should not be committed", activeFiles.isEmpty());
assertEquals("Files should be copied to disk", false, onDiskFiles.isEmpty());
assertEquals(target.state().getStage(), SegmentReplicationState.Stage.GET_FILES);

// Corrupt segment files so that there is checksum mis-match and next round of segment replication also fails
final Path indexPath = replica.shardPath().getDataPath().resolve(ShardPath.INDEX_FOLDER_NAME);
CorruptionUtils.corruptIndex(random(), indexPath, false);

// Start next round of segment replication
CountDownLatch waitForSecondRound = new CountDownLatch(1);
final SegmentReplicationTarget newTarget = targetService.startReplication(
replica,
primary.getLatestReplicationCheckpoint(),
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
Assert.fail("Replication should fail with corruption exception");
waitForSecondRound.countDown();
}

@Override
public void onReplicationFailure(
SegmentReplicationState state,
ReplicationFailedException e,
boolean sendShardFailure
) {
logger.error("Replication error", e);
assertTrue(e.unwrapCause().getCause() instanceof OpenSearchCorruptionException);
waitForSecondRound.countDown();
}
}
);
waitForSecondRound.await();
assertEquals(newTarget.state().getStage(), SegmentReplicationState.Stage.FINALIZE_REPLICATION);
// Fetching getSegmentMetadataMap from replica results in java.nio.file.NoSuchFileException
// activeFiles = replica.getSegmentMetadataMap().values().stream().map(metadata ->
// metadata.name()).collect(Collectors.toList());
assertTrue("Files should not be committed", activeFiles.isEmpty());
assertFalse("Files should be copied to disk", onDiskFiles.isEmpty());
for (String file : onDiskFiles) {
replica.store().deleteQuiet(file);
}
shards.removeReplica(replica);
closeShards(replica);
}
}

private void assertSingleSegmentFile(IndexShard shard, String fileName) throws IOException {
final Set<String> segmentsFileNames = Arrays.stream(shard.store().directory().listAll())
.filter(file -> file.startsWith(IndexFileNames.SEGMENTS))
Expand Down

0 comments on commit dd0928d

Please sign in to comment.