From f054022cd533cafe660425f85638fde531491bab Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Mon, 13 Jan 2025 15:49:53 -0800 Subject: [PATCH] Adaptive rate limiting for OpenSearch bulk requests (#1011) * adaptive rate limiting - replace fail safe rate limiter for google guava's - move rate limiter from RestHighLevelClientWrapper to OpenSearchBulkRetryWrapper - add metrics for rate limit (now convert rate from double to int) - add spark conf for rate limit parameters - adjust rate limit based on retryable result percentage Signed-off-by: Sean Kao * metrics and test cases WIP Signed-off-by: Sean Kao * test case Signed-off-by: Sean Kao * shaded jar and configurable failure threshold Signed-off-by: Sean Kao * update default values; add doc Signed-off-by: Sean Kao * rename OpenSearchBulkRetryWrapper (remove Retry) Signed-off-by: Sean Kao * remove failure threshold Signed-off-by: Sean Kao * update metric name suffix to comply with setting Signed-off-by: Sean Kao * remove bulk failure percentage metric Signed-off-by: Sean Kao * change rate from double to long Signed-off-by: Sean Kao * fix spark conf name Signed-off-by: Sean Kao * change default value Signed-off-by: Sean Kao * address comments - swap parameter for test case asserts - remove excessive null check (create noop impl for rate limiter) Signed-off-by: Sean Kao --------- Signed-off-by: Sean Kao --- build.sbt | 2 + docs/index.md | 6 +- .../core/RestHighLevelClientWrapper.java | 19 +-- .../flint/core/metrics/MetricConstants.java | 5 + .../opensearch/flint/core/FlintOptions.java | 33 +++- .../core/storage/BulkRequestRateLimiter.java | 33 ++-- .../storage/BulkRequestRateLimiterHolder.java | 6 +- .../storage/BulkRequestRateLimiterImpl.java | 83 ++++++++++ .../storage/BulkRequestRateLimiterNoop.java | 37 +++++ ...rapper.java => OpenSearchBulkWrapper.java} | 34 ++-- .../core/storage/OpenSearchClientUtils.java | 4 +- .../storage/BulkRequestRateLimiterTest.java | 31 +++- ...st.java => OpenSearchBulkWrapperTest.java} | 145 +++++++++++++++--- .../sql/flint/config/FlintSparkConf.scala | 45 +++++- .../flint/config/FlintSparkConfSuite.scala | 25 ++- 15 files changed, 412 insertions(+), 96 deletions(-) create mode 100644 flint-core/src/main/scala/org/opensearch/flint/core/storage/BulkRequestRateLimiterImpl.java create mode 100644 flint-core/src/main/scala/org/opensearch/flint/core/storage/BulkRequestRateLimiterNoop.java rename flint-core/src/main/scala/org/opensearch/flint/core/storage/{OpenSearchBulkRetryWrapper.java => OpenSearchBulkWrapper.java} (77%) rename flint-core/src/test/scala/org/opensearch/flint/core/storage/{OpenSearchBulkRetryWrapperTest.java => OpenSearchBulkWrapperTest.java} (54%) diff --git a/build.sbt b/build.sbt index 154f3370c..4d585b933 100644 --- a/build.sbt +++ b/build.sbt @@ -58,6 +58,7 @@ val packagesToShade = Seq( "com.fasterxml.jackson.core.**", "com.fasterxml.jackson.dataformat.**", "com.fasterxml.jackson.databind.**", + "com.google.**", "com.sun.jna.**", "com.thoughtworks.paranamer.**", "javax.annotation.**", @@ -120,6 +121,7 @@ lazy val flintCore = (project in file("flint-core")) exclude ("org.apache.httpcomponents.client5", "httpclient5"), "org.opensearch" % "opensearch-job-scheduler-spi" % opensearchMavenVersion, "dev.failsafe" % "failsafe" % "3.3.2", + "com.google.guava" % "guava" % "33.3.1-jre", "com.amazonaws" % "aws-java-sdk" % "1.12.397" % "provided" exclude ("com.fasterxml.jackson.core", "jackson-databind"), "com.amazonaws" % "aws-java-sdk-cloudwatch" % "1.12.593" diff --git a/docs/index.md b/docs/index.md index 684ba7da6..ac131c6a1 100644 --- a/docs/index.md +++ b/docs/index.md @@ -535,7 +535,11 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i - `spark.datasource.flint.write.batch_size`: "The number of documents written to Flint in a single batch request. Default value is Integer.MAX_VALUE. - `spark.datasource.flint.write.batch_bytes`: The approximately amount of data in bytes written to Flint in a single batch request. The actual data write to OpenSearch may more than it. Default value is 1mb. The writing process checks after each document whether the total number of documents (docCount) has reached batch_size or the buffer size has surpassed batch_bytes. If either condition is met, the current batch is flushed and the document count resets to zero. - `spark.datasource.flint.write.refresh_policy`: default value is false. valid values [NONE(false), IMMEDIATE(true), WAIT_UNTIL(wait_for)] -- `spark.datasource.flint.write.bulkRequestRateLimitPerNode`: [Experimental] Rate limit(request/sec) for bulk request per worker node. Only accept integer value. To reduce the traffic less than 1 req/sec, batch_bytes or batch_size should be reduced. Default value is 0, which disables rate limit. +- `spark.datasource.flint.write.bulk.rate_limit_per_node.enabled`: [Experimental] Enable rate limit for bulk request per worker node. Default is false. +- `spark.datasource.flint.write.bulk.rate_limit_per_node.min`: [Experimental] Lower limit (documents/sec) for adaptive rate limiting for bulk request per worker node, if rate limit enabled. The adaptive rate will not drop below this value. Must be greater than 0. Default is 5000. +- `spark.datasource.flint.write.bulk.rate_limit_per_node.max`: [Experimental] Upper limit (documents/sec) for adaptive rate limiting for bulk request per worker node, if rate limit enabled. The adaptive rate will not exceed this value. Set to -1 for no upper bound. Default is 50000. +- `spark.datasource.flint.write.bulk.rate_limit_per_node.increase_step`: [Experimental] Adaptive rate limit increase step for bulk request per worker node, if rate limit enabled. Must be greater than 0. Default is 500. +- `spark.datasource.flint.write.bulk.rate_limit_per_node.decrease_ratio`: [Experimental] Adaptive rate limit decrease ratio for bulk request per worker node, if rate limit enabled. Must be between 0 and 1. Default is 0.8. - `spark.datasource.flint.read.scroll_size`: default value is 100. - `spark.datasource.flint.read.scroll_duration`: default value is 5 minutes. scroll context keep alive duration. - `spark.datasource.flint.retry.max_retries`: max retries on failed HTTP request. default value is 3. Use 0 to disable retry. diff --git a/flint-core/src/main/java/org/opensearch/flint/core/RestHighLevelClientWrapper.java b/flint-core/src/main/java/org/opensearch/flint/core/RestHighLevelClientWrapper.java index 31f012256..7eb607963 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/RestHighLevelClientWrapper.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/RestHighLevelClientWrapper.java @@ -39,8 +39,7 @@ import org.opensearch.client.transport.rest_client.RestClientTransport; import java.io.IOException; -import org.opensearch.flint.core.storage.BulkRequestRateLimiter; -import org.opensearch.flint.core.storage.OpenSearchBulkRetryWrapper; +import org.opensearch.flint.core.storage.OpenSearchBulkWrapper; import static org.opensearch.flint.core.metrics.MetricConstants.OS_BULK_OP_METRIC_PREFIX; import static org.opensearch.flint.core.metrics.MetricConstants.OS_CREATE_OP_METRIC_PREFIX; @@ -54,8 +53,7 @@ */ public class RestHighLevelClientWrapper implements IRestHighLevelClient { private final RestHighLevelClient client; - private final BulkRequestRateLimiter rateLimiter; - private final OpenSearchBulkRetryWrapper bulkRetryWrapper; + private final OpenSearchBulkWrapper bulkRetryWrapper; private final static JacksonJsonpMapper JACKSON_MAPPER = new JacksonJsonpMapper(); @@ -64,22 +62,15 @@ public class RestHighLevelClientWrapper implements IRestHighLevelClient { * * @param client the RestHighLevelClient instance to wrap */ - public RestHighLevelClientWrapper(RestHighLevelClient client, BulkRequestRateLimiter rateLimiter, OpenSearchBulkRetryWrapper bulkRetryWrapper) { + public RestHighLevelClientWrapper(RestHighLevelClient client, OpenSearchBulkWrapper bulkRetryWrapper) { this.client = client; - this.rateLimiter = rateLimiter; this.bulkRetryWrapper = bulkRetryWrapper; } @Override public BulkResponse bulk(BulkRequest bulkRequest, RequestOptions options) throws IOException { - return execute(() -> { - try { - rateLimiter.acquirePermit(); - return bulkRetryWrapper.bulkWithPartialRetry(client, bulkRequest, options); - } catch (InterruptedException e) { - throw new RuntimeException("rateLimiter.acquirePermit was interrupted.", e); - } - }, OS_WRITE_OP_METRIC_PREFIX, OS_BULK_OP_METRIC_PREFIX); + return execute(() -> bulkRetryWrapper.bulk(client, bulkRequest, options), + OS_WRITE_OP_METRIC_PREFIX, OS_BULK_OP_METRIC_PREFIX); } @Override diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java index 79e70b8c2..3a08047d8 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java @@ -40,6 +40,11 @@ public final class MetricConstants { public static final String OPENSEARCH_BULK_RETRY_COUNT_METRIC = "opensearch.bulk.retry.count"; public static final String OPENSEARCH_BULK_ALL_RETRY_FAILED_COUNT_METRIC = "opensearch.bulk.allRetryFailed.count"; + /** + * Metric name for opensearch bulk request rate limit + */ + public static final String OS_BULK_RATE_LIMIT_METRIC = "opensearch.bulk.rateLimit.documentsPerSecond.count"; + /** * Metric name for counting the errors encountered with Amazon S3 operations. */ diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java index f9d181b70..578468e06 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java @@ -111,8 +111,17 @@ public class FlintOptions implements Serializable { private static final String UNKNOWN = "UNKNOWN"; - public static final String BULK_REQUEST_RATE_LIMIT_PER_NODE = "bulkRequestRateLimitPerNode"; - public static final String DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE = "0"; + public static final String BULK_REQUEST_RATE_LIMIT_PER_NODE_ENABLED = "write.bulk.rate_limit_per_node.enabled"; + public static final String DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_ENABLED = "false"; + public static final String BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE = "write.bulk.rate_limit_per_node.min"; + public static final String DEFAULT_BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE = "5000"; + public static final String BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE = "write.bulk.rate_limit_per_node.max"; + public static final String DEFAULT_BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE = "50000"; + public static final String BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP = "write.bulk.rate_limit_per_node.increase_step"; + public static final String DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP = "500"; + public static final String BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO = "write.bulk.rate_limit_per_node.decrease_ratio"; + public static final String DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO = "0.8"; + public static final String DEFAULT_EXTERNAL_SCHEDULER_INTERVAL = "5 minutes"; public FlintOptions(Map options) { @@ -234,8 +243,24 @@ public boolean supportShard() { DEFAULT_SUPPORT_SHARD); } - public long getBulkRequestRateLimitPerNode() { - return Long.parseLong(options.getOrDefault(BULK_REQUEST_RATE_LIMIT_PER_NODE, DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE)); + public boolean getBulkRequestRateLimitPerNodeEnabled() { + return Boolean.parseBoolean(options.getOrDefault(BULK_REQUEST_RATE_LIMIT_PER_NODE_ENABLED, DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_ENABLED)); + } + + public long getBulkRequestMinRateLimitPerNode() { + return Long.parseLong(options.getOrDefault(BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE, DEFAULT_BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE)); + } + + public long getBulkRequestMaxRateLimitPerNode() { + return Long.parseLong(options.getOrDefault(BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE, DEFAULT_BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE)); + } + + public long getBulkRequestRateLimitPerNodeIncreaseStep() { + return Long.parseLong(options.getOrDefault(BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP, DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP)); + } + + public double getBulkRequestRateLimitPerNodeDecreaseRatio() { + return Double.parseDouble(options.getOrDefault(BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO, DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO)); } public String getCustomAsyncQuerySchedulerClass() { diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/BulkRequestRateLimiter.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/BulkRequestRateLimiter.java index 797dc2d02..77a9ca6f8 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/BulkRequestRateLimiter.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/BulkRequestRateLimiter.java @@ -5,31 +5,16 @@ package org.opensearch.flint.core.storage; -import dev.failsafe.RateLimiter; -import java.time.Duration; -import java.util.logging.Logger; -import org.opensearch.flint.core.FlintOptions; +public interface BulkRequestRateLimiter { + void acquirePermit(); -public class BulkRequestRateLimiter { - private static final Logger LOG = Logger.getLogger(BulkRequestRateLimiter.class.getName()); - private RateLimiter rateLimiter; + void acquirePermit(int permits); - public BulkRequestRateLimiter(FlintOptions flintOptions) { - long bulkRequestRateLimitPerNode = flintOptions.getBulkRequestRateLimitPerNode(); - if (bulkRequestRateLimitPerNode > 0) { - LOG.info("Setting rate limit for bulk request to " + bulkRequestRateLimitPerNode + "/sec"); - this.rateLimiter = RateLimiter.smoothBuilder( - flintOptions.getBulkRequestRateLimitPerNode(), - Duration.ofSeconds(1)).build(); - } else { - LOG.info("Rate limit for bulk request was not set."); - } - } + void increaseRate(); - // Wait so it won't exceed rate limit. Does nothing if rate limit is not set. - public void acquirePermit() throws InterruptedException { - if (rateLimiter != null) { - this.rateLimiter.acquirePermit(); - } - } + void decreaseRate(); + + long getRate(); + + void setRate(long permitsPerSecond); } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/BulkRequestRateLimiterHolder.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/BulkRequestRateLimiterHolder.java index 73fdb8843..cd05c8710 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/BulkRequestRateLimiterHolder.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/BulkRequestRateLimiterHolder.java @@ -20,7 +20,11 @@ private BulkRequestRateLimiterHolder() {} public synchronized static BulkRequestRateLimiter getBulkRequestRateLimiter( FlintOptions flintOptions) { if (instance == null) { - instance = new BulkRequestRateLimiter(flintOptions); + if (flintOptions.getBulkRequestRateLimitPerNodeEnabled()) { + instance = new BulkRequestRateLimiterImpl(flintOptions); + } else { + instance = new BulkRequestRateLimiterNoop(); + } } return instance; } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/BulkRequestRateLimiterImpl.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/BulkRequestRateLimiterImpl.java new file mode 100644 index 000000000..3dec19558 --- /dev/null +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/BulkRequestRateLimiterImpl.java @@ -0,0 +1,83 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.storage; + +import com.google.common.util.concurrent.RateLimiter; +import java.util.logging.Logger; +import org.opensearch.flint.core.FlintOptions; +import org.opensearch.flint.core.metrics.MetricConstants; +import org.opensearch.flint.core.metrics.MetricsUtil; + +public class BulkRequestRateLimiterImpl implements BulkRequestRateLimiter { + private static final Logger LOG = Logger.getLogger(BulkRequestRateLimiterImpl.class.getName()); + private RateLimiter rateLimiter; + + private final long minRate; + private final long maxRate; + private final long increaseStep; + private final double decreaseRatio; + + public BulkRequestRateLimiterImpl(FlintOptions flintOptions) { + minRate = flintOptions.getBulkRequestMinRateLimitPerNode(); + maxRate = flintOptions.getBulkRequestMaxRateLimitPerNode(); + increaseStep = flintOptions.getBulkRequestRateLimitPerNodeIncreaseStep(); + decreaseRatio = flintOptions.getBulkRequestRateLimitPerNodeDecreaseRatio(); + + LOG.info("Setting rate limit for bulk request to " + minRate + " documents/sec"); + this.rateLimiter = RateLimiter.create(minRate); + MetricsUtil.addHistoricGauge(MetricConstants.OS_BULK_RATE_LIMIT_METRIC, minRate); + } + + // Wait so it won't exceed rate limit. Does nothing if rate limit is not set. + @Override + public void acquirePermit() { + this.rateLimiter.acquire(); + LOG.info("Acquired 1 permit"); + } + + @Override + public void acquirePermit(int permits) { + this.rateLimiter.acquire(permits); + LOG.info("Acquired " + permits + " permits"); + } + + /** + * Increase rate limit additively. + */ + @Override + public void increaseRate() { + setRate(getRate() + increaseStep); + } + + /** + * Decrease rate limit multiplicatively. + */ + @Override + public void decreaseRate() { + setRate((long) (getRate() * decreaseRatio)); + } + + @Override + public long getRate() { + return (long) this.rateLimiter.getRate(); + } + + /** + * Set rate limit to the given value, clamped by minRate and maxRate. Non-positive maxRate means + * there's no maximum rate restriction, and the rate can be set to any value greater than + * minRate. + */ + @Override + public void setRate(long permitsPerSecond) { + if (maxRate > 0) { + permitsPerSecond = Math.min(permitsPerSecond, maxRate); + } + permitsPerSecond = Math.max(minRate, permitsPerSecond); + LOG.info("Setting rate limit for bulk request to " + permitsPerSecond + " documents/sec"); + this.rateLimiter.setRate(permitsPerSecond); + MetricsUtil.addHistoricGauge(MetricConstants.OS_BULK_RATE_LIMIT_METRIC, permitsPerSecond); + } +} diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/BulkRequestRateLimiterNoop.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/BulkRequestRateLimiterNoop.java new file mode 100644 index 000000000..47abd6a46 --- /dev/null +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/BulkRequestRateLimiterNoop.java @@ -0,0 +1,37 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.storage; + +import java.util.logging.Logger; + +public class BulkRequestRateLimiterNoop implements BulkRequestRateLimiter { + private static final Logger LOG = Logger.getLogger(BulkRequestRateLimiterNoop.class.getName()); + + public BulkRequestRateLimiterNoop() { + LOG.info("Rate limit for bulk request was not set."); + } + + @Override + public void acquirePermit() {} + + @Override + public void acquirePermit(int permits) {} + + @Override + public void increaseRate() {} + + @Override + public void decreaseRate() {} + + @Override + public long getRate() { + return 0; + } + + @Override + public void setRate(long permitsPerSecond) {} +} + diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchBulkRetryWrapper.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchBulkWrapper.java similarity index 77% rename from flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchBulkRetryWrapper.java rename to flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchBulkWrapper.java index 14e3b7099..935894a31 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchBulkRetryWrapper.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchBulkWrapper.java @@ -25,25 +25,36 @@ import org.opensearch.flint.core.metrics.MetricsUtil; import org.opensearch.rest.RestStatus; -public class OpenSearchBulkRetryWrapper { +/** + * Wrapper class for OpenSearch bulk API with retry and rate limiting capability. + */ +public class OpenSearchBulkWrapper { - private static final Logger LOG = Logger.getLogger(OpenSearchBulkRetryWrapper.class.getName()); + private static final Logger LOG = Logger.getLogger(OpenSearchBulkWrapper.class.getName()); private final RetryPolicy retryPolicy; + private final BulkRequestRateLimiter rateLimiter; - public OpenSearchBulkRetryWrapper(FlintRetryOptions retryOptions) { + public OpenSearchBulkWrapper(FlintRetryOptions retryOptions, BulkRequestRateLimiter rateLimiter) { this.retryPolicy = retryOptions.getBulkRetryPolicy(bulkItemRetryableResultPredicate); + this.rateLimiter = rateLimiter; } /** - * Delegate bulk request to the client, and retry the request if the response contains retryable - * failure. It won't retry when bulk call thrown exception. + * Bulk request with retry and rate limiting. Delegate bulk request to the client, and retry the + * request if the response contains retryable failure. It won't retry when bulk call thrown + * exception. In addition, adjust rate limit based on the responses. * @param client used to call bulk API * @param bulkRequest requests passed to bulk method * @param options options passed to bulk method * @return Last result */ - public BulkResponse bulkWithPartialRetry(RestHighLevelClient client, BulkRequest bulkRequest, + public BulkResponse bulk(RestHighLevelClient client, BulkRequest bulkRequest, RequestOptions options) { + rateLimiter.acquirePermit(bulkRequest.requests().size()); + return bulkWithPartialRetry(client, bulkRequest, options); + } + + private BulkResponse bulkWithPartialRetry(RestHighLevelClient client, BulkRequest bulkRequest, RequestOptions options) { final AtomicInteger requestCount = new AtomicInteger(0); try { @@ -59,9 +70,14 @@ public BulkResponse bulkWithPartialRetry(RestHighLevelClient client, BulkRequest .get(() -> { requestCount.incrementAndGet(); BulkResponse response = client.bulk(nextRequest.get(), options); - if (retryPolicy.getConfig().allowsRetries() && bulkItemRetryableResultPredicate.test( - response)) { - nextRequest.set(getRetryableRequest(nextRequest.get(), response)); + + if (!bulkItemRetryableResultPredicate.test(response)) { + rateLimiter.increaseRate(); + } else { + rateLimiter.decreaseRate(); + if (retryPolicy.getConfig().allowsRetries()) { + nextRequest.set(getRetryableRequest(nextRequest.get(), response)); + } } return response; }); diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchClientUtils.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchClientUtils.java index 8bf80e90e..841725e71 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchClientUtils.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchClientUtils.java @@ -69,8 +69,8 @@ public static RestHighLevelClient createRestHighLevelClient(FlintOptions options public static IRestHighLevelClient createClient(FlintOptions options) { return new RestHighLevelClientWrapper(createRestHighLevelClient(options), - BulkRequestRateLimiterHolder.getBulkRequestRateLimiter(options), - new OpenSearchBulkRetryWrapper(options.getRetryOptions())); + new OpenSearchBulkWrapper(options.getRetryOptions(), + BulkRequestRateLimiterHolder.getBulkRequestRateLimiter(options))); } /** diff --git a/flint-core/src/test/scala/org/opensearch/flint/core/storage/BulkRequestRateLimiterTest.java b/flint-core/src/test/scala/org/opensearch/flint/core/storage/BulkRequestRateLimiterTest.java index b87c9f797..44fe95d48 100644 --- a/flint-core/src/test/scala/org/opensearch/flint/core/storage/BulkRequestRateLimiterTest.java +++ b/flint-core/src/test/scala/org/opensearch/flint/core/storage/BulkRequestRateLimiterTest.java @@ -6,35 +6,52 @@ package org.opensearch.flint.core.storage; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import com.google.common.collect.ImmutableMap; import org.junit.jupiter.api.Test; import org.opensearch.flint.core.FlintOptions; +/** + * These tests are largely dependent on the choice of the underlying rate limiter. While conceptually + * they all distribute permits at some rate, the actual behavior varies based on implementation. + * To avoid flakiness and creating test cases for specific implementation, we measure the time required + * for acquiring several permits, and set lenient thresholds. + */ class BulkRequestRateLimiterTest { - FlintOptions flintOptionsWithRateLimit = new FlintOptions(ImmutableMap.of(FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE, "1")); - FlintOptions flintOptionsWithoutRateLimit = new FlintOptions(ImmutableMap.of(FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE, "0")); @Test void acquirePermitWithRateConfig() throws Exception { - BulkRequestRateLimiter limiter = new BulkRequestRateLimiter(flintOptionsWithRateLimit); + FlintOptions options = new FlintOptions(ImmutableMap.of( + FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_ENABLED, "true", + FlintOptions.BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE, "1")); + BulkRequestRateLimiter limiter = new BulkRequestRateLimiterImpl(options); assertTrue(timer(() -> { limiter.acquirePermit(); limiter.acquirePermit(); - }) >= 1000); + limiter.acquirePermit(); + limiter.acquirePermit(); + limiter.acquirePermit(); + limiter.acquirePermit(); + }) >= 4500); + assertTrue(timer(() -> { + limiter.acquirePermit(5); + limiter.acquirePermit(); + }) >= 4500); } @Test void acquirePermitWithoutRateConfig() throws Exception { - BulkRequestRateLimiter limiter = new BulkRequestRateLimiter(flintOptionsWithoutRateLimit); + BulkRequestRateLimiter limiter = new BulkRequestRateLimiterNoop(); assertTrue(timer(() -> { limiter.acquirePermit(); limiter.acquirePermit(); + limiter.acquirePermit(); + limiter.acquirePermit(); + limiter.acquirePermit(); + limiter.acquirePermit(); }) < 100); } diff --git a/flint-core/src/test/scala/org/opensearch/flint/core/storage/OpenSearchBulkRetryWrapperTest.java b/flint-core/src/test/scala/org/opensearch/flint/core/storage/OpenSearchBulkWrapperTest.java similarity index 54% rename from flint-core/src/test/scala/org/opensearch/flint/core/storage/OpenSearchBulkRetryWrapperTest.java rename to flint-core/src/test/scala/org/opensearch/flint/core/storage/OpenSearchBulkWrapperTest.java index 43bd8d2b2..79ed51825 100644 --- a/flint-core/src/test/scala/org/opensearch/flint/core/storage/OpenSearchBulkRetryWrapperTest.java +++ b/flint-core/src/test/scala/org/opensearch/flint/core/storage/OpenSearchBulkWrapperTest.java @@ -28,13 +28,14 @@ import org.opensearch.action.index.IndexRequest; import org.opensearch.client.RequestOptions; import org.opensearch.client.RestHighLevelClient; +import org.opensearch.flint.core.FlintOptions; import org.opensearch.flint.core.http.FlintRetryOptions; import org.opensearch.flint.core.metrics.MetricConstants; import org.opensearch.flint.core.metrics.MetricsTestUtil; import org.opensearch.rest.RestStatus; @ExtendWith(MockitoExtension.class) -class OpenSearchBulkRetryWrapperTest { +class OpenSearchBulkWrapperTest { private static final long ESTIMATED_SIZE_IN_BYTES = 1000L; @Mock @@ -46,6 +47,8 @@ class OpenSearchBulkRetryWrapperTest { @Mock BulkResponse failureResponse; @Mock + BulkResponse retriedResponse; + @Mock BulkResponse conflictResponse; @Mock RestHighLevelClient client; @@ -66,18 +69,27 @@ class OpenSearchBulkRetryWrapperTest { FlintRetryOptions retryOptionsWithoutRetry = new FlintRetryOptions( Map.of("retry.max_retries", "0")); + FlintOptions optionsWithRateLimit = new FlintOptions(Map.of( + FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_ENABLED, "true", + FlintOptions.BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE, "2", + FlintOptions.BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE, "20", + FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP, "1", + FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO, "0.5")); + @Test public void withRetryWhenCallSucceed() throws Exception { MetricsTestUtil.withMetricEnv(verifier -> { - OpenSearchBulkRetryWrapper bulkRetryWrapper = new OpenSearchBulkRetryWrapper( - retryOptionsWithRetry); + BulkRequestRateLimiter rateLimiter = new BulkRequestRateLimiterNoop(); + OpenSearchBulkWrapper bulkWrapper = new OpenSearchBulkWrapper( + retryOptionsWithRetry, rateLimiter); when(client.bulk(bulkRequest, options)).thenReturn(successResponse); when(successResponse.hasFailures()).thenReturn(false); + when(bulkRequest.requests()).thenReturn(ImmutableList.of(indexRequest0, indexRequest1)); when(bulkRequest.estimatedSizeInBytes()).thenReturn(ESTIMATED_SIZE_IN_BYTES); - BulkResponse response = bulkRetryWrapper.bulkWithPartialRetry(client, bulkRequest, options); + BulkResponse response = bulkWrapper.bulk(client, bulkRequest, options); - assertEquals(response, successResponse); + assertEquals(successResponse, response); verify(client).bulk(bulkRequest, options); verifier.assertHistoricGauge(MetricConstants.OPENSEARCH_BULK_SIZE_METRIC, ESTIMATED_SIZE_IN_BYTES); @@ -89,17 +101,19 @@ public void withRetryWhenCallSucceed() throws Exception { @Test public void withRetryWhenCallConflict() throws Exception { MetricsTestUtil.withMetricEnv(verifier -> { - OpenSearchBulkRetryWrapper bulkRetryWrapper = new OpenSearchBulkRetryWrapper( - retryOptionsWithRetry); + BulkRequestRateLimiter rateLimiter = new BulkRequestRateLimiterNoop(); + OpenSearchBulkWrapper bulkWrapper = new OpenSearchBulkWrapper( + retryOptionsWithRetry, rateLimiter); when(client.bulk(any(), eq(options))) .thenReturn(conflictResponse); mockConflictResponse(); when(conflictResponse.hasFailures()).thenReturn(true); + when(bulkRequest.requests()).thenReturn(ImmutableList.of(indexRequest0, indexRequest1)); when(bulkRequest.estimatedSizeInBytes()).thenReturn(ESTIMATED_SIZE_IN_BYTES); - BulkResponse response = bulkRetryWrapper.bulkWithPartialRetry(client, bulkRequest, options); + BulkResponse response = bulkWrapper.bulk(client, bulkRequest, options); - assertEquals(response, conflictResponse); + assertEquals(conflictResponse, response); verify(client).bulk(bulkRequest, options); verifier.assertHistoricGauge(MetricConstants.OPENSEARCH_BULK_SIZE_METRIC, ESTIMATED_SIZE_IN_BYTES); @@ -111,8 +125,9 @@ public void withRetryWhenCallConflict() throws Exception { @Test public void withRetryWhenCallFailOnce() throws Exception { MetricsTestUtil.withMetricEnv(verifier -> { - OpenSearchBulkRetryWrapper bulkRetryWrapper = new OpenSearchBulkRetryWrapper( - retryOptionsWithRetry); + BulkRequestRateLimiter rateLimiter = new BulkRequestRateLimiterNoop(); + OpenSearchBulkWrapper bulkWrapper = new OpenSearchBulkWrapper( + retryOptionsWithRetry, rateLimiter); when(client.bulk(any(), eq(options))) .thenReturn(failureResponse) .thenReturn(successResponse); @@ -121,9 +136,9 @@ public void withRetryWhenCallFailOnce() throws Exception { when(bulkRequest.requests()).thenReturn(ImmutableList.of(indexRequest0, indexRequest1)); when(bulkRequest.estimatedSizeInBytes()).thenReturn(ESTIMATED_SIZE_IN_BYTES); - BulkResponse response = bulkRetryWrapper.bulkWithPartialRetry(client, bulkRequest, options); + BulkResponse response = bulkWrapper.bulk(client, bulkRequest, options); - assertEquals(response, successResponse); + assertEquals(successResponse, response); verify(client, times(2)).bulk(any(), eq(options)); verifier.assertHistoricGauge(MetricConstants.OPENSEARCH_BULK_SIZE_METRIC, ESTIMATED_SIZE_IN_BYTES); verifier.assertHistoricGauge(MetricConstants.OPENSEARCH_BULK_RETRY_COUNT_METRIC, 1); @@ -134,16 +149,18 @@ public void withRetryWhenCallFailOnce() throws Exception { @Test public void withRetryWhenAllCallFail() throws Exception { MetricsTestUtil.withMetricEnv(verifier -> { - OpenSearchBulkRetryWrapper bulkRetryWrapper = new OpenSearchBulkRetryWrapper( - retryOptionsWithRetry); + BulkRequestRateLimiter rateLimiter = new BulkRequestRateLimiterNoop(); + OpenSearchBulkWrapper bulkWrapper = new OpenSearchBulkWrapper( + retryOptionsWithRetry, rateLimiter); when(client.bulk(any(), eq(options))) .thenReturn(failureResponse); + when(bulkRequest.requests()).thenReturn(ImmutableList.of(indexRequest0, indexRequest1)); when(bulkRequest.estimatedSizeInBytes()).thenReturn(ESTIMATED_SIZE_IN_BYTES); mockFailureResponse(); - BulkResponse response = bulkRetryWrapper.bulkWithPartialRetry(client, bulkRequest, options); + BulkResponse response = bulkWrapper.bulk(client, bulkRequest, options); - assertEquals(response, failureResponse); + assertEquals(failureResponse, response); verify(client, times(3)).bulk(any(), eq(options)); verifier.assertHistoricGauge(MetricConstants.OPENSEARCH_BULK_SIZE_METRIC, ESTIMATED_SIZE_IN_BYTES); verifier.assertHistoricGauge(MetricConstants.OPENSEARCH_BULK_RETRY_COUNT_METRIC, 2); @@ -154,13 +171,15 @@ public void withRetryWhenAllCallFail() throws Exception { @Test public void withRetryWhenCallThrowsShouldNotRetry() throws Exception { MetricsTestUtil.withMetricEnv(verifier -> { - OpenSearchBulkRetryWrapper bulkRetryWrapper = new OpenSearchBulkRetryWrapper( - retryOptionsWithRetry); + BulkRequestRateLimiter rateLimiter = new BulkRequestRateLimiterNoop(); + OpenSearchBulkWrapper bulkWrapper = new OpenSearchBulkWrapper( + retryOptionsWithRetry, rateLimiter); when(client.bulk(bulkRequest, options)).thenThrow(new RuntimeException("test")); + when(bulkRequest.requests()).thenReturn(ImmutableList.of(indexRequest0, indexRequest1)); when(bulkRequest.estimatedSizeInBytes()).thenReturn(ESTIMATED_SIZE_IN_BYTES); assertThrows(RuntimeException.class, - () -> bulkRetryWrapper.bulkWithPartialRetry(client, bulkRequest, options)); + () -> bulkWrapper.bulk(client, bulkRequest, options)); verify(client).bulk(bulkRequest, options); verifier.assertHistoricGauge(MetricConstants.OPENSEARCH_BULK_SIZE_METRIC, ESTIMATED_SIZE_IN_BYTES); @@ -172,16 +191,18 @@ public void withRetryWhenCallThrowsShouldNotRetry() throws Exception { @Test public void withoutRetryWhenCallFail() throws Exception { MetricsTestUtil.withMetricEnv(verifier -> { - OpenSearchBulkRetryWrapper bulkRetryWrapper = new OpenSearchBulkRetryWrapper( - retryOptionsWithoutRetry); + BulkRequestRateLimiter rateLimiter = new BulkRequestRateLimiterNoop(); + OpenSearchBulkWrapper bulkWrapper = new OpenSearchBulkWrapper( + retryOptionsWithoutRetry, rateLimiter); when(client.bulk(bulkRequest, options)) .thenReturn(failureResponse); + when(bulkRequest.requests()).thenReturn(ImmutableList.of(indexRequest0, indexRequest1)); when(bulkRequest.estimatedSizeInBytes()).thenReturn(ESTIMATED_SIZE_IN_BYTES); mockFailureResponse(); - BulkResponse response = bulkRetryWrapper.bulkWithPartialRetry(client, bulkRequest, options); + BulkResponse response = bulkWrapper.bulk(client, bulkRequest, options); - assertEquals(response, failureResponse); + assertEquals(failureResponse, response); verify(client).bulk(bulkRequest, options); verifier.assertHistoricGauge(MetricConstants.OPENSEARCH_BULK_SIZE_METRIC, ESTIMATED_SIZE_IN_BYTES); verifier.assertHistoricGauge(MetricConstants.OPENSEARCH_BULK_RETRY_COUNT_METRIC, 0); @@ -189,6 +210,78 @@ public void withoutRetryWhenCallFail() throws Exception { }); } + @Test + public void increaseRateLimitWhenCallSucceed() throws Exception { + MetricsTestUtil.withMetricEnv(verifier -> { + BulkRequestRateLimiter rateLimiter = new BulkRequestRateLimiterImpl(optionsWithRateLimit); + OpenSearchBulkWrapper bulkWrapper = new OpenSearchBulkWrapper( + retryOptionsWithRetry, rateLimiter); + when(client.bulk(bulkRequest, options)).thenReturn(successResponse); + when(successResponse.hasFailures()).thenReturn(false); + when(bulkRequest.requests()).thenReturn(ImmutableList.of(indexRequest0, indexRequest1)); + when(bulkRequest.estimatedSizeInBytes()).thenReturn(ESTIMATED_SIZE_IN_BYTES); + + assertEquals(2, rateLimiter.getRate()); + + bulkWrapper.bulk(client, bulkRequest, options); + assertEquals(3, rateLimiter.getRate()); + + bulkWrapper.bulk(client, bulkRequest, options); + assertEquals(4, rateLimiter.getRate()); + + // Should not exceed max rate limit + rateLimiter.setRate(20); + bulkWrapper.bulk(client, bulkRequest, options); + assertEquals(20, rateLimiter.getRate()); + }); + } + + @Test + public void adjustRateLimitWithRetryWhenCallFailOnce() throws Exception { + MetricsTestUtil.withMetricEnv(verifier -> { + BulkRequestRateLimiter rateLimiter = new BulkRequestRateLimiterImpl(optionsWithRateLimit); + OpenSearchBulkWrapper bulkWrapper = new OpenSearchBulkWrapper( + retryOptionsWithRetry, rateLimiter); + when(client.bulk(any(), eq(options))) + .thenReturn(failureResponse) + .thenReturn(successResponse); + mockFailureResponse(); + when(successResponse.hasFailures()).thenReturn(false); + when(bulkRequest.requests()).thenReturn(ImmutableList.of(indexRequest0, indexRequest1)); + when(bulkRequest.estimatedSizeInBytes()).thenReturn(ESTIMATED_SIZE_IN_BYTES); + + rateLimiter.setRate(10); + + bulkWrapper.bulk(client, bulkRequest, options); + + // Should decrease once then increase once + assertEquals(6, rateLimiter.getRate()); + }); + } + + @Test + public void decreaseRateLimitWhenAllCallFail() throws Exception { + MetricsTestUtil.withMetricEnv(verifier -> { + BulkRequestRateLimiter rateLimiter = new BulkRequestRateLimiterImpl(optionsWithRateLimit); + OpenSearchBulkWrapper bulkWrapper = new OpenSearchBulkWrapper( + retryOptionsWithRetry, rateLimiter); + when(client.bulk(any(), eq(options))) + .thenReturn(failureResponse) + .thenReturn(retriedResponse); + when(bulkRequest.requests()).thenReturn(ImmutableList.of(indexRequest0, indexRequest1)); + when(bulkRequest.estimatedSizeInBytes()).thenReturn(ESTIMATED_SIZE_IN_BYTES); + mockFailureResponse(); + mockRetriedResponse(); + + rateLimiter.setRate(20); + + bulkWrapper.bulk(client, bulkRequest, options); + + // Should decrease three times + assertEquals(2.5, rateLimiter.getRate()); + }); + } + private void mockFailureResponse() { when(failureResponse.hasFailures()).thenReturn(true); when(failureResponse.getItems()).thenReturn(new BulkItemResponse[]{successItem, failureItem}); @@ -198,4 +291,8 @@ private void mockConflictResponse() { when(conflictResponse.hasFailures()).thenReturn(true); when(conflictResponse.getItems()).thenReturn(new BulkItemResponse[]{successItem, conflictItem}); } + private void mockRetriedResponse() { + when(retriedResponse.hasFailures()).thenReturn(true); + when(retriedResponse.getItems()).thenReturn(new BulkItemResponse[]{failureItem}); + } } diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala index 364a8a1de..faba2135f 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala @@ -136,11 +136,42 @@ object FlintSparkConf { .doc("max retries on failed HTTP request, 0 means retry is disabled, default is 3") .createWithDefault(String.valueOf(FlintRetryOptions.DEFAULT_MAX_RETRIES)) - val BULK_REQUEST_RATE_LIMIT_PER_NODE = - FlintConfig(s"spark.datasource.flint.${FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE}") + val BULK_REQUEST_RATE_LIMIT_PER_NODE_ENABLED = + FlintConfig( + s"spark.datasource.flint.${FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_ENABLED}") + .datasourceOption() + .doc("[Experimental] Enable adaptive rate limit for bulk request per worker node") + .createWithDefault(FlintOptions.DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_ENABLED) + + val BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE = + FlintConfig(s"spark.datasource.flint.${FlintOptions.BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE}") + .datasourceOption() + .doc( + "[Experimental] Lower limit (documents/sec) for adaptive rate limiting for bulk request per worker node, if rate limit enabled. " + + "The adaptive rate will not drop below this value. Must be greater than 0.") + .createWithDefault(FlintOptions.DEFAULT_BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE) + + val BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE = + FlintConfig(s"spark.datasource.flint.${FlintOptions.BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE}") + .datasourceOption() + .doc( + "[Experimental] Upper limit (documents/sec) for adaptive rate limiting for bulk request per worker node, if rate limit enabled. " + + "The adaptive rate will not exceed this value. Set to -1 for no upper bound.") + .createWithDefault(FlintOptions.DEFAULT_BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE) + + val BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP = + FlintConfig( + s"spark.datasource.flint.${FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP}") + .datasourceOption() + .doc("[Experimental] Adaptive rate limit increase step for bulk request per worker node, if rate limit enabled. Must be greater than 0.") + .createWithDefault(FlintOptions.DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP) + + val BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO = + FlintConfig( + s"spark.datasource.flint.${FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO}") .datasourceOption() - .doc("[Experimental] Rate limit (requests/sec) for bulk request per worker node. Rate won't be limited by default") - .createWithDefault(FlintOptions.DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE) + .doc("[Experimental] Adaptive rate limit decrease ratio for bulk request per worker node, if rate limit enabled. Must be between 0 and 1.") + .createWithDefault(FlintOptions.DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO) val RETRYABLE_HTTP_STATUS_CODES = FlintConfig(s"spark.datasource.flint.${FlintRetryOptions.RETRYABLE_HTTP_STATUS_CODES}") @@ -338,7 +369,11 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable AUTH, MAX_RETRIES, RETRYABLE_HTTP_STATUS_CODES, - BULK_REQUEST_RATE_LIMIT_PER_NODE, + BULK_REQUEST_RATE_LIMIT_PER_NODE_ENABLED, + BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE, + BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE, + BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP, + BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO, REGION, CUSTOM_AWS_CREDENTIALS_PROVIDER, SERVICE_NAME, diff --git a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala index 594322bae..ac9346562 100644 --- a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala +++ b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala @@ -64,14 +64,29 @@ class FlintSparkConfSuite extends FlintSuite { retryOptions.getRetryableExceptionClassNames.get() shouldBe "java.net.ConnectException" } - test("test bulkRequestRateLimitPerNode default value") { + test("test bulk request rate limit options default value") { val options = FlintSparkConf().flintOptions() - options.getBulkRequestRateLimitPerNode shouldBe 0 + options.getBulkRequestRateLimitPerNodeEnabled shouldBe false + options.getBulkRequestMinRateLimitPerNode shouldBe 5000 + options.getBulkRequestMaxRateLimitPerNode shouldBe 50000 + options.getBulkRequestRateLimitPerNodeIncreaseStep shouldBe 500 + options.getBulkRequestRateLimitPerNodeDecreaseRatio shouldBe 0.8 } - test("test specified bulkRequestRateLimitPerNode") { - val options = FlintSparkConf(Map("bulkRequestRateLimitPerNode" -> "5").asJava).flintOptions() - options.getBulkRequestRateLimitPerNode shouldBe 5 + test("test specified bulk request rate limit options") { + val options = FlintSparkConf( + Map( + "write.bulk.rate_limit_per_node.enabled" -> "true", + "write.bulk.rate_limit_per_node.min" -> "20", + "write.bulk.rate_limit_per_node.max" -> "200", + "write.bulk.rate_limit_per_node.increase_step" -> "20", + "write.bulk.rate_limit_per_node.decrease_ratio" -> "0.5").asJava) + .flintOptions() + options.getBulkRequestRateLimitPerNodeEnabled shouldBe true + options.getBulkRequestMinRateLimitPerNode shouldBe 20 + options.getBulkRequestMaxRateLimitPerNode shouldBe 200 + options.getBulkRequestRateLimitPerNodeIncreaseStep shouldBe 20 + options.getBulkRequestRateLimitPerNodeDecreaseRatio shouldBe 0.5 } test("test metadata access AWS credentials provider option") {