Skip to content

Commit

Permalink
Make remote translog upload buffer interval setting dynamic (opensear…
Browse files Browse the repository at this point in the history
…ch-project#8175)

Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 authored and sudarshan-baliga committed Jun 29, 2023
1 parent a3abcbc commit 0e30972
Show file tree
Hide file tree
Showing 14 changed files with 216 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.action.admin.indices.get.GetIndexResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexSettings;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.OpenSearchIntegTestCase;

Expand Down Expand Up @@ -46,7 +47,15 @@ public void testRemoteStoreTranslogDisabledByUser() throws Exception {
.getIndex(new GetIndexRequest().indices("test-idx-1").includeDefaults(true))
.get();
Settings indexSettings = getIndexResponse.settings().get("test-idx-1");
verifyRemoteStoreIndexSettings(indexSettings, "true", "my-segment-repo-1", "false", null, ReplicationType.SEGMENT.toString(), null);
verifyRemoteStoreIndexSettings(
indexSettings,
"true",
"my-segment-repo-1",
"false",
null,
ReplicationType.SEGMENT.toString(),
IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL
);
}

@Override
Expand Down Expand Up @@ -84,7 +93,7 @@ public void testDefaultRemoteStoreNoUserOverrideExceptReplicationTypeSegment() t
"true",
"my-translog-repo-1",
ReplicationType.SEGMENT.toString(),
null
IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@
import org.opensearch.action.admin.indices.get.GetIndexResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexSettings;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.FeatureFlagSetter;
import org.opensearch.test.OpenSearchIntegTestCase;

import static org.hamcrest.Matchers.containsString;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_TRANSLOG_BUFFER_INTERVAL;
import static org.opensearch.index.IndexSettings.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
Expand Down Expand Up @@ -101,7 +103,7 @@ public void testDefaultRemoteStoreNoUserOverride() throws Exception {
"true",
"my-translog-repo-1",
ReplicationType.SEGMENT.toString(),
null
IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL
);
}

Expand All @@ -124,7 +126,7 @@ public void testRemoteStoreDisabledByUser() throws Exception {
null,
null,
client().settings().get(CLUSTER_SETTING_REPLICATION_TYPE),
null
IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL
);
}

Expand Down Expand Up @@ -175,7 +177,15 @@ public void testReplicationTypeDocumentByUser() throws Exception {
.getIndex(new GetIndexRequest().indices("test-idx-1").includeDefaults(true))
.get();
Settings indexSettings = getIndexResponse.settings().get("test-idx-1");
verifyRemoteStoreIndexSettings(indexSettings, null, null, null, null, ReplicationType.DOCUMENT.toString(), null);
verifyRemoteStoreIndexSettings(
indexSettings,
null,
null,
null,
null,
ReplicationType.DOCUMENT.toString(),
IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL
);
}

public void testRemoteStoreSegmentRepoWithoutRemoteEnabledAndSegmentReplicationIllegalArgumentException() throws Exception {
Expand Down Expand Up @@ -216,7 +226,7 @@ public void testRemoteStoreEnabledByUserWithRemoteRepo() throws Exception {
"true",
"my-translog-repo-1",
ReplicationType.SEGMENT.toString(),
null
IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL
);
}

Expand All @@ -232,7 +242,15 @@ public void testRemoteStoreTranslogDisabledByUser() throws Exception {
.getIndex(new GetIndexRequest().indices("test-idx-1").includeDefaults(true))
.get();
Settings indexSettings = getIndexResponse.settings().get("test-idx-1");
verifyRemoteStoreIndexSettings(indexSettings, "true", "my-segment-repo-1", "false", null, ReplicationType.SEGMENT.toString(), null);
verifyRemoteStoreIndexSettings(
indexSettings,
"true",
"my-segment-repo-1",
"false",
null,
ReplicationType.SEGMENT.toString(),
IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL
);
}

public void testRemoteStoreOverrideOnlyTranslogRepoIllegalArgumentException() throws Exception {
Expand Down Expand Up @@ -307,7 +325,15 @@ public void testRemoteStoreOverrideTranslogDisabledCorrectly() throws Exception
.getIndex(new GetIndexRequest().indices("test-idx-1").includeDefaults(true))
.get();
Settings indexSettings = getIndexResponse.settings().get("test-idx-1");
verifyRemoteStoreIndexSettings(indexSettings, "true", "my-custom-repo", "false", null, ReplicationType.SEGMENT.toString(), null);
verifyRemoteStoreIndexSettings(
indexSettings,
"true",
"my-custom-repo",
"false",
null,
ReplicationType.SEGMENT.toString(),
IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL
);
}

