Skip to content

Commit

Permalink
[Segment Replication] Allow segment replication with on disk files no…
Browse files Browse the repository at this point in the history
…t referenced by reader with matching checksum (opensearch-project#9630)

* [Segment Replication] Allow segment replication with on disk files not referenced by reader with matching checksum

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Address review comments

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Address review comments

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Spotless fix

Signed-off-by: Suraj Singh <surajrider@gmail.com>

---------

Signed-off-by: Suraj Singh <surajrider@gmail.com>
Signed-off-by: Ivan Brusic <ivan.brusic@flocksafety.com>
  • Loading branch information
dreamer-89 authored and brusic committed Sep 25, 2023
1 parent ebc4e1f commit c81666b
Show file tree
Hide file tree
Showing 2 changed files with 176 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@
package org.opensearch.indices.replication;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.opensearch.OpenSearchCorruptionException;
import org.opensearch.OpenSearchException;
import org.opensearch.action.StepListener;
Expand All @@ -33,8 +36,11 @@
import org.opensearch.indices.replication.common.ReplicationTarget;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Represents the target of a replication event.
Expand Down Expand Up @@ -178,7 +184,27 @@ private List<StoreFileMetadata> getFiles(CheckpointInfoResponse checkpointInfo)
cancellableThreads.checkForCancel();
state.setStage(SegmentReplicationState.Stage.FILE_DIFF);
final Store.RecoveryDiff diff = Store.segmentReplicationDiff(checkpointInfo.getMetadataMap(), indexShard.getSegmentMetadataMap());
logger.trace(() -> new ParameterizedMessage("Replication diff for checkpoint {} {}", checkpointInfo.getCheckpoint(), diff));
// local files
final Set<String> localFiles = Set.of(indexShard.store().directory().listAll());
// set of local files that can be reused
final Set<String> reuseFiles = diff.missing.stream()
.filter(storeFileMetadata -> localFiles.contains(storeFileMetadata.name()))
.filter(this::validateLocalChecksum)
.map(StoreFileMetadata::name)
.collect(Collectors.toSet());

final List<StoreFileMetadata> missingFiles = diff.missing.stream()
.filter(md -> reuseFiles.contains(md.name()) == false)
.collect(Collectors.toList());

logger.trace(
() -> new ParameterizedMessage(
"Replication diff for checkpoint {} {} {}",
checkpointInfo.getCheckpoint(),
missingFiles,
diff.different
)
);
/*
* Segments are immutable. So if the replica has any segments with the same name that differ from the one in the incoming
* snapshot from source that means the local copy of the segment has been corrupted/changed in some way and we throw an
Expand All @@ -194,10 +220,25 @@ private List<StoreFileMetadata> getFiles(CheckpointInfoResponse checkpointInfo)
);
}

for (StoreFileMetadata file : diff.missing) {
for (StoreFileMetadata file : missingFiles) {
state.getIndex().addFileDetail(file.name(), file.length(), false);
}
return diff.missing;
return missingFiles;
}

private boolean validateLocalChecksum(StoreFileMetadata file) {
try (IndexInput indexInput = indexShard.store().directory().openInput(file.name(), IOContext.DEFAULT)) {
String checksum = Store.digestToString(CodecUtil.retrieveChecksum(indexInput));
if (file.checksum().equals(checksum)) {
return true;
} else {
// clear local copy with mismatch. Safe because file is not referenced by active reader.
store.deleteQuiet(file.name());
return false;
}
} catch (IOException e) {
throw new UncheckedIOException("Error reading " + file, e);
}
}

private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse) throws OpenSearchCorruptionException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,45 @@
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.action.ActionListener;
import org.opensearch.index.engine.DocIdSeqNoAndSource;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.engine.InternalEngine;
import org.opensearch.index.engine.NRTReplicationEngineFactory;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.replication.CheckpointInfoResponse;
import org.opensearch.indices.replication.GetSegmentFilesResponse;
import org.opensearch.indices.replication.RemoteStoreReplicationSource;
import org.opensearch.indices.replication.SegmentReplicationSourceFactory;
import org.opensearch.indices.replication.SegmentReplicationState;
import org.opensearch.indices.replication.SegmentReplicationTarget;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationFailedException;
import org.opensearch.indices.replication.common.ReplicationType;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static org.opensearch.index.engine.EngineTestCase.assertAtMostOneLuceneDocumentPerSequenceNumber;
import static org.opensearch.index.shard.RemoteStoreRefreshListener.EXCLUDE_FILES;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class RemoteIndexShardTests extends SegmentReplicationIndexShardTests {

Expand Down Expand Up @@ -355,6 +372,121 @@ public void testPrimaryRestart() throws Exception {
}
}

/**
* This test validates that unreferenced on disk file are ignored while requesting files from replication source to
* prevent FileAlreadyExistsException. It does so by only copying files in first round of segment replication without
* committing locally so that in next round of segment replication those files are not considered for download again
*/
public void testSegRepSucceedsOnPreviousCopiedFiles() throws Exception {
try (ReplicationGroup shards = createGroup(1, getIndexSettings(), new NRTReplicationEngineFactory())) {
shards.startAll();
IndexShard primary = shards.getPrimary();
final IndexShard replica = shards.getReplicas().get(0);

shards.indexDocs(10);
primary.refresh("Test");

final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class);
final SegmentReplicationTargetService targetService = newTargetService(sourceFactory);
Runnable[] runAfterGetFiles = { () -> { throw new RuntimeException("Simulated"); }, () -> {} };
AtomicInteger index = new AtomicInteger(0);
RemoteStoreReplicationSource testRSReplicationSource = new RemoteStoreReplicationSource(replica) {
@Override
public void getCheckpointMetadata(
long replicationId,
ReplicationCheckpoint checkpoint,
ActionListener<CheckpointInfoResponse> listener
) {
super.getCheckpointMetadata(replicationId, checkpoint, listener);
}

@Override
public void getSegmentFiles(
long replicationId,
ReplicationCheckpoint checkpoint,
List<StoreFileMetadata> filesToFetch,
IndexShard indexShard,
ActionListener<GetSegmentFilesResponse> listener
) {
super.getSegmentFiles(replicationId, checkpoint, filesToFetch, indexShard, listener);
runAfterGetFiles[index.getAndIncrement()].run();
}

@Override
public String getDescription() {
return "TestRemoteStoreReplicationSource";
}
};
when(sourceFactory.get(any())).thenReturn(testRSReplicationSource);
CountDownLatch latch = new CountDownLatch(1);

// Start first round of segment replication. This should fail with simulated error but with replica having
// files in its local store but not in active reader.
final SegmentReplicationTarget target = targetService.startReplication(
replica,
primary.getLatestReplicationCheckpoint(),
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
Assert.fail("Replication should fail with simulated error");
}

@Override
public void onReplicationFailure(
SegmentReplicationState state,
ReplicationFailedException e,
boolean sendShardFailure
) {
assertFalse(sendShardFailure);
logger.error("Replication error", e);
latch.countDown();
}
}
);
latch.await();
Set<String> onDiskFiles = new HashSet<>(Arrays.asList(replica.store().directory().listAll()));
onDiskFiles.removeIf(name -> EXCLUDE_FILES.contains(name) || name.startsWith(IndexFileNames.SEGMENTS));
List<String> activeFiles = replica.getSegmentMetadataMap()
.values()
.stream()
.map(metadata -> metadata.name())
.collect(Collectors.toList());
assertTrue("Files should not be committed", activeFiles.isEmpty());
assertEquals("Files should be copied to disk", false, onDiskFiles.isEmpty());
assertEquals(target.state().getStage(), SegmentReplicationState.Stage.GET_FILES);

// Start next round of segment replication
CountDownLatch waitForSecondRound = new CountDownLatch(1);
final SegmentReplicationTarget newTarget = targetService.startReplication(
replica,
primary.getLatestReplicationCheckpoint(),
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
waitForSecondRound.countDown();
}

@Override
public void onReplicationFailure(
SegmentReplicationState state,
ReplicationFailedException e,
boolean sendShardFailure
) {
logger.error("Replication error", e);
Assert.fail("Replication should not fail");
waitForSecondRound.countDown();
}
}
);
waitForSecondRound.await();
assertEquals(newTarget.state().getStage(), SegmentReplicationState.Stage.DONE);
activeFiles = replica.getSegmentMetadataMap().values().stream().map(metadata -> metadata.name()).collect(Collectors.toList());
assertTrue("Replica should have consistent disk & reader", activeFiles.containsAll(onDiskFiles));
shards.removeReplica(replica);
closeShards(replica);
}
}

private void assertSingleSegmentFile(IndexShard shard, String fileName) throws IOException {
final Set<String> segmentsFileNames = Arrays.stream(shard.store().directory().listAll())
.filter(file -> file.startsWith(IndexFileNames.SEGMENTS))
Expand Down

0 comments on commit c81666b

Please sign in to comment.