Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Makes response codes rate limited as well as prints a message when it is hit #6701

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -612,18 +612,27 @@ property has the value `KSQL_PROCESSING_LOG`.
Toggles whether or not the processing log should include rows in log
messages. By default, this property has the value `false`.

### ksql.logging.server.skipped.response.codes
### ksql.logging.server.rate.limited.response.codes

A comma-separated list of HTTP response codes to skip during server
request logging. This is useful for ignoring certain 4XX errors that you
might not want to show up in the logs.
A list of `code:qps` pairs, to limit the rate of server request
logging. An example would be "400:10" which would limit 400 error
logs to 10 per second. This is useful for limiting certain 4XX errors that you
might not want to blow up in the logs.
This setting enables seeing the logs when the request rate is low
and dropping them when they go over the threshold.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mention that a message will be logged (at most once every five seconds) if the threshold is hit? (And same for the other config below.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

A message will be logged every 5 seconds indicating if the rate limit
is being hit, so an absence of this message means a complete set of logs.

### ksql.logging.server.rate.limited.request.paths
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're currently logging for internal endpoints (used in server-to-server communication for multi-node clusters), right? Would it make sense to disable logging for those by default? (I don't feel strongly one way or the other -- you have more context than I do about when/how often those endpoints are hit.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, currently we log all requests, regardless of internal/external status. I think this could be useful if we're trying to trace pull queries falling back on standbys after failures. We can see if this ends up being useful and potentially remove internal if not.


A list of `path:rate_limit` pairs, to limit the rate of server request
logging. This is useful for requests that are coming in at a high rate,
such as for pull queries. This setting enables seeing the logs when the request rate is low
A list of `path:qps` pairs, to limit the rate of server request
logging. An example would be "/query:10" which would limit pull query
logs to 10 per second. This is useful for requests that are coming in
at a high rate, such as for pull queries.
This setting enables seeing the logs when the request rate is low
and dropping them when they go over the threshold.
A message will be logged every 5 seconds indicating if the rate limit
is being hit, so an absence of this message means a complete set of logs.

ksqlDB-Connect Settings
-----------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,29 @@ public static Validator intList() {
};
}

public static Validator mapWithIntKeyDoubleValue() {
return (name, val) -> {
if (!(val instanceof String)) {
throw new ConfigException(name, val, "Must be a string");
}

final String str = (String) val;
final Map<String, String> map = KsqlConfig.parseStringAsMap(name, str);
map.forEach((keyStr, valueStr) -> {
try {
Integer.parseInt(keyStr);
} catch (NumberFormatException e) {
throw new ConfigException(name, keyStr, "Not an int");
}
try {
Double.parseDouble(valueStr);
} catch (NumberFormatException e) {
throw new ConfigException(name, valueStr, "Not a double");
}
});
};
}

