From ba47fe4ab0dab29d4d0e0805ffeccb172dd0aa14 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Wed, 14 Dec 2022 10:20:04 +0530 Subject: [PATCH] Download segments from remote segment store in failover flow Signed-off-by: Sachin Kale --- .../opensearch/index/shard/IndexShard.java | 40 ++++++++++++++++++- 1 file changed, 38 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 3a3c4b19a02f6..bf3ea4017c00f 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -48,6 +48,11 @@ import org.apache.lucene.search.Sort; import org.apache.lucene.search.UsageTrackingQueryCachingPolicy; import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.store.BufferedChecksumIndexInput; +import org.apache.lucene.store.ChecksumIndexInput; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; +import org.apache.lucene.store.IOContext; import org.apache.lucene.util.SetOnce; import org.apache.lucene.util.ThreadInterruptedException; import org.opensearch.Assertions; @@ -88,6 +93,7 @@ import org.opensearch.common.util.concurrent.AsyncIOProcessor; import org.opensearch.common.util.concurrent.RunOnce; import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.common.util.set.Sets; import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.core.internal.io.IOUtils; import org.opensearch.gateway.WriteStateException; @@ -202,8 +208,8 @@ import java.util.stream.StreamSupport; import static org.opensearch.index.seqno.RetentionLeaseActions.RETAIN_ALL; -import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO; -import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; +import static org.opensearch.index.seqno.SequenceNumbers.*; +import static org.opensearch.index.shard.RemoteStoreRefreshListener.SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX; /** * An OpenSearch index shard @@ -623,6 +629,36 @@ public void updateShardState( if (indexSettings.isSegRepEnabled()) { // this Shard's engine was read only, we need to update its engine before restoring local history from xlog. assert newRouting.primary() && currentRouting.primary() == false; + + if (indexSettings.isRemoteStoreEnabled()) { + logger.info("Downloading segments from remote segment store"); + FilterDirectory remoteStoreDirectory = (FilterDirectory) remoteStore.directory(); + assert remoteStoreDirectory.getDelegate() instanceof FilterDirectory + : "Store.directory is not enclosing an instance of FilterDirectory"; + FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate(); + final Directory remoteDirectory = byteSizeCachingStoreDirectory.getDelegate(); + final Directory storeDirectory = store.directory(); + store.incRef(); + remoteStore.incRef(); + try { + Set localSegmentFiles = Sets.newHashSet(storeDirectory.listAll()); + List downloadedFiles = new ArrayList<>(); + for (String file : remoteDirectory.listAll()) { + if(localSegmentFiles.contains(file) == false) { + logger.debug("Downloading segments file: {} ", file); + storeDirectory.copyFrom(remoteDirectory, file, file, IOContext.DEFAULT); + downloadedFiles.add(file); + } + } + storeDirectory.sync(downloadedFiles); + } catch (IOException e) { + throw new IndexShardRecoveryException(shardId, "Exception while copying segment files from remote segment store", e); + } finally { + store.decRef(); + remoteStore.decRef(); + } + } + resetEngineToGlobalCheckpoint(); } replicationTracker.activatePrimaryMode(getLocalCheckpoint());