Skip to content

Commit

Permalink
feat: Adds max concurrent pull queries to limit effects of table scans (
Browse files Browse the repository at this point in the history
#7188)

* feat: Adds max concurrent pull queries to limit effects of table scans
  • Loading branch information
AlanConfluent authored Mar 10, 2021
1 parent 4151bae commit 55f5403
Show file tree
Hide file tree
Showing 16 changed files with 386 additions and 94 deletions.
12 changes: 12 additions & 0 deletions docs/reference/server-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,18 @@ for persistent and push queries, which are naturally longer-lived than pull quer
interpreter gives significant performance gains. This can be disabled per query if the code compiler
is preferred.

## `ksql.query.pull.max.qps`

Sets a rate limit for pull queries, in queries per second. This limit is enforced per host, not per cluster.
After hitting the limit, the host will fail pull query requests until it determines that it's no longer
at the limit.

## `ksql.query.pull.max.concurrent.requests`

Sets the maximum number of concurrent pull queries. This limit is enforced per host, not per cluster.
After hitting the limit, the host will fail pull query requests until it determines that it's no longer
at the limit.

## `ksql.variable.substitution.enable`

Enables variable substitution through [`DEFINE`](../../../../developer-guide/ksqldb-reference/define) statements.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,14 @@ public class KsqlConfig extends AbstractConfig {
public static final String KSQL_QUERY_PULL_MAX_QPS_CONFIG = "ksql.query.pull.max.qps";
public static final Integer KSQL_QUERY_PULL_MAX_QPS_DEFAULT = Integer.MAX_VALUE;
public static final String KSQL_QUERY_PULL_MAX_QPS_DOC = "The maximum qps allowed for pull "
+ "queries. Once the limit is hit, queries will fail immediately";
+ "queries on this host. Once the limit is hit, queries will fail immediately";

public static final String KSQL_QUERY_PULL_MAX_CONCURRENT_REQUESTS_CONFIG
= "ksql.query.pull.max.concurrent.requests";
public static final Integer KSQL_QUERY_PULL_MAX_CONCURRENT_REQUESTS_DEFAULT = Integer.MAX_VALUE;
public static final String KSQL_QUERY_PULL_MAX_CONCURRENT_REQUESTS_DOC =
"The maximum number of concurrent requests allowed for pull "
+ "queries on this host. Once the limit is hit, queries will fail immediately";

public static final String KSQL_QUERY_PULL_THREAD_POOL_SIZE_CONFIG
= "ksql.query.pull.thread.pool.size";
Expand Down Expand Up @@ -788,6 +795,13 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
Importance.LOW,
KSQL_QUERY_PULL_MAX_QPS_DOC
)
.define(
KSQL_QUERY_PULL_MAX_CONCURRENT_REQUESTS_CONFIG,
Type.INT,
KSQL_QUERY_PULL_MAX_CONCURRENT_REQUESTS_DEFAULT,
Importance.LOW,
KSQL_QUERY_PULL_MAX_CONCURRENT_REQUESTS_DOC
)
.define(
KSQL_QUERY_PULL_THREAD_POOL_SIZE_CONFIG,
Type.INT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.confluent.ksql.schema.ksql.LogicalSchema;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

public class PullQueryResult {
Expand Down Expand Up @@ -90,4 +91,11 @@ public void onException(final Consumer<Throwable> consumer) {
public void onCompletion(final Consumer<Void> consumer) {
future.thenAccept(consumer::accept);
}

public void onCompletionOrException(final BiConsumer<Void, Throwable> biConsumer) {
future.handle((v, t) -> {
biConsumer.accept(v, t);
return null;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import io.confluent.ksql.rest.server.LocalCommands;
import io.confluent.ksql.rest.server.resources.streaming.PullQueryConfigPlannerOptions;
import io.confluent.ksql.rest.server.resources.streaming.PullQueryConfigRoutingOptions;
import io.confluent.ksql.rest.util.ConcurrencyLimiter;
import io.confluent.ksql.rest.util.ConcurrencyLimiter.Decrementer;
import io.confluent.ksql.rest.util.QueryCapacityUtil;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.utils.FormatOptions;
Expand Down Expand Up @@ -66,6 +68,7 @@ public class QueryEndpoint {
private final RoutingFilterFactory routingFilterFactory;
private final Optional<PullQueryExecutorMetrics> pullQueryMetrics;
private final RateLimiter rateLimiter;
private final ConcurrencyLimiter pullConcurrencyLimiter;
private final HARouting routing;
private final Optional<LocalCommands> localCommands;

Expand All @@ -76,6 +79,7 @@ public QueryEndpoint(
final RoutingFilterFactory routingFilterFactory,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics,
final RateLimiter rateLimiter,
final ConcurrencyLimiter pullConcurrencyLimiter,
final HARouting routing,
final Optional<LocalCommands> localCommands
) {
Expand All @@ -85,6 +89,7 @@ public QueryEndpoint(
this.routingFilterFactory = routingFilterFactory;
this.pullQueryMetrics = pullQueryMetrics;
this.rateLimiter = rateLimiter;
this.pullConcurrencyLimiter = pullConcurrencyLimiter;
this.routing = routing;
this.localCommands = localCommands;
}
Expand Down Expand Up @@ -157,26 +162,35 @@ private QueryPublisher createPullQueryPublisher(
);

PullQueryExecutionUtil.checkRateLimit(rateLimiter);
final Decrementer decrementer = pullConcurrencyLimiter.increment();

try {
final PullQueryResult result = ksqlEngine.executePullQuery(
serviceContext,
statement,
routing,
routingOptions,
plannerOptions,
pullQueryMetrics,
false
);

final PullQueryResult result = ksqlEngine.executePullQuery(
serviceContext,
statement,
routing,
routingOptions,
plannerOptions,
pullQueryMetrics,
false
);
result.onCompletionOrException((v, throwable) -> {
decrementer.decrementAtMostOnce();
if (throwable == null) {
pullQueryMetrics.ifPresent(p -> p.recordLatency(startTimeNanos));
}
});

result.onCompletion(v -> {
pullQueryMetrics.ifPresent(p -> p.recordLatency(startTimeNanos));
});
final BlockingQueryPublisher publisher = new BlockingQueryPublisher(context, workerExecutor);

final BlockingQueryPublisher publisher = new BlockingQueryPublisher(context, workerExecutor);
publisher.setQueryHandle(new KsqlPullQueryHandle(result, pullQueryMetrics), true);

publisher.setQueryHandle(new KsqlPullQueryHandle(result, pullQueryMetrics), true);

return publisher;
return publisher;
} catch (Throwable t) {
decrementer.decrementAtMostOnce();
throw t;
}
}

private ConfiguredStatement<Query> createStatement(final String queryString,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import io.confluent.ksql.rest.server.services.ServerInternalKsqlClient;
import io.confluent.ksql.rest.server.state.ServerState;
import io.confluent.ksql.rest.util.ClusterTerminator;
import io.confluent.ksql.rest.util.ConcurrencyLimiter;
import io.confluent.ksql.rest.util.KsqlInternalTopicUtils;
import io.confluent.ksql.rest.util.KsqlUncaughtExceptionHandler;
import io.confluent.ksql.rest.util.RocksDBConfigSetterHandler;
Expand Down Expand Up @@ -188,6 +189,7 @@ public final class KsqlRestApplication implements Executable {
private final RoutingFilterFactory routingFilterFactory;
private final Optional<PullQueryExecutorMetrics> pullQueryMetrics;
private final RateLimiter pullQueryRateLimiter;
private final ConcurrencyLimiter pullConcurrencyLimiter;
private final HARouting pullQueryRouting;
private final Optional<LocalCommands> localCommands;

Expand Down Expand Up @@ -228,6 +230,7 @@ public static SourceName getCommandsStreamName() {
final Optional<PullQueryExecutorMetrics> pullQueryMetrics,
final RoutingFilterFactory routingFilterFactory,
final RateLimiter pullQueryRateLimiter,
final ConcurrencyLimiter pullConcurrencyLimiter,
final HARouting pullQueryRouting,
final Optional<LocalCommands> localCommands
) {
Expand Down Expand Up @@ -284,6 +287,7 @@ public static SourceName getCommandsStreamName() {
log.debug("ksqlDB API server instance created");
this.routingFilterFactory = requireNonNull(routingFilterFactory, "routingFilterFactory");
this.pullQueryRateLimiter = requireNonNull(pullQueryRateLimiter, "pullQueryRateLimiter");
this.pullConcurrencyLimiter = requireNonNull(pullConcurrencyLimiter, "pullConcurrencyLimiter");
this.pullQueryRouting = requireNonNull(pullQueryRouting, "pullQueryRouting");
this.localCommands = requireNonNull(localCommands, "localCommands");
}
Expand Down Expand Up @@ -327,6 +331,7 @@ public void startAsync() {
pullQueryMetrics,
routingFilterFactory,
pullQueryRateLimiter,
pullConcurrencyLimiter,
pullQueryRouting,
localCommands
);
Expand All @@ -351,6 +356,7 @@ public void startAsync() {
wsQueryEndpoint,
pullQueryMetrics,
pullQueryRateLimiter,
pullConcurrencyLimiter,
pullQueryRouting,
localCommands
);
Expand Down Expand Up @@ -729,6 +735,9 @@ static KsqlRestApplication buildApplication(
heartbeatAgent, lagReportingAgent);
final RateLimiter pullQueryRateLimiter = RateLimiter.create(
ksqlConfig.getInt(KsqlConfig.KSQL_QUERY_PULL_MAX_QPS_CONFIG));
final ConcurrencyLimiter pullQueryConcurrencyLimiter = new ConcurrencyLimiter(
ksqlConfig.getInt(KsqlConfig.KSQL_QUERY_PULL_MAX_CONCURRENT_REQUESTS_CONFIG),
"pull queries");

final DenyListPropertyValidator denyListPropertyValidator = new DenyListPropertyValidator(
ksqlConfig.getList(KsqlConfig.KSQL_PROPERTIES_OVERRIDES_DENYLIST));
Expand Down Expand Up @@ -761,6 +770,7 @@ static KsqlRestApplication buildApplication(
pullQueryMetrics,
routingFilterFactory,
pullQueryRateLimiter,
pullQueryConcurrencyLimiter,
pullQueryRouting,
localCommands
);
Expand Down Expand Up @@ -837,6 +847,7 @@ static KsqlRestApplication buildApplication(
pullQueryMetrics,
routingFilterFactory,
pullQueryRateLimiter,
pullQueryConcurrencyLimiter,
pullQueryRouting,
localCommands
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import io.confluent.ksql.rest.server.resources.StatusResource;
import io.confluent.ksql.rest.server.resources.streaming.StreamedQueryResource;
import io.confluent.ksql.rest.server.resources.streaming.WSQueryEndpoint;
import io.confluent.ksql.rest.util.ConcurrencyLimiter;
import io.confluent.ksql.security.KsqlSecurityContext;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.ReservedInternalTopics;
Expand Down Expand Up @@ -84,6 +85,7 @@ public class KsqlServerEndpoints implements Endpoints {
private final WSQueryEndpoint wsQueryEndpoint;
private final Optional<PullQueryExecutorMetrics> pullQueryMetrics;
private final RateLimiter rateLimiter;
private final ConcurrencyLimiter pullConcurrencyLimiter;
private final HARouting routing;
private final Optional<LocalCommands> localCommands;

Expand All @@ -106,6 +108,7 @@ public KsqlServerEndpoints(
final WSQueryEndpoint wsQueryEndpoint,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics,
final RateLimiter rateLimiter,
final ConcurrencyLimiter pullConcurrencyLimiter,
final HARouting routing,
final Optional<LocalCommands> localCommands
) {
Expand All @@ -129,6 +132,7 @@ public KsqlServerEndpoints(
this.wsQueryEndpoint = Objects.requireNonNull(wsQueryEndpoint);
this.pullQueryMetrics = Objects.requireNonNull(pullQueryMetrics);
this.rateLimiter = Objects.requireNonNull(rateLimiter);
this.pullConcurrencyLimiter = pullConcurrencyLimiter;
this.routing = Objects.requireNonNull(routing);
this.localCommands = Objects.requireNonNull(localCommands);
}
Expand All @@ -145,7 +149,7 @@ public CompletableFuture<QueryPublisher> createQueryPublisher(final String sql,
try {
return new QueryEndpoint(
ksqlEngine, ksqlConfig, ksqlRestConfig, routingFilterFactory, pullQueryMetrics,
rateLimiter, routing, localCommands)
rateLimiter, pullConcurrencyLimiter, routing, localCommands)
.createQueryPublisher(
sql,
properties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import io.confluent.ksql.planner.PullPlannerOptions;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import java.util.Map;

public class PullQueryConfigPlannerOptions implements PullPlannerOptions {
Expand All @@ -33,17 +32,10 @@ public PullQueryConfigPlannerOptions(final KsqlConfig ksqlConfig,

@Override
public boolean getTableScansEnabled() {
final boolean configured = ksqlConfig.getBoolean(KsqlConfig.KSQL_QUERY_PULL_TABLE_SCAN_ENABLED);
if (configOverrides.containsKey(KsqlConfig.KSQL_QUERY_PULL_TABLE_SCAN_ENABLED)) {
final boolean override
= (Boolean) configOverrides.get(KsqlConfig.KSQL_QUERY_PULL_TABLE_SCAN_ENABLED);
if (override && !configured) {
throw new KsqlException("You can only disable table scans with an override, "
+ "not enable them.");
}
return override;
return (Boolean) configOverrides.get(KsqlConfig.KSQL_QUERY_PULL_TABLE_SCAN_ENABLED);
}
return configured;
return ksqlConfig.getBoolean(KsqlConfig.KSQL_QUERY_PULL_TABLE_SCAN_ENABLED);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import io.confluent.ksql.physical.pull.PullQueryResult;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.rest.server.resources.streaming.Flow.Subscriber;
import io.confluent.ksql.rest.util.ConcurrencyLimiter;
import io.confluent.ksql.rest.util.ConcurrencyLimiter.Decrementer;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KeyValue;
Expand All @@ -51,6 +53,7 @@ class PullQueryPublisher implements Flow.Publisher<Collection<StreamedRow>> {
private final long startTimeNanos;
private final RoutingFilterFactory routingFilterFactory;
private final RateLimiter rateLimiter;
private final ConcurrencyLimiter concurrencyLimiter;
private final HARouting routing;

@VisibleForTesting
Expand All @@ -63,6 +66,7 @@ class PullQueryPublisher implements Flow.Publisher<Collection<StreamedRow>> {
final long startTimeNanos,
final RoutingFilterFactory routingFilterFactory,
final RateLimiter rateLimiter,
final ConcurrencyLimiter concurrencyLimiter,
final HARouting routing
) {
this.ksqlEngine = requireNonNull(ksqlEngine, "ksqlEngine");
Expand All @@ -73,6 +77,7 @@ class PullQueryPublisher implements Flow.Publisher<Collection<StreamedRow>> {
this.startTimeNanos = startTimeNanos;
this.routingFilterFactory = requireNonNull(routingFilterFactory, "routingFilterFactory");
this.rateLimiter = requireNonNull(rateLimiter, "rateLimiter");
this.concurrencyLimiter = concurrencyLimiter;
this.routing = requireNonNull(routing, "routing");
}

Expand All @@ -90,25 +95,38 @@ public synchronized void subscribe(final Subscriber<Collection<StreamedRow>> sub
);

PullQueryExecutionUtil.checkRateLimit(rateLimiter);

final PullQueryResult result = ksqlEngine.executePullQuery(
serviceContext,
query,
routing,
routingOptions,
plannerOptions,
pullQueryMetrics,
true
);

result.onCompletion(v -> {
pullQueryMetrics.ifPresent(p -> p.recordLatency(startTimeNanos));
});

final PullQuerySubscription subscription = new PullQuerySubscription(
exec, subscriber, result);

subscriber.onSubscribe(subscription);
final Decrementer decrementer = concurrencyLimiter.increment();

try {
final PullQueryResult result = ksqlEngine.executePullQuery(
serviceContext,
query,
routing,
routingOptions,
plannerOptions,
pullQueryMetrics,
true
);

result.onCompletionOrException((v, t) -> decrementer.decrementAtMostOnce());
result.onCompletion(v -> {
pullQueryMetrics.ifPresent(p -> p.recordLatency(startTimeNanos));
});
result.onCompletionOrException((v, throwable) -> {
decrementer.decrementAtMostOnce();
if (throwable == null) {
pullQueryMetrics.ifPresent(p -> p.recordLatency(startTimeNanos));
}
});

final PullQuerySubscription subscription = new PullQuerySubscription(
exec, subscriber, result);

subscriber.onSubscribe(subscription);
} catch (Throwable t) {
decrementer.decrementAtMostOnce();
throw t;
}
}

private static final class PullQuerySubscription
Expand Down
Loading

0 comments on commit 55f5403

Please sign in to comment.