public static Validator mapWithDoubleValue() {
return (name, val) -> {
if (!(val instanceof String)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,58 @@ public void shouldThrowOnNoStringRegexList() {
assertThat(e.getMessage(), containsString("validator should only be used with LIST of STRING defs"));
}

@Test
public void shouldParseDoubleValueInMap() {
// Given:
final Validator validator = ConfigValidators.mapWithDoubleValue();
validator.ensureValid("propName", "foo:1.2,bar:3");
}

@Test
public void shouldParseIntKeyDoubleValueInMap() {
// Given:
final Validator validator = ConfigValidators.mapWithIntKeyDoubleValue();
validator.ensureValid("propName", "123:1.2,345:9.0");
}

@Test
public void shouldThrowOnBadDoubleValueInMap() {
// Given:
final Validator validator = ConfigValidators.mapWithDoubleValue();

// When:
final Exception e = assertThrows(
ConfigException.class,
() -> validator.ensureValid("propName", "foo:abc")
);

// Then:
assertThat(e.getMessage(),
containsString("Invalid value abc for configuration propName: Not a double"));
}

@Test
public void shouldThrowOnBadIntDoubleValueInMap() {
// Given:
final Validator validator = ConfigValidators.mapWithIntKeyDoubleValue();

// When:
final Exception e = assertThrows(
ConfigException.class,
() -> validator.ensureValid("propName", "1:abc")
);
final Exception e2 = assertThrows(
ConfigException.class,
() -> validator.ensureValid("propName", "abc:1.2")
);

// Then:
assertThat(e.getMessage(),
containsString("Invalid value abc for configuration propName: Not a double"));
assertThat(e2.getMessage(),
containsString("Invalid value abc for configuration propName: Not an int"));
}

private enum TestEnum {
FOO, BAR
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,17 @@

package io.confluent.ksql.api.server;

import static io.confluent.ksql.rest.server.KsqlRestConfig.KSQL_LOGGING_SERVER_SKIPPED_RESPONSE_CODES_CONFIG;
import static java.util.Objects.requireNonNull;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.RateLimiter;
import io.confluent.ksql.api.auth.ApiUser;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.vertx.core.Handler;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpVersion;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.impl.Utils;
import java.time.Clock;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -41,13 +34,10 @@ public class LoggingHandler implements Handler<RoutingContext> {
private static final Logger LOG = LoggerFactory.getLogger(LoggingHandler.class);
static final String HTTP_HEADER_USER_AGENT = "User-Agent";

private final Set<Integer> skipResponseCodes;
private final Logger logger;
private final Clock clock;
private final LoggingRateLimiter loggingRateLimiter;

private final Map<String, RateLimiter> rateLimiters = new ConcurrentHashMap<>();

public LoggingHandler(final Server server, final LoggingRateLimiter loggingRateLimiter) {
this(server, loggingRateLimiter, LOG, Clock.systemUTC());
}
Expand All @@ -60,7 +50,6 @@ public LoggingHandler(final Server server, final LoggingRateLimiter loggingRateL
final Clock clock) {
requireNonNull(server);
this.loggingRateLimiter = requireNonNull(loggingRateLimiter);
this.skipResponseCodes = getSkipResponseCodes(server.getConfig());
this.logger = logger;
this.clock = clock;
}
Expand All @@ -69,17 +58,14 @@ public LoggingHandler(final Server server, final LoggingRateLimiter loggingRateL
public void handle(final RoutingContext routingContext) {
routingContext.addEndHandler(ar -> {
// After the response is complete, log results here.
if (skipResponseCodes.contains(routingContext.response().getStatusCode())) {
return;
}
if (!loggingRateLimiter.shouldLog(routingContext.request().path())) {
final int status = routingContext.request().response().getStatusCode();
if (!loggingRateLimiter.shouldLog(logger, routingContext.request().path(), status)) {
return;
}
final long contentLength = routingContext.request().response().bytesWritten();
final HttpVersion version = routingContext.request().version();
final HttpMethod method = routingContext.request().method();
final String uri = routingContext.request().uri();
final int status = routingContext.request().response().getStatusCode();
final long requestBodyLength = routingContext.request().bytesRead();
final String versionFormatted;
switch (version) {
Expand Down Expand Up @@ -118,13 +104,6 @@ public void handle(final RoutingContext routingContext) {
routingContext.next();
}

private static Set<Integer> getSkipResponseCodes(final KsqlRestConfig config) {
// Already validated as all ints
return config.getList(KSQL_LOGGING_SERVER_SKIPPED_RESPONSE_CODES_CONFIG)
.stream()
.map(Integer::parseInt).collect(ImmutableSet.toImmutableSet());
}

private void doLog(final int status, final String message) {
if (status >= 500) {
logger.error(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,28 @@
package io.confluent.ksql.api.server;

import static io.confluent.ksql.rest.server.KsqlRestConfig.KSQL_LOGGING_SERVER_RATE_LIMITED_REQUEST_PATHS_CONFIG;
import static io.confluent.ksql.rest.server.KsqlRestConfig.KSQL_LOGGING_SERVER_RATE_LIMITED_RESPONSE_CODES_CONFIG;
import static java.util.Objects.requireNonNull;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.RateLimiter;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.confluent.ksql.util.Pair;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import org.slf4j.Logger;

class LoggingRateLimiter {
// Print "You hit a rate limit" every 5 seconds
private static final double LIMIT_HIT_LOG_RATE = 0.2;

private final Map<String, Double> rateLimitedPaths;
private final Function<Double, RateLimiter> rateLimiterFactory;
private final Map<String, RateLimiter> rateLimitersByPath;
private final Map<Integer, RateLimiter> rateLimitersByResponseCode;

private final Map<String, RateLimiter> rateLimiters = new ConcurrentHashMap<>();
// Rate limiters for printing the "You hit a rate limit" message
private final RateLimiter pathLimitHit;
private final RateLimiter responseCodeLimitHit;

LoggingRateLimiter(final KsqlRestConfig ksqlRestConfig) {
this(ksqlRestConfig, RateLimiter::create);
Expand All @@ -41,27 +46,66 @@ class LoggingRateLimiter {
@VisibleForTesting
LoggingRateLimiter(
final KsqlRestConfig ksqlRestConfig,
final Function<Double, RateLimiter> rateLimiterFactory) {
final Function<Double, RateLimiter> rateLimiterFactory
) {
requireNonNull(ksqlRestConfig);
this.rateLimiterFactory = requireNonNull(rateLimiterFactory);
this.rateLimitedPaths = getRateLimitedRequestPaths(ksqlRestConfig);
requireNonNull(rateLimiterFactory);
this.pathLimitHit = rateLimiterFactory.apply(LIMIT_HIT_LOG_RATE);
this.responseCodeLimitHit = rateLimiterFactory.apply(LIMIT_HIT_LOG_RATE);
this.rateLimitersByPath = getRateLimitedRequestPaths(ksqlRestConfig, rateLimiterFactory);
this.rateLimitersByResponseCode
= getRateLimitedResponseCodes(ksqlRestConfig, rateLimiterFactory);
}

public boolean shouldLog(final String path) {
if (rateLimitedPaths.containsKey(path)) {
final double rateLimit = rateLimitedPaths.get(path);
rateLimiters.computeIfAbsent(path, (k) -> rateLimiterFactory.apply(rateLimit));
return rateLimiters.get(path).tryAcquire();
public boolean shouldLog(final Logger logger, final String path, final int responseCode) {
if (rateLimitersByPath.containsKey(path)) {
final RateLimiter rateLimiter = rateLimitersByPath.get(path);
if (!rateLimiter.tryAcquire()) {
if (pathLimitHit.tryAcquire()) {
logger.info("Hit rate limit for path " + path + " with limit " + rateLimiter.getRate());
}
return false;
}
}
if (rateLimitersByResponseCode.containsKey(responseCode)) {
final RateLimiter rateLimiter = rateLimitersByResponseCode.get(responseCode);
if (!rateLimiter.tryAcquire()) {
if (responseCodeLimitHit.tryAcquire()) {
logger.info("Hit rate limit for response code " + responseCode + " with limit "
+ rateLimiter.getRate());
}
return false;
}
}
return true;
}

private static Map<String, Double> getRateLimitedRequestPaths(final KsqlRestConfig config) {
private static Map<String, RateLimiter> getRateLimitedRequestPaths(
final KsqlRestConfig config,
final Function<Double, RateLimiter> rateLimiterFactory
) {
// Already validated as having double values
return config.getStringAsMap(KSQL_LOGGING_SERVER_RATE_LIMITED_REQUEST_PATHS_CONFIG)
.entrySet().stream()
.collect(ImmutableMap.toImmutableMap(Entry::getKey,
entry -> Double.parseDouble(entry.getValue())));
.map(entry -> {
final double rateLimit = Double.parseDouble(entry.getValue());
return Pair.of(entry.getKey(), rateLimiterFactory.apply(rateLimit));
})
.collect(ImmutableMap.toImmutableMap(Pair::getLeft, Pair::getRight));
}

private static Map<Integer, RateLimiter> getRateLimitedResponseCodes(
final KsqlRestConfig config,
final Function<Double, RateLimiter> rateLimiterFactory
) {
// Already validated as all ints
return config.getStringAsMap(KSQL_LOGGING_SERVER_RATE_LIMITED_RESPONSE_CODES_CONFIG)
.entrySet().stream()
.map(entry -> {
final int statusCode = Integer.parseInt(entry.getKey());
final double rateLimit = Double.parseDouble(entry.getValue());
return Pair.of(statusCode, rateLimiterFactory.apply(rateLimit));
})
.collect(ImmutableMap.toImmutableMap(Pair::getLeft, Pair::getRight));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@

package io.confluent.ksql.rest.server;

import static io.confluent.ksql.configdef.ConfigValidators.intList;
import static io.confluent.ksql.configdef.ConfigValidators.mapWithDoubleValue;
import static io.confluent.ksql.configdef.ConfigValidators.mapWithIntKeyDoubleValue;
import static io.confluent.ksql.configdef.ConfigValidators.oneOrMore;
import static io.confluent.ksql.configdef.ConfigValidators.zeroOrPositive;

Expand Down Expand Up @@ -333,10 +333,10 @@ public class KsqlRestConfig extends AbstractConfig {
"The key store certificate alias to be used for internal client requests. If not set, "
+ "the system will fall back on the Vert.x default choice";

public static final String KSQL_LOGGING_SERVER_SKIPPED_RESPONSE_CODES_CONFIG =
KSQL_CONFIG_PREFIX + "logging.server.skipped.response.codes";
private static final String KSQL_LOGGING_SERVER_SKIPPED_RESPONSE_CODES_DOC =
"A list of HTTP response codes to skip during server request logging";
public static final String KSQL_LOGGING_SERVER_RATE_LIMITED_RESPONSE_CODES_CONFIG =
KSQL_CONFIG_PREFIX + "logging.server.rate.limited.response.codes";
private static final String KSQL_LOGGING_SERVER_RATE_LIMITED_RESPONSE_CODES_DOC =
"A list of code:rate_limit pairs, to rate limit the server request logging";

public static final String KSQL_LOGGING_SERVER_RATE_LIMITED_REQUEST_PATHS_CONFIG =
KSQL_CONFIG_PREFIX + "logging.server.rate.limited.request.paths";
Expand Down Expand Up @@ -640,12 +640,12 @@ public class KsqlRestConfig extends AbstractConfig {
ConfigDef.Importance.LOW,
KSQL_AUTHENTICATION_PLUGIN_DOC
).define(
KSQL_LOGGING_SERVER_SKIPPED_RESPONSE_CODES_CONFIG,
Type.LIST,
KSQL_LOGGING_SERVER_RATE_LIMITED_RESPONSE_CODES_CONFIG,
Type.STRING,
"",
intList(),
mapWithIntKeyDoubleValue(),
ConfigDef.Importance.LOW,
KSQL_LOGGING_SERVER_SKIPPED_RESPONSE_CODES_DOC
KSQL_LOGGING_SERVER_RATE_LIMITED_RESPONSE_CODES_DOC
).define(
KSQL_LOGGING_SERVER_RATE_LIMITED_REQUEST_PATHS_CONFIG,
Type.STRING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ public void setUp() {
when(request.response()).thenReturn(response);
when(request.remoteAddress()).thenReturn(socketAddress);
when(ksqlRestConfig.getList(any())).thenReturn(ImmutableList.of("401"));
when(loggingRateLimiter.shouldLog("/query")).thenReturn(true);
when(loggingRateLimiter.shouldLog(logger, "/query", 200)).thenReturn(true);
when(loggingRateLimiter.shouldLog(logger, "/query", 405)).thenReturn(true);
when(clock.millis()).thenReturn(1699813434333L);
when(response.bytesWritten()).thenReturn(5678L);
when(request.path()).thenReturn("/query");
Expand Down Expand Up @@ -117,6 +118,7 @@ public void shouldProduceLog_warn() {
public void shouldSkipLog() {
// Given:
when(response.getStatusCode()).thenReturn(401);
when(loggingRateLimiter.shouldLog(logger, "/query", 401)).thenReturn(false);

// When:
loggingHandler.handle(routingContext);
Expand All @@ -133,7 +135,7 @@ public void shouldSkipLog() {
public void shouldSkipRateLimited() {
// Given:
when(response.getStatusCode()).thenReturn(200);
when(loggingRateLimiter.shouldLog("/query")).thenReturn(true, true, false, false);
when(loggingRateLimiter.shouldLog(logger, "/query", 200)).thenReturn(true, true, false, false);

// When:
loggingHandler.handle(routingContext);
Expand Down
Loading