Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Store] Make translog transfer timeout configurable using dynamic setting #12704

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Concurrent Segment Search] Disable concurrent segment search for system indices and throttled requests ([#12954](https://github.com/opensearch-project/OpenSearch/pull/12954))
- Detect breaking changes on pull requests ([#9044](https://github.com/opensearch-project/OpenSearch/pull/9044))
- Add cluster primary balance contraint for rebalancing with buffer ([#12656](https://github.com/opensearch-project/OpenSearch/pull/12656))
- [Remote Store] Make translog transfer timeout configurable ([#12704](https://github.com/opensearch-project/OpenSearch/pull/12704))

### Dependencies
- Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,8 @@ public void apply(Settings value, Settings current, Settings previous) {
SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING,

RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4969,7 +4969,7 @@
TranslogFactory translogFactory = translogFactorySupplier.apply(indexSettings, shardRouting);
assert translogFactory instanceof RemoteBlobStoreInternalTranslogFactory;
Repository repository = ((RemoteBlobStoreInternalTranslogFactory) translogFactory).getRepository();
RemoteFsTranslog.cleanup(repository, shardId, getThreadPool(), indexSettings.getRemoteStorePathStrategy());
RemoteFsTranslog.cleanup(repository, shardId, getThreadPool(), indexSettings.getRemoteStorePathStrategy(), remoteStoreSettings);

Check warning on line 4972 in server/src/main/java/org/opensearch/index/shard/IndexShard.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/IndexShard.java#L4972

Added line #L4972 was not covered by tests
}

/*
Expand All @@ -4992,6 +4992,7 @@
getThreadPool(),
shardPath().resolveTranslog(),
indexSettings.getRemoteStorePathStrategy(),
remoteStoreSettings,
logger
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.index.translog;

import org.opensearch.index.remote.RemoteTranslogTransferTracker;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryMissingException;
Expand All @@ -34,11 +35,14 @@ public class RemoteBlobStoreInternalTranslogFactory implements TranslogFactory {

private final RemoteTranslogTransferTracker remoteTranslogTransferTracker;

private final RemoteStoreSettings remoteStoreSettings;

public RemoteBlobStoreInternalTranslogFactory(
Supplier<RepositoriesService> repositoriesServiceSupplier,
ThreadPool threadPool,
String repositoryName,
RemoteTranslogTransferTracker remoteTranslogTransferTracker
RemoteTranslogTransferTracker remoteTranslogTransferTracker,
RemoteStoreSettings remoteStoreSettings
) {
Repository repository;
try {
Expand All @@ -49,6 +53,7 @@ public RemoteBlobStoreInternalTranslogFactory(
this.repository = repository;
this.threadPool = threadPool;
this.remoteTranslogTransferTracker = remoteTranslogTransferTracker;
this.remoteStoreSettings = remoteStoreSettings;
}

@Override
Expand All @@ -74,7 +79,8 @@ public Translog newTranslog(
blobStoreRepository,
threadPool,
startedPrimarySupplier,
remoteTranslogTransferTracker
remoteTranslogTransferTracker,
remoteStoreSettings
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.opensearch.index.translog.transfer.TranslogTransferManager;
import org.opensearch.index.translog.transfer.TranslogTransferMetadata;
import org.opensearch.index.translog.transfer.listener.TranslogTransferListener;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -100,7 +101,8 @@ public RemoteFsTranslog(
BlobStoreRepository blobStoreRepository,
ThreadPool threadPool,
BooleanSupplier startedPrimarySupplier,
RemoteTranslogTransferTracker remoteTranslogTransferTracker
RemoteTranslogTransferTracker remoteTranslogTransferTracker,
RemoteStoreSettings remoteStoreSettings
) throws IOException {
super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer);
logger = Loggers.getLogger(getClass(), shardId);
Expand All @@ -113,7 +115,8 @@ public RemoteFsTranslog(
shardId,
fileTransferTracker,
remoteTranslogTransferTracker,
indexSettings().getRemoteStorePathStrategy()
indexSettings().getRemoteStorePathStrategy(),
remoteStoreSettings
);
try {
download(translogTransferManager, location, logger);
Expand Down Expand Up @@ -163,6 +166,7 @@ public static void download(
ThreadPool threadPool,
Path location,
RemoteStorePathStrategy pathStrategy,
RemoteStoreSettings remoteStoreSettings,
Logger logger
) throws IOException {
assert repository instanceof BlobStoreRepository : String.format(
Expand All @@ -181,7 +185,8 @@ public static void download(
shardId,
fileTransferTracker,
remoteTranslogTransferTracker,
pathStrategy
pathStrategy,
remoteStoreSettings
);
RemoteFsTranslog.download(translogTransferManager, location, logger);
logger.trace(remoteTranslogTransferTracker.toString());
Expand Down Expand Up @@ -259,7 +264,8 @@ public static TranslogTransferManager buildTranslogTransferManager(
ShardId shardId,
FileTransferTracker fileTransferTracker,
RemoteTranslogTransferTracker tracker,
RemoteStorePathStrategy pathStrategy
RemoteStorePathStrategy pathStrategy,
RemoteStoreSettings remoteStoreSettings
) {
assert Objects.nonNull(pathStrategy);
String indexUUID = shardId.getIndex().getUUID();
Expand All @@ -281,7 +287,7 @@ public static TranslogTransferManager buildTranslogTransferManager(
.build();
BlobPath mdPath = pathStrategy.generatePath(mdPathInput);
BlobStoreTransferService transferService = new BlobStoreTransferService(blobStoreRepository.blobStore(), threadPool);
return new TranslogTransferManager(shardId, transferService, dataPath, mdPath, fileTransferTracker, tracker);
return new TranslogTransferManager(shardId, transferService, dataPath, mdPath, fileTransferTracker, tracker, remoteStoreSettings);
}

@Override
Expand Down Expand Up @@ -553,8 +559,13 @@ private void deleteStaleRemotePrimaryTerms() {
}
}

public static void cleanup(Repository repository, ShardId shardId, ThreadPool threadPool, RemoteStorePathStrategy pathStrategy)
throws IOException {
public static void cleanup(
Repository repository,
ShardId shardId,
ThreadPool threadPool,
RemoteStorePathStrategy pathStrategy,
RemoteStoreSettings remoteStoreSettings
) throws IOException {
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository;
// We use a dummy stats tracker to ensure the flow doesn't break.
Expand All @@ -567,7 +578,8 @@ public static void cleanup(Repository repository, ShardId shardId, ThreadPool th
shardId,
fileTransferTracker,
remoteTranslogTransferTracker,
pathStrategy
pathStrategy,
remoteStoreSettings
);
// clean up all remote translog files
translogTransferManager.deleteTranslogFiles();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.opensearch.index.remote.RemoteTranslogTransferTracker;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.transfer.listener.TranslogTransferListener;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
Expand Down Expand Up @@ -60,9 +61,7 @@ public class TranslogTransferManager {
private final BlobPath remoteMetadataTransferPath;
private final FileTransferTracker fileTransferTracker;
private final RemoteTranslogTransferTracker remoteTranslogTransferTracker;

private static final long TRANSFER_TIMEOUT_IN_MILLIS = 30000;

private final RemoteStoreSettings remoteStoreSettings;
private static final int METADATA_FILES_TO_FETCH = 10;

private final Logger logger;
Expand All @@ -79,7 +78,8 @@ public TranslogTransferManager(
BlobPath remoteDataTransferPath,
BlobPath remoteMetadataTransferPath,
FileTransferTracker fileTransferTracker,
RemoteTranslogTransferTracker remoteTranslogTransferTracker
RemoteTranslogTransferTracker remoteTranslogTransferTracker,
RemoteStoreSettings remoteStoreSettings
) {
this.shardId = shardId;
this.transferService = transferService;
Expand All @@ -88,6 +88,7 @@ public TranslogTransferManager(
this.fileTransferTracker = fileTransferTracker;
this.logger = Loggers.getLogger(getClass(), shardId);
this.remoteTranslogTransferTracker = remoteTranslogTransferTracker;
this.remoteStoreSettings = remoteStoreSettings;
}

public RemoteTranslogTransferTracker getRemoteTranslogTransferTracker() {
Expand Down Expand Up @@ -151,7 +152,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans
transferService.uploadBlobs(toUpload, blobPathMap, latchedActionListener, WritePriority.HIGH);

try {
if (latch.await(TRANSFER_TIMEOUT_IN_MILLIS, TimeUnit.MILLISECONDS) == false) {
if (latch.await(remoteStoreSettings.getClusterRemoteTranslogTransferTimeout().millis(), TimeUnit.MILLISECONDS) == false) {
Exception ex = new TranslogUploadFailedException(
"Timed out waiting for transfer of snapshot " + transferSnapshot + " to complete"
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,8 @@
repositoriesServiceSupplier,
threadPool,
remoteStoreStatsTrackerFactory,
settings
settings,
remoteStoreSettings
);
this.searchRequestStats = searchRequestStats;
this.clusterDefaultRefreshInterval = CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING.get(clusterService.getSettings());
Expand Down Expand Up @@ -528,22 +529,25 @@
Supplier<RepositoriesService> repositoriesServiceSupplier,
ThreadPool threadPool,
RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory,
Settings settings
Settings settings,
RemoteStoreSettings remoteStoreSettings
) {
return (indexSettings, shardRouting) -> {
if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) {
return new RemoteBlobStoreInternalTranslogFactory(
repositoriesServiceSupplier,
threadPool,
indexSettings.getRemoteStoreTranslogRepository(),
remoteStoreStatsTrackerFactory.getRemoteTranslogTransferTracker(shardRouting.shardId())
remoteStoreStatsTrackerFactory.getRemoteTranslogTransferTracker(shardRouting.shardId()),
remoteStoreSettings
);
} else if (isRemoteDataAttributePresent(settings) && shardRouting.primary()) {
return new RemoteBlobStoreInternalTranslogFactory(
repositoriesServiceSupplier,
threadPool,
RemoteStoreNodeAttribute.getRemoteStoreTranslogRepo(indexSettings.getNodeSettings()),
remoteStoreStatsTrackerFactory.getRemoteTranslogTransferTracker(shardRouting.shardId())
remoteStoreStatsTrackerFactory.getRemoteTranslogTransferTracker(shardRouting.shardId()),

Check warning on line 549 in server/src/main/java/org/opensearch/indices/IndicesService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/IndicesService.java#L549

Added line #L549 was not covered by tests
remoteStoreSettings
);
}
return new InternalTranslogFactory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,20 @@ public class RemoteStoreSettings {
Property.Dynamic
);

/**
* Controls timeout value while uploading translog and checkpoint files to remote translog
*/
public static final Setting<TimeValue> CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING = Setting.timeSetting(
"cluster.remote_store.translog.transfer_timeout",
TimeValue.timeValueSeconds(30),
TimeValue.timeValueSeconds(30),
Property.NodeScope,
Property.Dynamic
);

private volatile TimeValue clusterRemoteTranslogBufferInterval;
private volatile int minRemoteSegmentMetadataFiles;
private volatile TimeValue clusterRemoteTranslogTransferTimeout;

public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) {
this.clusterRemoteTranslogBufferInterval = CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(settings);
Expand All @@ -69,9 +81,14 @@ public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) {
CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING,
this::setMinRemoteSegmentMetadataFiles
);

this.clusterRemoteTranslogTransferTimeout = CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(
CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING,
this::setClusterRemoteTranslogTransferTimeout
);
}

// Exclusively for testing, please do not use it elsewhere.
public TimeValue getClusterRemoteTranslogBufferInterval() {
return clusterRemoteTranslogBufferInterval;
}
Expand All @@ -87,4 +104,12 @@ private void setMinRemoteSegmentMetadataFiles(int minRemoteSegmentMetadataFiles)
public int getMinRemoteSegmentMetadataFiles() {
return this.minRemoteSegmentMetadataFiles;
}

public TimeValue getClusterRemoteTranslogTransferTimeout() {
return clusterRemoteTranslogTransferTimeout;
}

private void setClusterRemoteTranslogTransferTimeout(TimeValue clusterRemoteTranslogTransferTimeout) {
this.clusterRemoteTranslogTransferTimeout = clusterRemoteTranslogTransferTimeout;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ private IndexService newIndexService(IndexModule module) throws IOException {
repositoriesServiceReference::get,
threadPool,
indexSettings.getRemoteStoreTranslogRepository(),
new RemoteTranslogTransferTracker(shardRouting.shardId(), 10)
new RemoteTranslogTransferTracker(shardRouting.shardId(), 10),
DefaultRemoteStoreSettings.INSTANCE
);
}
return new InternalTranslogFactory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.opensearch.index.translog.transfer.TranslogTransferManager;
import org.opensearch.index.translog.transfer.TranslogTransferMetadata;
import org.opensearch.index.translog.transfer.TranslogUploadFailedException;
import org.opensearch.indices.DefaultRemoteStoreSettings;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
Expand Down Expand Up @@ -188,7 +189,8 @@ private RemoteFsTranslog create(Path path, BlobStoreRepository repository, Strin
repository,
threadPool,
primaryMode::get,
new RemoteTranslogTransferTracker(shardId, 10)
new RemoteTranslogTransferTracker(shardId, 10),
DefaultRemoteStoreSettings.INSTANCE
);
}

Expand Down Expand Up @@ -459,7 +461,8 @@ public void testExtraGenToKeep() throws Exception {
repository,
threadPool,
() -> Boolean.TRUE,
new RemoteTranslogTransferTracker(shardId, 10)
new RemoteTranslogTransferTracker(shardId, 10),
DefaultRemoteStoreSettings.INSTANCE
) {
@Override
ChannelFactory getChannelFactory() {
Expand Down Expand Up @@ -1508,7 +1511,8 @@ public void testTranslogWriterCanFlushInAddOrReadCall() throws IOException {
repository,
threadPool,
() -> Boolean.TRUE,
new RemoteTranslogTransferTracker(shardId, 10)
new RemoteTranslogTransferTracker(shardId, 10),
DefaultRemoteStoreSettings.INSTANCE
) {
@Override
ChannelFactory getChannelFactory() {
Expand Down Expand Up @@ -1616,7 +1620,8 @@ public void force(boolean metaData) throws IOException {
repository,
threadPool,
() -> Boolean.TRUE,
new RemoteTranslogTransferTracker(shardId, 10)
new RemoteTranslogTransferTracker(shardId, 10),
DefaultRemoteStoreSettings.INSTANCE
) {
@Override
ChannelFactory getChannelFactory() {
Expand Down
Loading
Loading