From 57eb105a6a156e3a313efd58e267d772ac178eea Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Tue, 1 Aug 2023 23:17:39 -0700 Subject: [PATCH] [Segment Replication] Refactor RemoteStoreReplicationSource (#8767) * [Segment Replication] Refactor remote replication source Signed-off-by: Suraj Singh * Unit test updates Signed-off-by: Suraj Singh * Self review Signed-off-by: Suraj Singh * Self review Signed-off-by: Suraj Singh * Segregate shard level tests for node to node and remote store segment replication Signed-off-by: Suraj Singh * Fix failing unit tests Signed-off-by: Suraj Singh * Fix failing UT Signed-off-by: Suraj Singh * Fix failing UT Signed-off-by: Suraj Singh * Address review comments Signed-off-by: Suraj Singh * Fix more unit tests Signed-off-by: Suraj Singh * Improve RemoteStoreReplicationSourceTests, remove unnecessary mocks and use actual failures for failure/exception use cases Signed-off-by: Suraj Singh * Spotless check fix Signed-off-by: Suraj Singh * Address review comments Signed-off-by: Suraj Singh * Ignore files already in store while computing segment file diff with primary Signed-off-by: Suraj Singh * Spotless fix Signed-off-by: Suraj Singh * Fix failing UT Signed-off-by: Suraj Singh * Spotless fix Signed-off-by: Suraj Singh * Move read/writes from IndexInput/Output to RemoteSegmentMetadata Signed-off-by: Suraj Singh * Address review commnt Signed-off-by: Suraj Singh * Update recovery flow to perform commits during recovery Signed-off-by: Suraj Singh * Remove un-necessary char Signed-off-by: Suraj Singh * Address review comments Signed-off-by: Suraj Singh * Update comment nit-pick Signed-off-by: Suraj Singh * Remove deletion logic causing read issues due to deleted segments_N Signed-off-by: Suraj Singh * Spotless fix Signed-off-by: Suraj Singh * Fix unit tests Signed-off-by: Suraj Singh --------- Signed-off-by: Suraj Singh --- .../replication/SegmentReplicationBaseIT.java | 20 - .../remotestore/RemoteStoreStatsIT.java | 8 +- .../RemoteIndexSnapshotStatusApiIT.java | 1 + .../opensearch/index/shard/IndexShard.java | 54 +- .../shard/RemoteStoreRefreshListener.java | 9 +- .../opensearch/index/shard/StoreRecovery.java | 2 +- .../store/RemoteSegmentStoreDirectory.java | 13 +- .../org/opensearch/index/store/Store.java | 10 +- .../metadata/RemoteSegmentMetadata.java | 53 +- .../recovery/PeerRecoveryTargetService.java | 2 +- .../replication/GetSegmentFilesResponse.java | 4 + .../RemoteStoreReplicationSource.java | 50 +- .../replication/SegmentReplicationTarget.java | 55 +- .../RemoteStoreRefreshListenerTests.java | 2 +- ...overyWithRemoteTranslogOnPrimaryTests.java | 37 +- .../SegmentReplicationIndexShardTests.java | 896 ++---------------- ...licationWithNodeToNodeIndexShardTests.java | 697 ++++++++++++++ ...tReplicationWithRemoteIndexShardTests.java | 133 ++- .../RemoteSegmentStoreDirectoryTests.java | 47 +- .../RemoteSegmentMetadataHandlerTests.java | 28 +- ...teStorePeerRecoverySourceHandlerTests.java | 29 +- .../RemoteStoreReplicationSourceTests.java | 199 ++-- ...enSearchIndexLevelReplicationTestCase.java | 6 +- .../index/shard/IndexShardTestCase.java | 64 +- 24 files changed, 1291 insertions(+), 1128 deletions(-) create mode 100644 server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java index 64c6ebbb33482..cfb2e11c8c429 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java @@ -8,7 +8,6 @@ package org.opensearch.indices.replication; -import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse; import org.opensearch.action.search.SearchResponse; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; @@ -24,7 +23,6 @@ import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.IndexModule; import org.opensearch.index.IndexService; -import org.opensearch.index.SegmentReplicationPerGroupStats; import org.opensearch.index.SegmentReplicationShardStats; import org.opensearch.index.engine.Engine; import org.opensearch.index.shard.IndexShard; @@ -134,24 +132,6 @@ protected void waitForSearchableDocs(long docCount, String... nodes) throws Exce waitForSearchableDocs(docCount, Arrays.stream(nodes).collect(Collectors.toList())); } - protected void waitForSegmentReplication(String node) throws Exception { - assertBusy(() -> { - SegmentReplicationStatsResponse segmentReplicationStatsResponse = client(node).admin() - .indices() - .prepareSegmentReplicationStats(INDEX_NAME) - .setDetailed(true) - .execute() - .actionGet(); - final SegmentReplicationPerGroupStats perGroupStats = segmentReplicationStatsResponse.getReplicationStats() - .get(INDEX_NAME) - .get(0); - assertEquals( - perGroupStats.getReplicaStats().stream().findFirst().get().getCurrentReplicationState().getStage(), - SegmentReplicationState.Stage.DONE - ); - }, 1, TimeUnit.MINUTES); - } - protected void verifyStoreContent() throws Exception { assertBusy(() -> { final ClusterState clusterState = getClusterState(); diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java index 840e3a07ed255..1c7f14701b3e7 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java @@ -272,12 +272,12 @@ public void testDownloadStatsCorrectnessSinglePrimarySingleReplica() throws Exce assertTrue( replicaStats.directoryFileTransferTrackerStats.transferredBytesStarted > 0 && primaryStats.uploadBytesStarted - - zeroStatePrimaryStats.uploadBytesStarted == replicaStats.directoryFileTransferTrackerStats.transferredBytesStarted + - zeroStatePrimaryStats.uploadBytesStarted >= replicaStats.directoryFileTransferTrackerStats.transferredBytesStarted ); assertTrue( replicaStats.directoryFileTransferTrackerStats.transferredBytesSucceeded > 0 && primaryStats.uploadBytesSucceeded - - zeroStatePrimaryStats.uploadBytesSucceeded == replicaStats.directoryFileTransferTrackerStats.transferredBytesSucceeded + - zeroStatePrimaryStats.uploadBytesSucceeded >= replicaStats.directoryFileTransferTrackerStats.transferredBytesSucceeded ); // Assert zero failures assertEquals(0, primaryStats.uploadBytesFailed); @@ -369,8 +369,8 @@ public void testDownloadStatsCorrectnessSinglePrimaryMultipleReplicaShards() thr assertEquals(0, uploadsFailed); assertEquals(0, uploadBytesFailed); for (int j = 0; j < response.getSuccessfulShards() - 1; j++) { - assertEquals(uploadBytesStarted - zeroStatePrimaryStats.uploadBytesStarted, (long) downloadBytesStarted.get(j)); - assertEquals(uploadBytesSucceeded - zeroStatePrimaryStats.uploadBytesSucceeded, (long) downloadBytesSucceeded.get(j)); + assertTrue(uploadBytesStarted - zeroStatePrimaryStats.uploadBytesStarted > downloadBytesStarted.get(j)); + assertTrue(uploadBytesSucceeded - zeroStatePrimaryStats.uploadBytesSucceeded > downloadBytesSucceeded.get(j)); assertEquals(0, (long) downloadBytesFailed.get(j)); } }, 60, TimeUnit.SECONDS); diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/RemoteIndexSnapshotStatusApiIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/RemoteIndexSnapshotStatusApiIT.java index b6a5188c99335..d17410d8921ed 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/RemoteIndexSnapshotStatusApiIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/RemoteIndexSnapshotStatusApiIT.java @@ -57,6 +57,7 @@ protected Settings nodeSettings(int nodeOrdinal) { .put(super.nodeSettings(nodeOrdinal)) .put(ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.getKey(), 0) // We have tests that check by-timestamp order .put(FeatureFlags.REMOTE_STORE, "true") + .put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true") .put(remoteStoreClusterSettings("remote-store-repo-name")) .build(); } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index bb5088866edb6..2b85193275a13 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -199,9 +199,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.Comparator; import java.util.EnumSet; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Locale; @@ -1988,7 +1986,7 @@ private long recoverLocallyUpToGlobalCheckpoint() { final Optional safeCommit; final long globalCheckpoint; try { - final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY); + final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(TRANSLOG_UUID_KEY); globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID); safeCommit = store.findSafeIndexCommit(globalCheckpoint); } catch (org.apache.lucene.index.IndexNotFoundException e) { @@ -2088,7 +2086,7 @@ private long recoverLocallyUptoLastCommit() { try { seqNo = Long.parseLong(store.readLastCommittedSegmentsInfo().getUserData().get(MAX_SEQ_NO)); } catch (org.apache.lucene.index.IndexNotFoundException e) { - logger.error("skip local recovery as no index commit found", e); + logger.error("skip local recovery as no index commit found"); return UNASSIGNED_SEQ_NO; } catch (Exception e) { logger.error("skip local recovery as failed to find the safe commit", e); @@ -2242,7 +2240,7 @@ private void loadGlobalCheckpointToReplicationTracker() throws IOException { // we have to set it before we open an engine and recover from the translog because // acquiring a snapshot from the translog causes a sync which causes the global checkpoint to be pulled in, // and an engine can be forced to close in ctor which also causes the global checkpoint to be pulled in. - final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY); + final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(TRANSLOG_UUID_KEY); final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID); replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint"); } @@ -2326,7 +2324,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier, b assert currentEngineReference.get() == null : "engine is running"; verifyNotClosed(); if (indexSettings.isRemoteStoreEnabled() && syncFromRemote) { - syncSegmentsFromRemoteSegmentStore(false, true, true); + syncSegmentsFromRemoteSegmentStore(false, true); } if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) { if (syncFromRemote) { @@ -4555,7 +4553,7 @@ public void close() throws IOException { }; IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine)); if (indexSettings.isRemoteStoreEnabled()) { - syncSegmentsFromRemoteSegmentStore(false, true, true); + syncSegmentsFromRemoteSegmentStore(false, true); } if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) { syncRemoteTranslogAndUpdateGlobalCheckpoint(); @@ -4616,13 +4614,11 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException { * Downloads segments from remote segment store. * @param overrideLocal flag to override local segment files with those in remote store * @param refreshLevelSegmentSync last refresh checkpoint is used if true, commit checkpoint otherwise - * @param shouldCommit if the shard requires committing the changes after sync from remote. * @throws IOException if exception occurs while reading segments from remote store */ - public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean refreshLevelSegmentSync, boolean shouldCommit) - throws IOException { + public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean refreshLevelSegmentSync) throws IOException { assert indexSettings.isRemoteStoreEnabled(); - logger.info("Downloading segments from remote segment store"); + logger.trace("Downloading segments from remote segment store"); RemoteSegmentStoreDirectory remoteDirectory = getRemoteDirectory(); // We need to call RemoteSegmentStoreDirectory.init() in order to get latest metadata of the files that // are uploaded to the remote segment store. @@ -4647,7 +4643,6 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re } else { storeDirectory = store.directory(); } - Set localSegmentFiles = Sets.newHashSet(storeDirectory.listAll()); copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal); if (refreshLevelSegmentSync && remoteSegmentMetadata != null) { @@ -4661,37 +4656,8 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re indexInput, remoteSegmentMetadata.getGeneration() ); - // Replicas never need a local commit - if (shouldCommit) { - if (this.shardRouting.primary()) { - long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY)); - // Following code block makes sure to use SegmentInfosSnapshot in the remote store if generation differs - // with local filesystem. If local filesystem already has segments_N+2 and infosSnapshot has generation N, - // after commit, there would be 2 files that would be created segments_N+1 and segments_N+2. With the - // policy of preserving only the latest commit, we will delete segments_N+1 which in fact is the part of the - // latest commit. - Optional localMaxSegmentInfos = localSegmentFiles.stream() - .filter(file -> file.startsWith(IndexFileNames.SEGMENTS)) - .max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName)); - if (localMaxSegmentInfos.isPresent() - && infosSnapshot.getGeneration() < SegmentInfos.generationFromSegmentsFileName(localMaxSegmentInfos.get()) - - 1) { - // If remote translog is not enabled, local translog will be created with different UUID. - // This fails in Store.trimUnsafeCommits() as translog UUID of checkpoint and SegmentInfos needs - // to be same. Following code block make sure to have the same UUID. - if (indexSettings.isRemoteTranslogStoreEnabled() == false) { - SegmentInfos localSegmentInfos = store.readLastCommittedSegmentsInfo(); - Map userData = new HashMap<>(infosSnapshot.getUserData()); - userData.put(TRANSLOG_UUID_KEY, localSegmentInfos.userData.get(TRANSLOG_UUID_KEY)); - infosSnapshot.setUserData(userData, false); - } - storeDirectory.deleteFile(localMaxSegmentInfos.get()); - } - store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint); - } - } else { - finalizeReplication(infosSnapshot); - } + long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY)); + store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint); } } } catch (IOException e) { @@ -4716,7 +4682,7 @@ public void syncSegmentsFromGivenRemoteSegmentStore( long primaryTerm, long commitGeneration ) throws IOException { - logger.info("Downloading segments from given remote segment store"); + logger.trace("Downloading segments from given remote segment store"); RemoteSegmentStoreDirectory remoteDirectory = null; if (remoteStore != null) { remoteDirectory = getRemoteDirectory(); diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 4a70ff04770d3..8dd0c8b9d4405 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -220,7 +220,7 @@ private synchronized boolean syncSegments() { public void onResponse(Void unused) { try { // Start metadata file upload - uploadMetadata(localSegmentsPostRefresh, segmentInfos); + uploadMetadata(localSegmentsPostRefresh, segmentInfos, checkpoint); clearStaleFilesFromLocalSegmentChecksumMap(localSegmentsPostRefresh); onSuccessfulSegmentsSync( refreshTimeMs, @@ -327,7 +327,8 @@ private boolean isRefreshAfterCommit() throws IOException { && !remoteDirectory.containsFile(lastCommittedLocalSegmentFileName, getChecksumOfLocalFile(lastCommittedLocalSegmentFileName))); } - void uploadMetadata(Collection localSegmentsPostRefresh, SegmentInfos segmentInfos) throws IOException { + void uploadMetadata(Collection localSegmentsPostRefresh, SegmentInfos segmentInfos, ReplicationCheckpoint replicationCheckpoint) + throws IOException { final long maxSeqNo = ((InternalEngine) indexShard.getEngine()).currentOngoingRefreshCheckpoint(); SegmentInfos segmentInfosSnapshot = segmentInfos.clone(); Map userData = segmentInfosSnapshot.getUserData(); @@ -344,8 +345,8 @@ void uploadMetadata(Collection localSegmentsPostRefresh, SegmentInfos se localSegmentsPostRefresh, segmentInfosSnapshot, storeDirectory, - indexShard.getOperationPrimaryTerm(), - translogFileGeneration + translogFileGeneration, + replicationCheckpoint ); } } diff --git a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java index 5897fa7d513d7..2c8a186a6ed53 100644 --- a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java @@ -530,7 +530,7 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco remoteStore.incRef(); try { // Download segments from remote segment store - indexShard.syncSegmentsFromRemoteSegmentStore(true, true, true); + indexShard.syncSegmentsFromRemoteSegmentStore(true, true); if (store.directory().listAll().length == 0) { store.createEmpty(indexShard.indexSettings().getIndexVersionCreated().luceneVersion); diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index 8ee267cb67e68..8dfdb3e2c8e06 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -44,6 +44,7 @@ import org.opensearch.index.store.lockmanager.RemoteStoreLockManager; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.threadpool.ThreadPool; import java.io.FileNotFoundException; @@ -603,19 +604,20 @@ public boolean containsFile(String localFilename, String checksum) { * @param segmentFiles segment files that are part of the shard at the time of the latest refresh * @param segmentInfosSnapshot SegmentInfos bytes to store as part of metadata file * @param storeDirectory instance of local directory to temporarily create metadata file before upload - * @param primaryTerm primary term to be used in the name of metadata file + * @param translogGeneration translog generation + * @param replicationCheckpoint ReplicationCheckpoint of primary shard * @throws IOException in case of I/O error while uploading the metadata file */ public void uploadMetadata( Collection segmentFiles, SegmentInfos segmentInfosSnapshot, Directory storeDirectory, - long primaryTerm, - long translogGeneration + long translogGeneration, + ReplicationCheckpoint replicationCheckpoint ) throws IOException { synchronized (this) { String metadataFilename = MetadataFilenameUtils.getMetadataFilename( - primaryTerm, + replicationCheckpoint.getPrimaryTerm(), segmentInfosSnapshot.getGeneration(), translogGeneration, metadataUploadCounter.incrementAndGet(), @@ -646,8 +648,7 @@ public void uploadMetadata( new RemoteSegmentMetadata( RemoteSegmentMetadata.fromMapOfStrings(uploadedSegments), segmentInfoSnapshotByteArray, - primaryTerm, - segmentInfosSnapshot.getGeneration() + replicationCheckpoint ) ); } diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index a67b87f58110c..921deae41946a 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -845,22 +845,24 @@ private void cleanupFiles(Collection filesToConsiderForCleanup, String r * @param tmpToFileName Map of temporary replication file to actual file name * @param infosBytes bytes[] of SegmentInfos supposed to be sent over by primary excluding segment_N file * @param segmentsGen segment generation number - * @param consumer consumer for generated SegmentInfos + * @param finalizeConsumer consumer for action on passed in SegmentInfos + * @param renameConsumer consumer for action on temporary copied over files * @throws IOException Exception while reading store and building segment infos */ public void buildInfosFromBytes( Map tmpToFileName, byte[] infosBytes, long segmentsGen, - CheckedConsumer consumer + CheckedConsumer finalizeConsumer, + CheckedConsumer, IOException> renameConsumer ) throws IOException { metadataLock.writeLock().lock(); try { final List values = new ArrayList<>(tmpToFileName.values()); incRefFileDeleter(values); try { - renameTempFilesSafe(tmpToFileName); - consumer.accept(buildSegmentInfos(infosBytes, segmentsGen)); + renameConsumer.accept(tmpToFileName); + finalizeConsumer.accept(buildSegmentInfos(infosBytes, segmentsGen)); } finally { decRefFileDeleter(values); } diff --git a/server/src/main/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadata.java b/server/src/main/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadata.java index 9a479346ff711..15703a2c02b13 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadata.java +++ b/server/src/main/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadata.java @@ -14,7 +14,10 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.store.RemoteSegmentStoreDirectory; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; /** * Metadata object for Remote Segment @@ -38,19 +41,16 @@ public class RemoteSegmentMetadata { private final byte[] segmentInfosBytes; - private final long primaryTerm; - private final long generation; + private final ReplicationCheckpoint replicationCheckpoint; public RemoteSegmentMetadata( Map metadata, byte[] segmentInfosBytes, - long primaryTerm, - long generation + ReplicationCheckpoint replicationCheckpoint ) { this.metadata = metadata; this.segmentInfosBytes = segmentInfosBytes; - this.generation = generation; - this.primaryTerm = primaryTerm; + this.replicationCheckpoint = replicationCheckpoint; } /** @@ -66,11 +66,15 @@ public byte[] getSegmentInfosBytes() { } public long getGeneration() { - return generation; + return replicationCheckpoint.getSegmentsGen(); } public long getPrimaryTerm() { - return primaryTerm; + return replicationCheckpoint.getPrimaryTerm(); + } + + public ReplicationCheckpoint getReplicationCheckpoint() { + return replicationCheckpoint; } /** @@ -99,19 +103,42 @@ public static Map f public void write(IndexOutput out) throws IOException { out.writeMapOfStrings(toMapOfStrings()); - out.writeLong(generation); - out.writeLong(primaryTerm); + writeCheckpointToIndexOutput(replicationCheckpoint, out); out.writeLong(segmentInfosBytes.length); out.writeBytes(segmentInfosBytes, segmentInfosBytes.length); } public static RemoteSegmentMetadata read(IndexInput indexInput) throws IOException { Map metadata = indexInput.readMapOfStrings(); - long generation = indexInput.readLong(); - long primaryTerm = indexInput.readLong(); + ReplicationCheckpoint replicationCheckpoint = readCheckpointFromIndexInput(indexInput); int byteArraySize = (int) indexInput.readLong(); byte[] segmentInfosBytes = new byte[byteArraySize]; indexInput.readBytes(segmentInfosBytes, 0, byteArraySize); - return new RemoteSegmentMetadata(RemoteSegmentMetadata.fromMapOfStrings(metadata), segmentInfosBytes, primaryTerm, generation); + return new RemoteSegmentMetadata(RemoteSegmentMetadata.fromMapOfStrings(metadata), segmentInfosBytes, replicationCheckpoint); + } + + public static void writeCheckpointToIndexOutput(ReplicationCheckpoint replicationCheckpoint, IndexOutput out) throws IOException { + ShardId shardId = replicationCheckpoint.getShardId(); + // Write ShardId + out.writeString(shardId.getIndex().getName()); + out.writeString(shardId.getIndex().getUUID()); + out.writeVInt(shardId.getId()); + // Write remaining checkpoint fields + out.writeLong(replicationCheckpoint.getPrimaryTerm()); + out.writeLong(replicationCheckpoint.getSegmentsGen()); + out.writeLong(replicationCheckpoint.getSegmentInfosVersion()); + out.writeLong(replicationCheckpoint.getLength()); + out.writeString(replicationCheckpoint.getCodec()); + } + + private static ReplicationCheckpoint readCheckpointFromIndexInput(IndexInput in) throws IOException { + return new ReplicationCheckpoint( + new ShardId(new Index(in.readString(), in.readString()), in.readVInt()), + in.readLong(), + in.readLong(), + in.readLong(), + in.readLong(), + in.readString() + ); } } diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index 0ba57a9ee7f65..386b2e0e8192d 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -245,7 +245,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi indexShard.prepareForIndexRecovery(); final boolean hasRemoteSegmentStore = indexShard.indexSettings().isRemoteStoreEnabled(); if (hasRemoteSegmentStore) { - indexShard.syncSegmentsFromRemoteSegmentStore(false, false, true); + indexShard.syncSegmentsFromRemoteSegmentStore(false, false); } final boolean hasRemoteTranslog = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled(); final boolean hasNoTranslog = indexShard.indexSettings().isRemoteSnapshot(); diff --git a/server/src/main/java/org/opensearch/indices/replication/GetSegmentFilesResponse.java b/server/src/main/java/org/opensearch/indices/replication/GetSegmentFilesResponse.java index 89d50a17464a6..33a84833f2418 100644 --- a/server/src/main/java/org/opensearch/indices/replication/GetSegmentFilesResponse.java +++ b/server/src/main/java/org/opensearch/indices/replication/GetSegmentFilesResponse.java @@ -33,6 +33,10 @@ public GetSegmentFilesResponse(StreamInput out) throws IOException { out.readList(StoreFileMetadata::new); } + public List getFiles() { + return files; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeCollection(files); diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java index c5be7635782af..7f444d0031533 100644 --- a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java @@ -10,7 +10,9 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.Directory; import org.apache.lucene.store.FilterDirectory; +import org.apache.lucene.store.IOContext; import org.apache.lucene.util.Version; import org.opensearch.action.ActionListener; import org.opensearch.index.shard.IndexShard; @@ -21,6 +23,8 @@ import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -33,12 +37,16 @@ */ public class RemoteStoreReplicationSource implements SegmentReplicationSource { - private static final Logger logger = LogManager.getLogger(PrimaryShardReplicationSource.class); + private static final Logger logger = LogManager.getLogger(RemoteStoreReplicationSource.class); private final IndexShard indexShard; + private final RemoteSegmentStoreDirectory remoteDirectory; public RemoteStoreReplicationSource(IndexShard indexShard) { this.indexShard = indexShard; + FilterDirectory remoteStoreDirectory = (FilterDirectory) indexShard.remoteStore().directory(); + FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate(); + this.remoteDirectory = (RemoteSegmentStoreDirectory) byteSizeCachingStoreDirectory.getDelegate(); } @Override @@ -47,15 +55,11 @@ public void getCheckpointMetadata( ReplicationCheckpoint checkpoint, ActionListener listener ) { - FilterDirectory remoteStoreDirectory = (FilterDirectory) indexShard.remoteStore().directory(); - FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate(); - RemoteSegmentStoreDirectory remoteDirectory = (RemoteSegmentStoreDirectory) byteSizeCachingStoreDirectory.getDelegate(); - Map metadataMap; // TODO: Need to figure out a way to pass this information for segment metadata via remote store. final Version version = indexShard.getSegmentInfosSnapshot().get().getCommitLuceneVersion(); try { - RemoteSegmentMetadata mdFile = remoteDirectory.readLatestMetadataFile(); + RemoteSegmentMetadata mdFile = remoteDirectory.init(); // During initial recovery flow, the remote store might not have metadata as primary hasn't uploaded anything yet. if (mdFile == null && indexShard.state().equals(IndexShardState.STARTED) == false) { listener.onResponse(new CheckpointInfoResponse(checkpoint, Collections.emptyMap(), null)); @@ -77,8 +81,7 @@ public void getCheckpointMetadata( ) ) ); - // TODO: GET current checkpoint from remote store. - listener.onResponse(new CheckpointInfoResponse(checkpoint, metadataMap, null)); + listener.onResponse(new CheckpointInfoResponse(mdFile.getReplicationCheckpoint(), metadataMap, mdFile.getSegmentInfosBytes())); } catch (Exception e) { listener.onFailure(e); } @@ -93,8 +96,33 @@ public void getSegmentFiles( ActionListener listener ) { try { - indexShard.syncSegmentsFromRemoteSegmentStore(false, true, false); - listener.onResponse(new GetSegmentFilesResponse(Collections.emptyList())); + if (filesToFetch.isEmpty()) { + listener.onResponse(new GetSegmentFilesResponse(Collections.emptyList())); + return; + } + logger.trace("Downloading segments files from remote store {}", filesToFetch); + + RemoteSegmentMetadata remoteSegmentMetadata = remoteDirectory.readLatestMetadataFile(); + List downloadedSegments = new ArrayList<>(); + Collection directoryFiles = List.of(indexShard.store().directory().listAll()); + if (remoteSegmentMetadata != null) { + try { + indexShard.store().incRef(); + indexShard.remoteStore().incRef(); + final Directory storeDirectory = indexShard.store().directory(); + for (StoreFileMetadata fileMetadata : filesToFetch) { + String file = fileMetadata.name(); + assert directoryFiles.contains(file) == false : "Local store already contains the file " + file; + storeDirectory.copyFrom(remoteDirectory, file, file, IOContext.DEFAULT); + downloadedSegments.add(fileMetadata); + } + logger.trace("Downloaded segments from remote store {}", downloadedSegments); + } finally { + indexShard.store().decRef(); + indexShard.remoteStore().decRef(); + } + } + listener.onResponse(new GetSegmentFilesResponse(downloadedSegments)); } catch (Exception e) { listener.onFailure(e); } @@ -102,6 +130,6 @@ public void getSegmentFiles( @Override public String getDescription() { - return "remote store"; + return "RemoteStoreReplicationSource"; } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 2e0f5a8c0ad1f..c22701dfc94ce 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -13,10 +13,6 @@ import org.apache.lucene.index.IndexFormatTooNewException; import org.apache.lucene.index.IndexFormatTooOldException; import org.apache.lucene.store.AlreadyClosedException; -import org.apache.lucene.store.BufferedChecksumIndexInput; -import org.apache.lucene.store.ByteBuffersDataInput; -import org.apache.lucene.store.ByteBuffersIndexInput; -import org.apache.lucene.store.ChecksumIndexInput; import org.opensearch.OpenSearchCorruptionException; import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; @@ -36,10 +32,10 @@ import org.opensearch.indices.replication.common.ReplicationTarget; import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Arrays; import java.util.List; import java.util.Locale; +import java.util.Map; +import java.util.stream.Collectors; /** * Represents the target of a replication event. @@ -55,10 +51,6 @@ public class SegmentReplicationTarget extends ReplicationTarget { public final static String REPLICATION_PREFIX = "replication."; - public ReplicationCheckpoint getCheckpoint() { - return this.checkpoint; - } - public SegmentReplicationTarget(IndexShard indexShard, SegmentReplicationSource source, ReplicationListener listener) { super("replication_target", indexShard, new ReplicationLuceneIndex(), listener); this.checkpoint = indexShard.getLatestReplicationCheckpoint(); @@ -117,6 +109,10 @@ public boolean reset(CancellableThreads newTargetCancellableThreads) throws IOEx return false; } + public ReplicationCheckpoint getCheckpoint() { + return this.checkpoint; + } + @Override public void writeFileChunk( StoreFileMetadata metadata, @@ -162,7 +158,7 @@ public void startReplication(ActionListener listener) { }, listener::onFailure); getFilesListener.whenComplete(response -> { - finalizeReplication(checkpointInfoListener.result()); + finalizeReplication(checkpointInfoListener.result(), getFilesListener.result()); listener.onResponse(null); }, listener::onFailure); } @@ -193,23 +189,34 @@ private List getFiles(CheckpointInfoResponse checkpointInfo) return diff.missing; } - private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse) throws OpenSearchCorruptionException { - // TODO: Refactor the logic so that finalize doesn't have to be invoked for remote store as source - if (source instanceof RemoteStoreReplicationSource) { - state.setStage(SegmentReplicationState.Stage.FINALIZE_REPLICATION); - return; - } + private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse, GetSegmentFilesResponse getSegmentFilesResponse) + throws OpenSearchCorruptionException { cancellableThreads.checkForCancel(); state.setStage(SegmentReplicationState.Stage.FINALIZE_REPLICATION); + // Handle empty SegmentInfos bytes for recovering replicas + if (checkpointInfoResponse.getInfosBytes() == null) { + return; + } Store store = null; try { store = store(); store.incRef(); + Map tempFileNames; + if (this.indexShard.indexSettings().isRemoteStoreEnabled() == true) { + tempFileNames = getSegmentFilesResponse.getFiles() + .stream() + .collect(Collectors.toMap(StoreFileMetadata::name, StoreFileMetadata::name)); + } else { + tempFileNames = multiFileWriter.getTempFileNames(); + } store.buildInfosFromBytes( - multiFileWriter.getTempFileNames(), + tempFileNames, checkpointInfoResponse.getInfosBytes(), checkpointInfoResponse.getCheckpoint().getSegmentsGen(), - indexShard::finalizeReplication + indexShard::finalizeReplication, + this.indexShard.indexSettings().isRemoteStoreEnabled() == true + ? (files) -> {} + : (files) -> indexShard.store().renameTempFilesSafe(files) ); } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { // this is a fatal exception at this stage. @@ -247,16 +254,6 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse) } } - /** - * This method formats our byte[] containing the primary's SegmentInfos into lucene's {@link ChecksumIndexInput} that can be - * passed to SegmentInfos.readCommit - */ - private ChecksumIndexInput toIndexInput(byte[] input) { - return new BufferedChecksumIndexInput( - new ByteBuffersIndexInput(new ByteBuffersDataInput(Arrays.asList(ByteBuffer.wrap(input))), "SegmentInfos") - ); - } - /** * Trigger a cancellation, this method will not close the target a subsequent call to #fail is required from target service. */ diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index f13f89c6e067c..66938eec10513 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -413,7 +413,7 @@ private Tuple m when(remoteStore.directory()).thenReturn(remoteStoreFilterDirectory); // Mock indexShard.getOperationPrimaryTerm() - when(shard.getOperationPrimaryTerm()).thenReturn(indexShard.getOperationPrimaryTerm()); + when(shard.getLatestReplicationCheckpoint()).thenReturn(indexShard.getLatestReplicationCheckpoint()); // Mock indexShard.routingEntry().primary() when(shard.routingEntry()).thenReturn(indexShard.routingEntry()); diff --git a/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java b/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java index 690c7955ff338..20b3dfc0f93a6 100644 --- a/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java +++ b/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java @@ -9,22 +9,27 @@ package org.opensearch.index.shard; import org.junit.Assert; +import org.junit.Before; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.IndexSettings; import org.opensearch.index.engine.DocIdSeqNoAndSource; import org.opensearch.index.engine.NRTReplicationEngine; import org.opensearch.index.engine.NRTReplicationEngineFactory; +import org.opensearch.index.mapper.MapperService; import org.opensearch.index.replication.OpenSearchIndexLevelReplicationTestCase; import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.index.store.Store; import org.opensearch.index.translog.WriteOnlyTranslogManager; import org.opensearch.indices.recovery.RecoveryTarget; import org.opensearch.indices.replication.common.ReplicationType; import java.io.IOException; +import java.nio.file.Path; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -39,15 +44,24 @@ public class ReplicaRecoveryWithRemoteTranslogOnPrimaryTests extends OpenSearchI .put(IndexSettings.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), "100ms") .build(); - public void testStartSequenceForReplicaRecovery() throws Exception { - try (ReplicationGroup shards = createGroup(0, settings, new NRTReplicationEngineFactory())) { + @Before + public void setup() { + // Todo: Remove feature flag once remote store integration with segrep goes GA + FeatureFlags.initializeFeatureFlags( + Settings.builder().put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL_SETTING.getKey(), "true").build() + ); + } + public void testStartSequenceForReplicaRecovery() throws Exception { + final Path remoteDir = createTempDir(); + final String indexMapping = "{ \"" + MapperService.SINGLE_MAPPING_NAME + "\": {} }"; + try (ReplicationGroup shards = createGroup(0, settings, indexMapping, new NRTReplicationEngineFactory(), remoteDir)) { shards.startPrimary(); final IndexShard primary = shards.getPrimary(); int numDocs = shards.indexDocs(randomIntBetween(10, 100)); shards.flush(); - final IndexShard replica = shards.addReplica(); + final IndexShard replica = shards.addReplica(remoteDir); shards.startAll(); allowShardFailures(); @@ -63,6 +77,14 @@ public void testStartSequenceForReplicaRecovery() throws Exception { int moreDocs = shards.indexDocs(randomIntBetween(20, 100)); shards.flush(); + final ShardRouting replicaRouting2 = newShardRouting( + replicaRouting.shardId(), + replicaRouting.currentNodeId(), + false, + ShardRoutingState.INITIALIZING, + RecoverySource.PeerRecoverySource.INSTANCE + ); + Store remoteStore = createRemoteStore(remoteDir, replicaRouting2, newIndexMetadata); IndexShard newReplicaShard = newShard( newShardRouting( replicaRouting.shardId(), @@ -80,7 +102,7 @@ public void testStartSequenceForReplicaRecovery() throws Exception { replica.getGlobalCheckpointSyncer(), replica.getRetentionLeaseSyncer(), EMPTY_EVENT_LISTENER, - null + remoteStore ); shards.addReplica(newReplicaShard); AtomicBoolean assertDone = new AtomicBoolean(false); @@ -103,7 +125,6 @@ public IndexShard indexShard() { return idxShard; } }); - shards.flush(); replicateSegments(primary, shards.getReplicas()); shards.assertAllEqual(numDocs + moreDocs); @@ -111,7 +132,9 @@ public IndexShard indexShard() { } public void testNoTranslogHistoryTransferred() throws Exception { - try (ReplicationGroup shards = createGroup(0, settings, new NRTReplicationEngineFactory())) { + final Path remoteDir = createTempDir(); + final String indexMapping = "{ \"" + MapperService.SINGLE_MAPPING_NAME + "\": {} }"; + try (ReplicationGroup shards = createGroup(0, settings, indexMapping, new NRTReplicationEngineFactory(), remoteDir)) { // Step1 - Start primary, index docs, flush, index more docs, check translog in primary as expected shards.startPrimary(); @@ -123,7 +146,7 @@ public void testNoTranslogHistoryTransferred() throws Exception { assertEquals(numDocs + moreDocs, getTranslog(primary).totalOperations()); // Step 2 - Start replica, recovery happens, check docs recovered till last flush - final IndexShard replica = shards.addReplica(); + final IndexShard replica = shards.addReplica(remoteDir); shards.startAll(); assertEquals(docIdAndSeqNosAfterFlush, getDocIdAndSeqNos(replica)); assertDocCount(replica, numDocs); diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 9107606326150..12b7341349442 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -16,7 +16,6 @@ import org.opensearch.action.ActionListener; import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.action.admin.indices.forcemerge.ForceMergeRequest; -import org.opensearch.action.delete.DeleteRequest; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.cluster.metadata.IndexMetadata; @@ -31,9 +30,7 @@ import org.opensearch.common.xcontent.XContentType; import org.opensearch.common.lease.Releasable; import org.opensearch.index.IndexSettings; -import org.opensearch.index.engine.DocIdSeqNoAndSource; import org.opensearch.index.engine.Engine; -import org.opensearch.index.engine.InternalEngine; import org.opensearch.index.engine.InternalEngineFactory; import org.opensearch.index.engine.NRTReplicationEngine; import org.opensearch.index.engine.NRTReplicationEngineFactory; @@ -44,7 +41,6 @@ import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.translog.SnapshotMatchers; import org.opensearch.index.translog.Translog; -import org.opensearch.indices.IndicesService; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryTarget; import org.opensearch.indices.replication.CheckpointInfoResponse; @@ -67,22 +63,15 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; import java.util.function.Function; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import static java.util.Arrays.asList; import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; import static org.mockito.Mockito.any; @@ -100,6 +89,41 @@ public class SegmentReplicationIndexShardTests extends OpenSearchIndexLevelRepli .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .build(); + protected ReplicationGroup getReplicationGroup(int numberOfReplicas) throws IOException { + return createGroup(numberOfReplicas, getIndexSettings(), indexMapping, new NRTReplicationEngineFactory()); + } + + protected ReplicationGroup getReplicationGroup(int numberOfReplicas, String indexMapping) throws IOException { + return createGroup(numberOfReplicas, getIndexSettings(), indexMapping, new NRTReplicationEngineFactory()); + } + + protected Settings getIndexSettings() { + return settings; + } + + /** + * Validates happy path of segment replication where primary index docs which are replicated to replica shards. Assertions + * made on doc count on both primary and replica. + * @throws Exception + */ + public void testReplication() throws Exception { + try (ReplicationGroup shards = createGroup(1, getIndexSettings(), indexMapping, new NRTReplicationEngineFactory());) { + shards.startAll(); + final IndexShard primaryShard = shards.getPrimary(); + final IndexShard replicaShard = shards.getReplicas().get(0); + + // index and replicate segments to replica. + int numDocs = randomIntBetween(10, 20); + shards.indexDocs(numDocs); + primaryShard.refresh("test"); + flushShard(primaryShard); + replicateSegments(primaryShard, List.of(replicaShard)); + + // Assertions + shards.assertAllEqual(numDocs); + } + } + /** * Test that latestReplicationCheckpoint returns null only for docrep enabled indices */ @@ -114,7 +138,7 @@ public void testReplicationCheckpointNullForDocRep() throws IOException { * Test that latestReplicationCheckpoint returns ReplicationCheckpoint for segrep enabled indices */ public void testReplicationCheckpointNotNullForSegRep() throws IOException { - final IndexShard indexShard = newStartedShard(randomBoolean(), settings, new NRTReplicationEngineFactory()); + final IndexShard indexShard = newStartedShard(randomBoolean(), getIndexSettings(), new NRTReplicationEngineFactory()); final ReplicationCheckpoint replicationCheckpoint = indexShard.getLatestReplicationCheckpoint(); assertNotNull(replicationCheckpoint); closeShards(indexShard); @@ -127,7 +151,7 @@ public void testNRTReplicasDoNotAcceptRefreshListeners() throws IOException { } public void testSegmentInfosAndReplicationCheckpointTuple() throws Exception { - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { + try (ReplicationGroup shards = createGroup(1, getIndexSettings(), indexMapping, new NRTReplicationEngineFactory())) { shards.startAll(); final IndexShard primary = shards.getPrimary(); final IndexShard replica = shards.getReplicas().get(0); @@ -149,7 +173,7 @@ public void testSegmentInfosAndReplicationCheckpointTuple() throws Exception { assertEquals(1, primary.getLatestReplicationCheckpoint().compareTo(replica.getLatestReplicationCheckpoint())); // index and copy segments to replica. - int numDocs = randomIntBetween(10, 100); + int numDocs = randomIntBetween(10, 20); shards.indexDocs(numDocs); primary.refresh("test"); replicateSegments(primary, List.of(replica)); @@ -172,6 +196,51 @@ public void testSegmentInfosAndReplicationCheckpointTuple() throws Exception { } } + public void testPrimaryRelocationWithSegRepFailure() throws Exception { + final IndexShard primarySource = newStartedShard(true, getIndexSettings()); + int totalOps = randomInt(10); + for (int i = 0; i < totalOps; i++) { + indexDoc(primarySource, "_doc", Integer.toString(i)); + } + IndexShardTestCase.updateRoutingEntry(primarySource, primarySource.routingEntry().relocate(randomAlphaOfLength(10), -1)); + final IndexShard primaryTarget = newShard( + primarySource.routingEntry().getTargetRelocatingShard(), + getIndexSettings(), + new NRTReplicationEngineFactory() + ); + updateMappings(primaryTarget, primarySource.indexSettings().getIndexMetadata()); + + Function, List> replicatePrimaryFunction = (shardList) -> { + try { + throw new IOException("Expected failure"); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + Exception e = expectThrows( + Exception.class, + () -> recoverReplica( + primaryTarget, + primarySource, + (primary, sourceNode) -> new RecoveryTarget(primary, sourceNode, new ReplicationListener() { + @Override + public void onDone(ReplicationState state) { + throw new AssertionError("recovery must fail"); + } + + @Override + public void onFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { + assertEquals(ExceptionsHelper.unwrap(e, IOException.class).getMessage(), "Expected failure"); + } + }), + true, + true, + replicatePrimaryFunction + ) + ); + closeShards(primarySource, primaryTarget); + } + private void assertReplicationCheckpoint(IndexShard shard, SegmentInfos segmentInfos, ReplicationCheckpoint checkpoint) throws IOException { assertNotNull(segmentInfos); @@ -180,7 +249,7 @@ private void assertReplicationCheckpoint(IndexShard shard, SegmentInfos segmentI } public void testIsSegmentReplicationAllowed_WrongEngineType() throws IOException { - final IndexShard indexShard = newShard(false, settings, new InternalEngineFactory()); + final IndexShard indexShard = newShard(false, getIndexSettings(), new InternalEngineFactory()); assertFalse(indexShard.isSegmentReplicationAllowed()); closeShards(indexShard); } @@ -193,13 +262,13 @@ public void testIsSegmentReplicationAllowed_WrongEngineType() throws IOException */ public void testSegmentReplication_With_ReaderClosedConcurrently() throws Exception { String mappings = "{ \"" + MapperService.SINGLE_MAPPING_NAME + "\": { \"properties\": { \"foo\": { \"type\": \"keyword\"} }}}"; - try (ReplicationGroup shards = createGroup(1, settings, mappings, new NRTReplicationEngineFactory())) { + try (ReplicationGroup shards = createGroup(1, getIndexSettings(), mappings, new NRTReplicationEngineFactory())) { shards.startAll(); IndexShard primaryShard = shards.getPrimary(); final IndexShard replicaShard = shards.getReplicas().get(0); // Step 1. Ingest numDocs documents & replicate to replica shard - final int numDocs = randomIntBetween(100, 200); + final int numDocs = randomIntBetween(10, 20); logger.info("--> Inserting documents {}", numDocs); for (int i = 0; i < numDocs; i++) { shards.index(new IndexRequest(index.getName()).id(String.valueOf(i)).source("{\"foo\": \"bar\"}", XContentType.JSON)); @@ -242,13 +311,13 @@ public void testSegmentReplication_With_ReaderClosedConcurrently() throws Except */ public void testSegmentReplication_With_EngineClosedConcurrently() throws Exception { String mappings = "{ \"" + MapperService.SINGLE_MAPPING_NAME + "\": { \"properties\": { \"foo\": { \"type\": \"keyword\"} }}}"; - try (ReplicationGroup shards = createGroup(1, settings, mappings, new NRTReplicationEngineFactory())) { + try (ReplicationGroup shards = createGroup(1, getIndexSettings(), mappings, new NRTReplicationEngineFactory())) { shards.startAll(); IndexShard primaryShard = shards.getPrimary(); final IndexShard replicaShard = shards.getReplicas().get(0); // Step 1. Ingest numDocs documents - final int numDocs = randomIntBetween(100, 200); + final int numDocs = randomIntBetween(10, 20); logger.info("--> Inserting documents {}", numDocs); for (int i = 0; i < numDocs; i++) { shards.index(new IndexRequest(index.getName()).id(String.valueOf(i)).source("{\"foo\": \"bar\"}", XContentType.JSON)); @@ -284,137 +353,9 @@ public void testSegmentReplication_With_EngineClosedConcurrently() throws Except } } - /** - * Verifies that commits on replica engine resulting from engine or reader close does not cleanup the temporary - * replication files from ongoing round of segment replication - */ - public void testTemporaryFilesNotCleanup() throws Exception { - String mappings = "{ \"" + MapperService.SINGLE_MAPPING_NAME + "\": { \"properties\": { \"foo\": { \"type\": \"keyword\"} }}}"; - try (ReplicationGroup shards = createGroup(1, settings, mappings, new NRTReplicationEngineFactory())) { - shards.startAll(); - IndexShard primaryShard = shards.getPrimary(); - final IndexShard replica = shards.getReplicas().get(0); - - // Step 1. Ingest numDocs documents, commit to create commit point on primary & replicate - final int numDocs = randomIntBetween(100, 200); - logger.info("--> Inserting documents {}", numDocs); - for (int i = 0; i < numDocs; i++) { - shards.index(new IndexRequest(index.getName()).id(String.valueOf(i)).source("{\"foo\": \"bar\"}", XContentType.JSON)); - } - assertEqualTranslogOperations(shards, primaryShard); - primaryShard.flush(new FlushRequest().waitIfOngoing(true).force(true)); - replicateSegments(primaryShard, shards.getReplicas()); - shards.assertAllEqual(numDocs); - - // Step 2. Ingest numDocs documents again to create a new commit on primary - logger.info("--> Ingest {} docs again", numDocs); - for (int i = 0; i < numDocs; i++) { - shards.index(new IndexRequest(index.getName()).id(String.valueOf(i)).source("{\"foo\": \"bar\"}", XContentType.JSON)); - } - assertEqualTranslogOperations(shards, primaryShard); - primaryShard.flush(new FlushRequest().waitIfOngoing(true).force(true)); - - // Step 3. Copy segment files to replica shard but prevent commit - final CountDownLatch countDownLatch = new CountDownLatch(1); - Map primaryMetadata; - try (final GatedCloseable segmentInfosSnapshot = primaryShard.getSegmentInfosSnapshot()) { - final SegmentInfos primarySegmentInfos = segmentInfosSnapshot.get(); - primaryMetadata = primaryShard.store().getSegmentMetadataMap(primarySegmentInfos); - } - final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); - final IndicesService indicesService = mock(IndicesService.class); - when(indicesService.getShardOrNull(replica.shardId)).thenReturn(replica); - final SegmentReplicationTargetService targetService = new SegmentReplicationTargetService( - threadPool, - new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), - mock(TransportService.class), - sourceFactory, - indicesService, - clusterService - ); - final Consumer runnablePostGetFiles = (indexShard) -> { - try { - Collection temporaryFiles = Stream.of(indexShard.store().directory().listAll()) - .filter(name -> name.startsWith(SegmentReplicationTarget.REPLICATION_PREFIX)) - .collect(Collectors.toList()); - - // Step 4. Perform a commit on replica shard. - NRTReplicationEngine engine = (NRTReplicationEngine) indexShard.getEngine(); - engine.updateSegments(engine.getSegmentInfosSnapshot().get()); - - // Step 5. Validate temporary files are not deleted from store. - Collection replicaStoreFiles = List.of(indexShard.store().directory().listAll()); - assertTrue(replicaStoreFiles.containsAll(temporaryFiles)); - } catch (IOException e) { - throw new RuntimeException(e); - } - }; - SegmentReplicationSource segmentReplicationSource = getSegmentReplicationSource( - primaryShard, - (repId) -> targetService.get(repId), - runnablePostGetFiles - ); - when(sourceFactory.get(any())).thenReturn(segmentReplicationSource); - targetService.startReplication(replica, getTargetListener(primaryShard, replica, primaryMetadata, countDownLatch)); - countDownLatch.await(30, TimeUnit.SECONDS); - assertEquals("Replication failed", 0, countDownLatch.getCount()); - shards.assertAllEqual(numDocs); - } - } - - public void testSegmentReplication_Index_Update_Delete() throws Exception { - String mappings = "{ \"" + MapperService.SINGLE_MAPPING_NAME + "\": { \"properties\": { \"foo\": { \"type\": \"keyword\"} }}}"; - try (ReplicationGroup shards = createGroup(2, settings, mappings, new NRTReplicationEngineFactory())) { - shards.startAll(); - final IndexShard primaryShard = shards.getPrimary(); - - final int numDocs = randomIntBetween(100, 200); - for (int i = 0; i < numDocs; i++) { - shards.index(new IndexRequest(index.getName()).id(String.valueOf(i)).source("{\"foo\": \"bar\"}", XContentType.JSON)); - } - - assertEqualTranslogOperations(shards, primaryShard); - primaryShard.refresh("Test"); - replicateSegments(primaryShard, shards.getReplicas()); - - shards.assertAllEqual(numDocs); - - for (int i = 0; i < numDocs; i++) { - // randomly update docs. - if (randomBoolean()) { - shards.index( - new IndexRequest(index.getName()).id(String.valueOf(i)).source("{ \"foo\" : \"baz\" }", XContentType.JSON) - ); - } - } - assertEqualTranslogOperations(shards, primaryShard); - primaryShard.refresh("Test"); - replicateSegments(primaryShard, shards.getReplicas()); - shards.assertAllEqual(numDocs); - - final List docs = getDocIdAndSeqNos(primaryShard); - for (IndexShard shard : shards.getReplicas()) { - assertEquals(getDocIdAndSeqNos(shard), docs); - } - for (int i = 0; i < numDocs; i++) { - // randomly delete. - if (randomBoolean()) { - shards.delete(new DeleteRequest(index.getName()).id(String.valueOf(i))); - } - } - assertEqualTranslogOperations(shards, primaryShard); - primaryShard.refresh("Test"); - replicateSegments(primaryShard, shards.getReplicas()); - final List docsAfterDelete = getDocIdAndSeqNos(primaryShard); - for (IndexShard shard : shards.getReplicas()) { - assertEquals(getDocIdAndSeqNos(shard), docsAfterDelete); - } - } - } - public void testIgnoreShardIdle() throws Exception { Settings updatedSettings = Settings.builder() - .put(settings) + .put(getIndexSettings()) .put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO) .build(); try (ReplicationGroup shards = createGroup(1, updatedSettings, new NRTReplicationEngineFactory())) { @@ -464,7 +405,7 @@ public void testShardIdle_Docrep() throws Exception { public void testShardIdleWithNoReplicas() throws Exception { Settings updatedSettings = Settings.builder() - .put(settings) + .put(getIndexSettings()) .put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO) .build(); try (ReplicationGroup shards = createGroup(0, updatedSettings, new NRTReplicationEngineFactory())) { @@ -544,132 +485,10 @@ public void testRejectCheckpointOnShardRoutingPrimary() throws IOException { closeShards(primaryShard); } - public void testReplicaReceivesGenIncrease() throws Exception { - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { - shards.startAll(); - final IndexShard primary = shards.getPrimary(); - final IndexShard replica = shards.getReplicas().get(0); - final int numDocs = randomIntBetween(10, 100); - shards.indexDocs(numDocs); - assertEquals(numDocs, primary.translogStats().estimatedNumberOfOperations()); - assertEquals(numDocs, replica.translogStats().estimatedNumberOfOperations()); - assertEquals(numDocs, primary.translogStats().getUncommittedOperations()); - assertEquals(numDocs, replica.translogStats().getUncommittedOperations()); - flushShard(primary, true); - replicateSegments(primary, shards.getReplicas()); - assertEquals(0, primary.translogStats().estimatedNumberOfOperations()); - assertEquals(0, replica.translogStats().estimatedNumberOfOperations()); - assertEquals(0, primary.translogStats().getUncommittedOperations()); - assertEquals(0, replica.translogStats().getUncommittedOperations()); - - final int additionalDocs = shards.indexDocs(randomIntBetween(numDocs + 1, numDocs + 10)); - - final int totalDocs = numDocs + additionalDocs; - primary.refresh("test"); - replicateSegments(primary, shards.getReplicas()); - assertEquals(additionalDocs, primary.translogStats().estimatedNumberOfOperations()); - assertEquals(additionalDocs, replica.translogStats().estimatedNumberOfOperations()); - assertEquals(additionalDocs, primary.translogStats().getUncommittedOperations()); - assertEquals(additionalDocs, replica.translogStats().getUncommittedOperations()); - flushShard(primary, true); - replicateSegments(primary, shards.getReplicas()); - - assertEqualCommittedSegments(primary, replica); - assertDocCount(primary, totalDocs); - assertDocCount(replica, totalDocs); - assertEquals(0, primary.translogStats().estimatedNumberOfOperations()); - assertEquals(0, replica.translogStats().estimatedNumberOfOperations()); - assertEquals(0, primary.translogStats().getUncommittedOperations()); - assertEquals(0, replica.translogStats().getUncommittedOperations()); - } - } - - public void testPrimaryRelocation() throws Exception { - final IndexShard primarySource = newStartedShard(true, settings); - int totalOps = randomInt(10); - for (int i = 0; i < totalOps; i++) { - indexDoc(primarySource, "_doc", Integer.toString(i)); - } - IndexShardTestCase.updateRoutingEntry(primarySource, primarySource.routingEntry().relocate(randomAlphaOfLength(10), -1)); - final IndexShard primaryTarget = newShard( - primarySource.routingEntry().getTargetRelocatingShard(), - settings, - new NRTReplicationEngineFactory() - ); - updateMappings(primaryTarget, primarySource.indexSettings().getIndexMetadata()); - - Function, List> replicatePrimaryFunction = (shardList) -> { - try { - assert shardList.size() >= 2; - final IndexShard primary = shardList.get(0); - return replicateSegments(primary, shardList.subList(1, shardList.size())); - } catch (IOException | InterruptedException e) { - throw new RuntimeException(e); - } - }; - recoverReplica(primaryTarget, primarySource, true, replicatePrimaryFunction); - - // check that local checkpoint of new primary is properly tracked after primary relocation - assertThat(primaryTarget.getLocalCheckpoint(), equalTo(totalOps - 1L)); - assertThat( - primaryTarget.getReplicationTracker() - .getTrackedLocalCheckpointForShard(primaryTarget.routingEntry().allocationId().getId()) - .getLocalCheckpoint(), - equalTo(totalOps - 1L) - ); - assertDocCount(primaryTarget, totalOps); - closeShards(primarySource, primaryTarget); - } - - public void testPrimaryRelocationWithSegRepFailure() throws Exception { - final IndexShard primarySource = newStartedShard(true, settings); - int totalOps = randomInt(10); - for (int i = 0; i < totalOps; i++) { - indexDoc(primarySource, "_doc", Integer.toString(i)); - } - IndexShardTestCase.updateRoutingEntry(primarySource, primarySource.routingEntry().relocate(randomAlphaOfLength(10), -1)); - final IndexShard primaryTarget = newShard( - primarySource.routingEntry().getTargetRelocatingShard(), - settings, - new NRTReplicationEngineFactory() - ); - updateMappings(primaryTarget, primarySource.indexSettings().getIndexMetadata()); - - Function, List> replicatePrimaryFunction = (shardList) -> { - try { - throw new IOException("Expected failure"); - } catch (IOException e) { - throw new RuntimeException(e); - } - }; - Exception e = expectThrows( - Exception.class, - () -> recoverReplica( - primaryTarget, - primarySource, - (primary, sourceNode) -> new RecoveryTarget(primary, sourceNode, new ReplicationListener() { - @Override - public void onDone(ReplicationState state) { - throw new AssertionError("recovery must fail"); - } - - @Override - public void onFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { - assertEquals(ExceptionsHelper.unwrap(e, IOException.class).getMessage(), "Expected failure"); - } - }), - true, - true, - replicatePrimaryFunction - ) - ); - closeShards(primarySource, primaryTarget); - } - // Todo: Remove this test when there is a better mechanism to test a functionality passing in different replication // strategy. public void testLockingBeforeAndAfterRelocated() throws Exception { - final IndexShard shard = newStartedShard(true, settings); + final IndexShard shard = newStartedShard(true, getIndexSettings()); final ShardRouting routing = ShardRoutingHelper.relocate(shard.routingEntry(), "other_node"); IndexShardTestCase.updateRoutingEntry(shard, routing); CountDownLatch latch = new CountDownLatch(1); @@ -702,7 +521,7 @@ public void testLockingBeforeAndAfterRelocated() throws Exception { // Todo: Remove this test when there is a better mechanism to test a functionality passing in different replication // strategy. public void testDelayedOperationsBeforeAndAfterRelocated() throws Exception { - final IndexShard shard = newStartedShard(true, settings); + final IndexShard shard = newStartedShard(true, getIndexSettings()); final ShardRouting routing = ShardRoutingHelper.relocate(shard.routingEntry(), "other_node"); IndexShardTestCase.updateRoutingEntry(shard, routing); final CountDownLatch startRecovery = new CountDownLatch(1); @@ -776,428 +595,8 @@ public void onFailure(Exception e) { closeShards(shard); } - public void testReplicaReceivesLowerGeneration() throws Exception { - // when a replica gets incoming segments that are lower than what it currently has on disk. - - // start 3 nodes Gens: P [2], R [2], R[2] - // index some docs and flush twice, push to only 1 replica. - // State Gens: P [4], R-1 [3], R-2 [2] - // Promote R-2 as the new primary and demote the old primary. - // State Gens: R[4], R-1 [3], P [4] - *commit on close of NRTEngine, xlog replayed and commit made. - // index docs on new primary and flush - // replicate to all. - // Expected result: State Gens: P[4], R-1 [4], R-2 [4] - try (ReplicationGroup shards = createGroup(2, settings, new NRTReplicationEngineFactory())) { - shards.startAll(); - final IndexShard primary = shards.getPrimary(); - final IndexShard replica_1 = shards.getReplicas().get(0); - final IndexShard replica_2 = shards.getReplicas().get(1); - int numDocs = randomIntBetween(10, 100); - shards.indexDocs(numDocs); - flushShard(primary, false); - replicateSegments(primary, List.of(replica_1)); - numDocs = randomIntBetween(numDocs + 1, numDocs + 10); - shards.indexDocs(numDocs); - flushShard(primary, false); - replicateSegments(primary, List.of(replica_1)); - - assertEqualCommittedSegments(primary, replica_1); - - shards.promoteReplicaToPrimary(replica_2).get(); - primary.close("demoted", false, false); - primary.store().close(); - IndexShard oldPrimary = shards.addReplicaWithExistingPath(primary.shardPath(), primary.routingEntry().currentNodeId()); - shards.recoverReplica(oldPrimary); - - numDocs = randomIntBetween(numDocs + 1, numDocs + 10); - shards.indexDocs(numDocs); - flushShard(replica_2, false); - replicateSegments(replica_2, shards.getReplicas()); - assertEqualCommittedSegments(replica_2, oldPrimary, replica_1); - } - } - - public void testReplicaRestarts() throws Exception { - try (ReplicationGroup shards = createGroup(3, settings, new NRTReplicationEngineFactory())) { - shards.startAll(); - IndexShard primary = shards.getPrimary(); - // 1. Create ops that are in the index and xlog of both shards but not yet part of a commit point. - final int numDocs = shards.indexDocs(randomInt(10)); - - // refresh and copy the segments over. - if (randomBoolean()) { - flushShard(primary); - } - primary.refresh("Test"); - replicateSegments(primary, shards.getReplicas()); - - // at this point both shards should have numDocs persisted and searchable. - assertDocCounts(primary, numDocs, numDocs); - for (IndexShard shard : shards.getReplicas()) { - assertDocCounts(shard, numDocs, numDocs); - } - - final int i1 = randomInt(5); - for (int i = 0; i < i1; i++) { - shards.indexDocs(randomInt(10)); - - // randomly resetart a replica - final IndexShard replicaToRestart = getRandomReplica(shards); - replicaToRestart.close("restart", false, false); - replicaToRestart.store().close(); - shards.removeReplica(replicaToRestart); - final IndexShard newReplica = shards.addReplicaWithExistingPath( - replicaToRestart.shardPath(), - replicaToRestart.routingEntry().currentNodeId() - ); - shards.recoverReplica(newReplica); - - // refresh and push segments to our other replicas. - if (randomBoolean()) { - failAndPromoteRandomReplica(shards); - } - flushShard(shards.getPrimary()); - replicateSegments(shards.getPrimary(), shards.getReplicas()); - } - primary = shards.getPrimary(); - - // refresh and push segments to our other replica. - flushShard(primary); - replicateSegments(primary, shards.getReplicas()); - - for (IndexShard shard : shards) { - assertConsistentHistoryBetweenTranslogAndLucene(shard); - } - final List docsAfterReplication = getDocIdAndSeqNos(shards.getPrimary()); - for (IndexShard shard : shards.getReplicas()) { - assertThat(shard.routingEntry().toString(), getDocIdAndSeqNos(shard), equalTo(docsAfterReplication)); - } - } - } - - public void testNRTReplicaWithRemoteStorePromotedAsPrimaryRefreshRefresh() throws Exception { - testNRTReplicaWithRemoteStorePromotedAsPrimary(false, false); - } - - public void testNRTReplicaWithRemoteStorePromotedAsPrimaryRefreshCommit() throws Exception { - testNRTReplicaWithRemoteStorePromotedAsPrimary(false, true); - } - - public void testNRTReplicaWithRemoteStorePromotedAsPrimaryCommitRefresh() throws Exception { - testNRTReplicaWithRemoteStorePromotedAsPrimary(true, false); - } - - public void testNRTReplicaWithRemoteStorePromotedAsPrimaryCommitCommit() throws Exception { - testNRTReplicaWithRemoteStorePromotedAsPrimary(true, true); - } - - private void testNRTReplicaWithRemoteStorePromotedAsPrimary(boolean performFlushFirst, boolean performFlushSecond) throws Exception { - Settings settings = Settings.builder() - .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) - .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) - .put(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, "temp-fs") - .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, "temp-fs") - .build(); - - try (ReplicationGroup shards = createGroup(1, settings, indexMapping, new NRTReplicationEngineFactory(), createTempDir())) { - shards.startAll(); - IndexShard oldPrimary = shards.getPrimary(); - final IndexShard nextPrimary = shards.getReplicas().get(0); - - // 1. Create ops that are in the index and xlog of both shards but not yet part of a commit point. - final int numDocs = shards.indexDocs(randomInt(10)); - - // refresh but do not copy the segments over. - if (performFlushFirst) { - flushShard(oldPrimary, true); - } else { - oldPrimary.refresh("Test"); - } - // replicateSegments(primary, shards.getReplicas()); - - // at this point both shards should have numDocs persisted and searchable. - assertDocCounts(oldPrimary, numDocs, numDocs); - for (IndexShard shard : shards.getReplicas()) { - assertDocCounts(shard, numDocs, 0); - } - - // 2. Create ops that are in the replica's xlog, not in the index. - // index some more into both but don't replicate. replica will have only numDocs searchable, but should have totalDocs - // persisted. - final int additonalDocs = shards.indexDocs(randomInt(10)); - final int totalDocs = numDocs + additonalDocs; - - if (performFlushSecond) { - flushShard(oldPrimary, true); - } else { - oldPrimary.refresh("Test"); - } - assertDocCounts(oldPrimary, totalDocs, totalDocs); - for (IndexShard shard : shards.getReplicas()) { - assertDocCounts(shard, totalDocs, 0); - } - assertTrue(nextPrimary.translogStats().estimatedNumberOfOperations() >= additonalDocs); - assertTrue(nextPrimary.translogStats().getUncommittedOperations() >= additonalDocs); - - int prevOperationCount = nextPrimary.translogStats().estimatedNumberOfOperations(); - - // promote the replica - shards.promoteReplicaToPrimary(nextPrimary).get(); - - // close oldPrimary. - oldPrimary.close("demoted", false, false); - oldPrimary.store().close(); - - assertEquals(InternalEngine.class, nextPrimary.getEngine().getClass()); - assertDocCounts(nextPrimary, totalDocs, totalDocs); - - // As we are downloading segments from remote segment store on failover, there should not be - // any operations replayed from translog - assertEquals(prevOperationCount, nextPrimary.translogStats().estimatedNumberOfOperations()); - - // refresh and push segments to our other replica. - nextPrimary.refresh("test"); - - for (IndexShard shard : shards) { - assertConsistentHistoryBetweenTranslogAndLucene(shard); - } - final List docsAfterRecovery = getDocIdAndSeqNos(shards.getPrimary()); - for (IndexShard shard : shards.getReplicas()) { - assertThat(shard.routingEntry().toString(), getDocIdAndSeqNos(shard), equalTo(docsAfterRecovery)); - } - } - } - - public void testNRTReplicaPromotedAsPrimary() throws Exception { - try (ReplicationGroup shards = createGroup(2, settings, new NRTReplicationEngineFactory())) { - shards.startAll(); - IndexShard oldPrimary = shards.getPrimary(); - final IndexShard nextPrimary = shards.getReplicas().get(0); - final IndexShard replica = shards.getReplicas().get(1); - - // 1. Create ops that are in the index and xlog of both shards but not yet part of a commit point. - final int numDocs = shards.indexDocs(randomInt(10)); - - // refresh and copy the segments over. - oldPrimary.refresh("Test"); - replicateSegments(oldPrimary, shards.getReplicas()); - - // at this point both shards should have numDocs persisted and searchable. - assertDocCounts(oldPrimary, numDocs, numDocs); - for (IndexShard shard : shards.getReplicas()) { - assertDocCounts(shard, numDocs, numDocs); - } - assertEqualTranslogOperations(shards, oldPrimary); - - // 2. Create ops that are in the replica's xlog, not in the index. - // index some more into both but don't replicate. replica will have only numDocs searchable, but should have totalDocs - // persisted. - final int additonalDocs = shards.indexDocs(randomInt(10)); - final int totalDocs = numDocs + additonalDocs; - - assertDocCounts(oldPrimary, totalDocs, totalDocs); - assertEqualTranslogOperations(shards, oldPrimary); - for (IndexShard shard : shards.getReplicas()) { - assertDocCounts(shard, totalDocs, numDocs); - } - assertEquals(totalDocs, oldPrimary.translogStats().estimatedNumberOfOperations()); - assertEquals(totalDocs, oldPrimary.translogStats().estimatedNumberOfOperations()); - assertEquals(totalDocs, nextPrimary.translogStats().estimatedNumberOfOperations()); - assertEquals(totalDocs, replica.translogStats().estimatedNumberOfOperations()); - assertEquals(totalDocs, nextPrimary.translogStats().getUncommittedOperations()); - assertEquals(totalDocs, replica.translogStats().getUncommittedOperations()); - - // promote the replica - shards.syncGlobalCheckpoint(); - shards.promoteReplicaToPrimary(nextPrimary); - - // close and start the oldPrimary as a replica. - oldPrimary.close("demoted", false, false); - oldPrimary.store().close(); - oldPrimary = shards.addReplicaWithExistingPath(oldPrimary.shardPath(), oldPrimary.routingEntry().currentNodeId()); - shards.recoverReplica(oldPrimary); - - assertEquals(NRTReplicationEngine.class, oldPrimary.getEngine().getClass()); - assertEquals(InternalEngine.class, nextPrimary.getEngine().getClass()); - assertDocCounts(nextPrimary, totalDocs, totalDocs); - assertEquals(0, nextPrimary.translogStats().estimatedNumberOfOperations()); - - // refresh and push segments to our other replica. - nextPrimary.refresh("test"); - replicateSegments(nextPrimary, asList(replica)); - - for (IndexShard shard : shards) { - assertConsistentHistoryBetweenTranslogAndLucene(shard); - } - final List docsAfterRecovery = getDocIdAndSeqNos(shards.getPrimary()); - for (IndexShard shard : shards.getReplicas()) { - assertThat(shard.routingEntry().toString(), getDocIdAndSeqNos(shard), equalTo(docsAfterRecovery)); - } - } - } - - public void testReplicaPromotedWhileReplicating() throws Exception { - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { - shards.startAll(); - final IndexShard oldPrimary = shards.getPrimary(); - final IndexShard nextPrimary = shards.getReplicas().get(0); - - final int numDocs = shards.indexDocs(randomInt(10)); - oldPrimary.refresh("Test"); - shards.syncGlobalCheckpoint(); - - final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); - final SegmentReplicationTargetService targetService = newTargetService(sourceFactory); - SegmentReplicationSource source = new TestReplicationSource() { - @Override - public void getCheckpointMetadata( - long replicationId, - ReplicationCheckpoint checkpoint, - ActionListener listener - ) { - resolveCheckpointInfoResponseListener(listener, oldPrimary); - ShardRouting oldRouting = nextPrimary.shardRouting; - try { - shards.promoteReplicaToPrimary(nextPrimary); - } catch (IOException e) { - Assert.fail("Promotion should not fail"); - } - targetService.shardRoutingChanged(nextPrimary, oldRouting, nextPrimary.shardRouting); - } - - @Override - public void getSegmentFiles( - long replicationId, - ReplicationCheckpoint checkpoint, - List filesToFetch, - IndexShard indexShard, - ActionListener listener - ) { - listener.onResponse(new GetSegmentFilesResponse(Collections.emptyList())); - } - }; - when(sourceFactory.get(any())).thenReturn(source); - startReplicationAndAssertCancellation(nextPrimary, targetService); - // wait for replica to finish being promoted, and assert doc counts. - final CountDownLatch latch = new CountDownLatch(1); - nextPrimary.acquirePrimaryOperationPermit(new ActionListener<>() { - @Override - public void onResponse(Releasable releasable) { - latch.countDown(); - } - - @Override - public void onFailure(Exception e) { - throw new AssertionError(e); - } - }, ThreadPool.Names.GENERIC, ""); - latch.await(); - assertEquals(nextPrimary.getEngine().getClass(), InternalEngine.class); - nextPrimary.refresh("test"); - - oldPrimary.close("demoted", false, false); - oldPrimary.store().close(); - IndexShard newReplica = shards.addReplicaWithExistingPath(oldPrimary.shardPath(), oldPrimary.routingEntry().currentNodeId()); - shards.recoverReplica(newReplica); - - assertDocCount(nextPrimary, numDocs); - assertDocCount(newReplica, numDocs); - - nextPrimary.refresh("test"); - replicateSegments(nextPrimary, shards.getReplicas()); - final List docsAfterRecovery = getDocIdAndSeqNos(shards.getPrimary()); - for (IndexShard shard : shards.getReplicas()) { - assertThat(shard.routingEntry().toString(), getDocIdAndSeqNos(shard), equalTo(docsAfterRecovery)); - } - } - } - - public void testReplicaClosesWhileReplicating_AfterGetCheckpoint() throws Exception { - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { - shards.startAll(); - IndexShard primary = shards.getPrimary(); - final IndexShard replica = shards.getReplicas().get(0); - - final int numDocs = shards.indexDocs(randomInt(10)); - primary.refresh("Test"); - - final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); - final SegmentReplicationTargetService targetService = newTargetService(sourceFactory); - SegmentReplicationSource source = new TestReplicationSource() { - @Override - public void getCheckpointMetadata( - long replicationId, - ReplicationCheckpoint checkpoint, - ActionListener listener - ) { - // trigger a cancellation by closing the replica. - targetService.beforeIndexShardClosed(replica.shardId, replica, Settings.EMPTY); - resolveCheckpointInfoResponseListener(listener, primary); - } - - @Override - public void getSegmentFiles( - long replicationId, - ReplicationCheckpoint checkpoint, - List filesToFetch, - IndexShard indexShard, - ActionListener listener - ) { - Assert.fail("Should not be reached"); - } - }; - when(sourceFactory.get(any())).thenReturn(source); - startReplicationAndAssertCancellation(replica, targetService); - - shards.removeReplica(replica); - closeShards(replica); - } - } - - public void testReplicaClosesWhileReplicating_AfterGetSegmentFiles() throws Exception { - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { - shards.startAll(); - IndexShard primary = shards.getPrimary(); - final IndexShard replica = shards.getReplicas().get(0); - - final int numDocs = shards.indexDocs(randomInt(10)); - primary.refresh("Test"); - - final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); - final SegmentReplicationTargetService targetService = newTargetService(sourceFactory); - SegmentReplicationSource source = new TestReplicationSource() { - @Override - public void getCheckpointMetadata( - long replicationId, - ReplicationCheckpoint checkpoint, - ActionListener listener - ) { - resolveCheckpointInfoResponseListener(listener, primary); - } - - @Override - public void getSegmentFiles( - long replicationId, - ReplicationCheckpoint checkpoint, - List filesToFetch, - IndexShard indexShard, - ActionListener listener - ) { - // randomly resolve the listener, indicating the source has resolved. - listener.onResponse(new GetSegmentFilesResponse(Collections.emptyList())); - targetService.beforeIndexShardClosed(replica.shardId, replica, Settings.EMPTY); - } - }; - when(sourceFactory.get(any())).thenReturn(source); - startReplicationAndAssertCancellation(replica, targetService); - - shards.removeReplica(replica); - closeShards(replica); - } - } - public void testCloseShardDuringFinalize() throws Exception { - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { + try (ReplicationGroup shards = createGroup(1, getIndexSettings(), new NRTReplicationEngineFactory())) { shards.startAll(); IndexShard primary = shards.getPrimary(); final IndexShard replica = shards.getReplicas().get(0); @@ -1211,60 +610,8 @@ public void testCloseShardDuringFinalize() throws Exception { } } - public void testCloseShardWhileGettingCheckpoint() throws Exception { - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { - shards.startAll(); - IndexShard primary = shards.getPrimary(); - final IndexShard replica = shards.getReplicas().get(0); - - primary.refresh("Test"); - - final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); - final SegmentReplicationTargetService targetService = newTargetService(sourceFactory); - SegmentReplicationSource source = new TestReplicationSource() { - - ActionListener listener; - - @Override - public void getCheckpointMetadata( - long replicationId, - ReplicationCheckpoint checkpoint, - ActionListener listener - ) { - // set the listener, we will only fail it once we cancel the source. - this.listener = listener; - // shard is closing while we are copying files. - targetService.beforeIndexShardClosed(replica.shardId, replica, Settings.EMPTY); - } - - @Override - public void getSegmentFiles( - long replicationId, - ReplicationCheckpoint checkpoint, - List filesToFetch, - IndexShard indexShard, - ActionListener listener - ) { - Assert.fail("Unreachable"); - } - - @Override - public void cancel() { - // simulate listener resolving, but only after we have issued a cancel from beforeIndexShardClosed . - final RuntimeException exception = new CancellableThreads.ExecutionCancelledException("retryable action was cancelled"); - listener.onFailure(exception); - } - }; - when(sourceFactory.get(any())).thenReturn(source); - startReplicationAndAssertCancellation(replica, targetService); - - shards.removeReplica(replica); - closeShards(replica); - } - } - public void testBeforeIndexShardClosedWhileCopyingFiles() throws Exception { - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { + try (ReplicationGroup shards = createGroup(1, getIndexSettings(), new NRTReplicationEngineFactory())) { shards.startAll(); IndexShard primary = shards.getPrimary(); final IndexShard replica = shards.getReplicas().get(0); @@ -1315,45 +662,7 @@ public void cancel() { } } - public void testPrimaryCancelsExecution() throws Exception { - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { - shards.startAll(); - IndexShard primary = shards.getPrimary(); - final IndexShard replica = shards.getReplicas().get(0); - - final int numDocs = shards.indexDocs(randomInt(10)); - primary.refresh("Test"); - - final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); - final SegmentReplicationTargetService targetService = newTargetService(sourceFactory); - SegmentReplicationSource source = new TestReplicationSource() { - @Override - public void getCheckpointMetadata( - long replicationId, - ReplicationCheckpoint checkpoint, - ActionListener listener - ) { - listener.onFailure(new CancellableThreads.ExecutionCancelledException("Cancelled")); - } - - @Override - public void getSegmentFiles( - long replicationId, - ReplicationCheckpoint checkpoint, - List filesToFetch, - IndexShard indexShard, - ActionListener listener - ) {} - }; - when(sourceFactory.get(any())).thenReturn(source); - startReplicationAndAssertCancellation(replica, targetService); - - shards.removeReplica(replica); - closeShards(replica); - } - } - - private SegmentReplicationTargetService newTargetService(SegmentReplicationSourceFactory sourceFactory) { + protected SegmentReplicationTargetService newTargetService(SegmentReplicationSourceFactory sourceFactory) { return new SegmentReplicationTargetService( threadPool, new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), @@ -1368,14 +677,15 @@ private SegmentReplicationTargetService newTargetService(SegmentReplicationSourc * Assert persisted and searchable doc counts. This method should not be used while docs are concurrently indexed because * it asserts point in time seqNos are relative to the doc counts. */ - private void assertDocCounts(IndexShard indexShard, int expectedPersistedDocCount, int expectedSearchableDocCount) throws IOException { + protected void assertDocCounts(IndexShard indexShard, int expectedPersistedDocCount, int expectedSearchableDocCount) + throws IOException { assertDocCount(indexShard, expectedSearchableDocCount); // assigned seqNos start at 0, so assert max & local seqNos are 1 less than our persisted doc count. assertEquals(expectedPersistedDocCount - 1, indexShard.seqNoStats().getMaxSeqNo()); assertEquals(expectedPersistedDocCount - 1, indexShard.seqNoStats().getLocalCheckpoint()); } - private void resolveCheckpointInfoResponseListener(ActionListener listener, IndexShard primary) { + protected void resolveCheckpointInfoResponseListener(ActionListener listener, IndexShard primary) { try { final CopyState copyState = new CopyState( ReplicationCheckpoint.empty(primary.shardId, primary.getLatestReplicationCheckpoint().getCodec()), @@ -1390,7 +700,7 @@ private void resolveCheckpointInfoResponseListener(ActionListener operations = new ArrayList<>(); Translog.Operation op; diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java new file mode 100644 index 0000000000000..69846fbbe1dd4 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java @@ -0,0 +1,697 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.shard; + +import org.apache.lucene.index.SegmentInfos; +import org.junit.Assert; +import org.opensearch.action.ActionListener; +import org.opensearch.action.admin.indices.flush.FlushRequest; +import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.common.concurrent.GatedCloseable; +import org.opensearch.common.lease.Releasable; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.CancellableThreads; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.index.engine.DocIdSeqNoAndSource; +import org.opensearch.index.engine.InternalEngine; +import org.opensearch.index.engine.NRTReplicationEngine; +import org.opensearch.index.engine.NRTReplicationEngineFactory; +import org.opensearch.index.mapper.MapperService; +import org.opensearch.index.replication.TestReplicationSource; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.IndicesService; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.indices.replication.CheckpointInfoResponse; +import org.opensearch.indices.replication.GetSegmentFilesResponse; +import org.opensearch.indices.replication.SegmentReplicationSource; +import org.opensearch.indices.replication.SegmentReplicationSourceFactory; +import org.opensearch.indices.replication.SegmentReplicationTarget; +import org.opensearch.indices.replication.SegmentReplicationTargetService; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static java.util.Arrays.asList; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class SegmentReplicationWithNodeToNodeIndexShardTests extends SegmentReplicationIndexShardTests { + + public void testReplicaClosesWhileReplicating_AfterGetSegmentFiles() throws Exception { + try (ReplicationGroup shards = createGroup(1, getIndexSettings(), new NRTReplicationEngineFactory())) { + shards.startAll(); + IndexShard primary = shards.getPrimary(); + final IndexShard replica = shards.getReplicas().get(0); + + final int numDocs = shards.indexDocs(randomInt(10)); + primary.refresh("Test"); + + final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); + final SegmentReplicationTargetService targetService = newTargetService(sourceFactory); + SegmentReplicationSource source = new TestReplicationSource() { + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + resolveCheckpointInfoResponseListener(listener, primary); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + IndexShard indexShard, + ActionListener listener + ) { + // randomly resolve the listener, indicating the source has resolved. + listener.onResponse(new GetSegmentFilesResponse(Collections.emptyList())); + targetService.beforeIndexShardClosed(replica.shardId, replica, Settings.EMPTY); + } + }; + when(sourceFactory.get(any())).thenReturn(source); + startReplicationAndAssertCancellation(replica, targetService); + + shards.removeReplica(replica); + closeShards(replica); + } + } + + public void testReplicaClosesWhileReplicating_AfterGetCheckpoint() throws Exception { + try (ReplicationGroup shards = createGroup(1, getIndexSettings(), new NRTReplicationEngineFactory())) { + shards.startAll(); + IndexShard primary = shards.getPrimary(); + final IndexShard replica = shards.getReplicas().get(0); + + final int numDocs = shards.indexDocs(randomInt(10)); + primary.refresh("Test"); + + final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); + final SegmentReplicationTargetService targetService = newTargetService(sourceFactory); + SegmentReplicationSource source = new TestReplicationSource() { + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + // trigger a cancellation by closing the replica. + targetService.beforeIndexShardClosed(replica.shardId, replica, Settings.EMPTY); + resolveCheckpointInfoResponseListener(listener, primary); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + IndexShard indexShard, + ActionListener listener + ) { + Assert.fail("Should not be reached"); + } + }; + when(sourceFactory.get(any())).thenReturn(source); + startReplicationAndAssertCancellation(replica, targetService); + + shards.removeReplica(replica); + closeShards(replica); + } + } + + public void testCloseShardWhileGettingCheckpoint() throws Exception { + try (ReplicationGroup shards = createGroup(1, getIndexSettings(), new NRTReplicationEngineFactory())) { + shards.startAll(); + IndexShard primary = shards.getPrimary(); + final IndexShard replica = shards.getReplicas().get(0); + + primary.refresh("Test"); + + final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); + final SegmentReplicationTargetService targetService = newTargetService(sourceFactory); + SegmentReplicationSource source = new TestReplicationSource() { + + ActionListener listener; + + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + // set the listener, we will only fail it once we cancel the source. + this.listener = listener; + // shard is closing while we are copying files. + targetService.beforeIndexShardClosed(replica.shardId, replica, Settings.EMPTY); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + IndexShard indexShard, + ActionListener listener + ) { + Assert.fail("Unreachable"); + } + + @Override + public void cancel() { + // simulate listener resolving, but only after we have issued a cancel from beforeIndexShardClosed . + final RuntimeException exception = new CancellableThreads.ExecutionCancelledException("retryable action was cancelled"); + listener.onFailure(exception); + } + }; + when(sourceFactory.get(any())).thenReturn(source); + startReplicationAndAssertCancellation(replica, targetService); + + shards.removeReplica(replica); + closeShards(replica); + } + } + + public void testPrimaryCancelsExecution() throws Exception { + try (ReplicationGroup shards = createGroup(1, getIndexSettings(), new NRTReplicationEngineFactory())) { + shards.startAll(); + IndexShard primary = shards.getPrimary(); + final IndexShard replica = shards.getReplicas().get(0); + + final int numDocs = shards.indexDocs(randomInt(10)); + primary.refresh("Test"); + + final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); + final SegmentReplicationTargetService targetService = newTargetService(sourceFactory); + SegmentReplicationSource source = new TestReplicationSource() { + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + listener.onFailure(new CancellableThreads.ExecutionCancelledException("Cancelled")); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + IndexShard indexShard, + ActionListener listener + ) {} + }; + when(sourceFactory.get(any())).thenReturn(source); + startReplicationAndAssertCancellation(replica, targetService); + + shards.removeReplica(replica); + closeShards(replica); + } + } + + public void testReplicaPromotedWhileReplicating() throws Exception { + try (ReplicationGroup shards = createGroup(1, getIndexSettings(), new NRTReplicationEngineFactory())) { + shards.startAll(); + final IndexShard oldPrimary = shards.getPrimary(); + final IndexShard nextPrimary = shards.getReplicas().get(0); + + final int numDocs = shards.indexDocs(randomInt(10)); + oldPrimary.refresh("Test"); + shards.syncGlobalCheckpoint(); + + final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); + final SegmentReplicationTargetService targetService = newTargetService(sourceFactory); + SegmentReplicationSource source = new TestReplicationSource() { + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + resolveCheckpointInfoResponseListener(listener, oldPrimary); + ShardRouting oldRouting = nextPrimary.shardRouting; + try { + shards.promoteReplicaToPrimary(nextPrimary); + } catch (IOException e) { + Assert.fail("Promotion should not fail"); + } + targetService.shardRoutingChanged(nextPrimary, oldRouting, nextPrimary.shardRouting); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + IndexShard indexShard, + ActionListener listener + ) { + listener.onResponse(new GetSegmentFilesResponse(Collections.emptyList())); + } + }; + when(sourceFactory.get(any())).thenReturn(source); + startReplicationAndAssertCancellation(nextPrimary, targetService); + // wait for replica to finish being promoted, and assert doc counts. + final CountDownLatch latch = new CountDownLatch(1); + nextPrimary.acquirePrimaryOperationPermit(new ActionListener<>() { + @Override + public void onResponse(Releasable releasable) { + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError(e); + } + }, ThreadPool.Names.GENERIC, ""); + latch.await(); + assertEquals(nextPrimary.getEngine().getClass(), InternalEngine.class); + nextPrimary.refresh("test"); + + oldPrimary.close("demoted", false, false); + oldPrimary.store().close(); + IndexShard newReplica = shards.addReplicaWithExistingPath(oldPrimary.shardPath(), oldPrimary.routingEntry().currentNodeId()); + shards.recoverReplica(newReplica); + + assertDocCount(nextPrimary, numDocs); + assertDocCount(newReplica, numDocs); + + nextPrimary.refresh("test"); + replicateSegments(nextPrimary, shards.getReplicas()); + final List docsAfterRecovery = getDocIdAndSeqNos(shards.getPrimary()); + for (IndexShard shard : shards.getReplicas()) { + assertThat(shard.routingEntry().toString(), getDocIdAndSeqNos(shard), equalTo(docsAfterRecovery)); + } + } + } + + public void testReplicaReceivesGenIncrease() throws Exception { + try (ReplicationGroup shards = createGroup(1, getIndexSettings(), new NRTReplicationEngineFactory())) { + shards.startAll(); + final IndexShard primary = shards.getPrimary(); + final IndexShard replica = shards.getReplicas().get(0); + final int numDocs = randomIntBetween(10, 100); + shards.indexDocs(numDocs); + assertEquals(numDocs, primary.translogStats().estimatedNumberOfOperations()); + assertEquals(numDocs, replica.translogStats().estimatedNumberOfOperations()); + assertEquals(numDocs, primary.translogStats().getUncommittedOperations()); + assertEquals(numDocs, replica.translogStats().getUncommittedOperations()); + flushShard(primary, true); + replicateSegments(primary, shards.getReplicas()); + assertEquals(0, primary.translogStats().estimatedNumberOfOperations()); + assertEquals(0, replica.translogStats().estimatedNumberOfOperations()); + assertEquals(0, primary.translogStats().getUncommittedOperations()); + assertEquals(0, replica.translogStats().getUncommittedOperations()); + + final int additionalDocs = shards.indexDocs(randomIntBetween(numDocs + 1, numDocs + 10)); + + final int totalDocs = numDocs + additionalDocs; + primary.refresh("test"); + replicateSegments(primary, shards.getReplicas()); + assertEquals(additionalDocs, primary.translogStats().estimatedNumberOfOperations()); + assertEquals(additionalDocs, replica.translogStats().estimatedNumberOfOperations()); + assertEquals(additionalDocs, primary.translogStats().getUncommittedOperations()); + assertEquals(additionalDocs, replica.translogStats().getUncommittedOperations()); + flushShard(primary, true); + replicateSegments(primary, shards.getReplicas()); + + assertEqualCommittedSegments(primary, replica); + assertDocCount(primary, totalDocs); + assertDocCount(replica, totalDocs); + assertEquals(0, primary.translogStats().estimatedNumberOfOperations()); + assertEquals(0, replica.translogStats().estimatedNumberOfOperations()); + assertEquals(0, primary.translogStats().getUncommittedOperations()); + assertEquals(0, replica.translogStats().getUncommittedOperations()); + } + } + + /** + * Verifies that commits on replica engine resulting from engine or reader close does not cleanup the temporary + * replication files from ongoing round of segment replication + * @throws Exception + */ + public void testTemporaryFilesNotCleanup() throws Exception { + String mappings = "{ \"" + MapperService.SINGLE_MAPPING_NAME + "\": { \"properties\": { \"foo\": { \"type\": \"keyword\"} }}}"; + try (ReplicationGroup shards = createGroup(1, getIndexSettings(), mappings, new NRTReplicationEngineFactory())) { + shards.startAll(); + IndexShard primaryShard = shards.getPrimary(); + final IndexShard replica = shards.getReplicas().get(0); + + // Step 1. Ingest numDocs documents, commit to create commit point on primary & replicate + final int numDocs = randomIntBetween(100, 200); + logger.info("--> Inserting documents {}", numDocs); + for (int i = 0; i < numDocs; i++) { + shards.index(new IndexRequest(index.getName()).id(String.valueOf(i)).source("{\"foo\": \"bar\"}", XContentType.JSON)); + } + assertEqualTranslogOperations(shards, primaryShard); + primaryShard.flush(new FlushRequest().waitIfOngoing(true).force(true)); + replicateSegments(primaryShard, shards.getReplicas()); + shards.assertAllEqual(numDocs); + + // Step 2. Ingest numDocs documents again to create a new commit on primary + logger.info("--> Ingest {} docs again", numDocs); + for (int i = 0; i < numDocs; i++) { + shards.index(new IndexRequest(index.getName()).id(String.valueOf(i)).source("{\"foo\": \"bar\"}", XContentType.JSON)); + } + assertEqualTranslogOperations(shards, primaryShard); + primaryShard.flush(new FlushRequest().waitIfOngoing(true).force(true)); + + // Step 3. Copy segment files to replica shard but prevent commit + final CountDownLatch countDownLatch = new CountDownLatch(1); + Map primaryMetadata; + try (final GatedCloseable segmentInfosSnapshot = primaryShard.getSegmentInfosSnapshot()) { + final SegmentInfos primarySegmentInfos = segmentInfosSnapshot.get(); + primaryMetadata = primaryShard.store().getSegmentMetadataMap(primarySegmentInfos); + } + final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); + final IndicesService indicesService = mock(IndicesService.class); + when(indicesService.getShardOrNull(replica.shardId)).thenReturn(replica); + final SegmentReplicationTargetService targetService = new SegmentReplicationTargetService( + threadPool, + new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), + mock(TransportService.class), + sourceFactory, + indicesService, + clusterService + ); + final Consumer runnablePostGetFiles = (indexShard) -> { + try { + Collection temporaryFiles = Stream.of(indexShard.store().directory().listAll()) + .filter(name -> name.startsWith(SegmentReplicationTarget.REPLICATION_PREFIX)) + .collect(Collectors.toList()); + + // Step 4. Perform a commit on replica shard. + NRTReplicationEngine engine = (NRTReplicationEngine) indexShard.getEngine(); + engine.updateSegments(engine.getSegmentInfosSnapshot().get()); + + // Step 5. Validate temporary files are not deleted from store. + Collection replicaStoreFiles = List.of(indexShard.store().directory().listAll()); + assertTrue(replicaStoreFiles.containsAll(temporaryFiles)); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + SegmentReplicationSource segmentReplicationSource = getSegmentReplicationSource( + primaryShard, + (repId) -> targetService.get(repId), + runnablePostGetFiles + ); + when(sourceFactory.get(any())).thenReturn(segmentReplicationSource); + targetService.startReplication(replica, getTargetListener(primaryShard, replica, primaryMetadata, countDownLatch)); + countDownLatch.await(30, TimeUnit.SECONDS); + assertEquals("Replication failed", 0, countDownLatch.getCount()); + shards.assertAllEqual(numDocs); + } + } + + // Todo: Move this test to SegmentReplicationIndexShardTests so that it runs for both node-node & remote store + public void testReplicaReceivesLowerGeneration() throws Exception { + // when a replica gets incoming segments that are lower than what it currently has on disk. + + // start 3 nodes Gens: P [2], R [2], R[2] + // index some docs and flush twice, push to only 1 replica. + // State Gens: P [4], R-1 [3], R-2 [2] + // Promote R-2 as the new primary and demote the old primary. + // State Gens: R[4], R-1 [3], P [4] - *commit on close of NRTEngine, xlog replayed and commit made. + // index docs on new primary and flush + // replicate to all. + // Expected result: State Gens: P[4], R-1 [4], R-2 [4] + try (ReplicationGroup shards = createGroup(2, getIndexSettings(), new NRTReplicationEngineFactory())) { + shards.startAll(); + final IndexShard primary = shards.getPrimary(); + final IndexShard replica_1 = shards.getReplicas().get(0); + final IndexShard replica_2 = shards.getReplicas().get(1); + int numDocs = randomIntBetween(10, 100); + shards.indexDocs(numDocs); + flushShard(primary, false); + replicateSegments(primary, List.of(replica_1)); + numDocs = randomIntBetween(numDocs + 1, numDocs + 10); + shards.indexDocs(numDocs); + flushShard(primary, false); + replicateSegments(primary, List.of(replica_1)); + + assertEqualCommittedSegments(primary, replica_1); + + shards.promoteReplicaToPrimary(replica_2).get(); + primary.close("demoted", false, false); + primary.store().close(); + IndexShard oldPrimary = shards.addReplicaWithExistingPath(primary.shardPath(), primary.routingEntry().currentNodeId()); + shards.recoverReplica(oldPrimary); + + numDocs = randomIntBetween(numDocs + 1, numDocs + 10); + shards.indexDocs(numDocs); + flushShard(replica_2, false); + replicateSegments(replica_2, shards.getReplicas()); + assertEqualCommittedSegments(replica_2, oldPrimary, replica_1); + } + } + + // Todo: Move this test to SegmentReplicationIndexShardTests so that it runs for both node-node & remote store + public void testPrimaryRelocation() throws Exception { + final IndexShard primarySource = newStartedShard(true, getIndexSettings()); + int totalOps = randomInt(10); + for (int i = 0; i < totalOps; i++) { + indexDoc(primarySource, "_doc", Integer.toString(i)); + } + IndexShardTestCase.updateRoutingEntry(primarySource, primarySource.routingEntry().relocate(randomAlphaOfLength(10), -1)); + final IndexShard primaryTarget = newShard( + primarySource.routingEntry().getTargetRelocatingShard(), + getIndexSettings(), + new NRTReplicationEngineFactory() + ); + updateMappings(primaryTarget, primarySource.indexSettings().getIndexMetadata()); + + Function, List> replicatePrimaryFunction = (shardList) -> { + try { + assert shardList.size() >= 2; + final IndexShard primary = shardList.get(0); + return replicateSegments(primary, shardList.subList(1, shardList.size())); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + }; + recoverReplica(primaryTarget, primarySource, true, replicatePrimaryFunction); + + // check that local checkpoint of new primary is properly tracked after primary relocation + assertThat(primaryTarget.getLocalCheckpoint(), equalTo(totalOps - 1L)); + assertThat( + primaryTarget.getReplicationTracker() + .getTrackedLocalCheckpointForShard(primaryTarget.routingEntry().allocationId().getId()) + .getLocalCheckpoint(), + equalTo(totalOps - 1L) + ); + assertDocCount(primaryTarget, totalOps); + closeShards(primarySource, primaryTarget); + } + + // Todo: Move this test to SegmentReplicationIndexShardTests so that it runs for both node-node & remote store + public void testNRTReplicaPromotedAsPrimary() throws Exception { + try (ReplicationGroup shards = createGroup(2, getIndexSettings(), new NRTReplicationEngineFactory())) { + shards.startAll(); + IndexShard oldPrimary = shards.getPrimary(); + final IndexShard nextPrimary = shards.getReplicas().get(0); + final IndexShard replica = shards.getReplicas().get(1); + + // 1. Create ops that are in the index and xlog of both shards but not yet part of a commit point. + final int numDocs = shards.indexDocs(randomInt(10)); + + // refresh and copy the segments over. + oldPrimary.refresh("Test"); + replicateSegments(oldPrimary, shards.getReplicas()); + + // at this point both shards should have numDocs persisted and searchable. + assertDocCounts(oldPrimary, numDocs, numDocs); + for (IndexShard shard : shards.getReplicas()) { + assertDocCounts(shard, numDocs, numDocs); + } + assertEqualTranslogOperations(shards, oldPrimary); + + // 2. Create ops that are in the replica's xlog, not in the index. + // index some more into both but don't replicate. replica will have only numDocs searchable, but should have totalDocs + // persisted. + final int additonalDocs = shards.indexDocs(randomInt(10)); + final int totalDocs = numDocs + additonalDocs; + + assertDocCounts(oldPrimary, totalDocs, totalDocs); + assertEqualTranslogOperations(shards, oldPrimary); + for (IndexShard shard : shards.getReplicas()) { + assertDocCounts(shard, totalDocs, numDocs); + } + assertEquals(totalDocs, oldPrimary.translogStats().estimatedNumberOfOperations()); + assertEquals(totalDocs, oldPrimary.translogStats().estimatedNumberOfOperations()); + assertEquals(totalDocs, nextPrimary.translogStats().estimatedNumberOfOperations()); + assertEquals(totalDocs, replica.translogStats().estimatedNumberOfOperations()); + assertEquals(totalDocs, nextPrimary.translogStats().getUncommittedOperations()); + assertEquals(totalDocs, replica.translogStats().getUncommittedOperations()); + + // promote the replica + shards.syncGlobalCheckpoint(); + shards.promoteReplicaToPrimary(nextPrimary); + + // close and start the oldPrimary as a replica. + oldPrimary.close("demoted", false, false); + oldPrimary.store().close(); + oldPrimary = shards.addReplicaWithExistingPath(oldPrimary.shardPath(), oldPrimary.routingEntry().currentNodeId()); + shards.recoverReplica(oldPrimary); + + assertEquals(NRTReplicationEngine.class, oldPrimary.getEngine().getClass()); + assertEquals(InternalEngine.class, nextPrimary.getEngine().getClass()); + assertDocCounts(nextPrimary, totalDocs, totalDocs); + assertEquals(0, nextPrimary.translogStats().estimatedNumberOfOperations()); + + // refresh and push segments to our other replica. + nextPrimary.refresh("test"); + replicateSegments(nextPrimary, asList(replica)); + + for (IndexShard shard : shards) { + assertConsistentHistoryBetweenTranslogAndLucene(shard); + } + final List docsAfterRecovery = getDocIdAndSeqNos(shards.getPrimary()); + for (IndexShard shard : shards.getReplicas()) { + assertThat(shard.routingEntry().toString(), getDocIdAndSeqNos(shard), equalTo(docsAfterRecovery)); + } + } + } + + // Todo: Move this test to SegmentReplicationIndexShardTests so that it runs for both node-node & remote store + public void testReplicaRestarts() throws Exception { + try (ReplicationGroup shards = createGroup(3, getIndexSettings(), new NRTReplicationEngineFactory())) { + shards.startAll(); + IndexShard primary = shards.getPrimary(); + // 1. Create ops that are in the index and xlog of both shards but not yet part of a commit point. + final int numDocs = shards.indexDocs(randomInt(10)); + logger.info("--> Index {} documents on primary", numDocs); + + // refresh and copy the segments over. + if (randomBoolean()) { + flushShard(primary); + } + primary.refresh("Test"); + logger.info("--> Replicate segments"); + replicateSegments(primary, shards.getReplicas()); + + // at this point both shards should have numDocs persisted and searchable. + logger.info("--> Verify doc count"); + assertDocCounts(primary, numDocs, numDocs); + for (IndexShard shard : shards.getReplicas()) { + assertDocCounts(shard, numDocs, numDocs); + } + + final int i1 = randomInt(5); + logger.info("--> Index {} more docs", i1); + for (int i = 0; i < i1; i++) { + shards.indexDocs(randomInt(10)); + + // randomly restart a replica + final IndexShard replicaToRestart = getRandomReplica(shards); + logger.info("--> Restarting replica {}", replicaToRestart.shardId); + replicaToRestart.close("restart", false, false); + replicaToRestart.store().close(); + shards.removeReplica(replicaToRestart); + final IndexShard newReplica = shards.addReplicaWithExistingPath( + replicaToRestart.shardPath(), + replicaToRestart.routingEntry().currentNodeId() + ); + logger.info("--> Recover newReplica {}", newReplica.shardId); + shards.recoverReplica(newReplica); + + // refresh and push segments to our other replicas. + if (randomBoolean()) { + failAndPromoteRandomReplica(shards); + } + flushShard(shards.getPrimary()); + replicateSegments(shards.getPrimary(), shards.getReplicas()); + } + primary = shards.getPrimary(); + + // refresh and push segments to our other replica. + flushShard(primary); + replicateSegments(primary, shards.getReplicas()); + + for (IndexShard shard : shards) { + assertConsistentHistoryBetweenTranslogAndLucene(shard); + } + final List docsAfterReplication = getDocIdAndSeqNos(shards.getPrimary()); + for (IndexShard shard : shards.getReplicas()) { + assertThat(shard.routingEntry().toString(), getDocIdAndSeqNos(shard), equalTo(docsAfterReplication)); + } + } + } + + // Todo: Move this test to SegmentReplicationIndexShardTests so that it runs for both node-node & remote store + public void testSegmentReplication_Index_Update_Delete() throws Exception { + String mappings = "{ \"" + MapperService.SINGLE_MAPPING_NAME + "\": { \"properties\": { \"foo\": { \"type\": \"keyword\"} }}}"; + try (ReplicationGroup shards = createGroup(2, getIndexSettings(), mappings, new NRTReplicationEngineFactory())) { + shards.startAll(); + final IndexShard primaryShard = shards.getPrimary(); + + final int numDocs = randomIntBetween(100, 200); + for (int i = 0; i < numDocs; i++) { + shards.index(new IndexRequest(index.getName()).id(String.valueOf(i)).source("{\"foo\": \"bar\"}", XContentType.JSON)); + } + + assertEqualTranslogOperations(shards, primaryShard); + primaryShard.refresh("Test"); + replicateSegments(primaryShard, shards.getReplicas()); + + shards.assertAllEqual(numDocs); + + for (int i = 0; i < numDocs; i++) { + // randomly update docs. + if (randomBoolean()) { + shards.index( + new IndexRequest(index.getName()).id(String.valueOf(i)).source("{ \"foo\" : \"baz\" }", XContentType.JSON) + ); + } + } + assertEqualTranslogOperations(shards, primaryShard); + primaryShard.refresh("Test"); + replicateSegments(primaryShard, shards.getReplicas()); + shards.assertAllEqual(numDocs); + + final List docs = getDocIdAndSeqNos(primaryShard); + for (IndexShard shard : shards.getReplicas()) { + assertEquals(getDocIdAndSeqNos(shard), docs); + } + for (int i = 0; i < numDocs; i++) { + // randomly delete. + if (randomBoolean()) { + shards.delete(new DeleteRequest(index.getName()).id(String.valueOf(i))); + } + } + assertEqualTranslogOperations(shards, primaryShard); + primaryShard.refresh("Test"); + replicateSegments(primaryShard, shards.getReplicas()); + final List docsAfterDelete = getDocIdAndSeqNos(primaryShard); + for (IndexShard shard : shards.getReplicas()) { + assertEquals(getDocIdAndSeqNos(shard), docsAfterDelete); + } + } + } + +} diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithRemoteIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithRemoteIndexShardTests.java index a67b60d6128d1..b15d8b66fca55 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithRemoteIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithRemoteIndexShardTests.java @@ -8,36 +8,131 @@ package org.opensearch.index.shard; +import org.junit.Before; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.index.engine.DocIdSeqNoAndSource; +import org.opensearch.index.engine.InternalEngine; import org.opensearch.index.engine.NRTReplicationEngineFactory; -import org.opensearch.index.replication.OpenSearchIndexLevelReplicationTestCase; import org.opensearch.indices.replication.common.ReplicationType; import java.io.IOException; +import java.util.List; -public class SegmentReplicationWithRemoteIndexShardTests extends OpenSearchIndexLevelReplicationTestCase { +import static org.hamcrest.Matchers.equalTo; + +public class SegmentReplicationWithRemoteIndexShardTests extends SegmentReplicationIndexShardTests { + + private static final String REPOSITORY_NAME = "temp-fs"; private static final Settings settings = Settings.builder() .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) - .put(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, "temp-fs") - .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, "temp-fs") + .put(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, REPOSITORY_NAME) + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME) .build(); - public void testReplicaSyncingFromRemoteStore() throws IOException { - ReplicationGroup shards = createGroup(1, settings, indexMapping, new NRTReplicationEngineFactory(), createTempDir()); - final IndexShard primaryShard = shards.getPrimary(); - final IndexShard replicaShard = shards.getReplicas().get(0); - shards.startPrimary(); - shards.startAll(); - indexDoc(primaryShard, "_doc", "1"); - indexDoc(primaryShard, "_doc", "2"); - primaryShard.refresh("test"); - assertDocs(primaryShard, "1", "2"); - flushShard(primaryShard); - - replicaShard.syncSegmentsFromRemoteSegmentStore(true, true, false); - assertDocs(replicaShard, "1", "2"); - closeShards(primaryShard, replicaShard); + @Before + public void setup() { + // Todo: Remove feature flag once remote store integration with segrep goes GA + FeatureFlags.initializeFeatureFlags( + Settings.builder().put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL_SETTING.getKey(), "true").build() + ); + } + + protected Settings getIndexSettings() { + return settings; + } + + protected ReplicationGroup getReplicationGroup(int numberOfReplicas) throws IOException { + return createGroup(numberOfReplicas, settings, indexMapping, new NRTReplicationEngineFactory(), createTempDir()); + } + + public void testNRTReplicaWithRemoteStorePromotedAsPrimaryRefreshRefresh() throws Exception { + testNRTReplicaWithRemoteStorePromotedAsPrimary(false, false); + } + + public void testNRTReplicaWithRemoteStorePromotedAsPrimaryRefreshCommit() throws Exception { + testNRTReplicaWithRemoteStorePromotedAsPrimary(false, true); + } + + public void testNRTReplicaWithRemoteStorePromotedAsPrimaryCommitRefresh() throws Exception { + testNRTReplicaWithRemoteStorePromotedAsPrimary(true, false); + } + + public void testNRTReplicaWithRemoteStorePromotedAsPrimaryCommitCommit() throws Exception { + testNRTReplicaWithRemoteStorePromotedAsPrimary(true, true); + } + + public void testNRTReplicaWithRemoteStorePromotedAsPrimary(boolean performFlushFirst, boolean performFlushSecond) throws Exception { + try ( + ReplicationGroup shards = createGroup(1, getIndexSettings(), indexMapping, new NRTReplicationEngineFactory(), createTempDir()) + ) { + shards.startAll(); + IndexShard oldPrimary = shards.getPrimary(); + final IndexShard nextPrimary = shards.getReplicas().get(0); + + // 1. Create ops that are in the index and xlog of both shards but not yet part of a commit point. + final int numDocs = shards.indexDocs(randomInt(10)); + + // refresh but do not copy the segments over. + if (performFlushFirst) { + flushShard(oldPrimary, true); + } else { + oldPrimary.refresh("Test"); + } + // replicateSegments(primary, shards.getReplicas()); + + // at this point both shards should have numDocs persisted and searchable. + assertDocCounts(oldPrimary, numDocs, numDocs); + for (IndexShard shard : shards.getReplicas()) { + assertDocCounts(shard, numDocs, 0); + } + + // 2. Create ops that are in the replica's xlog, not in the index. + // index some more into both but don't replicate. replica will have only numDocs searchable, but should have totalDocs + // persisted. + final int additonalDocs = shards.indexDocs(randomInt(10)); + final int totalDocs = numDocs + additonalDocs; + + if (performFlushSecond) { + flushShard(oldPrimary, true); + } else { + oldPrimary.refresh("Test"); + } + assertDocCounts(oldPrimary, totalDocs, totalDocs); + for (IndexShard shard : shards.getReplicas()) { + assertDocCounts(shard, totalDocs, 0); + } + assertTrue(nextPrimary.translogStats().estimatedNumberOfOperations() >= additonalDocs); + assertTrue(nextPrimary.translogStats().getUncommittedOperations() >= additonalDocs); + + int prevOperationCount = nextPrimary.translogStats().estimatedNumberOfOperations(); + + // promote the replica + shards.promoteReplicaToPrimary(nextPrimary).get(); + + // close oldPrimary. + oldPrimary.close("demoted", false, false); + oldPrimary.store().close(); + + assertEquals(InternalEngine.class, nextPrimary.getEngine().getClass()); + assertDocCounts(nextPrimary, totalDocs, totalDocs); + + // As we are downloading segments from remote segment store on failover, there should not be + // any operations replayed from translog + assertEquals(prevOperationCount, nextPrimary.translogStats().estimatedNumberOfOperations()); + + // refresh and push segments to our other replica. + nextPrimary.refresh("test"); + + for (IndexShard shard : shards) { + assertConsistentHistoryBetweenTranslogAndLucene(shard); + } + final List docsAfterRecovery = getDocIdAndSeqNos(shards.getPrimary()); + for (IndexShard shard : shards.getReplicas()) { + assertThat(shard.routingEntry().toString(), getDocIdAndSeqNos(shard), equalTo(docsAfterRecovery)); + } + } } } diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index 7c765cf5df0be..3b2e33388925a 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -43,6 +43,8 @@ import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.threadpool.ThreadPool; import java.io.IOException; @@ -55,8 +57,6 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.HashMap; -import java.util.Collection; import java.util.concurrent.ExecutorService; import static org.mockito.Mockito.mock; @@ -101,7 +101,10 @@ public void setup() throws IOException { ); testUploadTracker = new TestUploadListener(); - Settings indexSettings = Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT).build(); + Settings indexSettings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build(); ExecutorService executorService = OpenSearchExecutors.newDirectExecutorService(); indexShard = newStartedShard(false, indexSettings, new NRTReplicationEngineFactory()); @@ -260,7 +263,7 @@ private Map getDummyMetadata(String prefix, int commitGeneration * @return ByteArrayIndexInput: metadata file bytes with header and footer * @throws IOException IOException */ - private ByteArrayIndexInput createMetadataFileBytes(Map segmentFilesMap, long generation, long primaryTerm) + private ByteArrayIndexInput createMetadataFileBytes(Map segmentFilesMap, ReplicationCheckpoint replicationCheckpoint) throws IOException { ByteBuffersDataOutput byteBuffersIndexOutput = new ByteBuffersDataOutput(); segmentInfos.write(new ByteBuffersIndexOutput(byteBuffersIndexOutput, "", "")); @@ -270,8 +273,7 @@ private ByteArrayIndexInput createMetadataFileBytes(Map segmentF OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput("segment metadata", "metadata output stream", output, 4096); CodecUtil.writeHeader(indexOutput, RemoteSegmentMetadata.METADATA_CODEC, RemoteSegmentMetadata.CURRENT_VERSION); indexOutput.writeMapOfStrings(segmentFilesMap); - indexOutput.writeLong(generation); - indexOutput.writeLong(primaryTerm); + RemoteSegmentMetadata.writeCheckpointToIndexOutput(replicationCheckpoint, indexOutput); indexOutput.writeLong(byteArray.length); indexOutput.writeBytes(byteArray, byteArray.length); CodecUtil.writeFooter(indexOutput); @@ -309,13 +311,13 @@ private Map> populateMetadata() throws IOException { ); when(remoteMetadataDirectory.openInput(metadataFilename, IOContext.DEFAULT)).thenAnswer( - I -> createMetadataFileBytes(metadataFilenameContentMapping.get(metadataFilename), 23, 12) + I -> createMetadataFileBytes(metadataFilenameContentMapping.get(metadataFilename), indexShard.getLatestReplicationCheckpoint()) ); when(remoteMetadataDirectory.openInput(metadataFilename2, IOContext.DEFAULT)).thenAnswer( - I -> createMetadataFileBytes(metadataFilenameContentMapping.get(metadataFilename2), 13, 12) + I -> createMetadataFileBytes(metadataFilenameContentMapping.get(metadataFilename2), indexShard.getLatestReplicationCheckpoint()) ); when(remoteMetadataDirectory.openInput(metadataFilename3, IOContext.DEFAULT)).thenAnswer( - I -> createMetadataFileBytes(metadataFilenameContentMapping.get(metadataFilename3), 38, 10) + I -> createMetadataFileBytes(metadataFilenameContentMapping.get(metadataFilename3), indexShard.getLatestReplicationCheckpoint()) ); return metadataFilenameContentMapping; @@ -651,7 +653,9 @@ public void testContainsFile() throws IOException { metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234::512::" + Version.LATEST.major); metadata.put("_0.cfs", "_0.cfs::_0.cfs__" + UUIDs.base64UUID() + "::2345::1024::" + Version.LATEST.major); - when(remoteMetadataDirectory.openInput(metadataFilename, IOContext.DEFAULT)).thenReturn(createMetadataFileBytes(metadata, 1, 5)); + when(remoteMetadataDirectory.openInput(metadataFilename, IOContext.DEFAULT)).thenReturn( + createMetadataFileBytes(metadata, indexShard.getLatestReplicationCheckpoint()) + ); remoteSegmentStoreDirectory.init(); @@ -676,12 +680,19 @@ public void testContainsFile() throws IOException { public void testUploadMetadataEmpty() throws IOException { Directory storeDirectory = mock(Directory.class); IndexOutput indexOutput = mock(IndexOutput.class); - when(storeDirectory.createOutput(startsWith("metadata__12__o"), eq(IOContext.DEFAULT))).thenReturn(indexOutput); + final long primaryTerm = indexShard.getOperationPrimaryTerm(); + when(storeDirectory.createOutput(startsWith("metadata__" + primaryTerm + "__o"), eq(IOContext.DEFAULT))).thenReturn(indexOutput); Collection segmentFiles = List.of("_s1.si", "_s1.cfe", "_s3.cfs"); assertThrows( NoSuchFileException.class, - () -> remoteSegmentStoreDirectory.uploadMetadata(segmentFiles, segmentInfos, storeDirectory, 12L, 34L) + () -> remoteSegmentStoreDirectory.uploadMetadata( + segmentFiles, + segmentInfos, + storeDirectory, + 34L, + indexShard.getLatestReplicationCheckpoint() + ) ); } @@ -689,7 +700,7 @@ public void testUploadMetadataNonEmpty() throws IOException { indexDocs(142364, 5); flushShard(indexShard, true); SegmentInfos segInfos = indexShard.store().readLastCommittedSegmentsInfo(); - long primaryTerm = 12; + long primaryTerm = indexShard.getLatestReplicationCheckpoint().getPrimaryTerm(); String primaryTermLong = RemoteStoreUtils.invertLong(primaryTerm); long generation = segInfos.getGeneration(); String generationLong = RemoteStoreUtils.invertLong(generation); @@ -706,7 +717,7 @@ public void testUploadMetadataNonEmpty() throws IOException { getDummyMetadata("_0", (int) generation) ); when(remoteMetadataDirectory.openInput(latestMetadataFileName, IOContext.DEFAULT)).thenReturn( - createMetadataFileBytes(metadataFilenameContentMapping.get(latestMetadataFileName), generation, primaryTerm) + createMetadataFileBytes(metadataFilenameContentMapping.get(latestMetadataFileName), indexShard.getLatestReplicationCheckpoint()) ); remoteSegmentStoreDirectory.init(); @@ -717,7 +728,13 @@ public void testUploadMetadataNonEmpty() throws IOException { when(storeDirectory.createOutput(startsWith("metadata__" + primaryTermLong + "__" + generationLong), eq(IOContext.DEFAULT))) .thenReturn(indexOutput); - remoteSegmentStoreDirectory.uploadMetadata(segInfos.files(true), segInfos, storeDirectory, primaryTerm, generation); + remoteSegmentStoreDirectory.uploadMetadata( + segInfos.files(true), + segInfos, + storeDirectory, + generation, + indexShard.getLatestReplicationCheckpoint() + ); verify(remoteMetadataDirectory).copyFrom( eq(storeDirectory), diff --git a/server/src/test/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandlerTests.java b/server/src/test/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandlerTests.java index 2fee77ab563c0..d0136f04afd75 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandlerTests.java +++ b/server/src/test/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandlerTests.java @@ -25,6 +25,8 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; import org.opensearch.index.store.Store; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.common.ReplicationType; import java.io.IOException; import java.util.HashMap; @@ -38,16 +40,23 @@ public class RemoteSegmentMetadataHandlerTests extends IndexShardTestCase { private IndexShard indexShard; private SegmentInfos segmentInfos; + private ReplicationCheckpoint replicationCheckpoint; + @Before public void setup() throws IOException { remoteSegmentMetadataHandler = new RemoteSegmentMetadataHandler(); - Settings indexSettings = Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT).build(); + Settings indexSettings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .build(); indexShard = newStartedShard(false, indexSettings, new NRTReplicationEngineFactory()); try (Store store = indexShard.store()) { segmentInfos = store.readLastCommittedSegmentsInfo(); } + replicationCheckpoint = indexShard.getLatestReplicationCheckpoint(); } @After @@ -61,8 +70,7 @@ public void testReadContentNoSegmentInfos() throws IOException { OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput("dummy bytes", "dummy stream", output, 4096); Map expectedOutput = getDummyData(); indexOutput.writeMapOfStrings(expectedOutput); - indexOutput.writeLong(1234); - indexOutput.writeLong(1234); + RemoteSegmentMetadata.writeCheckpointToIndexOutput(replicationCheckpoint, indexOutput); indexOutput.writeLong(0); indexOutput.writeBytes(new byte[0], 0); indexOutput.close(); @@ -70,7 +78,7 @@ public void testReadContentNoSegmentInfos() throws IOException { new ByteArrayIndexInput("dummy bytes", BytesReference.toBytes(output.bytes())) ); assertEquals(expectedOutput, metadata.toMapOfStrings()); - assertEquals(1234, metadata.getGeneration()); + assertEquals(replicationCheckpoint.getSegmentsGen(), metadata.getGeneration()); } public void testReadContentWithSegmentInfos() throws IOException { @@ -78,8 +86,7 @@ public void testReadContentWithSegmentInfos() throws IOException { OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput("dummy bytes", "dummy stream", output, 4096); Map expectedOutput = getDummyData(); indexOutput.writeMapOfStrings(expectedOutput); - indexOutput.writeLong(1234); - indexOutput.writeLong(1234); + RemoteSegmentMetadata.writeCheckpointToIndexOutput(replicationCheckpoint, indexOutput); ByteBuffersIndexOutput segmentInfosOutput = new ByteBuffersIndexOutput(new ByteBuffersDataOutput(), "test", "resource"); segmentInfos.write(segmentInfosOutput); byte[] segmentInfosBytes = segmentInfosOutput.toArrayCopy(); @@ -90,7 +97,7 @@ public void testReadContentWithSegmentInfos() throws IOException { new ByteArrayIndexInput("dummy bytes", BytesReference.toBytes(output.bytes())) ); assertEquals(expectedOutput, metadata.toMapOfStrings()); - assertEquals(1234, metadata.getGeneration()); + assertEquals(replicationCheckpoint.getSegmentsGen(), metadata.getGeneration()); assertArrayEquals(segmentInfosBytes, metadata.getSegmentInfosBytes()); } @@ -106,8 +113,7 @@ public void testWriteContent() throws IOException { RemoteSegmentMetadata remoteSegmentMetadata = new RemoteSegmentMetadata( RemoteSegmentMetadata.fromMapOfStrings(expectedOutput), segmentInfosBytes, - 1234, - 1234 + indexShard.getLatestReplicationCheckpoint() ); remoteSegmentMetadataHandler.writeContent(indexOutput, remoteSegmentMetadata); indexOutput.close(); @@ -116,8 +122,8 @@ public void testWriteContent() throws IOException { new ByteArrayIndexInput("dummy bytes", BytesReference.toBytes(output.bytes())) ); assertEquals(expectedOutput, metadata.toMapOfStrings()); - assertEquals(1234, metadata.getGeneration()); - assertEquals(1234, metadata.getPrimaryTerm()); + assertEquals(replicationCheckpoint.getSegmentsGen(), metadata.getGeneration()); + assertEquals(replicationCheckpoint.getPrimaryTerm(), metadata.getPrimaryTerm()); assertArrayEquals(segmentInfosBytes, metadata.getSegmentInfosBytes()); } diff --git a/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java index 8135d9cd3718e..40182a85608ea 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java @@ -8,15 +8,20 @@ package org.opensearch.indices.recovery; +import org.junit.Before; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.IndexSettings; import org.opensearch.index.engine.NRTReplicationEngineFactory; +import org.opensearch.index.mapper.MapperService; import org.opensearch.index.replication.OpenSearchIndexLevelReplicationTestCase; import org.opensearch.index.seqno.ReplicationTracker; import org.opensearch.index.shard.IndexShard; import org.opensearch.indices.replication.common.ReplicationType; +import java.nio.file.Path; + public class RemoteStorePeerRecoverySourceHandlerTests extends OpenSearchIndexLevelReplicationTestCase { private static final Settings settings = Settings.builder() @@ -26,23 +31,36 @@ public class RemoteStorePeerRecoverySourceHandlerTests extends OpenSearchIndexLe .put(IndexSettings.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), "100ms") .build(); + @Before + public void setup() { + // Todo: Remove feature flag once remote store integration with segrep goes GA + FeatureFlags.initializeFeatureFlags( + Settings.builder().put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL_SETTING.getKey(), "true").build() + ); + } + public void testReplicaShardRecoveryUptoLastFlushedCommit() throws Exception { - try (ReplicationGroup shards = createGroup(0, settings, new NRTReplicationEngineFactory())) { + final Path remoteDir = createTempDir(); + final String indexMapping = "{ \"" + MapperService.SINGLE_MAPPING_NAME + "\": {} }"; + try (ReplicationGroup shards = createGroup(0, settings, indexMapping, new NRTReplicationEngineFactory(), remoteDir)) { // Step1 - Start primary, index docs and flush shards.startPrimary(); final IndexShard primary = shards.getPrimary(); - int numDocs = shards.indexDocs(randomIntBetween(10, 100)); + int numDocs = shards.indexDocs(randomIntBetween(10, 20)); + logger.info("--> Index numDocs {} and flush", numDocs); shards.flush(); // Step 2 - Start replica for recovery to happen, check both has same number of docs - final IndexShard replica1 = shards.addReplica(); + final IndexShard replica1 = shards.addReplica(remoteDir); + logger.info("--> Added and started replica {}", replica1.routingEntry()); shards.startAll(); assertEquals(getDocIdAndSeqNos(primary), getDocIdAndSeqNos(replica1)); // Step 3 - Index more docs, run segment replication, check both have same number of docs - int moreDocs = shards.indexDocs(randomIntBetween(10, 100)); + int moreDocs = shards.indexDocs(randomIntBetween(10, 20)); primary.refresh("test"); + logger.info("--> Index more docs {} and replicate segments", moreDocs); replicateSegments(primary, shards.getReplicas()); assertEquals(getDocIdAndSeqNos(primary), getDocIdAndSeqNos(replica1)); @@ -55,7 +73,8 @@ public void testReplicaShardRecoveryUptoLastFlushedCommit() throws Exception { assertFalse(primary.getRetentionLeases().contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(replica1.routingEntry()))); // Step 6 - Start new replica, recovery happens, and check that new replica has all docs - final IndexShard replica2 = shards.addReplica(); + final IndexShard replica2 = shards.addReplica(remoteDir); + logger.info("--> Added and started replica {}", replica2.routingEntry()); shards.startAll(); shards.assertAllEqual(numDocs + moreDocs); diff --git a/server/src/test/java/org/opensearch/indices/replication/RemoteStoreReplicationSourceTests.java b/server/src/test/java/org/opensearch/indices/replication/RemoteStoreReplicationSourceTests.java index 04b5aa58ea485..9204f48ba5bdd 100644 --- a/server/src/test/java/org/opensearch/indices/replication/RemoteStoreReplicationSourceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/RemoteStoreReplicationSourceTests.java @@ -8,43 +8,38 @@ package org.opensearch.indices.replication; -import org.apache.lucene.codecs.Codec; import org.apache.lucene.store.FilterDirectory; -import org.mockito.Mockito; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.index.engine.InternalEngineFactory; +import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.replication.OpenSearchIndexLevelReplicationTestCase; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardState; import org.opensearch.index.shard.RemoteStoreRefreshListenerTests; import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.ReplicationType; import java.io.IOException; import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class RemoteStoreReplicationSourceTests extends OpenSearchIndexLevelReplicationTestCase { - - private static final long PRIMARY_TERM = 1L; - private static final long SEGMENTS_GEN = 2L; - private static final long VERSION = 4L; private static final long REPLICATION_ID = 123L; private RemoteStoreReplicationSource replicationSource; - private IndexShard indexShard; - - private IndexShard mockShard; - - private Store remoteStore; + private IndexShard primaryShard; + private IndexShard replicaShard; private final Settings settings = Settings.builder() .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) .put(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, "my-repo") @@ -55,146 +50,110 @@ public class RemoteStoreReplicationSourceTests extends OpenSearchIndexLevelRepli @Override public void setUp() throws Exception { super.setUp(); - - indexShard = newStartedShard(true, settings, new InternalEngineFactory()); - - indexDoc(indexShard, "_doc", "1"); - indexDoc(indexShard, "_doc", "2"); - indexShard.refresh("test"); - - // mock shard - mockShard = mock(IndexShard.class); - Store store = mock(Store.class); - when(mockShard.store()).thenReturn(store); - when(store.directory()).thenReturn(indexShard.store().directory()); - remoteStore = mock(Store.class); - when(mockShard.remoteStore()).thenReturn(remoteStore); - RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = - (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory()).getDelegate()) - .getDelegate(); - FilterDirectory remoteStoreFilterDirectory = new RemoteStoreRefreshListenerTests.TestFilterDirectory( - new RemoteStoreRefreshListenerTests.TestFilterDirectory(remoteSegmentStoreDirectory) - ); - when(remoteStore.directory()).thenReturn(remoteStoreFilterDirectory); - replicationSource = new RemoteStoreReplicationSource(mockShard); + primaryShard = newStartedShard(true, settings, new InternalEngineFactory()); + indexDoc(primaryShard, "_doc", "1"); + indexDoc(primaryShard, "_doc", "2"); + primaryShard.refresh("test"); + replicaShard = newStartedShard(false, settings, new NRTReplicationEngineFactory()); } @Override public void tearDown() throws Exception { - closeShards(indexShard); + closeShards(primaryShard, replicaShard); super.tearDown(); } public void testGetCheckpointMetadata() throws ExecutionException, InterruptedException { - when(mockShard.getSegmentInfosSnapshot()).thenReturn(indexShard.getSegmentInfosSnapshot()); - final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint( - indexShard.shardId(), - PRIMARY_TERM, - SEGMENTS_GEN, - VERSION, - Codec.getDefault().getName() - ); - + final ReplicationCheckpoint checkpoint = primaryShard.getLatestReplicationCheckpoint(); final PlainActionFuture res = PlainActionFuture.newFuture(); + replicationSource = new RemoteStoreReplicationSource(primaryShard); replicationSource.getCheckpointMetadata(REPLICATION_ID, checkpoint, res); CheckpointInfoResponse response = res.get(); assert (response.getCheckpoint().equals(checkpoint)); - assert (!response.getMetadataMap().isEmpty()); + assert (response.getMetadataMap().isEmpty() == false); } public void testGetCheckpointMetadataFailure() { - final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint( - indexShard.shardId(), - PRIMARY_TERM, - SEGMENTS_GEN, - VERSION, - Codec.getDefault().getName() - ); - + IndexShard mockShard = mock(IndexShard.class); + final ReplicationCheckpoint checkpoint = primaryShard.getLatestReplicationCheckpoint(); when(mockShard.getSegmentInfosSnapshot()).thenThrow(new RuntimeException("test")); - assertThrows(RuntimeException.class, () -> { + replicationSource = new RemoteStoreReplicationSource(mockShard); final PlainActionFuture res = PlainActionFuture.newFuture(); replicationSource.getCheckpointMetadata(REPLICATION_ID, checkpoint, res); res.get(); }); } - public void testGetCheckpointMetadataEmpty() throws ExecutionException, InterruptedException, IOException { - when(mockShard.getSegmentInfosSnapshot()).thenReturn(indexShard.getSegmentInfosSnapshot()); - final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint( - indexShard.shardId(), - PRIMARY_TERM, - SEGMENTS_GEN, - VERSION, - Codec.getDefault().getName() - ); - IndexShard emptyIndexShard = null; - try { - emptyIndexShard = newStartedShard( - true, - settings, - new InternalEngineFactory() - ); - RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = - (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) emptyIndexShard.remoteStore().directory()).getDelegate()) - .getDelegate(); - FilterDirectory remoteStoreFilterDirectory = new RemoteStoreRefreshListenerTests.TestFilterDirectory( - new RemoteStoreRefreshListenerTests.TestFilterDirectory(remoteSegmentStoreDirectory) - ); - when(remoteStore.directory()).thenReturn(remoteStoreFilterDirectory); + public void testGetSegmentFiles() throws ExecutionException, InterruptedException, IOException { + final ReplicationCheckpoint checkpoint = primaryShard.getLatestReplicationCheckpoint(); + List filesToFetch = primaryShard.getSegmentMetadataMap().values().stream().collect(Collectors.toList()); + final PlainActionFuture res = PlainActionFuture.newFuture(); + replicationSource = new RemoteStoreReplicationSource(primaryShard); + replicationSource.getSegmentFiles(REPLICATION_ID, checkpoint, filesToFetch, replicaShard, res); + GetSegmentFilesResponse response = res.get(); + assertEquals(response.files.size(), filesToFetch.size()); + assertTrue(response.files.containsAll(filesToFetch)); + closeShards(replicaShard); + } - final PlainActionFuture res = PlainActionFuture.newFuture(); - when(mockShard.state()).thenReturn(IndexShardState.RECOVERING); - // Recovering shard should just do a noop and return empty metadata map. - replicationSource.getCheckpointMetadata(REPLICATION_ID, checkpoint, res); - CheckpointInfoResponse response = res.get(); - assert (response.getCheckpoint().equals(checkpoint)); - assert (response.getMetadataMap().isEmpty()); - - when(mockShard.state()).thenReturn(IndexShardState.STARTED); - // Started shard should fail with assertion error. - expectThrows(AssertionError.class, () -> { - final PlainActionFuture res2 = PlainActionFuture.newFuture(); - replicationSource.getCheckpointMetadata(REPLICATION_ID, checkpoint, res2); - }); - } finally { - closeShards(emptyIndexShard); + public void testGetSegmentFilesAlreadyExists() throws IOException, InterruptedException { + final ReplicationCheckpoint checkpoint = primaryShard.getLatestReplicationCheckpoint(); + List filesToFetch = primaryShard.getSegmentMetadataMap().values().stream().collect(Collectors.toList()); + CountDownLatch latch = new CountDownLatch(1); + try { + final PlainActionFuture res = PlainActionFuture.newFuture(); + replicationSource = new RemoteStoreReplicationSource(primaryShard); + replicationSource.getSegmentFiles(REPLICATION_ID, checkpoint, filesToFetch, primaryShard, res); + res.get(); + } catch (AssertionError | ExecutionException ex) { + latch.countDown(); + assertTrue(ex instanceof AssertionError); + assertTrue(ex.getMessage().startsWith("Local store already contains the file")); } + latch.await(); } - public void testGetSegmentFiles() throws ExecutionException, InterruptedException { - final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint( - indexShard.shardId(), - PRIMARY_TERM, - SEGMENTS_GEN, - VERSION, - Codec.getDefault().getName() - ); - + public void testGetSegmentFilesReturnEmptyResponse() throws ExecutionException, InterruptedException { + final ReplicationCheckpoint checkpoint = primaryShard.getLatestReplicationCheckpoint(); final PlainActionFuture res = PlainActionFuture.newFuture(); - replicationSource.getSegmentFiles(REPLICATION_ID, checkpoint, Collections.emptyList(), indexShard, res); + replicationSource = new RemoteStoreReplicationSource(primaryShard); + replicationSource.getSegmentFiles(REPLICATION_ID, checkpoint, Collections.emptyList(), primaryShard, res); GetSegmentFilesResponse response = res.get(); assert (response.files.isEmpty()); - assertEquals("remote store", replicationSource.getDescription()); - } - public void testGetSegmentFilesFailure() throws IOException { - final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint( - indexShard.shardId(), - PRIMARY_TERM, - SEGMENTS_GEN, - VERSION, - Codec.getDefault().getName() - ); - Mockito.doThrow(new RuntimeException("testing")) - .when(mockShard) - .syncSegmentsFromRemoteSegmentStore(Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyBoolean()); - assertThrows(ExecutionException.class, () -> { - final PlainActionFuture res = PlainActionFuture.newFuture(); - replicationSource.getSegmentFiles(REPLICATION_ID, checkpoint, Collections.emptyList(), mockShard, res); - res.get(10, TimeUnit.SECONDS); + public void testGetCheckpointMetadataEmpty() throws ExecutionException, InterruptedException, IOException { + IndexShard mockShard = mock(IndexShard.class); + // Build mockShard to return replicaShard directory so that empty metadata file is returned. + buildIndexShardBehavior(mockShard, replicaShard); + replicationSource = new RemoteStoreReplicationSource(mockShard); + + // Mock replica shard state to RECOVERING so that getCheckpointInfo return empty map + final ReplicationCheckpoint checkpoint = replicaShard.getLatestReplicationCheckpoint(); + final PlainActionFuture res = PlainActionFuture.newFuture(); + when(mockShard.state()).thenReturn(IndexShardState.RECOVERING); + replicationSource = new RemoteStoreReplicationSource(mockShard); + // Recovering shard should just do a noop and return empty metadata map. + replicationSource.getCheckpointMetadata(REPLICATION_ID, checkpoint, res); + CheckpointInfoResponse response = res.get(); + assert (response.getCheckpoint().equals(checkpoint)); + assert (response.getMetadataMap().isEmpty()); + + // Started shard should fail with assertion error. + when(mockShard.state()).thenReturn(IndexShardState.STARTED); + expectThrows(AssertionError.class, () -> { + final PlainActionFuture res2 = PlainActionFuture.newFuture(); + replicationSource.getCheckpointMetadata(REPLICATION_ID, checkpoint, res2); }); } + + private void buildIndexShardBehavior(IndexShard mockShard, IndexShard indexShard) { + when(mockShard.getSegmentInfosSnapshot()).thenReturn(indexShard.getSegmentInfosSnapshot()); + Store remoteStore = mock(Store.class); + when(mockShard.remoteStore()).thenReturn(remoteStore); + RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory()).getDelegate()).getDelegate(); + FilterDirectory remoteStoreFilterDirectory = new RemoteStoreRefreshListenerTests.TestFilterDirectory(new RemoteStoreRefreshListenerTests.TestFilterDirectory(remoteSegmentStoreDirectory)); + when(remoteStore.directory()).thenReturn(remoteStoreFilterDirectory); + } } diff --git a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java index f3c98ce4f9f03..278847e56e65f 100644 --- a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java @@ -149,7 +149,11 @@ protected ReplicationGroup createGroup(int replicas, Settings settings, EngineFa protected ReplicationGroup createGroup(int replicas, Settings settings, String mappings, EngineFactory engineFactory) throws IOException { - return createGroup(replicas, settings, mappings, engineFactory, null); + Path remotePath = null; + if ("true".equals(settings.get(IndexMetadata.SETTING_REMOTE_STORE_ENABLED))) { + remotePath = createTempDir(); + } + return createGroup(replicas, settings, mappings, engineFactory, remotePath); } protected ReplicationGroup createGroup(int replicas, Settings settings, String mappings, EngineFactory engineFactory, Path remotePath) diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 7a492dbebd836..66e5459cfea3b 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -1326,9 +1326,27 @@ public static Engine.Warmer createTestWarmer(IndexSettings indexSettings) { }; } + private SegmentReplicationTargetService getSegmentReplicationTargetService( + TransportService transportService, + IndicesService indicesService, + ClusterService clusterService, + SegmentReplicationSourceFactory sourceFactory + ) { + return new SegmentReplicationTargetService( + threadPool, + new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), + transportService, + sourceFactory, + indicesService, + clusterService + ); + } + /** * Segment Replication specific test method - Creates a {@link SegmentReplicationTargetService} to perform replications that has - * been configured to return the given primaryShard's current segments. + * been configured to return the given primaryShard's current segments. In order to do so, it mimics the replication + * source (to avoid transport calls) and simply copies over the segment files from primary store to replica's as part of + * get_files calls. * * @param primaryShard {@link IndexShard} - The target replica shard in segment replication. * @param target {@link IndexShard} - The source primary shard in segment replication. @@ -1339,7 +1357,7 @@ public static Engine.Warmer createTestWarmer(IndexSettings indexSettings) { * which are desired right after files are copied. e.g. To work with temp files * @return Returns SegmentReplicationTargetService */ - public final SegmentReplicationTargetService prepareForReplication( + private SegmentReplicationTargetService prepareForReplication( IndexShard primaryShard, IndexShard target, TransportService transportService, @@ -1347,22 +1365,28 @@ public final SegmentReplicationTargetService prepareForReplication( ClusterService clusterService, Consumer postGetFilesRunnable ) { - final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); - final SegmentReplicationTargetService targetService = new SegmentReplicationTargetService( - threadPool, - new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), - transportService, - sourceFactory, - indicesService, - clusterService - ); - final SegmentReplicationSource replicationSource = getSegmentReplicationSource( - primaryShard, - (repId) -> targetService.get(repId), - postGetFilesRunnable - ); - when(sourceFactory.get(any())).thenReturn(replicationSource); - when(indicesService.getShardOrNull(any())).thenReturn(target); + + SegmentReplicationSourceFactory sourceFactory = null; + SegmentReplicationTargetService targetService; + if (primaryShard.indexSettings.isRemoteStoreEnabled()) { + RecoverySettings recoverySettings = new RecoverySettings( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ); + sourceFactory = new SegmentReplicationSourceFactory(transportService, recoverySettings, clusterService); + targetService = getSegmentReplicationTargetService(transportService, indicesService, clusterService, sourceFactory); + } else { + sourceFactory = mock(SegmentReplicationSourceFactory.class); + targetService = getSegmentReplicationTargetService(transportService, indicesService, clusterService, sourceFactory); + final SegmentReplicationSource replicationSource = getSegmentReplicationSource( + primaryShard, + (repId) -> targetService.get(repId), + postGetFilesRunnable + ); + when(sourceFactory.get(any())).thenReturn(replicationSource); + // This is needed for force segment sync call. Remote store uses a different recovery mechanism + when(indicesService.getShardOrNull(any())).thenReturn(target); + } return targetService; } @@ -1502,9 +1526,11 @@ public void getSegmentFiles( * @param replicaShards - Replicas that will be updated. * @return {@link List} List of target components orchestrating replication. */ - public final List replicateSegments(IndexShard primaryShard, List replicaShards) + protected final List replicateSegments(IndexShard primaryShard, List replicaShards) throws IOException, InterruptedException { + // Latch to block test execution until replica catches up final CountDownLatch countDownLatch = new CountDownLatch(replicaShards.size()); + // Get primary metadata to verify with replica's, used to ensure replica catches up Map primaryMetadata; try (final GatedCloseable segmentInfosSnapshot = primaryShard.getSegmentInfosSnapshot()) { final SegmentInfos primarySegmentInfos = segmentInfosSnapshot.get();