public void testRemoteStoreOverrideTranslogDisabledWithTranslogRepoIllegalArgumentException() throws Exception {
Expand Down Expand Up @@ -376,7 +402,7 @@ public void testRemoteStoreOverrideTranslogRepoCorrectly() throws Exception {
"true",
"my-custom-repo",
ReplicationType.SEGMENT.toString(),
null
IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL
);
}

Expand All @@ -392,7 +418,15 @@ public void testRemoteStoreOverrideReplicationTypeIndexSettings() throws Excepti
.getIndex(new GetIndexRequest().indices("test-idx-1").includeDefaults(true))
.get();
Settings indexSettings = getIndexResponse.settings().get("test-idx-1");
verifyRemoteStoreIndexSettings(indexSettings, null, null, null, null, ReplicationType.DOCUMENT.toString(), null);
verifyRemoteStoreIndexSettings(
indexSettings,
null,
null,
null,
null,
ReplicationType.DOCUMENT.toString(),
IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL
);
}

protected void verifyRemoteStoreIndexSettings(
Expand All @@ -402,14 +436,14 @@ protected void verifyRemoteStoreIndexSettings(
String isRemoteTranslogEnabled,
String remoteTranslogRepo,
String replicationType,
String translogBufferInterval
TimeValue translogBufferInterval
) {
assertEquals(replicationType, indexSettings.get(SETTING_REPLICATION_TYPE));
assertEquals(isRemoteSegmentEnabled, indexSettings.get(SETTING_REMOTE_STORE_ENABLED));
assertEquals(remoteSegmentRepo, indexSettings.get(SETTING_REMOTE_STORE_REPOSITORY));
assertEquals(isRemoteTranslogEnabled, indexSettings.get(SETTING_REMOTE_TRANSLOG_STORE_ENABLED));
assertEquals(remoteTranslogRepo, indexSettings.get(SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY));
assertEquals(translogBufferInterval, indexSettings.get(SETTING_REMOTE_TRANSLOG_BUFFER_INTERVAL));
assertEquals(translogBufferInterval, INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(indexSettings));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.action.admin.indices.get.GetIndexResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexSettings;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.OpenSearchIntegTestCase;

Expand Down Expand Up @@ -49,7 +50,15 @@ public void testRemoteStoreEnabledByUserWithRemoteRepo() throws Exception {
.getIndex(new GetIndexRequest().indices("test-idx-1").includeDefaults(true))
.get();
Settings indexSettings = getIndexResponse.settings().get("test-idx-1");
verifyRemoteStoreIndexSettings(indexSettings, "true", "my-custom-repo", null, null, ReplicationType.SEGMENT.toString(), null);
verifyRemoteStoreIndexSettings(
indexSettings,
"true",
"my-custom-repo",
null,
null,
ReplicationType.SEGMENT.toString(),
IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL
);
}

public void testDefaultRemoteStoreNoUserOverride() throws Exception {
Expand All @@ -63,7 +72,15 @@ public void testDefaultRemoteStoreNoUserOverride() throws Exception {
.getIndex(new GetIndexRequest().indices("test-idx-1").includeDefaults(true))
.get();
Settings indexSettings = getIndexResponse.settings().get("test-idx-1");
verifyRemoteStoreIndexSettings(indexSettings, "true", "my-segment-repo-1", null, null, ReplicationType.SEGMENT.toString(), null);
verifyRemoteStoreIndexSettings(
indexSettings,
"true",
"my-segment-repo-1",
null,
null,
ReplicationType.SEGMENT.toString(),
IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.core.common.Strings;
Expand Down Expand Up @@ -299,8 +298,6 @@ public Iterator<Setting<?>> settings() {

public static final String SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY = "index.remote_store.translog.repository";

public static final String SETTING_REMOTE_TRANSLOG_BUFFER_INTERVAL = "index.remote_store.translog.buffer_interval";

/**
* Used to specify if the index data should be persisted in the remote store.
*/
Expand Down Expand Up @@ -449,45 +446,6 @@ public Iterator<Setting<?>> settings() {
Property.Final
);

public static final Setting<TimeValue> INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING = Setting.timeSetting(
SETTING_REMOTE_TRANSLOG_BUFFER_INTERVAL,
TimeValue.timeValueMillis(100),
TimeValue.timeValueMillis(50),
new Setting.Validator<>() {

@Override
public void validate(final TimeValue value) {}

@Override
public void validate(final TimeValue value, final Map<Setting<?>, Object> settings) {
if (value == null) {
throw new IllegalArgumentException(
"Setting " + SETTING_REMOTE_TRANSLOG_BUFFER_INTERVAL + " should be provided with a valid time value"
);
} else {
final Boolean isRemoteTranslogStoreEnabled = (Boolean) settings.get(INDEX_REMOTE_TRANSLOG_STORE_ENABLED_SETTING);
if (isRemoteTranslogStoreEnabled == null || isRemoteTranslogStoreEnabled == false) {
throw new IllegalArgumentException(
"Setting "
+ SETTING_REMOTE_TRANSLOG_BUFFER_INTERVAL
+ " can only be set when "
+ SETTING_REMOTE_TRANSLOG_STORE_ENABLED
+ " is set to true"
);
}
}
}

@Override
public Iterator<Setting<?>> settings() {
final List<Setting<?>> settings = Collections.singletonList(INDEX_REMOTE_TRANSLOG_STORE_ENABLED_SETTING);
return settings.iterator();
}
},
Property.IndexScope,
Property.Final
);

public static final String SETTING_AUTO_EXPAND_REPLICAS = "index.auto_expand_replicas";
public static final Setting<AutoExpandReplicas> INDEX_AUTO_EXPAND_REPLICAS_SETTING = AutoExpandReplicas.SETTING;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexSettings.SEARCHABLE_SNAPSHOT_ID_NAME,
IndexSettings.SEARCHABLE_SNAPSHOT_ID_UUID,

// Settings for remote translog
IndexSettings.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,

// validate that built-in similarities don't get redefined
Setting.groupSetting("index.similarity.", (s) -> {
Map<String, Settings> groups = s.getAsGroups();
Expand Down Expand Up @@ -233,8 +236,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING,
IndexMetadata.INDEX_REMOTE_STORE_REPOSITORY_SETTING,
IndexMetadata.INDEX_REMOTE_TRANSLOG_STORE_ENABLED_SETTING,
IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING,
IndexMetadata.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING
IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING
),
FeatureFlags.CONCURRENT_SEGMENT_SEARCH,
List.of(IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Supplier;

/**
* A variant of {@link AsyncIOProcessor} that allows to batch and buffer processing items at every
* {@link BufferedAsyncIOProcessor#bufferInterval} in a separate threadpool.
* {@link BufferedAsyncIOProcessor#getBufferInterval()} in a separate threadpool.
* <p>
* Requests are buffered till processor thread calls @{link drainAndProcessAndRelease} after bufferInterval.
* If more requests are enqueued between invocations of drainAndProcessAndRelease, another processor thread
Expand All @@ -32,18 +33,18 @@
public abstract class BufferedAsyncIOProcessor<Item> extends AsyncIOProcessor<Item> {

private final ThreadPool threadpool;
private final TimeValue bufferInterval;
private final Supplier<TimeValue> bufferIntervalSupplier;

protected BufferedAsyncIOProcessor(
Logger logger,
int queueSize,
ThreadContext threadContext,
ThreadPool threadpool,
TimeValue bufferInterval
Supplier<TimeValue> bufferIntervalSupplier
) {
super(logger, queueSize, threadContext);
this.threadpool = threadpool;
this.bufferInterval = bufferInterval;
this.bufferIntervalSupplier = bufferIntervalSupplier;
}

@Override
Expand Down Expand Up @@ -81,11 +82,12 @@ private void process() {
}

private TimeValue getBufferInterval() {
long bufferInterval = bufferIntervalSupplier.get().getNanos();
long timeSinceLastRunStartInNS = System.nanoTime() - getLastRunStartTimeInNs();
if (timeSinceLastRunStartInNS >= bufferInterval.getNanos()) {
if (timeSinceLastRunStartInNS >= bufferInterval) {
return TimeValue.ZERO;
}
return TimeValue.timeValueNanos(bufferInterval.getNanos() - timeSinceLastRunStartInNS);
return TimeValue.timeValueNanos(bufferInterval - timeSinceLastRunStartInNS);
}

protected abstract String getBufferProcessThreadPoolName();
Expand Down
Loading

0 comments on commit 0e30972

Please sign in to comment.