Skip to content

Commit

Permalink
Make number of segment metadata files in remote segment store configu…
Browse files Browse the repository at this point in the history
…rable (#11329)

Signed-off-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
sachinpkale authored Nov 27, 2023
1 parent 864e7f2 commit ec5a0f9
Show file tree
Hide file tree
Showing 10 changed files with 203 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Allow to pass the list settings through environment variables (like [], ["a", "b", "c"], ...) ([#10625](https://github.com/opensearch-project/OpenSearch/pull/10625))
- [Remote cluster state] Restore cluster state version during remote state auto restore ([#10853](https://github.com/opensearch-project/OpenSearch/pull/10853))
- Add back half_float BKD based sort query optimization ([#11024](https://github.com/opensearch-project/OpenSearch/pull/11024))
- Make number of segment metadata files in remote segment store configurable ([#11329](https://github.com/opensearch-project/OpenSearch/pull/11329))

### Dependencies
- Bump `log4j-core` from 2.18.0 to 2.19.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.opensearch.test.transport.MockTransportService;
import org.hamcrest.MatcherAssert;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -42,10 +44,10 @@
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.index.shard.RemoteStoreRefreshListener.LAST_N_METADATA_FILES_TO_KEEP;
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
Expand Down Expand Up @@ -167,7 +169,7 @@ public void testRemoteTranslogCleanup() throws Exception {
}

public void testStaleCommitDeletionWithInvokeFlush() throws Exception {
internalCluster().startNode();
String dataNode = internalCluster().startNode();
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
int numberOfIterations = randomIntBetween(5, 15);
indexData(numberOfIterations, true, INDEX_NAME);
Expand All @@ -177,17 +179,20 @@ public void testStaleCommitDeletionWithInvokeFlush() throws Exception {
.get()
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
Path indexPath = Path.of(String.valueOf(segmentRepoPath), indexUUID, "/0/segments/metadata");

IndexShard indexShard = getIndexShard(dataNode);
int lastNMetadataFilesToKeep = indexShard.getRecoverySettings().getMinRemoteSegmentMetadataFiles();
// Delete is async.
assertBusy(() -> {
int actualFileCount = getFileCount(indexPath);
if (numberOfIterations <= LAST_N_METADATA_FILES_TO_KEEP) {
if (numberOfIterations <= lastNMetadataFilesToKeep) {
MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations - 1, numberOfIterations, numberOfIterations + 1)));
} else {
// As delete is async its possible that the file gets created before the deletion or after
// deletion.
MatcherAssert.assertThat(
actualFileCount,
is(oneOf(LAST_N_METADATA_FILES_TO_KEEP - 1, LAST_N_METADATA_FILES_TO_KEEP, LAST_N_METADATA_FILES_TO_KEEP + 1))
is(oneOf(lastNMetadataFilesToKeep - 1, lastNMetadataFilesToKeep, lastNMetadataFilesToKeep + 1))
);
}
}, 30, TimeUnit.SECONDS);
Expand All @@ -209,6 +214,44 @@ public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception {
MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations - 1, numberOfIterations, numberOfIterations + 1)));
}

public void testStaleCommitDeletionWithMinSegmentFiles_3() throws Exception {
Settings.Builder settings = Settings.builder()
.put(RecoverySettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.getKey(), "3");
internalCluster().startNode(settings);

createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
int numberOfIterations = randomIntBetween(5, 15);
indexData(numberOfIterations, true, INDEX_NAME);
String indexUUID = client().admin()
.indices()
.prepareGetSettings(INDEX_NAME)
.get()
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
Path indexPath = Path.of(String.valueOf(segmentRepoPath), indexUUID, "/0/segments/metadata");
int actualFileCount = getFileCount(indexPath);
// We also allow (numberOfIterations + 1) as index creation also triggers refresh.
MatcherAssert.assertThat(actualFileCount, is(oneOf(4)));
}

public void testStaleCommitDeletionWithMinSegmentFiles_Disabled() throws Exception {
Settings.Builder settings = Settings.builder()
.put(RecoverySettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.getKey(), "-1");
internalCluster().startNode(settings);

createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
int numberOfIterations = randomIntBetween(12, 18);
indexData(numberOfIterations, true, INDEX_NAME);
String indexUUID = client().admin()
.indices()
.prepareGetSettings(INDEX_NAME)
.get()
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
Path indexPath = Path.of(String.valueOf(segmentRepoPath), indexUUID, "/0/segments/metadata");
int actualFileCount = getFileCount(indexPath);
// We also allow (numberOfIterations + 1) as index creation also triggers refresh.
MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations + 1)));
}

/**
* Tests that when the index setting is not passed during index creation, the buffer interval picked up is the cluster
* default.
Expand Down Expand Up @@ -532,4 +575,50 @@ public void testNoSearchIdleForAnyReplicaCount() throws ExecutionException, Inte
indexShard = getIndexShard(replicaShardNode);
assertFalse(indexShard.isSearchIdleSupported());
}

public void testFallbackToNodeToNodeSegmentCopy() throws Exception {
internalCluster().startClusterManagerOnlyNode();
List<String> dataNodes = internalCluster().startDataOnlyNodes(2);

// 1. Create index with 0 replica
createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 10000L, -1));
ensureGreen(INDEX_NAME);

// 2. Index docs
indexBulk(INDEX_NAME, 50);
flushAndRefresh(INDEX_NAME);

// 3. Delete data from remote segment store
String indexUUID = client().admin()
.indices()
.prepareGetSettings(INDEX_NAME)
.get()
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
Path segmentDataPath = Path.of(String.valueOf(segmentRepoPath), indexUUID, "/0/segments/data");

try (Stream<Path> files = Files.list(segmentDataPath)) {
files.forEach(p -> {
try {
Files.delete(p);
} catch (IOException e) {
// Ignore
}
});
}

// 4. Start recovery by changing number of replicas to 1
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
);

// 5. Ensure green and verify number of docs
ensureGreen(INDEX_NAME);
assertBusy(() -> {
assertHitCount(client(dataNodes.get(0)).prepareSearch(INDEX_NAME).setSize(0).get(), 50);
assertHitCount(client(dataNodes.get(1)).prepareSearch(INDEX_NAME).setSize(0).get(), 50);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ public void apply(Settings value, Settings current, Settings previous) {
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING,
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING,
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_REMOTE_STORE_STREAMS_SETTING,
RecoverySettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ Runnable getGlobalCheckpointSyncer() {

private final List<ReferenceManager.RefreshListener> internalRefreshListener = new ArrayList<>();
private final RemoteStoreFileDownloader fileDownloader;
private final RecoverySettings recoverySettings;

public IndexShard(
final ShardRouting shardRouting,
Expand Down Expand Up @@ -469,6 +470,7 @@ public boolean shouldCache(Query query) {
? false
: mapperService.documentMapper().mappers().containsTimeStampField();
this.remoteStoreStatsTrackerFactory = remoteStoreStatsTrackerFactory;
this.recoverySettings = recoverySettings;
this.fileDownloader = new RemoteStoreFileDownloader(shardRouting.shardId(), threadPool, recoverySettings);
}

Expand Down Expand Up @@ -567,6 +569,10 @@ public String getNodeId() {
return translogConfig.getNodeId();
}

public RecoverySettings getRecoverySettings() {
return recoverySettings;
}

public RemoteStoreFileDownloader getFileDownloader() {
return fileDownloader;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ public final class RemoteStoreRefreshListener extends CloseableRetryableRefreshL
);

public static final Set<String> EXCLUDE_FILES = Set.of("write.lock");
// Visible for testing
public static final int LAST_N_METADATA_FILES_TO_KEEP = 10;

private final IndexShard indexShard;
private final Directory storeDirectory;
Expand Down Expand Up @@ -205,7 +203,7 @@ private boolean syncSegments() {
// is considered as a first refresh post commit. A cleanup of stale commit files is triggered.
// This is done to avoid delete post each refresh.
if (isRefreshAfterCommit()) {
remoteDirectory.deleteStaleSegmentsAsync(LAST_N_METADATA_FILES_TO_KEEP);
remoteDirectory.deleteStaleSegmentsAsync(indexShard.getRecoverySettings().getMinRemoteSegmentMetadataFiles());
}

try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,12 @@ public Map<String, UploadedSegmentMetadata> getSegmentsUploadedToRemoteStore() {
* @throws IOException in case of I/O error while reading from / writing to remote segment store
*/
public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException {
if (lastNMetadataFilesToKeep == -1) {
logger.info(
"Stale segment deletion is disabled if cluster.remote_store.index.segment_metadata.retention.max_count is set to -1"
);
return;
}
List<String> sortedMetadataFileList = remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
MetadataFilenameUtils.METADATA_PREFIX,
Integer.MAX_VALUE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,18 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
indexShard.prepareForIndexRecovery();
final boolean hasRemoteSegmentStore = indexShard.indexSettings().isRemoteStoreEnabled();
if (hasRemoteSegmentStore) {
indexShard.syncSegmentsFromRemoteSegmentStore(false, recoveryTarget::setLastAccessTime);
// ToDo: This is a temporary mitigation to not fail the peer recovery flow in case there is
// an exception while downloading segments from remote store. For remote backed indexes, we
// plan to revamp this flow so that node-node segment copy will not happen.
// GitHub Issue to track the revamp: https://github.com/opensearch-project/OpenSearch/issues/11331
try {
indexShard.syncSegmentsFromRemoteSegmentStore(false, recoveryTarget::setLastAccessTime);
} catch (Exception e) {
logger.error(
"Exception while downloading segment files from remote store, will continue with peer to peer segment copy",
e
);
}
}
final boolean hasRemoteTranslog = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled();
final boolean hasNoTranslog = indexShard.indexSettings().isRemoteSnapshot();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,25 @@ public class RecoverySettings {
Property.NodeScope
);

/**
* Controls minimum number of metadata files to keep in remote segment store.
* {@code value < 1} will disable deletion of stale segment metadata files.
*/
public static final Setting<Integer> CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING = Setting.intSetting(
"cluster.remote_store.index.segment_metadata.retention.max_count",
10,
-1,
v -> {
if (v == 0) {
throw new IllegalArgumentException(
"Value 0 is not allowed for this setting as it would delete all the data from remote segment store"
);
}
},
Property.NodeScope,
Property.Dynamic
);

// choose 512KB-16B to ensure that the resulting byte[] is not a humongous allocation in G1.
public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512 * 1024 - 16, ByteSizeUnit.BYTES);

Expand All @@ -171,6 +190,7 @@ public class RecoverySettings {
private volatile TimeValue internalActionTimeout;
private volatile TimeValue internalActionRetryTimeout;
private volatile TimeValue internalActionLongTimeout;
private volatile int minRemoteSegmentMetadataFiles;

private volatile ByteSizeValue chunkSize = DEFAULT_CHUNK_SIZE;

Expand Down Expand Up @@ -212,6 +232,11 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
this::setInternalActionLongTimeout
);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, this::setActivityTimeout);
minRemoteSegmentMetadataFiles = CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(
CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING,
this::setMinRemoteSegmentMetadataFiles
);
}

public RateLimiter rateLimiter() {
Expand Down Expand Up @@ -307,4 +332,12 @@ public int getMaxConcurrentRemoteStoreStreams() {
private void setMaxConcurrentRemoteStoreStreams(int maxConcurrentRemoteStoreStreams) {
this.maxConcurrentRemoteStoreStreams = maxConcurrentRemoteStoreStreams;
}

private void setMinRemoteSegmentMetadataFiles(int minRemoteSegmentMetadataFiles) {
this.minRemoteSegmentMetadataFiles = minRemoteSegmentMetadataFiles;
}

public int getMinRemoteSegmentMetadataFiles() {
return this.minRemoteSegmentMetadataFiles;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.opensearch.index.store.RemoteSegmentStoreDirectory.MetadataFilenameUtils;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManager;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -237,7 +238,7 @@ public void testAfterMultipleCommits() throws IOException {
setup(true, 3);
assertDocs(indexShard, "1", "2", "3");

for (int i = 0; i < RemoteStoreRefreshListener.LAST_N_METADATA_FILES_TO_KEEP + 3; i++) {
for (int i = 0; i < indexShard.getRecoverySettings().getMinRemoteSegmentMetadataFiles() + 3; i++) {
indexDocs(4 * (i + 1), 4);
flushShard(indexShard);
}
Expand Down Expand Up @@ -550,6 +551,9 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory = indexShard.getRemoteStoreStatsTrackerFactory();
when(shard.indexSettings()).thenReturn(indexShard.indexSettings());
when(shard.shardId()).thenReturn(indexShard.shardId());
RecoverySettings recoverySettings = mock(RecoverySettings.class);
when(recoverySettings.getMinRemoteSegmentMetadataFiles()).thenReturn(10);
when(shard.getRecoverySettings()).thenReturn(recoverySettings);
RemoteSegmentTransferTracker tracker = remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(indexShard.shardId());
RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener(shard, emptyCheckpointPublisher, tracker);
refreshListener.afterRefresh(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,49 @@ public void testInternalLongActionTimeout() {
);
assertEquals(new TimeValue(duration, timeUnit), recoverySettings.internalActionLongTimeout());
}

public void testSegmentMetadataRetention() {
// Default value
assertEquals(10, recoverySettings.getMinRemoteSegmentMetadataFiles());

// Setting value < default (10)
clusterSettings.applySettings(
Settings.builder().put(RecoverySettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.getKey(), 5).build()
);
assertEquals(5, recoverySettings.getMinRemoteSegmentMetadataFiles());

// Setting min value
clusterSettings.applySettings(
Settings.builder().put(RecoverySettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.getKey(), -1).build()
);
assertEquals(-1, recoverySettings.getMinRemoteSegmentMetadataFiles());

// Setting value > default (10)
clusterSettings.applySettings(
Settings.builder().put(RecoverySettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.getKey(), 15).build()
);
assertEquals(15, recoverySettings.getMinRemoteSegmentMetadataFiles());

// Setting value to 0 should fail and retain the existing value
assertThrows(
IllegalArgumentException.class,
() -> clusterSettings.applySettings(
Settings.builder()
.put(RecoverySettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.getKey(), 0)
.build()
)
);
assertEquals(15, recoverySettings.getMinRemoteSegmentMetadataFiles());

// Setting value < -1 should fail and retain the existing value
assertThrows(
IllegalArgumentException.class,
() -> clusterSettings.applySettings(
Settings.builder()
.put(RecoverySettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.getKey(), -5)
.build()
)
);
assertEquals(15, recoverySettings.getMinRemoteSegmentMetadataFiles());
}
}

0 comments on commit ec5a0f9

Please sign in to comment.