Skip to content

Commit

Permalink
Download segments from remote segment store in failover flow
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
Sachin Kale committed Dec 15, 2022
1 parent 9562461 commit ba47fe4
Showing 1 changed file with 38 additions and 2 deletions.
40 changes: 38 additions & 2 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.UsageTrackingQueryCachingPolicy;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.BufferedChecksumIndexInput;
import org.apache.lucene.store.ChecksumIndexInput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.SetOnce;
import org.apache.lucene.util.ThreadInterruptedException;
import org.opensearch.Assertions;
Expand Down Expand Up @@ -88,6 +93,7 @@
import org.opensearch.common.util.concurrent.AsyncIOProcessor;
import org.opensearch.common.util.concurrent.RunOnce;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.util.set.Sets;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.core.internal.io.IOUtils;
import org.opensearch.gateway.WriteStateException;
Expand Down Expand Up @@ -202,8 +208,8 @@
import java.util.stream.StreamSupport;

import static org.opensearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.opensearch.index.seqno.SequenceNumbers.*;
import static org.opensearch.index.shard.RemoteStoreRefreshListener.SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX;

/**
* An OpenSearch index shard
Expand Down Expand Up @@ -623,6 +629,36 @@ public void updateShardState(
if (indexSettings.isSegRepEnabled()) {
// this Shard's engine was read only, we need to update its engine before restoring local history from xlog.
assert newRouting.primary() && currentRouting.primary() == false;

if (indexSettings.isRemoteStoreEnabled()) {
logger.info("Downloading segments from remote segment store");
FilterDirectory remoteStoreDirectory = (FilterDirectory) remoteStore.directory();
assert remoteStoreDirectory.getDelegate() instanceof FilterDirectory
: "Store.directory is not enclosing an instance of FilterDirectory";
FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate();
final Directory remoteDirectory = byteSizeCachingStoreDirectory.getDelegate();
final Directory storeDirectory = store.directory();
store.incRef();
remoteStore.incRef();
try {
Set<String> localSegmentFiles = Sets.newHashSet(storeDirectory.listAll());
List<String> downloadedFiles = new ArrayList<>();
for (String file : remoteDirectory.listAll()) {
if(localSegmentFiles.contains(file) == false) {
logger.debug("Downloading segments file: {} ", file);
storeDirectory.copyFrom(remoteDirectory, file, file, IOContext.DEFAULT);
downloadedFiles.add(file);
}
}
storeDirectory.sync(downloadedFiles);
} catch (IOException e) {
throw new IndexShardRecoveryException(shardId, "Exception while copying segment files from remote segment store", e);
} finally {
store.decRef();
remoteStore.decRef();
}
}

resetEngineToGlobalCheckpoint();
}
replicationTracker.activatePrimaryMode(getLocalCheckpoint());
Expand Down

0 comments on commit ba47fe4

Please sign in to comment.