Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SegRep with Remote: Add Remote store as a segment replication source #7653

Merged
merged 3 commits into from
Jun 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
## [Unreleased 2.x]
### Added
- Add TokenManager Interface ([#7452](https://github.com/opensearch-project/OpenSearch/pull/7452))
- Add Remote store as a segment replication source ([#7653](https://github.com/opensearch-project/OpenSearch/pull/7653))

### Dependencies
- Bump `jackson` from 2.15.1 to 2.15.2 ([#7897](https://github.com/opensearch-project/OpenSearch/pull/7897))
Expand All @@ -106,7 +107,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Removed

### Fixed
- Fixing error: adding a new/forgotten parameter to the configuration for checking the config on startup in plugins/repository-s3 #7924
- Fixing error: adding a new/forgotten parameter to the configuration for checking the config on startup in plugins/repository-s3 #7924

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.lease.Releasable;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.SegmentReplicationPressureService;
Expand Down Expand Up @@ -580,6 +581,7 @@ public void testDeleteOperations() throws Exception {
* from xlog.
*/
public void testReplicationPostDeleteAndForceMerge() throws Exception {
assumeFalse("Skipping the test with Remote store as its flaky.", segmentReplicationWithRemoteEnabled());
final String primary = internalCluster().startNode();
createIndex(INDEX_NAME);
final String replica = internalCluster().startNode();
Expand Down Expand Up @@ -785,6 +787,10 @@ public void testReplicaHasDiffFilesThanPrimary() throws Exception {
}

public void testPressureServiceStats() throws Exception {
assumeFalse(
"Skipping the test as pressure service is not compatible with SegRep and Remote store yet.",
segmentReplicationWithRemoteEnabled()
);
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME);
final String replicaNode = internalCluster().startNode();
Expand Down Expand Up @@ -874,6 +880,7 @@ public void testPressureServiceStats() throws Exception {
* @throws Exception when issue is encountered
*/
public void testScrollCreatedOnReplica() throws Exception {
assumeFalse("Skipping the test with Remote store as its flaky.", segmentReplicationWithRemoteEnabled());
// create the cluster with one primary node containing primary shard and replica node containing replica shard
final String primary = internalCluster().startNode();
createIndex(INDEX_NAME);
Expand Down Expand Up @@ -963,6 +970,11 @@ public void testScrollCreatedOnReplica() throws Exception {
* @throws Exception when issue is encountered
*/
public void testScrollWithOngoingSegmentReplication() throws Exception {
assumeFalse(
"Skipping the test as its not compatible with segment replication with remote store yet.",
segmentReplicationWithRemoteEnabled()
);

// create the cluster with one primary node containing primary shard and replica node containing replica shard
final String primary = internalCluster().startNode();
prepareCreate(
Expand Down Expand Up @@ -1249,4 +1261,8 @@ public void testPrimaryReceivesDocsDuringReplicaRecovery() throws Exception {
waitForSearchableDocs(2, nodes);
}

private boolean segmentReplicationWithRemoteEnabled() {
return IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.get(indexSettings()).booleanValue()
&& "true".equalsIgnoreCase(featureFlagSettings().get(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationRemoteStoreIT extends SegmentReplicationIT {

private static final String REPOSITORY_NAME = "test-remore-store-repo";
private static final String REPOSITORY_NAME = "test-remote-store-repo";

@Override
public Settings indexSettings() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.junit.After;
import org.junit.Before;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.indices.replication.SegmentReplicationIT;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

/**
* This class runs Segment Replication Integ test suite with remote store enabled.
* Setup is similar to SegmentReplicationRemoteStoreIT but this also enables the segment replication using remote store which
* is behind SEGMENT_REPLICATION_EXPERIMENTAL flag. After this is moved out of experimental, we can combine and keep only one
* test suite for Segment and Remote store integration tests.
*/
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationUsingRemoteStoreIT extends SegmentReplicationIT {
ankitkala marked this conversation as resolved.
Show resolved Hide resolved

private static final String REPOSITORY_NAME = "test-remote-store-repo";

@Override
public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, false)
.build();
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder()
.put(super.featureFlagSettings())
.put(FeatureFlags.REMOTE_STORE, "true")
.put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true")
.build();
}

@Before
public void setup() {
internalCluster().startClusterManagerOnlyNode();
Path absolutePath = randomRepoPath().toAbsolutePath();
assertAcked(
clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath))
);
}

@After
public void teardown() {
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2760,6 +2760,13 @@ public final long lastRefreshedCheckpoint() {
return lastRefreshedCheckpointListener.refreshedCheckpoint.get();
}

/**
* Returns the current local checkpoint getting refreshed internally.
*/
public final long currentOngoingRefreshCheckpoint() {
return lastRefreshedCheckpointListener.pendingCheckpoint;
}

private final Object refreshIfNeededMutex = new Object();

/**
Expand All @@ -2777,10 +2784,11 @@ protected final void refreshIfNeeded(String source, long requestingSeqNo) {

private final class LastRefreshedCheckpointListener implements ReferenceManager.RefreshListener {
final AtomicLong refreshedCheckpoint;
private long pendingCheckpoint;
volatile long pendingCheckpoint;

LastRefreshedCheckpointListener(long initialLocalCheckpoint) {
this.refreshedCheckpoint = new AtomicLong(initialLocalCheckpoint);
this.pendingCheckpoint = initialLocalCheckpoint;
}

@Override
Expand Down
65 changes: 31 additions & 34 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -2231,7 +2231,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
assert currentEngineReference.get() == null : "engine is running";
verifyNotClosed();
if (indexSettings.isRemoteStoreEnabled()) {
syncSegmentsFromRemoteSegmentStore(false);
syncSegmentsFromRemoteSegmentStore(false, true, true);
}
if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) {
syncRemoteTranslogAndUpdateGlobalCheckpoint();
Expand Down Expand Up @@ -4404,7 +4404,7 @@ public void close() throws IOException {
};
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
if (indexSettings.isRemoteStoreEnabled()) {
syncSegmentsFromRemoteSegmentStore(false);
syncSegmentsFromRemoteSegmentStore(false, true, true);
}
if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) {
syncRemoteTranslogAndUpdateGlobalCheckpoint();
Expand Down Expand Up @@ -4454,23 +4454,15 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException {
RemoteFsTranslog.download(repository, shardId, getThreadPool(), shardPath().resolveTranslog());
}

/**
* Downloads segments from remote segment store. This method will download segments till
* last refresh checkpoint.
* @param overrideLocal flag to override local segment files with those in remote store
* @throws IOException if exception occurs while reading segments from remote store
*/
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOException {
syncSegmentsFromRemoteSegmentStore(overrideLocal, true);
}

/**
* Downloads segments from remote segment store.
* @param overrideLocal flag to override local segment files with those in remote store
* @param refreshLevelSegmentSync last refresh checkpoint is used if true, commit checkpoint otherwise
* @param shouldCommit if the shard requires committing the changes after sync from remote.
* @throws IOException if exception occurs while reading segments from remote store
*/
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean refreshLevelSegmentSync) throws IOException {
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean refreshLevelSegmentSync, boolean shouldCommit)
ankitkala marked this conversation as resolved.
Show resolved Hide resolved
throws IOException {
assert indexSettings.isRemoteStoreEnabled();
logger.info("Downloading segments from remote segment store");
assert remoteStore.directory() instanceof FilterDirectory : "Store.directory is not an instance of FilterDirectory";
Expand Down Expand Up @@ -4529,29 +4521,34 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re
indexInput,
remoteSegmentMetadata.getGeneration()
);
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
// Following code block makes sure to use SegmentInfosSnapshot in the remote store if generation differs
// with local filesystem. If local filesystem already has segments_N+2 and infosSnapshot has generation N,
// after commit, there would be 2 files that would be created segments_N+1 and segments_N+2. With the
// policy of preserving only the latest commit, we will delete segments_N+1 which in fact is the part of the latest
// commit.
Optional<String> localMaxSegmentInfos = localSegmentFiles.stream()
.filter(file -> file.startsWith(IndexFileNames.SEGMENTS))
.max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName));
if (localMaxSegmentInfos.isPresent()
&& infosSnapshot.getGeneration() < SegmentInfos.generationFromSegmentsFileName(localMaxSegmentInfos.get()) - 1) {
// If remote translog is not enabled, local translog will be created with different UUID.
// This fails in Store.trimUnsafeCommits() as translog UUID of checkpoint and SegmentInfos needs
// to be same. Following code block make sure to have the same UUID.
if (indexSettings.isRemoteTranslogStoreEnabled() == false) {
SegmentInfos localSegmentInfos = store.readLastCommittedSegmentsInfo();
Map<String, String> userData = new HashMap<>(infosSnapshot.getUserData());
userData.put(TRANSLOG_UUID_KEY, localSegmentInfos.userData.get(TRANSLOG_UUID_KEY));
infosSnapshot.setUserData(userData, false);
if (shouldCommit) {
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
// Following code block makes sure to use SegmentInfosSnapshot in the remote store if generation differs
// with local filesystem. If local filesystem already has segments_N+2 and infosSnapshot has generation N,
// after commit, there would be 2 files that would be created segments_N+1 and segments_N+2. With the
// policy of preserving only the latest commit, we will delete segments_N+1 which in fact is the part of the latest
// commit.
Optional<String> localMaxSegmentInfos = localSegmentFiles.stream()
.filter(file -> file.startsWith(IndexFileNames.SEGMENTS))
.max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName));
if (localMaxSegmentInfos.isPresent()
&& infosSnapshot.getGeneration() < SegmentInfos.generationFromSegmentsFileName(localMaxSegmentInfos.get())
- 1) {
// If remote translog is not enabled, local translog will be created with different UUID.
// This fails in Store.trimUnsafeCommits() as translog UUID of checkpoint and SegmentInfos needs
// to be same. Following code block make sure to have the same UUID.
if (indexSettings.isRemoteTranslogStoreEnabled() == false) {
SegmentInfos localSegmentInfos = store.readLastCommittedSegmentsInfo();
Map<String, String> userData = new HashMap<>(infosSnapshot.getUserData());
userData.put(TRANSLOG_UUID_KEY, localSegmentInfos.userData.get(TRANSLOG_UUID_KEY));
infosSnapshot.setUserData(userData, false);
}
storeDirectory.deleteFile(localMaxSegmentInfos.get());
}
storeDirectory.deleteFile(localMaxSegmentInfos.get());
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
} else {
finalizeReplication(infosSnapshot);
}
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
}
}
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,12 +334,11 @@ private boolean isRefreshAfterCommit() throws IOException {
}

void uploadMetadata(Collection<String> localSegmentsPostRefresh, SegmentInfos segmentInfos) throws IOException {
final long maxSeqNoFromSegmentInfos = indexShard.getEngine().getMaxSeqNoFromSegmentInfos(segmentInfos);

final long maxSeqNo = ((InternalEngine) indexShard.getEngine()).currentOngoingRefreshCheckpoint();
ankitkala marked this conversation as resolved.
Show resolved Hide resolved
SegmentInfos segmentInfosSnapshot = segmentInfos.clone();
Map<String, String> userData = segmentInfosSnapshot.getUserData();
userData.put(LOCAL_CHECKPOINT_KEY, String.valueOf(maxSeqNoFromSegmentInfos));
userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNoFromSegmentInfos));
userData.put(LOCAL_CHECKPOINT_KEY, String.valueOf(maxSeqNo));
userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
segmentInfosSnapshot.setUserData(userData, false);

remoteDirectory.uploadMetadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco
remoteStore.incRef();
try {
// Download segments from remote segment store
indexShard.syncSegmentsFromRemoteSegmentStore(true);
indexShard.syncSegmentsFromRemoteSegmentStore(true, true, true);

if (store.directory().listAll().length == 0) {
store.createEmpty(indexShard.indexSettings().getIndexVersionCreated().luceneVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.NoSuchFileException;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -100,7 +101,15 @@ public IndexOutput createOutput(String name, IOContext context) {
*/
@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
return new RemoteIndexInput(name, blobContainer.readBlob(name), fileLength(name));
InputStream inputStream = null;
try {
inputStream = blobContainer.readBlob(name);
return new RemoteIndexInput(name, inputStream, fileLength(name));
} catch (Exception e) {
// Incase the RemoteIndexInput creation fails, close the input stream to avoid file handler leak.
if (inputStream != null) inputStream.close();
throw e;
}
ankitkala marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,7 @@ public RemoteSegmentMetadata init() throws IOException {
* @return Map of segment filename to uploaded filename with checksum
* @throws IOException if there were any failures in reading the metadata file
*/
private RemoteSegmentMetadata readLatestMetadataFile() throws IOException {
Map<String, UploadedSegmentMetadata> segmentMetadataMap = new HashMap<>();
public RemoteSegmentMetadata readLatestMetadataFile() throws IOException {
RemoteSegmentMetadata remoteSegmentMetadata = null;

Collection<String> metadataFiles = remoteMetadataDirectory.listFilesByPrefix(MetadataFilenameUtils.METADATA_PREFIX);
Expand Down Expand Up @@ -199,6 +198,10 @@ public static UploadedSegmentMetadata fromString(String uploadedFilename) {
String[] values = uploadedFilename.split(SEPARATOR);
return new UploadedSegmentMetadata(values[0], values[1], values[2], Long.parseLong(values[3]));
}

public String getOriginalFilename() {
return originalFilename;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,8 @@ public static RecoveryDiff segmentReplicationDiff(Map<String, StoreFileMetadata>
missing.add(value);
} else {
final StoreFileMetadata fileMetadata = target.get(value.name());
if (fileMetadata.isSame(value)) {
// match segments using checksum
if (fileMetadata.checksum().equals(value.checksum())) {
identical.add(value);
} else {
different.add(value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
indexShard.prepareForIndexRecovery();
final boolean hasRemoteSegmentStore = indexShard.indexSettings().isRemoteStoreEnabled();
if (hasRemoteSegmentStore) {
indexShard.syncSegmentsFromRemoteSegmentStore(false, false);
indexShard.syncSegmentsFromRemoteSegmentStore(false, false, true);
}
final boolean hasRemoteTranslog = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled();
final boolean hasNoTranslog = indexShard.indexSettings().isRemoteSnapshot();
Expand Down
Loading