-
Notifications
You must be signed in to change notification settings - Fork 33
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'main' into ppl-projection-command
- Loading branch information
Showing
15 changed files
with
412 additions
and
96 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
83 changes: 83 additions & 0 deletions
83
flint-core/src/main/scala/org/opensearch/flint/core/storage/BulkRequestRateLimiterImpl.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.flint.core.storage; | ||
|
||
import com.google.common.util.concurrent.RateLimiter; | ||
import java.util.logging.Logger; | ||
import org.opensearch.flint.core.FlintOptions; | ||
import org.opensearch.flint.core.metrics.MetricConstants; | ||
import org.opensearch.flint.core.metrics.MetricsUtil; | ||
|
||
public class BulkRequestRateLimiterImpl implements BulkRequestRateLimiter { | ||
private static final Logger LOG = Logger.getLogger(BulkRequestRateLimiterImpl.class.getName()); | ||
private RateLimiter rateLimiter; | ||
|
||
private final long minRate; | ||
private final long maxRate; | ||
private final long increaseStep; | ||
private final double decreaseRatio; | ||
|
||
public BulkRequestRateLimiterImpl(FlintOptions flintOptions) { | ||
minRate = flintOptions.getBulkRequestMinRateLimitPerNode(); | ||
maxRate = flintOptions.getBulkRequestMaxRateLimitPerNode(); | ||
increaseStep = flintOptions.getBulkRequestRateLimitPerNodeIncreaseStep(); | ||
decreaseRatio = flintOptions.getBulkRequestRateLimitPerNodeDecreaseRatio(); | ||
|
||
LOG.info("Setting rate limit for bulk request to " + minRate + " documents/sec"); | ||
this.rateLimiter = RateLimiter.create(minRate); | ||
MetricsUtil.addHistoricGauge(MetricConstants.OS_BULK_RATE_LIMIT_METRIC, minRate); | ||
} | ||
|
||
// Wait so it won't exceed rate limit. Does nothing if rate limit is not set. | ||
@Override | ||
public void acquirePermit() { | ||
this.rateLimiter.acquire(); | ||
LOG.info("Acquired 1 permit"); | ||
} | ||
|
||
@Override | ||
public void acquirePermit(int permits) { | ||
this.rateLimiter.acquire(permits); | ||
LOG.info("Acquired " + permits + " permits"); | ||
} | ||
|
||
/** | ||
* Increase rate limit additively. | ||
*/ | ||
@Override | ||
public void increaseRate() { | ||
setRate(getRate() + increaseStep); | ||
} | ||
|
||
/** | ||
* Decrease rate limit multiplicatively. | ||
*/ | ||
@Override | ||
public void decreaseRate() { | ||
setRate((long) (getRate() * decreaseRatio)); | ||
} | ||
|
||
@Override | ||
public long getRate() { | ||
return (long) this.rateLimiter.getRate(); | ||
} | ||
|
||
/** | ||
* Set rate limit to the given value, clamped by minRate and maxRate. Non-positive maxRate means | ||
* there's no maximum rate restriction, and the rate can be set to any value greater than | ||
* minRate. | ||
*/ | ||
@Override | ||
public void setRate(long permitsPerSecond) { | ||
if (maxRate > 0) { | ||
permitsPerSecond = Math.min(permitsPerSecond, maxRate); | ||
} | ||
permitsPerSecond = Math.max(minRate, permitsPerSecond); | ||
LOG.info("Setting rate limit for bulk request to " + permitsPerSecond + " documents/sec"); | ||
this.rateLimiter.setRate(permitsPerSecond); | ||
MetricsUtil.addHistoricGauge(MetricConstants.OS_BULK_RATE_LIMIT_METRIC, permitsPerSecond); | ||
} | ||
} |
37 changes: 37 additions & 0 deletions
37
flint-core/src/main/scala/org/opensearch/flint/core/storage/BulkRequestRateLimiterNoop.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.flint.core.storage; | ||
|
||
import java.util.logging.Logger; | ||
|
||
public class BulkRequestRateLimiterNoop implements BulkRequestRateLimiter { | ||
private static final Logger LOG = Logger.getLogger(BulkRequestRateLimiterNoop.class.getName()); | ||
|
||
public BulkRequestRateLimiterNoop() { | ||
LOG.info("Rate limit for bulk request was not set."); | ||
} | ||
|
||
@Override | ||
public void acquirePermit() {} | ||
|
||
@Override | ||
public void acquirePermit(int permits) {} | ||
|
||
@Override | ||
public void increaseRate() {} | ||
|
||
@Override | ||
public void decreaseRate() {} | ||
|
||
@Override | ||
public long getRate() { | ||
return 0; | ||
} | ||
|
||
@Override | ||
public void setRate(long permitsPerSecond) {} | ||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.