From 7f1edb14d0045ba9c0188c28698bbcb50fa124d1 Mon Sep 17 00:00:00 2001 From: Alan Sheinberg Date: Tue, 1 Dec 2020 18:28:48 -0800 Subject: [PATCH 1/6] fix: Makes response codes rate limited as well as prints a message when it is hit --- .../server-config/config-reference.md | 10 ++-- .../ksql/configdef/ConfigValidators.java | 23 ++++++++ .../ksql/configdef/ConfigValidatorsTest.java | 52 +++++++++++++++++ .../ksql/api/server/LoggingHandler.java | 25 +-------- .../ksql/api/server/LoggingRateLimiter.java | 47 ++++++++++++++-- .../ksql/rest/server/KsqlRestConfig.java | 18 +++--- .../ksql/api/server/LoggingHandlerTest.java | 6 +- .../api/server/LoggingRateLimiterTest.java | 56 ++++++++++++++++--- 8 files changed, 186 insertions(+), 51 deletions(-) diff --git a/docs/operate-and-deploy/installation/server-config/config-reference.md b/docs/operate-and-deploy/installation/server-config/config-reference.md index 5e2048389c09..19a3f6912fa1 100644 --- a/docs/operate-and-deploy/installation/server-config/config-reference.md +++ b/docs/operate-and-deploy/installation/server-config/config-reference.md @@ -612,11 +612,13 @@ property has the value `KSQL_PROCESSING_LOG`. Toggles whether or not the processing log should include rows in log messages. By default, this property has the value `false`. -### ksql.logging.server.skipped.response.codes +### ksql.logging.server.rate.limited.response.codes -A comma-separated list of HTTP response codes to skip during server -request logging. This is useful for ignoring certain 4XX errors that you -might not want to show up in the logs. +A list of `path:rate_limit` pairs, to limit the rate of server request +logging. This is useful for limiting certain 4XX errors that you +might not want to blow up in the logs. +This setting enables seeing the logs when the request rate is low +and dropping them when they go over the threshold. ### ksql.logging.server.rate.limited.request.paths diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/configdef/ConfigValidators.java b/ksqldb-common/src/main/java/io/confluent/ksql/configdef/ConfigValidators.java index 4f8df043e6a3..1c53a232902c 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/configdef/ConfigValidators.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/configdef/ConfigValidators.java @@ -173,6 +173,29 @@ public static Validator intList() { }; } + public static Validator mapWithIntKeyDoubleValue() { + return (name, val) -> { + if (!(val instanceof String)) { + throw new ConfigException(name, val, "Must be a string"); + } + + final String str = (String) val; + final Map map = KsqlConfig.parseStringAsMap(name, str); + map.forEach((keyStr, valueStr) -> { + try { + Integer.parseInt(keyStr); + } catch (NumberFormatException e) { + throw new ConfigException(name, keyStr, "Not an int"); + } + try { + Double.parseDouble(valueStr); + } catch (NumberFormatException e) { + throw new ConfigException(name, valueStr, "Not a double"); + } + }); + }; + } + public static Validator mapWithDoubleValue() { return (name, val) -> { if (!(val instanceof String)) { diff --git a/ksqldb-common/src/test/java/io/confluent/ksql/configdef/ConfigValidatorsTest.java b/ksqldb-common/src/test/java/io/confluent/ksql/configdef/ConfigValidatorsTest.java index 863503d82577..aab2bfd1bf35 100644 --- a/ksqldb-common/src/test/java/io/confluent/ksql/configdef/ConfigValidatorsTest.java +++ b/ksqldb-common/src/test/java/io/confluent/ksql/configdef/ConfigValidatorsTest.java @@ -270,6 +270,58 @@ public void shouldThrowOnNoStringRegexList() { assertThat(e.getMessage(), containsString("validator should only be used with LIST of STRING defs")); } + @Test + public void shouldParseDoubleValueInMap_v1() { + // Given: + final Validator validator = ConfigValidators.mapWithDoubleValue(); + validator.ensureValid("propName", "foo:1.2"); + } + + @Test + public void shouldParseIntKeyDoubleValueInMap_v1() { + // Given: + final Validator validator = ConfigValidators.mapWithIntKeyDoubleValue(); + validator.ensureValid("propName", "123:1.2"); + } + + @Test + public void shouldThrowOnBadDoubleValueInMap() { + // Given: + final Validator validator = ConfigValidators.mapWithDoubleValue(); + + // When: + final Exception e = assertThrows( + ConfigException.class, + () -> validator.ensureValid("propName", "foo:abc") + ); + + // Then: + assertThat(e.getMessage(), + containsString("Invalid value abc for configuration propName: Not a double")); + } + + @Test + public void shouldThrowOnBadIntDoubleValueInMap() { + // Given: + final Validator validator = ConfigValidators.mapWithIntKeyDoubleValue(); + + // When: + final Exception e = assertThrows( + ConfigException.class, + () -> validator.ensureValid("propName", "1:abc") + ); + final Exception e2 = assertThrows( + ConfigException.class, + () -> validator.ensureValid("propName", "abc:1.2") + ); + + // Then: + assertThat(e.getMessage(), + containsString("Invalid value abc for configuration propName: Not a double")); + assertThat(e2.getMessage(), + containsString("Invalid value abc for configuration propName: Not an int")); + } + private enum TestEnum { FOO, BAR } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/LoggingHandler.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/LoggingHandler.java index 1c6981178aa9..f9d3560a99c0 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/LoggingHandler.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/LoggingHandler.java @@ -15,24 +15,17 @@ package io.confluent.ksql.api.server; -import static io.confluent.ksql.rest.server.KsqlRestConfig.KSQL_LOGGING_SERVER_SKIPPED_RESPONSE_CODES_CONFIG; import static java.util.Objects.requireNonNull; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableSet; -import com.google.common.util.concurrent.RateLimiter; import io.confluent.ksql.api.auth.ApiUser; -import io.confluent.ksql.rest.server.KsqlRestConfig; import io.vertx.core.Handler; import io.vertx.core.http.HttpMethod; import io.vertx.core.http.HttpVersion; import io.vertx.ext.web.RoutingContext; import io.vertx.ext.web.impl.Utils; import java.time.Clock; -import java.util.Map; import java.util.Optional; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,13 +34,10 @@ public class LoggingHandler implements Handler { private static final Logger LOG = LoggerFactory.getLogger(LoggingHandler.class); static final String HTTP_HEADER_USER_AGENT = "User-Agent"; - private final Set skipResponseCodes; private final Logger logger; private final Clock clock; private final LoggingRateLimiter loggingRateLimiter; - private final Map rateLimiters = new ConcurrentHashMap<>(); - public LoggingHandler(final Server server, final LoggingRateLimiter loggingRateLimiter) { this(server, loggingRateLimiter, LOG, Clock.systemUTC()); } @@ -60,7 +50,6 @@ public LoggingHandler(final Server server, final LoggingRateLimiter loggingRateL final Clock clock) { requireNonNull(server); this.loggingRateLimiter = requireNonNull(loggingRateLimiter); - this.skipResponseCodes = getSkipResponseCodes(server.getConfig()); this.logger = logger; this.clock = clock; } @@ -69,17 +58,14 @@ public LoggingHandler(final Server server, final LoggingRateLimiter loggingRateL public void handle(final RoutingContext routingContext) { routingContext.addEndHandler(ar -> { // After the response is complete, log results here. - if (skipResponseCodes.contains(routingContext.response().getStatusCode())) { - return; - } - if (!loggingRateLimiter.shouldLog(routingContext.request().path())) { + final int status = routingContext.request().response().getStatusCode(); + if (!loggingRateLimiter.shouldLog(logger, routingContext.request().path(), status)) { return; } final long contentLength = routingContext.request().response().bytesWritten(); final HttpVersion version = routingContext.request().version(); final HttpMethod method = routingContext.request().method(); final String uri = routingContext.request().uri(); - final int status = routingContext.request().response().getStatusCode(); final long requestBodyLength = routingContext.request().bytesRead(); final String versionFormatted; switch (version) { @@ -118,13 +104,6 @@ public void handle(final RoutingContext routingContext) { routingContext.next(); } - private static Set getSkipResponseCodes(final KsqlRestConfig config) { - // Already validated as all ints - return config.getList(KSQL_LOGGING_SERVER_SKIPPED_RESPONSE_CODES_CONFIG) - .stream() - .map(Integer::parseInt).collect(ImmutableSet.toImmutableSet()); - } - private void doLog(final int status, final String message) { if (status >= 500) { logger.error(message); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/LoggingRateLimiter.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/LoggingRateLimiter.java index 4424443e2676..d2f40fc6d990 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/LoggingRateLimiter.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/LoggingRateLimiter.java @@ -16,6 +16,7 @@ package io.confluent.ksql.api.server; import static io.confluent.ksql.rest.server.KsqlRestConfig.KSQL_LOGGING_SERVER_RATE_LIMITED_REQUEST_PATHS_CONFIG; +import static io.confluent.ksql.rest.server.KsqlRestConfig.KSQL_LOGGING_SERVER_RATE_LIMITED_RESPONSE_CODES_CONFIG; import static java.util.Objects.requireNonNull; import com.google.common.annotations.VisibleForTesting; @@ -26,13 +27,23 @@ import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; +import org.slf4j.Logger; class LoggingRateLimiter { + // Print "You hit a rate limit" every 5 seconds + private static final double LIMIT_HIT_LOG_RATE = 0.2; private final Map rateLimitedPaths; + private final Map rateLimitedResponseCodes; + private final Function rateLimiterFactory; - private final Map rateLimiters = new ConcurrentHashMap<>(); + private final Map rateLimitersByPath = new ConcurrentHashMap<>(); + private final Map rateLimitersByResponseCode = new ConcurrentHashMap<>(); + + // Rate limiters for printing the "You hit a rate limit" message + private final RateLimiter pathLimitHit; + private final RateLimiter responseCodeLimitHit; LoggingRateLimiter(final KsqlRestConfig ksqlRestConfig) { this(ksqlRestConfig, RateLimiter::create); @@ -45,13 +56,33 @@ class LoggingRateLimiter { requireNonNull(ksqlRestConfig); this.rateLimiterFactory = requireNonNull(rateLimiterFactory); this.rateLimitedPaths = getRateLimitedRequestPaths(ksqlRestConfig); + this.rateLimitedResponseCodes = getRateLimitedResponseCodes(ksqlRestConfig); + this.pathLimitHit = rateLimiterFactory.apply(LIMIT_HIT_LOG_RATE); + this.responseCodeLimitHit = rateLimiterFactory.apply(LIMIT_HIT_LOG_RATE); } - public boolean shouldLog(final String path) { + public boolean shouldLog(final Logger logger, final String path, final int responseCode) { if (rateLimitedPaths.containsKey(path)) { final double rateLimit = rateLimitedPaths.get(path); - rateLimiters.computeIfAbsent(path, (k) -> rateLimiterFactory.apply(rateLimit)); - return rateLimiters.get(path).tryAcquire(); + rateLimitersByPath.computeIfAbsent(path, (k) -> rateLimiterFactory.apply(rateLimit)); + if (!rateLimitersByPath.get(path).tryAcquire()) { + if (pathLimitHit.tryAcquire()) { + logger.info("Hit rate limit for path " + path + " with limit " + rateLimit); + } + return false; + } + } + if (rateLimitedResponseCodes.containsKey(responseCode)) { + final double rateLimit = rateLimitedResponseCodes.get(responseCode); + rateLimitersByResponseCode.computeIfAbsent( + responseCode, (k) -> rateLimiterFactory.apply(rateLimit)); + if (!rateLimitersByResponseCode.get(responseCode).tryAcquire()) { + if (responseCodeLimitHit.tryAcquire()) { + logger.info("Hit rate limit for response code " + responseCode + " with limit " + + rateLimit); + } + return false; + } } return true; } @@ -64,4 +95,12 @@ private static Map getRateLimitedRequestPaths(final KsqlRestConf entry -> Double.parseDouble(entry.getValue()))); } + private static Map getRateLimitedResponseCodes(final KsqlRestConfig config) { + // Already validated as all ints + return config.getStringAsMap(KSQL_LOGGING_SERVER_RATE_LIMITED_RESPONSE_CODES_CONFIG) + .entrySet().stream() + .collect(ImmutableMap.toImmutableMap( + entry -> Integer.parseInt(entry.getKey()), + entry -> Double.parseDouble(entry.getValue()))); + } } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java index baaacd0fccd0..efcc8e20f126 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java @@ -15,8 +15,8 @@ package io.confluent.ksql.rest.server; -import static io.confluent.ksql.configdef.ConfigValidators.intList; import static io.confluent.ksql.configdef.ConfigValidators.mapWithDoubleValue; +import static io.confluent.ksql.configdef.ConfigValidators.mapWithIntKeyDoubleValue; import static io.confluent.ksql.configdef.ConfigValidators.oneOrMore; import static io.confluent.ksql.configdef.ConfigValidators.zeroOrPositive; @@ -333,10 +333,10 @@ public class KsqlRestConfig extends AbstractConfig { "The key store certificate alias to be used for internal client requests. If not set, " + "the system will fall back on the Vert.x default choice"; - public static final String KSQL_LOGGING_SERVER_SKIPPED_RESPONSE_CODES_CONFIG = - KSQL_CONFIG_PREFIX + "logging.server.skipped.response.codes"; - private static final String KSQL_LOGGING_SERVER_SKIPPED_RESPONSE_CODES_DOC = - "A list of HTTP response codes to skip during server request logging"; + public static final String KSQL_LOGGING_SERVER_RATE_LIMITED_RESPONSE_CODES_CONFIG = + KSQL_CONFIG_PREFIX + "logging.server.rate.limited.response.codes"; + private static final String KSQL_LOGGING_SERVER_RATE_LIMITED_RESPONSE_CODES_DOC = + "A list of code:rate_limit pairs, to rate limit the server request logging"; public static final String KSQL_LOGGING_SERVER_RATE_LIMITED_REQUEST_PATHS_CONFIG = KSQL_CONFIG_PREFIX + "logging.server.rate.limited.request.paths"; @@ -640,12 +640,12 @@ public class KsqlRestConfig extends AbstractConfig { ConfigDef.Importance.LOW, KSQL_AUTHENTICATION_PLUGIN_DOC ).define( - KSQL_LOGGING_SERVER_SKIPPED_RESPONSE_CODES_CONFIG, - Type.LIST, + KSQL_LOGGING_SERVER_RATE_LIMITED_RESPONSE_CODES_CONFIG, + Type.STRING, "", - intList(), + mapWithIntKeyDoubleValue(), ConfigDef.Importance.LOW, - KSQL_LOGGING_SERVER_SKIPPED_RESPONSE_CODES_DOC + KSQL_LOGGING_SERVER_RATE_LIMITED_RESPONSE_CODES_DOC ).define( KSQL_LOGGING_SERVER_RATE_LIMITED_REQUEST_PATHS_CONFIG, Type.STRING, diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/server/LoggingHandlerTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/server/LoggingHandlerTest.java index 1f10e2c976cb..843d41804392 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/server/LoggingHandlerTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/server/LoggingHandlerTest.java @@ -66,7 +66,8 @@ public void setUp() { when(request.response()).thenReturn(response); when(request.remoteAddress()).thenReturn(socketAddress); when(ksqlRestConfig.getList(any())).thenReturn(ImmutableList.of("401")); - when(loggingRateLimiter.shouldLog("/query")).thenReturn(true); + when(loggingRateLimiter.shouldLog(logger, "/query", 200)).thenReturn(true); + when(loggingRateLimiter.shouldLog(logger, "/query", 405)).thenReturn(true); when(clock.millis()).thenReturn(1699813434333L); when(response.bytesWritten()).thenReturn(5678L); when(request.path()).thenReturn("/query"); @@ -117,6 +118,7 @@ public void shouldProduceLog_warn() { public void shouldSkipLog() { // Given: when(response.getStatusCode()).thenReturn(401); + when(loggingRateLimiter.shouldLog(logger, "/query", 401)).thenReturn(false); // When: loggingHandler.handle(routingContext); @@ -133,7 +135,7 @@ public void shouldSkipLog() { public void shouldSkipRateLimited() { // Given: when(response.getStatusCode()).thenReturn(200); - when(loggingRateLimiter.shouldLog("/query")).thenReturn(true, true, false, false); + when(loggingRateLimiter.shouldLog(logger, "/query", 200)).thenReturn(true, true, false, false); // When: loggingHandler.handle(routingContext); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/server/LoggingRateLimiterTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/server/LoggingRateLimiterTest.java index 18d38f7ba901..6eb273540930 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/server/LoggingRateLimiterTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/server/LoggingRateLimiterTest.java @@ -1,6 +1,8 @@ package io.confluent.ksql.api.server; +import static io.confluent.ksql.rest.server.KsqlRestConfig.KSQL_LOGGING_SERVER_RATE_LIMITED_REQUEST_PATHS_CONFIG; +import static io.confluent.ksql.rest.server.KsqlRestConfig.KSQL_LOGGING_SERVER_RATE_LIMITED_RESPONSE_CODES_CONFIG; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.mockito.ArgumentMatchers.any; @@ -12,62 +14,98 @@ import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.RateLimiter; import io.confluent.ksql.rest.server.KsqlRestConfig; +import java.util.function.Function; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; +import org.slf4j.Logger; @RunWith(MockitoJUnitRunner.class) public class LoggingRateLimiterTest { private static final String PATH = "/query"; + private static final int RESPONSE_CODE = 401; @Mock private RateLimiter rateLimiter; @Mock + private RateLimiter pathLimiter; + @Mock + private RateLimiter responseCodeLimiter; + @Mock private KsqlRestConfig ksqlRestConfig; + @Mock + private Logger logger; + @Mock + Function factory; private LoggingRateLimiter loggingRateLimiter; @Before public void setUp() { - when(ksqlRestConfig.getStringAsMap(any())).thenReturn(ImmutableMap.of(PATH, "2")); + when(ksqlRestConfig.getStringAsMap(KSQL_LOGGING_SERVER_RATE_LIMITED_REQUEST_PATHS_CONFIG)) + .thenReturn(ImmutableMap.of(PATH, "2")); + when(ksqlRestConfig.getStringAsMap(KSQL_LOGGING_SERVER_RATE_LIMITED_RESPONSE_CODES_CONFIG)) + .thenReturn(ImmutableMap.of(Integer.toString(RESPONSE_CODE), "1")); when(rateLimiter.tryAcquire()).thenReturn(true); - loggingRateLimiter = new LoggingRateLimiter(ksqlRestConfig, (rateLimit) -> rateLimiter); + when(factory.apply(any())).thenReturn(pathLimiter, responseCodeLimiter, rateLimiter); + when(pathLimiter.tryAcquire()).thenReturn(true); + when(responseCodeLimiter.tryAcquire()).thenReturn(true); + loggingRateLimiter = new LoggingRateLimiter(ksqlRestConfig, factory); } @Test public void shouldLog() { // When: - assertThat(loggingRateLimiter.shouldLog(PATH), is(true)); + assertThat(loggingRateLimiter.shouldLog(logger, PATH, 200), is(true)); // Then: verify(rateLimiter).tryAcquire(); + verify(logger, never()).info(any()); } @Test - public void shouldSkipRateLimited() { + public void shouldSkipRateLimited_path() { // Given: when(rateLimiter.tryAcquire()).thenReturn(true, true, false, false); // When: - assertThat(loggingRateLimiter.shouldLog(PATH), is(true)); - assertThat(loggingRateLimiter.shouldLog(PATH), is(true)); - assertThat(loggingRateLimiter.shouldLog(PATH), is(false)); - assertThat(loggingRateLimiter.shouldLog(PATH), is(false)); + assertThat(loggingRateLimiter.shouldLog(logger, PATH, 200), is(true)); + assertThat(loggingRateLimiter.shouldLog(logger, PATH, 200), is(true)); + assertThat(loggingRateLimiter.shouldLog(logger, PATH, 200), is(false)); + assertThat(loggingRateLimiter.shouldLog(logger, PATH, 200), is(false)); + + // Then: + verify(rateLimiter, times(4)).tryAcquire(); + verify(logger, times(2)).info("Hit rate limit for path /query with limit 2.0"); + } + + @Test + public void shouldSkipRateLimited_responseCode() { + // Given: + when(rateLimiter.tryAcquire()).thenReturn(true, false, false, false); + + // When: + assertThat(loggingRateLimiter.shouldLog(logger, "/foo", RESPONSE_CODE), is(true)); + assertThat(loggingRateLimiter.shouldLog(logger, "/foo", RESPONSE_CODE), is(false)); + assertThat(loggingRateLimiter.shouldLog(logger, "/foo", RESPONSE_CODE), is(false)); + assertThat(loggingRateLimiter.shouldLog(logger, "/foo", RESPONSE_CODE), is(false)); // Then: verify(rateLimiter, times(4)).tryAcquire(); + verify(logger, times(3)).info("Hit rate limit for response code 401 with limit 1.0"); } @Test public void shouldLog_notRateLimited() { // When: - assertThat(loggingRateLimiter.shouldLog("/foo"), is(true)); + assertThat(loggingRateLimiter.shouldLog(logger, "/foo", 200), is(true)); // Then: verify(rateLimiter, never()).tryAcquire(); + verify(logger, never()).info(any()); } } From 8673feccf6c3eac95958ed110e4bb16fbee184d3 Mon Sep 17 00:00:00 2001 From: Alan Sheinberg Date: Tue, 1 Dec 2020 18:30:35 -0800 Subject: [PATCH 2/6] Fix test name --- .../io/confluent/ksql/configdef/ConfigValidatorsTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ksqldb-common/src/test/java/io/confluent/ksql/configdef/ConfigValidatorsTest.java b/ksqldb-common/src/test/java/io/confluent/ksql/configdef/ConfigValidatorsTest.java index aab2bfd1bf35..c12f58f87b70 100644 --- a/ksqldb-common/src/test/java/io/confluent/ksql/configdef/ConfigValidatorsTest.java +++ b/ksqldb-common/src/test/java/io/confluent/ksql/configdef/ConfigValidatorsTest.java @@ -271,14 +271,14 @@ public void shouldThrowOnNoStringRegexList() { } @Test - public void shouldParseDoubleValueInMap_v1() { + public void shouldParseDoubleValueInMap() { // Given: final Validator validator = ConfigValidators.mapWithDoubleValue(); validator.ensureValid("propName", "foo:1.2"); } @Test - public void shouldParseIntKeyDoubleValueInMap_v1() { + public void shouldParseIntKeyDoubleValueInMap() { // Given: final Validator validator = ConfigValidators.mapWithIntKeyDoubleValue(); validator.ensureValid("propName", "123:1.2"); From 48282a47e0339128a381871336b39004f9a27e9f Mon Sep 17 00:00:00 2001 From: Alan Sheinberg Date: Wed, 2 Dec 2020 09:16:45 -0800 Subject: [PATCH 3/6] Adds some more pairs to map test case --- .../io/confluent/ksql/configdef/ConfigValidatorsTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ksqldb-common/src/test/java/io/confluent/ksql/configdef/ConfigValidatorsTest.java b/ksqldb-common/src/test/java/io/confluent/ksql/configdef/ConfigValidatorsTest.java index c12f58f87b70..51b73206021d 100644 --- a/ksqldb-common/src/test/java/io/confluent/ksql/configdef/ConfigValidatorsTest.java +++ b/ksqldb-common/src/test/java/io/confluent/ksql/configdef/ConfigValidatorsTest.java @@ -274,14 +274,14 @@ public void shouldThrowOnNoStringRegexList() { public void shouldParseDoubleValueInMap() { // Given: final Validator validator = ConfigValidators.mapWithDoubleValue(); - validator.ensureValid("propName", "foo:1.2"); + validator.ensureValid("propName", "foo:1.2,bar:3"); } @Test public void shouldParseIntKeyDoubleValueInMap() { // Given: final Validator validator = ConfigValidators.mapWithIntKeyDoubleValue(); - validator.ensureValid("propName", "123:1.2"); + validator.ensureValid("propName", "123:1.2,345:9.0"); } @Test From 6836df9edc6a3a5be388f5adeaba02bc1187970c Mon Sep 17 00:00:00 2001 From: Alan Sheinberg <57688982+AlanConfluent@users.noreply.github.com> Date: Thu, 3 Dec 2020 09:51:54 -0800 Subject: [PATCH 4/6] Apply suggestions from code review Co-authored-by: Jim Galasyn --- .../installation/server-config/config-reference.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/operate-and-deploy/installation/server-config/config-reference.md b/docs/operate-and-deploy/installation/server-config/config-reference.md index 19a3f6912fa1..dcd1f8faaac9 100644 --- a/docs/operate-and-deploy/installation/server-config/config-reference.md +++ b/docs/operate-and-deploy/installation/server-config/config-reference.md @@ -615,7 +615,7 @@ messages. By default, this property has the value `false`. ### ksql.logging.server.rate.limited.response.codes A list of `path:rate_limit` pairs, to limit the rate of server request -logging. This is useful for limiting certain 4XX errors that you +logging. This is useful for limiting certain 4XX errors that you might not want to blow up in the logs. This setting enables seeing the logs when the request rate is low and dropping them when they go over the threshold. From 1c0ec7a1831f58107c61b7d0fc886bdae67985e2 Mon Sep 17 00:00:00 2001 From: Alan Sheinberg Date: Fri, 4 Dec 2020 19:25:24 -0800 Subject: [PATCH 5/6] Feedback --- .../server-config/config-reference.md | 19 ++++-- .../ksql/api/server/LoggingRateLimiter.java | 65 ++++++++++--------- .../api/server/LoggingRateLimiterTest.java | 2 + 3 files changed, 51 insertions(+), 35 deletions(-) diff --git a/docs/operate-and-deploy/installation/server-config/config-reference.md b/docs/operate-and-deploy/installation/server-config/config-reference.md index dcd1f8faaac9..ff166de45151 100644 --- a/docs/operate-and-deploy/installation/server-config/config-reference.md +++ b/docs/operate-and-deploy/installation/server-config/config-reference.md @@ -614,18 +614,25 @@ messages. By default, this property has the value `false`. ### ksql.logging.server.rate.limited.response.codes -A list of `path:rate_limit` pairs, to limit the rate of server request -logging. This is useful for limiting certain 4XX errors that you -might not want to blow up in the logs. +A list of `code:qps` pairs, to limit the rate of server request +logging. An example would be "400:10" which would limit 400 error +logs to 10 per second. This is useful for limiting certain 4XX errors that you +might not want to blow up in the logs. This setting enables seeing the logs when the request rate is low and dropping them when they go over the threshold. +A message will be logged every 5 seconds indicating if the rate limit +is being hit, so an absence of this message means a complete set of logs. ### ksql.logging.server.rate.limited.request.paths -A list of `path:rate_limit` pairs, to limit the rate of server request -logging. This is useful for requests that are coming in at a high rate, -such as for pull queries. This setting enables seeing the logs when the request rate is low +A list of `path:qps` pairs, to limit the rate of server request +logging. An example would be "/query:10" which would limit pull query +logs to 10 per second. This is useful for requests that are coming in +at a high rate, such as for pull queries. +This setting enables seeing the logs when the request rate is low and dropping them when they go over the threshold. +A message will be logged every 5 seconds indicating if the rate limit +is being hit, so an absence of this message means a complete set of logs. ksqlDB-Connect Settings ----------------------- diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/LoggingRateLimiter.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/LoggingRateLimiter.java index d2f40fc6d990..e7ae63f1d936 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/LoggingRateLimiter.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/LoggingRateLimiter.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.RateLimiter; import io.confluent.ksql.rest.server.KsqlRestConfig; +import io.confluent.ksql.util.Pair; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; @@ -33,13 +34,8 @@ class LoggingRateLimiter { // Print "You hit a rate limit" every 5 seconds private static final double LIMIT_HIT_LOG_RATE = 0.2; - private final Map rateLimitedPaths; - private final Map rateLimitedResponseCodes; - - private final Function rateLimiterFactory; - - private final Map rateLimitersByPath = new ConcurrentHashMap<>(); - private final Map rateLimitersByResponseCode = new ConcurrentHashMap<>(); + private final Map rateLimitersByPath; + private final Map rateLimitersByResponseCode; // Rate limiters for printing the "You hit a rate limit" message private final RateLimiter pathLimitHit; @@ -52,34 +48,33 @@ class LoggingRateLimiter { @VisibleForTesting LoggingRateLimiter( final KsqlRestConfig ksqlRestConfig, - final Function rateLimiterFactory) { + final Function rateLimiterFactory + ) { requireNonNull(ksqlRestConfig); - this.rateLimiterFactory = requireNonNull(rateLimiterFactory); - this.rateLimitedPaths = getRateLimitedRequestPaths(ksqlRestConfig); - this.rateLimitedResponseCodes = getRateLimitedResponseCodes(ksqlRestConfig); + requireNonNull(rateLimiterFactory); this.pathLimitHit = rateLimiterFactory.apply(LIMIT_HIT_LOG_RATE); this.responseCodeLimitHit = rateLimiterFactory.apply(LIMIT_HIT_LOG_RATE); + this.rateLimitersByPath = getRateLimitedRequestPaths(ksqlRestConfig, rateLimiterFactory); + this.rateLimitersByResponseCode + = getRateLimitedResponseCodes(ksqlRestConfig, rateLimiterFactory); } public boolean shouldLog(final Logger logger, final String path, final int responseCode) { - if (rateLimitedPaths.containsKey(path)) { - final double rateLimit = rateLimitedPaths.get(path); - rateLimitersByPath.computeIfAbsent(path, (k) -> rateLimiterFactory.apply(rateLimit)); - if (!rateLimitersByPath.get(path).tryAcquire()) { + if (rateLimitersByPath.containsKey(path)) { + final RateLimiter rateLimiter = rateLimitersByPath.get(path); + if (!rateLimiter.tryAcquire()) { if (pathLimitHit.tryAcquire()) { - logger.info("Hit rate limit for path " + path + " with limit " + rateLimit); + logger.info("Hit rate limit for path " + path + " with limit " + rateLimiter.getRate()); } return false; } } - if (rateLimitedResponseCodes.containsKey(responseCode)) { - final double rateLimit = rateLimitedResponseCodes.get(responseCode); - rateLimitersByResponseCode.computeIfAbsent( - responseCode, (k) -> rateLimiterFactory.apply(rateLimit)); - if (!rateLimitersByResponseCode.get(responseCode).tryAcquire()) { + if (rateLimitersByResponseCode.containsKey(responseCode)) { + final RateLimiter rateLimiter = rateLimitersByResponseCode.get(responseCode); + if (!rateLimiter.tryAcquire()) { if (responseCodeLimitHit.tryAcquire()) { logger.info("Hit rate limit for response code " + responseCode + " with limit " - + rateLimit); + + rateLimiter.getRate()); } return false; } @@ -87,20 +82,32 @@ public boolean shouldLog(final Logger logger, final String path, final int respo return true; } - private static Map getRateLimitedRequestPaths(final KsqlRestConfig config) { + private static Map getRateLimitedRequestPaths( + final KsqlRestConfig config, + final Function rateLimiterFactory + ) { // Already validated as having double values return config.getStringAsMap(KSQL_LOGGING_SERVER_RATE_LIMITED_REQUEST_PATHS_CONFIG) .entrySet().stream() - .collect(ImmutableMap.toImmutableMap(Entry::getKey, - entry -> Double.parseDouble(entry.getValue()))); + .map(entry -> { + double rateLimit = Double.parseDouble(entry.getValue()); + return Pair.of(entry.getKey(), rateLimiterFactory.apply(rateLimit)); + }) + .collect(ImmutableMap.toImmutableMap(Pair::getLeft, Pair::getRight)); } - private static Map getRateLimitedResponseCodes(final KsqlRestConfig config) { + private static Map getRateLimitedResponseCodes( + final KsqlRestConfig config, + final Function rateLimiterFactory + ) { // Already validated as all ints return config.getStringAsMap(KSQL_LOGGING_SERVER_RATE_LIMITED_RESPONSE_CODES_CONFIG) .entrySet().stream() - .collect(ImmutableMap.toImmutableMap( - entry -> Integer.parseInt(entry.getKey()), - entry -> Double.parseDouble(entry.getValue()))); + .map(entry -> { + int statusCode = Integer.parseInt(entry.getKey()); + double rateLimit = Double.parseDouble(entry.getValue()); + return Pair.of(statusCode, rateLimiterFactory.apply(rateLimit)); + }) + .collect(ImmutableMap.toImmutableMap(Pair::getLeft, Pair::getRight)); } } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/server/LoggingRateLimiterTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/server/LoggingRateLimiterTest.java index 6eb273540930..a62213bb313f 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/server/LoggingRateLimiterTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/server/LoggingRateLimiterTest.java @@ -71,6 +71,7 @@ public void shouldLog() { public void shouldSkipRateLimited_path() { // Given: when(rateLimiter.tryAcquire()).thenReturn(true, true, false, false); + when(rateLimiter.getRate()).thenReturn(2d); // When: assertThat(loggingRateLimiter.shouldLog(logger, PATH, 200), is(true)); @@ -87,6 +88,7 @@ public void shouldSkipRateLimited_path() { public void shouldSkipRateLimited_responseCode() { // Given: when(rateLimiter.tryAcquire()).thenReturn(true, false, false, false); + when(rateLimiter.getRate()).thenReturn(1d); // When: assertThat(loggingRateLimiter.shouldLog(logger, "/foo", RESPONSE_CODE), is(true)); From be9466426eb287001617948fdbace88172900443 Mon Sep 17 00:00:00 2001 From: Alan Sheinberg Date: Fri, 4 Dec 2020 19:45:18 -0800 Subject: [PATCH 6/6] Lint --- .../io/confluent/ksql/api/server/LoggingRateLimiter.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/LoggingRateLimiter.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/LoggingRateLimiter.java index e7ae63f1d936..35c5ad138d88 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/LoggingRateLimiter.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/LoggingRateLimiter.java @@ -25,8 +25,6 @@ import io.confluent.ksql.rest.server.KsqlRestConfig; import io.confluent.ksql.util.Pair; import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import org.slf4j.Logger; @@ -90,7 +88,7 @@ private static Map getRateLimitedRequestPaths( return config.getStringAsMap(KSQL_LOGGING_SERVER_RATE_LIMITED_REQUEST_PATHS_CONFIG) .entrySet().stream() .map(entry -> { - double rateLimit = Double.parseDouble(entry.getValue()); + final double rateLimit = Double.parseDouble(entry.getValue()); return Pair.of(entry.getKey(), rateLimiterFactory.apply(rateLimit)); }) .collect(ImmutableMap.toImmutableMap(Pair::getLeft, Pair::getRight)); @@ -104,8 +102,8 @@ private static Map getRateLimitedResponseCodes( return config.getStringAsMap(KSQL_LOGGING_SERVER_RATE_LIMITED_RESPONSE_CODES_CONFIG) .entrySet().stream() .map(entry -> { - int statusCode = Integer.parseInt(entry.getKey()); - double rateLimit = Double.parseDouble(entry.getValue()); + final int statusCode = Integer.parseInt(entry.getKey()); + final double rateLimit = Double.parseDouble(entry.getValue()); return Pair.of(statusCode, rateLimiterFactory.apply(rateLimit)); }) .collect(ImmutableMap.toImmutableMap(Pair::getLeft, Pair::getRight));