From 4dba86ff04e3a8a794b5b68f60546b89e7082bfa Mon Sep 17 00:00:00 2001 From: Vicky Papavasileiou Date: Sun, 30 Aug 2020 12:21:56 -0700 Subject: [PATCH 1/7] adding metrics for request/response size --- ksqldb-rest-app/pom.xml | 6 +++++ .../ksql/rest/server/KsqlRestApplication.java | 26 ++++++++++++++----- .../server/execution/PullQueryExecutor.java | 18 +++---------- .../execution/PullQueryExecutorMetrics.java | 13 +++------- .../streaming/PullQueryPublisher.java | 20 +++++++++++--- .../streaming/StreamedQueryResource.java | 26 ++++++++++++++----- .../resources/streaming/WSQueryEndpoint.java | 18 +++++++++---- 7 files changed, 80 insertions(+), 47 deletions(-) diff --git a/ksqldb-rest-app/pom.xml b/ksqldb-rest-app/pom.xml index f84bd051afac..33a82813391d 100644 --- a/ksqldb-rest-app/pom.xml +++ b/ksqldb-rest-app/pom.xml @@ -134,6 +134,12 @@ ${reactive-streams.version} + + org.openjdk.jol + jol-core + 0.13 + + diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index fa98f24871c1..1b1dba9db288 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -67,6 +67,7 @@ import io.confluent.ksql.rest.server.computation.InteractiveStatementExecutor; import io.confluent.ksql.rest.server.computation.InternalTopicSerdes; import io.confluent.ksql.rest.server.execution.PullQueryExecutor; +import io.confluent.ksql.rest.server.execution.PullQueryExecutorMetrics; import io.confluent.ksql.rest.server.resources.ClusterStatusResource; import io.confluent.ksql.rest.server.resources.HealthCheckResource; import io.confluent.ksql.rest.server.resources.HeartbeatResource; @@ -140,7 +141,6 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; -import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; import org.apache.log4j.LogManager; import org.slf4j.Logger; @@ -189,6 +189,7 @@ public final class KsqlRestApplication implements Executable { private final CompletableFuture terminatedFuture = new CompletableFuture<>(); private final QueryMonitor queryMonitor; private final DenyListPropertyValidator denyListPropertyValidator; + private final Optional pullQueryMetrics; // The startup thread that can be interrupted if necessary during shutdown. This should only // happen if startup hangs. @@ -225,7 +226,8 @@ public static SourceName getCommandsStreamName() { final Optional lagReportingAgent, final Vertx vertx, final QueryMonitor ksqlQueryMonitor, - final DenyListPropertyValidator denyListPropertyValidator + final DenyListPropertyValidator denyListPropertyValidator, + final Optional pullQueryMetrics ) { log.debug("Creating instance of ksqlDB API server"); this.serviceContext = requireNonNull(serviceContext, "serviceContext"); @@ -277,6 +279,7 @@ public static SourceName getCommandsStreamName() { this.ksqlConfigNoPort); this.queryMonitor = requireNonNull(ksqlQueryMonitor, "ksqlQueryMonitor"); MetricCollectors.addConfigurableReporter(ksqlConfigNoPort); + this.pullQueryMetrics = requireNonNull(pullQueryMetrics, "pullQueryMetrics"); log.debug("ksqlDB API server instance created"); } @@ -471,7 +474,7 @@ public void notifyTerminated() { public void shutdown() { log.info("ksqlDB shutdown called"); try { - pullQueryExecutor.closeMetrics(); + pullQueryMetrics.ifPresent(PullQueryExecutorMetrics::close); } catch (final Exception e) { log.error("Exception while waiting for pull query metrics to close", e); } @@ -704,12 +707,19 @@ static KsqlRestApplication buildApplication( heartbeatAgent, lagReportingAgent); final PullQueryExecutor pullQueryExecutor = new PullQueryExecutor( - ksqlEngine, routingFilterFactory, ksqlConfig, ksqlEngine.getServiceId(), - Time.SYSTEM); + ksqlEngine, routingFilterFactory, ksqlConfig, ksqlEngine.getServiceId()); final DenyListPropertyValidator denyListPropertyValidator = new DenyListPropertyValidator( ksqlConfig.getList(KsqlConfig.KSQL_PROPERTIES_OVERRIDES_DENYLIST)); + final Optional pullQueryMetrics = ksqlConfig.getBoolean( + KsqlConfig.KSQL_QUERY_PULL_METRICS_ENABLED) + ? Optional.of(new PullQueryExecutorMetrics( + ksqlEngine.getServiceId(), + ksqlConfig.getStringAsMap(KsqlConfig.KSQL_CUSTOM_METRICS_TAGS))) + : Optional.empty(); + + final StreamedQueryResource streamedQueryResource = new StreamedQueryResource( ksqlEngine, commandStore, @@ -720,7 +730,8 @@ static KsqlRestApplication buildApplication( authorizationValidator, errorHandler, pullQueryExecutor, - denyListPropertyValidator + denyListPropertyValidator, + pullQueryMetrics ); final List managedTopics = new LinkedList<>(); @@ -795,7 +806,8 @@ static KsqlRestApplication buildApplication( lagReportingAgent, vertx, queryMonitor, - denyListPropertyValidator + denyListPropertyValidator, + pullQueryMetrics ); } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java index 8d6a4fc2f245..680acb184ef3 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java @@ -111,7 +111,6 @@ import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; -import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.data.ConnectSchema; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Struct; @@ -142,24 +141,19 @@ public final class PullQueryExecutor { private final KsqlExecutionContext executionContext; private final RoutingFilterFactory routingFilterFactory; private final RateLimiter rateLimiter; - private final Optional pullQueryMetrics; public PullQueryExecutor( final KsqlExecutionContext executionContext, final RoutingFilterFactory routingFilterFactory, final KsqlConfig ksqlConfig, - final String serviceId, - final Time time + final String serviceId ) { this.executionContext = Objects.requireNonNull(executionContext, "executionContext"); this.routingFilterFactory = Objects.requireNonNull(routingFilterFactory, "routingFilterFactory"); this.rateLimiter = RateLimiter.create(ksqlConfig.getInt( KsqlConfig.KSQL_QUERY_PULL_MAX_QPS_CONFIG)); - this.pullQueryMetrics = ksqlConfig.getBoolean(KsqlConfig.KSQL_QUERY_PULL_METRICS_ENABLED) - ? Optional.of(new PullQueryExecutorMetrics(serviceId, - ksqlConfig.getStringAsMap(KsqlConfig.KSQL_CUSTOM_METRICS_TAGS), time)) - : Optional.empty(); + } @SuppressWarnings("unused") // Needs to match validator API. @@ -177,7 +171,7 @@ public PullQueryResult execute( final Map requestProperties, final ServiceContext serviceContext, final Optional isInternalRequest, - final long startTimeNanos + final Optional pullQueryMetrics ) { if (!statement.getStatement().isPullQuery()) { throw new IllegalArgumentException("Executor can only handle pull queries"); @@ -249,8 +243,6 @@ public PullQueryResult execute( routingOptions ); - pullQueryMetrics.ifPresent(metrics -> - metrics.recordLatency(startTimeNanos)); return result; } catch (final Exception e) { pullQueryMetrics.ifPresent(metrics -> metrics.recordErrorRate(1)); @@ -270,10 +262,6 @@ void checkRateLimit() { } } - public void closeMetrics() { - pullQueryMetrics.ifPresent(PullQueryExecutorMetrics::close); - } - private PullQueryResult handlePullQuery( final ConfiguredStatement statement, final KsqlExecutionContext executionContext, diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorMetrics.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorMetrics.java index 6518381e403b..e0c849bdab69 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorMetrics.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorMetrics.java @@ -22,7 +22,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.TimeUnit; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; @@ -33,7 +32,6 @@ import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing; import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.common.metrics.stats.WindowedCount; -import org.apache.kafka.common.utils.Time; public class PullQueryExecutorMetrics implements Closeable { @@ -48,17 +46,14 @@ public class PullQueryExecutorMetrics implements Closeable { private final Sensor errorRateSensor; private final Metrics metrics; private final Map customMetricsTags; - private final Time time; private final String ksqlServiceId; public PullQueryExecutorMetrics( final String ksqlServiceId, - final Map customMetricsTags, - final Time time + final Map customMetricsTags ) { this.customMetricsTags = Objects.requireNonNull(customMetricsTags, "customMetricsTags"); - this.time = Objects.requireNonNull(time, "time"); this.metrics = MetricCollectors.getMetrics(); this.ksqlServiceId = ReservedInternalTopics.KSQL_INTERNAL_TOPIC_PREFIX + ksqlServiceId; @@ -83,11 +78,9 @@ public void recordRemoteRequests(final double value) { this.remoteRequestsSensor.record(value); } - public void recordLatency(final long startTimeNanos) { + public void recordLatency(final double value) { // Record latency at microsecond scale - final long nowNanos = time.nanoseconds(); - final double latency = TimeUnit.NANOSECONDS.toMicros(nowNanos - startTimeNanos); - this.latencySensor.record(latency); + this.latencySensor.record(value); this.requestRateSensor.record(1); } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java index c4fbfc117610..8f0f3b5185c3 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java @@ -19,7 +19,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; import io.confluent.ksql.GenericRow; import io.confluent.ksql.execution.streams.materialization.Locator.KsqlNode; import io.confluent.ksql.parser.tree.Query; @@ -27,6 +26,7 @@ import io.confluent.ksql.rest.entity.StreamedRow; import io.confluent.ksql.rest.entity.TableRows; import io.confluent.ksql.rest.server.execution.PullQueryExecutor; +import io.confluent.ksql.rest.server.execution.PullQueryExecutorMetrics; import io.confluent.ksql.rest.server.execution.PullQueryResult; import io.confluent.ksql.rest.server.resources.streaming.Flow.Subscriber; import io.confluent.ksql.services.ServiceContext; @@ -35,13 +35,16 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import org.apache.kafka.common.utils.Time; class PullQueryPublisher implements Flow.Publisher> { private final ServiceContext serviceContext; private final ConfiguredStatement query; private final PullQueryExecutor pullQueryExecutor; + private final Optional pullQueryMetrics; private final long startTimeNanos; @VisibleForTesting @@ -49,11 +52,13 @@ class PullQueryPublisher implements Flow.Publisher> { final ServiceContext serviceContext, final ConfiguredStatement query, final PullQueryExecutor pullQueryExecutor, + final Optional pullQueryMetrics, final long startTimeNanos ) { this.serviceContext = requireNonNull(serviceContext, "serviceContext"); this.query = requireNonNull(query, "query"); this.pullQueryExecutor = requireNonNull(pullQueryExecutor, "pullQueryExecutor"); + this.pullQueryMetrics = pullQueryMetrics; this.startTimeNanos = startTimeNanos; } @@ -61,8 +66,17 @@ class PullQueryPublisher implements Flow.Publisher> { public synchronized void subscribe(final Subscriber> subscriber) { final PullQuerySubscription subscription = new PullQuerySubscription( subscriber, - () -> pullQueryExecutor - .execute(query, ImmutableMap.of(), serviceContext, Optional.of(false), startTimeNanos) + () -> { + PullQueryResult result = pullQueryExecutor.execute( + query, serviceContext, Optional.of(false), pullQueryMetrics); + if (pullQueryMetrics.isPresent()) { + //Record latency at microsecond scale + final long nowNanos = Time.SYSTEM.nanoseconds(); + final double latency = TimeUnit.NANOSECONDS.toMicros(nowNanos - startTimeNanos); + pullQueryMetrics.get().recordLatency(latency); + } + return result; + } ); subscriber.onSubscribe(subscription); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java index c943b19d5a04..171ce9cc14f3 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java @@ -36,6 +36,7 @@ import io.confluent.ksql.rest.server.StatementParser; import io.confluent.ksql.rest.server.computation.CommandQueue; import io.confluent.ksql.rest.server.execution.PullQueryExecutor; +import io.confluent.ksql.rest.server.execution.PullQueryExecutorMetrics; import io.confluent.ksql.rest.server.execution.PullQueryResult; import io.confluent.ksql.rest.server.resources.KsqlConfigurable; import io.confluent.ksql.rest.server.resources.KsqlRestException; @@ -57,6 +58,7 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.utils.Time; @@ -81,6 +83,7 @@ public class StreamedQueryResource implements KsqlConfigurable { private KsqlConfig ksqlConfig; private final PullQueryExecutor pullQueryExecutor; private final DenyListPropertyValidator denyListPropertyValidator; + private final Optional pullQueryMetrics; public StreamedQueryResource( final KsqlEngine ksqlEngine, @@ -91,7 +94,8 @@ public StreamedQueryResource( final Optional authorizationValidator, final Errors errorHandler, final PullQueryExecutor pullQueryExecutor, - final DenyListPropertyValidator denyListPropertyValidator + final DenyListPropertyValidator denyListPropertyValidator, + final Optional pullQueryMetrics ) { this( ksqlEngine, @@ -103,7 +107,8 @@ public StreamedQueryResource( authorizationValidator, errorHandler, pullQueryExecutor, - denyListPropertyValidator + denyListPropertyValidator, + pullQueryMetrics ); } @@ -120,7 +125,8 @@ public StreamedQueryResource( final Optional authorizationValidator, final Errors errorHandler, final PullQueryExecutor pullQueryExecutor, - final DenyListPropertyValidator denyListPropertyValidator + final DenyListPropertyValidator denyListPropertyValidator, + final Optional pullQueryMetrics ) { this.ksqlEngine = Objects.requireNonNull(ksqlEngine, "ksqlEngine"); this.statementParser = Objects.requireNonNull(statementParser, "statementParser"); @@ -136,6 +142,7 @@ public StreamedQueryResource( this.pullQueryExecutor = Objects.requireNonNull(pullQueryExecutor, "pullQueryExecutor"); this.denyListPropertyValidator = Objects.requireNonNull(denyListPropertyValidator, "denyListPropertyValidator"); + this.pullQueryMetrics = Objects.requireNonNull(pullQueryMetrics, "pullQueryMetrics"); } @Override @@ -216,8 +223,14 @@ private EndpointResponse handleStatement( configProperties, request.getRequestProperties(), isInternalRequest, - startTimeNanos + pullQueryMetrics ); + if (pullQueryMetrics.isPresent()) { + //Record latency at microsecond scale + final long nowNanos = Time.SYSTEM.nanoseconds(); + final double latency = TimeUnit.NANOSECONDS.toMicros(nowNanos - startTimeNanos); + pullQueryMetrics.get().recordLatency(latency); + } return response; } @@ -255,14 +268,13 @@ private EndpointResponse handlePullQuery( final Map configOverrides, final Map requestProperties, final Optional isInternalRequest, - final long startTimeNanos + final Optional pullQueryMetrics ) { final ConfiguredStatement configured = ConfiguredStatement .of(statement, SessionConfig.of(ksqlConfig, configOverrides)); final PullQueryResult result = pullQueryExecutor - .execute(configured, requestProperties, serviceContext, isInternalRequest, startTimeNanos); - + .execute(configured, serviceContext, isInternalRequest, pullQueryMetrics); final TableRows tableRows = result.getTableRows(); final Optional host = result.getSourceNode() .map(KsqlNode::location) diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java index f07eebdc4204..c928095832d8 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java @@ -35,6 +35,7 @@ import io.confluent.ksql.rest.server.StatementParser; import io.confluent.ksql.rest.server.computation.CommandQueue; import io.confluent.ksql.rest.server.execution.PullQueryExecutor; +import io.confluent.ksql.rest.server.execution.PullQueryExecutorMetrics; import io.confluent.ksql.rest.util.CommandStoreUtil; import io.confluent.ksql.security.KsqlAuthorizationValidator; import io.confluent.ksql.security.KsqlSecurityContext; @@ -72,6 +73,7 @@ public class WSQueryEndpoint { private final Errors errorHandler; private final PullQueryExecutor pullQueryExecutor; private final DenyListPropertyValidator denyListPropertyValidator; + private final Optional pullQueryMetrics; private WebSocketSubscriber subscriber; @@ -88,7 +90,8 @@ public WSQueryEndpoint( final Optional authorizationValidator, final Errors errorHandler, final PullQueryExecutor pullQueryExecutor, - final DenyListPropertyValidator denyListPropertyValidator + final DenyListPropertyValidator denyListPropertyValidator, + final Optional pullQueryMetrics ) { this(ksqlConfig, statementParser, @@ -103,7 +106,8 @@ public WSQueryEndpoint( authorizationValidator, errorHandler, pullQueryExecutor, - denyListPropertyValidator + denyListPropertyValidator, + pullQueryMetrics ); } @@ -123,7 +127,8 @@ public WSQueryEndpoint( final Optional authorizationValidator, final Errors errorHandler, final PullQueryExecutor pullQueryExecutor, - final DenyListPropertyValidator denyListPropertyValidator + final DenyListPropertyValidator denyListPropertyValidator, + final Optional pullQueryMetrics ) { this.ksqlConfig = Objects.requireNonNull(ksqlConfig, "ksqlConfig"); this.statementParser = Objects.requireNonNull(statementParser, "statementParser"); @@ -144,6 +149,7 @@ public WSQueryEndpoint( this.pullQueryExecutor = Objects.requireNonNull(pullQueryExecutor, "pullQueryExecutor"); this.denyListPropertyValidator = Objects.requireNonNull(denyListPropertyValidator, "denyListPropertyValidator"); + this.pullQueryMetrics = Objects.requireNonNull(pullQueryMetrics, "pullQueryMetrics"); } public void executeStreamQuery(final ServerWebSocket webSocket, final MultiMap requestParams, @@ -275,7 +281,7 @@ private void handleQuery(final RequestContext info, final Query query, configured, streamSubscriber, pullQueryExecutor, - startTimeNanos + pullQueryMetrics ); } else { pushQueryPublisher.start( @@ -337,12 +343,14 @@ private static void startPullQueryPublisher( final ConfiguredStatement query, final WebSocketSubscriber streamSubscriber, final PullQueryExecutor pullQueryExecutor, + final Optional pullQueryMetrics, final long startTimeNanos ) { new PullQueryPublisher( serviceContext, query, pullQueryExecutor, + pullQueryMetrics, startTimeNanos ).subscribe(streamSubscriber); } @@ -378,7 +386,7 @@ void start( ConfiguredStatement query, WebSocketSubscriber subscriber, PullQueryExecutor pullQueryExecutor, - long startTimeNanos); + Optional pullQueryMetrics); } From 7fc84367683bc837113db73b5ca4149183382efe Mon Sep 17 00:00:00 2001 From: Vicky Papavasileiou Date: Wed, 2 Sep 2020 18:31:12 -0700 Subject: [PATCH 2/7] measure size via vertx methods --- .../ksql/api/impl/QueryEndpoint.java | 19 ++++--- .../ksql/api/server/OldApiUtils.java | 19 +++++-- .../io/confluent/ksql/api/server/Server.java | 11 ++-- .../ksql/api/server/ServerVerticle.java | 39 +++++++++------ .../io/confluent/ksql/api/spi/Endpoints.java | 8 +-- .../ksql/rest/server/KsqlRestApplication.java | 8 +-- .../ksql/rest/server/KsqlServerEndpoints.java | 20 ++++++-- .../execution/PullQueryExecutorMetrics.java | 48 ++++++++++++++++++ .../streaming/PullQueryPublisher.java | 2 +- .../streaming/StreamedQueryResource.java | 17 +++++-- .../resources/streaming/WSQueryEndpoint.java | 11 +++- .../java/io/confluent/ksql/api/AuthTest.java | 3 +- .../io/confluent/ksql/api/BaseApiTest.java | 2 +- .../io/confluent/ksql/api/TestEndpoints.java | 7 ++- .../ksql/api/perf/BasePerfRunner.java | 2 +- .../ksql/api/perf/InsertsStreamRunner.java | 7 ++- .../ksql/api/perf/PullQueryRunner.java | 7 ++- .../ksql/api/perf/QueryStreamRunner.java | 8 ++- .../rest/server/KsqlRestApplicationTest.java | 3 +- .../PullQueryExecutorMetricsTest.java | 2 +- .../execution/PullQueryExecutorTest.java | 7 +-- .../server/resources/WSQueryEndpointTest.java | 18 +++---- .../streaming/PullQueryPublisherTest.java | 10 ++-- .../streaming/StreamedQueryResourceTest.java | 50 ++++++++++++------- 24 files changed, 232 insertions(+), 96 deletions(-) diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java index e72611337062..6dcd51e6474a 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java @@ -28,6 +28,7 @@ import io.confluent.ksql.query.BlockingRowQueue; import io.confluent.ksql.rest.entity.TableRows; import io.confluent.ksql.rest.server.execution.PullQueryExecutor; +import io.confluent.ksql.rest.server.execution.PullQueryExecutorMetrics; import io.confluent.ksql.rest.server.execution.PullQueryResult; import io.confluent.ksql.schema.ksql.Column; import io.confluent.ksql.schema.utils.FormatOptions; @@ -45,19 +46,24 @@ import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; -import org.apache.kafka.common.utils.Time; public class QueryEndpoint { private final KsqlEngine ksqlEngine; private final KsqlConfig ksqlConfig; private final PullQueryExecutor pullQueryExecutor; + private final Optional pullQueryMetrics; - public QueryEndpoint(final KsqlEngine ksqlEngine, final KsqlConfig ksqlConfig, - final PullQueryExecutor pullQueryExecutor) { + public QueryEndpoint( + final KsqlEngine ksqlEngine, + final KsqlConfig ksqlConfig, + final PullQueryExecutor pullQueryExecutor, + final Optional pullQueryMetrics + ) { this.ksqlEngine = ksqlEngine; this.ksqlConfig = ksqlConfig; this.pullQueryExecutor = pullQueryExecutor; + this.pullQueryMetrics = pullQueryMetrics; } public QueryPublisher createQueryPublisher( @@ -65,14 +71,13 @@ public QueryPublisher createQueryPublisher( final Context context, final WorkerExecutor workerExecutor, final ServiceContext serviceContext) { - final long startTimeNanos = Time.SYSTEM.nanoseconds(); // Must be run on worker as all this stuff is slow VertxUtils.checkIsWorker(); final ConfiguredStatement statement = createStatement(sql, properties.getMap()); if (statement.getStatement().isPullQuery()) { - return createPullQueryPublisher(context, serviceContext, statement, startTimeNanos); + return createPullQueryPublisher(context, serviceContext, statement, pullQueryMetrics); } else { return createPushQueryPublisher(context, serviceContext, statement, workerExecutor); } @@ -97,10 +102,10 @@ private QueryPublisher createPullQueryPublisher( final Context context, final ServiceContext serviceContext, final ConfiguredStatement statement, - final long startTimeNanos + final Optional pullQueryMetrics ) { final PullQueryResult result = pullQueryExecutor.execute( - statement, ImmutableMap.of(), serviceContext, Optional.of(false), startTimeNanos); + statement, serviceContext, Optional.of(false), pullQueryMetrics); final TableRows tableRows = result.getTableRows(); return new PullQueryPublisher( diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/OldApiUtils.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/OldApiUtils.java index f12b5be5e488..5f233eff1ab5 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/OldApiUtils.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/OldApiUtils.java @@ -24,6 +24,7 @@ import io.confluent.ksql.rest.EndpointResponse; import io.confluent.ksql.rest.Errors; import io.confluent.ksql.rest.entity.KsqlErrorMessage; +import io.confluent.ksql.rest.server.execution.PullQueryExecutorMetrics; import io.confluent.ksql.rest.server.resources.KsqlRestException; import io.confluent.ksql.util.VertxCompletableFuture; import io.vertx.core.WorkerExecutor; @@ -49,9 +50,11 @@ private OldApiUtils() { private static final String JSON_CONTENT_TYPE = "application/json"; private static final String CHUNKED_ENCODING = "chunked"; - static void handleOldApiRequest(final Server server, + static void handleOldApiRequest( + final Server server, final RoutingContext routingContext, final Class requestClass, + final Optional pullQueryMetrics, final BiFunction> requestor) { final T requestObject; if (requestClass != null) { @@ -68,18 +71,21 @@ static void handleOldApiRequest(final Server server, final CompletableFuture completableFuture = requestor .apply(requestObject, DefaultApiSecurityContext.create(routingContext)); completableFuture.thenAccept(endpointResponse -> { - handleOldApiResponse(server, routingContext, endpointResponse); + handleOldApiResponse(server, routingContext, endpointResponse, pullQueryMetrics); }).exceptionally(t -> { if (t instanceof CompletionException) { t = t.getCause(); } - handleOldApiResponse(server, routingContext, mapException(t)); + handleOldApiResponse(server, routingContext, mapException(t), pullQueryMetrics); return null; }); } - static void handleOldApiResponse(final Server server, final RoutingContext routingContext, - final EndpointResponse endpointResponse) { + static void handleOldApiResponse( + final Server server, final RoutingContext routingContext, + final EndpointResponse endpointResponse, + final Optional pullQueryMetrics + ) { final HttpServerResponse response = routingContext.response(); response.putHeader(CONTENT_TYPE_HEADER, JSON_CONTENT_TYPE); @@ -112,6 +118,9 @@ static void handleOldApiResponse(final Server server, final RoutingContext routi response.end(responseBody); } } + pullQueryMetrics + .ifPresent(pullQueryExecutorMetrics -> pullQueryExecutorMetrics.recordResponseSize( + routingContext.response().bytesWritten())); } private static void streamEndpointResponse(final Server server, diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/Server.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/Server.java index 335627c4cd06..8788cd108e48 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/Server.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/Server.java @@ -23,6 +23,7 @@ import io.confluent.ksql.api.spi.Endpoints; import io.confluent.ksql.rest.entity.PushQueryId; import io.confluent.ksql.rest.server.KsqlRestConfig; +import io.confluent.ksql.rest.server.execution.PullQueryExecutorMetrics; import io.confluent.ksql.rest.server.state.ServerState; import io.confluent.ksql.rest.util.KeystoreUtil; import io.confluent.ksql.security.KsqlSecurityExtension; @@ -78,14 +79,17 @@ public class Server { private final Optional authenticationPlugin; private final ServerState serverState; private final List listeners = new ArrayList<>(); + private final Optional pullQueryMetrics; private URI internalListener; private WorkerExecutor workerExecutor; private FileWatcher fileWatcher; - public Server(final Vertx vertx, final KsqlRestConfig config, final Endpoints endpoints, + public Server( + final Vertx vertx, final KsqlRestConfig config, final Endpoints endpoints, final KsqlSecurityExtension securityExtension, final Optional authenticationPlugin, - final ServerState serverState) { + final ServerState serverState, + final Optional pullQueryMetrics) { this.vertx = Objects.requireNonNull(vertx); this.config = Objects.requireNonNull(config); this.endpoints = Objects.requireNonNull(endpoints); @@ -93,6 +97,7 @@ public Server(final Vertx vertx, final KsqlRestConfig config, final Endpoints en this.authenticationPlugin = Objects.requireNonNull(authenticationPlugin); this.serverState = Objects.requireNonNull(serverState); this.maxPushQueryCount = config.getInt(KsqlRestConfig.MAX_PUSH_QUERIES); + this.pullQueryMetrics = Objects.requireNonNull(pullQueryMetrics, "pullQueryMetrics"); if (!OpenSsl.isAvailable()) { log.warn("OpenSSL does not appear to be installed. ksqlDB will fall back to using the JDK " + "TLS implementation. OpenSSL is recommended for better performance."); @@ -126,7 +131,7 @@ public synchronized void start() { final ServerVerticle serverVerticle = new ServerVerticle(endpoints, createHttpServerOptions(config, listener.getHost(), listener.getPort(), listener.getScheme().equalsIgnoreCase("https"), isInternalListener.orElse(false)), - this, isInternalListener); + this, isInternalListener, pullQueryMetrics); vertx.deployVerticle(serverVerticle, vcf); final int index = i; final CompletableFuture deployFuture = vcf.thenApply(s -> { diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/ServerVerticle.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/ServerVerticle.java index 4bb721bda6c6..4f8df8689d27 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/ServerVerticle.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/ServerVerticle.java @@ -27,6 +27,7 @@ import io.confluent.ksql.rest.entity.KsqlRequest; import io.confluent.ksql.rest.entity.LagReportingMessage; import io.confluent.ksql.rest.entity.Versions; +import io.confluent.ksql.rest.server.execution.PullQueryExecutorMetrics; import io.vertx.core.AbstractVerticle; import io.vertx.core.Promise; import io.vertx.core.http.HttpHeaders; @@ -65,16 +66,19 @@ public class ServerVerticle extends AbstractVerticle { private ConnectionQueryManager connectionQueryManager; private HttpServer httpServer; private final Optional isInternalListener; + private final Optional pullQueryMetrics; public ServerVerticle( final Endpoints endpoints, final HttpServerOptions httpServerOptions, final Server server, - final Optional isInternalListener) { + final Optional isInternalListener, + final Optional pullQueryMetrics) { this.endpoints = Objects.requireNonNull(endpoints); this.httpServerOptions = Objects.requireNonNull(httpServerOptions); this.server = Objects.requireNonNull(server); this.isInternalListener = Objects.requireNonNull(isInternalListener); + this.pullQueryMetrics = Objects.requireNonNull(pullQueryMetrics); } @Override @@ -206,7 +210,7 @@ private Router setupRouter() { } private void handleKsqlRequest(final RoutingContext routingContext) { - handleOldApiRequest(server, routingContext, KsqlRequest.class, + handleOldApiRequest(server, routingContext, KsqlRequest.class, Optional.empty(), (ksqlRequest, apiSecurityContext) -> endpoints .executeKsqlRequest(ksqlRequest, server.getWorkerExecutor(), @@ -215,7 +219,7 @@ private void handleKsqlRequest(final RoutingContext routingContext) { } private void handleTerminateRequest(final RoutingContext routingContext) { - handleOldApiRequest(server, routingContext, ClusterTerminateRequest.class, + handleOldApiRequest(server, routingContext, ClusterTerminateRequest.class, Optional.empty(), (request, apiSecurityContext) -> endpoints .executeTerminate(request, server.getWorkerExecutor(), @@ -227,32 +231,35 @@ private void handleQueryRequest(final RoutingContext routingContext) { final CompletableFuture connectionClosedFuture = new CompletableFuture<>(); routingContext.request().connection().closeHandler(v -> connectionClosedFuture.complete(null)); - - handleOldApiRequest(server, routingContext, KsqlRequest.class, + routingContext.request().bytesRead(); + handleOldApiRequest(server, routingContext, KsqlRequest.class, pullQueryMetrics, (request, apiSecurityContext) -> endpoints - .executeQueryRequest(request, server.getWorkerExecutor(), connectionClosedFuture, + .executeQueryRequest( + request, server.getWorkerExecutor(), connectionClosedFuture, DefaultApiSecurityContext.create(routingContext), - isInternalRequest(routingContext)) + isInternalRequest(routingContext), + routingContext) + ); } private void handleInfoRequest(final RoutingContext routingContext) { - handleOldApiRequest(server, routingContext, null, + handleOldApiRequest(server, routingContext, null, Optional.empty(), (request, apiSecurityContext) -> endpoints.executeInfo(DefaultApiSecurityContext.create(routingContext)) ); } private void handleClusterStatusRequest(final RoutingContext routingContext) { - handleOldApiRequest(server, routingContext, null, + handleOldApiRequest(server, routingContext, null, Optional.empty(), (request, apiSecurityContext) -> endpoints.executeClusterStatus(DefaultApiSecurityContext.create(routingContext)) ); } private void handleHeartbeatRequest(final RoutingContext routingContext) { - handleOldApiRequest(server, routingContext, HeartbeatMessage.class, + handleOldApiRequest(server, routingContext, HeartbeatMessage.class, Optional.empty(), (request, apiSecurityContext) -> endpoints.executeHeartbeat(request, DefaultApiSecurityContext.create(routingContext)) ); @@ -263,7 +270,7 @@ private void handleStatusRequest(final RoutingContext routingContext) { final String type = request.getParam("type"); final String entity = request.getParam("entity"); final String action = request.getParam("action"); - handleOldApiRequest(server, routingContext, null, + handleOldApiRequest(server, routingContext, null, Optional.empty(), (r, apiSecurityContext) -> endpoints.executeStatus(type, entity, action, DefaultApiSecurityContext.create(routingContext)) @@ -271,35 +278,35 @@ private void handleStatusRequest(final RoutingContext routingContext) { } private void handleAllStatusesRequest(final RoutingContext routingContext) { - handleOldApiRequest(server, routingContext, null, + handleOldApiRequest(server, routingContext, null, Optional.empty(), (r, apiSecurityContext) -> endpoints.executeAllStatuses(DefaultApiSecurityContext.create(routingContext)) ); } private void handleLagReportRequest(final RoutingContext routingContext) { - handleOldApiRequest(server, routingContext, LagReportingMessage.class, + handleOldApiRequest(server, routingContext, LagReportingMessage.class, Optional.empty(), (request, apiSecurityContext) -> endpoints.executeLagReport(request, DefaultApiSecurityContext.create(routingContext)) ); } private void handleHealthcheckRequest(final RoutingContext routingContext) { - handleOldApiRequest(server, routingContext, null, + handleOldApiRequest(server, routingContext, null, Optional.empty(), (request, apiSecurityContext) -> endpoints.executeCheckHealth(DefaultApiSecurityContext.create(routingContext)) ); } private void handleServerMetadataRequest(final RoutingContext routingContext) { - handleOldApiRequest(server, routingContext, null, + handleOldApiRequest(server, routingContext, null, Optional.empty(), (request, apiSecurityContext) -> endpoints.executeServerMetadata(DefaultApiSecurityContext.create(routingContext)) ); } private void handleServerMetadataClusterIdRequest(final RoutingContext routingContext) { - handleOldApiRequest(server, routingContext, null, + handleOldApiRequest(server, routingContext, null, Optional.empty(), (request, apiSecurityContext) -> endpoints .executeServerMetadataClusterId(DefaultApiSecurityContext.create(routingContext)) diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/spi/Endpoints.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/spi/Endpoints.java index 90dffb62a0b1..759c8e9b1e81 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/spi/Endpoints.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/spi/Endpoints.java @@ -28,6 +28,7 @@ import io.vertx.core.WorkerExecutor; import io.vertx.core.http.ServerWebSocket; import io.vertx.core.json.JsonObject; +import io.vertx.ext.web.RoutingContext; import java.util.Optional; import java.util.concurrent.CompletableFuture; import org.reactivestreams.Subscriber; @@ -78,9 +79,10 @@ CompletableFuture executeKsqlRequest(KsqlRequest request, CompletableFuture executeTerminate(ClusterTerminateRequest request, WorkerExecutor workerExecutor, ApiSecurityContext apiSecurityContext); - CompletableFuture executeQueryRequest(KsqlRequest request, - WorkerExecutor workerExecutor, CompletableFuture connectionClosedFuture, - ApiSecurityContext apiSecurityContext, Optional isInternalRequest); + CompletableFuture executeQueryRequest( + KsqlRequest request, WorkerExecutor workerExecutor, + CompletableFuture connectionClosedFuture, ApiSecurityContext apiSecurityContext, + Optional isInternalRequest, RoutingContext routingContext); CompletableFuture executeInfo(ApiSecurityContext apiSecurityContext); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index 1b1dba9db288..ced3e0aa8056 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -319,7 +319,8 @@ public void startAsync() { authorizationValidator, errorHandler, pullQueryExecutor, - denyListPropertyValidator + denyListPropertyValidator, + pullQueryMetrics ); startAsyncThreadRef.set(Thread.currentThread()); @@ -338,10 +339,11 @@ public void startAsync() { lagReportingResource, healthCheckResource, serverMetadataResource, - wsQueryEndpoint + wsQueryEndpoint, + pullQueryMetrics ); apiServer = new Server(vertx, ksqlRestConfig, endpoints, securityExtension, - authenticationPlugin, serverState); + authenticationPlugin, serverState, pullQueryMetrics); apiServer.start(); final KsqlConfig ksqlConfigWithPort = buildConfigWithPort(); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerEndpoints.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerEndpoints.java index 7468ac9c0a98..31e668fc3de8 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerEndpoints.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerEndpoints.java @@ -32,6 +32,7 @@ import io.confluent.ksql.rest.entity.KsqlRequest; import io.confluent.ksql.rest.entity.LagReportingMessage; import io.confluent.ksql.rest.server.execution.PullQueryExecutor; +import io.confluent.ksql.rest.server.execution.PullQueryExecutorMetrics; import io.confluent.ksql.rest.server.resources.ClusterStatusResource; import io.confluent.ksql.rest.server.resources.HealthCheckResource; import io.confluent.ksql.rest.server.resources.HeartbeatResource; @@ -51,6 +52,7 @@ import io.vertx.core.WorkerExecutor; import io.vertx.core.http.ServerWebSocket; import io.vertx.core.json.JsonObject; +import io.vertx.ext.web.RoutingContext; import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -77,6 +79,7 @@ public class KsqlServerEndpoints implements Endpoints { private final HealthCheckResource healthCheckResource; private final ServerMetadataResource serverMetadataResource; private final WSQueryEndpoint wsQueryEndpoint; + private final Optional pullQueryMetrics; // CHECKSTYLE_RULES.OFF: ParameterNumber public KsqlServerEndpoints( @@ -93,7 +96,9 @@ public KsqlServerEndpoints( final Optional lagReportingResource, final HealthCheckResource healthCheckResource, final ServerMetadataResource serverMetadataResource, - final WSQueryEndpoint wsQueryEndpoint) { + final WSQueryEndpoint wsQueryEndpoint, + final Optional pullQueryMetrics + ) { // CHECKSTYLE_RULES.ON: ParameterNumber this.ksqlEngine = Objects.requireNonNull(ksqlEngine); @@ -111,6 +116,7 @@ public KsqlServerEndpoints( this.healthCheckResource = Objects.requireNonNull(healthCheckResource); this.serverMetadataResource = Objects.requireNonNull(serverMetadataResource); this.wsQueryEndpoint = Objects.requireNonNull(wsQueryEndpoint); + this.pullQueryMetrics = Objects.requireNonNull(pullQueryMetrics); } @Override @@ -120,7 +126,7 @@ public CompletableFuture createQueryPublisher(final String sql, final WorkerExecutor workerExecutor, final ApiSecurityContext apiSecurityContext) { return executeOnWorker( - () -> new QueryEndpoint(ksqlEngine, ksqlConfig, pullQueryExecutor) + () -> new QueryEndpoint(ksqlEngine, ksqlConfig, pullQueryExecutor, pullQueryMetrics) .createQueryPublisher(sql, properties, context, workerExecutor, ksqlSecurityContextProvider.provide(apiSecurityContext).getServiceContext()), workerExecutor); @@ -151,17 +157,21 @@ public CompletableFuture executeKsqlRequest(final KsqlRequest } @Override - public CompletableFuture executeQueryRequest(final KsqlRequest request, + public CompletableFuture executeQueryRequest( + final KsqlRequest request, final WorkerExecutor workerExecutor, final CompletableFuture connectionClosedFuture, final ApiSecurityContext apiSecurityContext, - final Optional isInternalRequest) { + final Optional isInternalRequest, + final RoutingContext routingContext + ) { return executeOldApiEndpointOnWorker(apiSecurityContext, ksqlSecurityContext -> streamedQueryResource.streamQuery( ksqlSecurityContext, request, connectionClosedFuture, - isInternalRequest), workerExecutor); + isInternalRequest, + routingContext), workerExecutor); } @Override diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorMetrics.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorMetrics.java index e0c849bdab69..d9659d606b7f 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorMetrics.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorMetrics.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.CumulativeSum; import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Min; import org.apache.kafka.common.metrics.stats.Percentile; @@ -33,6 +34,7 @@ import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.common.metrics.stats.WindowedCount; +@SuppressWarnings("checkstyle:ClassDataAbstractionCoupling") public class PullQueryExecutorMetrics implements Closeable { private static final String PULL_QUERY_METRIC_GROUP = "pull-query"; @@ -44,6 +46,8 @@ public class PullQueryExecutorMetrics implements Closeable { private final Sensor latencySensor; private final Sensor requestRateSensor; private final Sensor errorRateSensor; + private final Sensor requestSizeSensor; + private final Sensor responseSizeSensor; private final Metrics metrics; private final Map customMetricsTags; private final String ksqlServiceId; @@ -63,6 +67,8 @@ public PullQueryExecutorMetrics( this.latencySensor = configureRequestSensor(); this.requestRateSensor = configureRateSensor(); this.errorRateSensor = configureErrorRateSensor(); + this.requestSizeSensor = configureRequestSizeSensor(); + this.responseSizeSensor = configureResponseSizeSensor(); } @Override @@ -88,6 +94,14 @@ public void recordErrorRate(final double value) { this.errorRateSensor.record(value); } + public void recordRequestSize(final double value) { + this.requestSizeSensor.record(value); + } + + public void recordResponseSize(final double value) { + this.responseSizeSensor.record(value); + } + List getSensors() { return sensors; } @@ -261,4 +275,38 @@ private Sensor configureRequestSensor() { sensors.add(sensor); return sensor; } + + private Sensor configureRequestSizeSensor() { + final Sensor sensor = metrics.sensor( + PULL_QUERY_METRIC_GROUP + "-" + PULL_REQUESTS + "-request-size"); + sensor.add( + metrics.metricName( + PULL_REQUESTS + "-request-size", + ksqlServiceId + PULL_QUERY_METRIC_GROUP, + "Size in bytes of pull query request", + customMetricsTags + ), + new CumulativeSum() + ); + + sensors.add(sensor); + return sensor; + } + + private Sensor configureResponseSizeSensor() { + final Sensor sensor = metrics.sensor( + PULL_QUERY_METRIC_GROUP + "-" + PULL_REQUESTS + "-response-size"); + sensor.add( + metrics.metricName( + PULL_REQUESTS + "-response-size", + ksqlServiceId + PULL_QUERY_METRIC_GROUP, + "Size in bytes of pull query response", + customMetricsTags + ), + new CumulativeSum() + ); + + sensors.add(sensor); + return sensor; + } } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java index 8f0f3b5185c3..bbb5233bcb14 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java @@ -67,7 +67,7 @@ public synchronized void subscribe(final Subscriber> sub final PullQuerySubscription subscription = new PullQuerySubscription( subscriber, () -> { - PullQueryResult result = pullQueryExecutor.execute( + final PullQueryResult result = pullQueryExecutor.execute( query, serviceContext, Optional.of(false), pullQueryMetrics); if (pullQueryMetrics.isPresent()) { //Record latency at microsecond scale diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java index 171ce9cc14f3..d735fef1a8cf 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java @@ -50,6 +50,7 @@ import io.confluent.ksql.util.KsqlStatementException; import io.confluent.ksql.util.TransientQueryMetadata; import io.confluent.ksql.version.metrics.ActivenessRegistrar; +import io.vertx.ext.web.RoutingContext; import java.time.Duration; import java.util.Collection; import java.util.HashMap; @@ -63,6 +64,7 @@ import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; +import org.openjdk.jol.info.ClassLayout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -158,11 +160,11 @@ public EndpointResponse streamQuery( final KsqlSecurityContext securityContext, final KsqlRequest request, final CompletableFuture connectionClosedFuture, - final Optional isInternalRequest + final Optional isInternalRequest, + final RoutingContext routingContext ) { final long startTimeNanos = Time.SYSTEM.nanoseconds(); throwIfNotConfigured(); - activenessRegistrar.updateLastRequestTime(); final PreparedStatement statement = parseStatement(request); @@ -171,7 +173,7 @@ public EndpointResponse streamQuery( commandQueue, request, commandQueueCatchupTimeout); return handleStatement(securityContext, request, statement, connectionClosedFuture, - isInternalRequest, startTimeNanos); + isInternalRequest, routingContext, startTimeNanos); } private void throwIfNotConfigured() { @@ -200,6 +202,7 @@ private EndpointResponse handleStatement( final PreparedStatement statement, final CompletableFuture connectionClosedFuture, final Optional isInternalRequest, + final RoutingContext routingContext, final long startTimeNanos ) { try { @@ -215,8 +218,11 @@ private EndpointResponse handleStatement( if (statement.getStatement() instanceof Query) { final PreparedStatement queryStmt = (PreparedStatement) statement; - + pullQueryMetrics + .ifPresent(pullQueryExecutorMetrics -> pullQueryExecutorMetrics.recordRequestSize( + routingContext.request().bytesRead())); if (queryStmt.getStatement().isPullQuery()) { + final EndpointResponse response = handlePullQuery( securityContext.getServiceContext(), queryStmt, @@ -275,6 +281,9 @@ private EndpointResponse handlePullQuery( final PullQueryResult result = pullQueryExecutor .execute(configured, serviceContext, isInternalRequest, pullQueryMetrics); + pullQueryMetrics + .ifPresent(pullQueryExecutorMetrics -> pullQueryExecutorMetrics.recordRequestSize( + ClassLayout.parseInstance(result).instanceSize())); final TableRows tableRows = result.getTableRows(); final Optional host = result.getSourceNode() .map(KsqlNode::location) diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java index c928095832d8..d58364a37b66 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java @@ -52,6 +52,7 @@ import java.util.concurrent.TimeoutException; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.utils.Time; +import org.openjdk.jol.info.ClassLayout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -193,6 +194,10 @@ public void executeStreamQuery(final ServerWebSocket webSocket, final MultiMap r ksqlSecurityContext); if (statement instanceof Query) { + if (((Query) statement).isPullQuery() && pullQueryMetrics.isPresent()) { + pullQueryMetrics.get().recordRequestSize( + ClassLayout.parseInstance(request).instanceSize()); + } handleQuery(requestContext, (Query) statement, startTimeNanos); } else if (statement instanceof PrintTopic) { handlePrintTopic(requestContext, (PrintTopic) statement); @@ -281,7 +286,8 @@ private void handleQuery(final RequestContext info, final Query query, configured, streamSubscriber, pullQueryExecutor, - pullQueryMetrics + pullQueryMetrics, + startTimeNanos ); } else { pushQueryPublisher.start( @@ -386,7 +392,8 @@ void start( ConfiguredStatement query, WebSocketSubscriber subscriber, PullQueryExecutor pullQueryExecutor, - Optional pullQueryMetrics); + Optional pullQueryMetrics, + long startTimeNanos); } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/AuthTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/AuthTest.java index 399a24099f87..b479bd89104d 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/AuthTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/AuthTest.java @@ -133,7 +133,8 @@ public void close() { } }, Optional.ofNullable(securityHandlerPlugin), - serverState); + serverState, + Optional.empty()); server.start(); } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/BaseApiTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/BaseApiTest.java index e9df1ff0f914..9f4b717bb713 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/BaseApiTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/BaseApiTest.java @@ -135,7 +135,7 @@ protected void stopClient() { protected void createServer(KsqlRestConfig serverConfig) { server = new Server(vertx, serverConfig, testEndpoints, - new KsqlDefaultSecurityExtension(), Optional.empty(), serverState); + new KsqlDefaultSecurityExtension(), Optional.empty(), serverState, Optional.empty()); try { server.start(); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/TestEndpoints.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/TestEndpoints.java index 8dc576ae9a6d..c780200bcc8f 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/TestEndpoints.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/TestEndpoints.java @@ -34,6 +34,7 @@ import io.vertx.core.WorkerExecutor; import io.vertx.core.http.ServerWebSocket; import io.vertx.core.json.JsonObject; +import io.vertx.ext.web.RoutingContext; import java.util.HashSet; import java.util.List; import java.util.Optional; @@ -136,9 +137,11 @@ public CompletableFuture executeTerminate(final ClusterTermina } @Override - public CompletableFuture executeQueryRequest(KsqlRequest request, + public CompletableFuture executeQueryRequest( + KsqlRequest request, WorkerExecutor workerExecutor, CompletableFuture connectionClosedFuture, - ApiSecurityContext apiSecurityContext, Optional isInternalRequest) { + ApiSecurityContext apiSecurityContext, Optional isInternalRequest, + RoutingContext routingContext) { return null; } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/BasePerfRunner.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/BasePerfRunner.java index 674a600311f6..b57971f19c7e 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/BasePerfRunner.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/BasePerfRunner.java @@ -171,7 +171,7 @@ private void setUp() { final ServerState serverState = new ServerState(); serverState.setReady(); server = new Server(vertx, serverConfig, endpoints, new KsqlDefaultSecurityExtension(), - Optional.empty(), serverState); + Optional.empty(), serverState, Optional.empty()); server.start(); client = createClient(); } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/InsertsStreamRunner.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/InsertsStreamRunner.java index b274e36cb6a2..d95033245177 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/InsertsStreamRunner.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/InsertsStreamRunner.java @@ -38,6 +38,7 @@ import io.vertx.core.json.JsonObject; import io.vertx.core.parsetools.RecordParser; import io.vertx.core.streams.ReadStream; +import io.vertx.ext.web.RoutingContext; import io.vertx.ext.web.codec.BodyCodec; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -188,9 +189,11 @@ public CompletableFuture executeTerminate( } @Override - public CompletableFuture executeQueryRequest(KsqlRequest request, + public CompletableFuture executeQueryRequest( + KsqlRequest request, WorkerExecutor workerExecutor, CompletableFuture connectionClosedFuture, - ApiSecurityContext apiSecurityContext, Optional isInternalRequest) { + ApiSecurityContext apiSecurityContext, Optional isInternalRequest, + RoutingContext routingContext) { return null; } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/PullQueryRunner.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/PullQueryRunner.java index 7cc2d26d40ca..8236df4ce808 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/PullQueryRunner.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/PullQueryRunner.java @@ -38,6 +38,7 @@ import io.vertx.core.buffer.Buffer; import io.vertx.core.http.ServerWebSocket; import io.vertx.core.json.JsonObject; +import io.vertx.ext.web.RoutingContext; import io.vertx.ext.web.client.HttpResponse; import java.util.ArrayList; import java.util.HashSet; @@ -145,9 +146,11 @@ public CompletableFuture executeTerminate( } @Override - public CompletableFuture executeQueryRequest(KsqlRequest request, + public CompletableFuture executeQueryRequest( + KsqlRequest request, WorkerExecutor workerExecutor, CompletableFuture connectionClosedFuture, - ApiSecurityContext apiSecurityContext, Optional isInternalRequest) { + ApiSecurityContext apiSecurityContext, Optional isInternalRequest, + RoutingContext routingContext) { return null; } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/QueryStreamRunner.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/QueryStreamRunner.java index 73aef92042a3..633e544afc85 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/QueryStreamRunner.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/QueryStreamRunner.java @@ -39,6 +39,7 @@ import io.vertx.core.http.ServerWebSocket; import io.vertx.core.json.JsonObject; import io.vertx.core.parsetools.RecordParser; +import io.vertx.ext.web.RoutingContext; import io.vertx.ext.web.codec.BodyCodec; import java.util.HashSet; import java.util.List; @@ -129,11 +130,14 @@ public CompletableFuture executeTerminate( } @Override - public CompletableFuture executeQueryRequest(KsqlRequest request, + public CompletableFuture executeQueryRequest( + KsqlRequest request, WorkerExecutor workerExecutor, CompletableFuture connectionClosedFuture, ApiSecurityContext apiSecurityContext, - Optional isInternalRequest) { + Optional isInternalRequest, + RoutingContext routingContext + ) { return null; } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java index 02e28594e071..21aab6367b76 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java @@ -499,7 +499,8 @@ private void givenAppWithRestConfig(final Map restConfigMap) { Optional.of(lagReportingAgent), vertx, queryMonitor, - denyListPropertyValidator + denyListPropertyValidator, + Optional.empty() ); } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorMetricsTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorMetricsTest.java index e53c970dcd7f..83c4533b77cf 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorMetricsTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorMetricsTest.java @@ -56,7 +56,7 @@ public void setUp() { when(ksqlEngine.getServiceId()).thenReturn(KSQL_SERVICE_ID); when(time.nanoseconds()).thenReturn(6000L); - pullMetrics = new PullQueryExecutorMetrics(ksqlEngine.getServiceId(), CUSTOM_TAGS, time); + pullMetrics = new PullQueryExecutorMetrics(ksqlEngine.getServiceId(), CUSTOM_TAGS); } @After diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorTest.java index 201f27e11d97..2c348471597f 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorTest.java @@ -75,12 +75,13 @@ public void shouldThrowExceptionIfConfigDisabled() { SessionConfig.of(engine.getKsqlConfig(), ImmutableMap.of())); PullQueryExecutor pullQueryExecutor = new PullQueryExecutor( engine.getEngine(), ROUTING_FILTER_FACTORY, engine.getKsqlConfig(), - engine.getEngine().getServiceId(), time); + engine.getEngine().getServiceId()); // When: final Exception e = assertThrows( KsqlStatementException.class, - () -> pullQueryExecutor.execute(query, ImmutableMap.of(), engine.getServiceContext(), Optional.empty(), 0L) + () -> pullQueryExecutor.execute( + query, engine.getServiceContext(), Optional.empty(), Optional.empty()) ); // Then: @@ -137,7 +138,7 @@ public static class RateLimit { public void shouldRateLimit() { PullQueryExecutor pullQueryExecutor = new PullQueryExecutor( engine.getEngine(), ROUTING_FILTER_FACTORY, engine.getKsqlConfig(), - engine.getEngine().getServiceId(), time); + engine.getEngine().getServiceId()); // When: pullQueryExecutor.checkRateLimit(); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/WSQueryEndpointTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/WSQueryEndpointTest.java index 506d21db8147..e3a2a3876de1 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/WSQueryEndpointTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/WSQueryEndpointTest.java @@ -15,6 +15,9 @@ package io.confluent.ksql.rest.server.resources; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; @@ -33,6 +36,10 @@ import io.confluent.ksql.version.metrics.ActivenessRegistrar; import io.vertx.core.MultiMap; import io.vertx.core.http.ServerWebSocket; +import java.time.Duration; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; import org.apache.kafka.streams.StreamsConfig; import org.junit.Before; import org.junit.Test; @@ -40,14 +47,6 @@ import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; -import java.time.Duration; -import java.util.Collections; -import java.util.Map; -import java.util.Optional; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; - @RunWith(MockitoJUnitRunner.class) public class WSQueryEndpointTest { private static final ObjectMapper OBJECT_MAPPER = ApiJsonMapper.INSTANCE.get(); @@ -74,7 +73,8 @@ public void setUp() { Optional.empty(), mock(Errors.class), mock(PullQueryExecutor.class), - denyListPropertyValidator + denyListPropertyValidator, + Optional.empty() ); } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisherTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisherTest.java index 3d4d51392b44..53389092ece9 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisherTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisherTest.java @@ -17,7 +17,6 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.eq; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -83,10 +82,11 @@ public void setUp() { serviceContext, statement, pullQueryExecutor, + Optional.empty(), TIME_NANOS); PullQueryResult result = new PullQueryResult(entity, Optional.empty()); - when(pullQueryExecutor.execute(any(), any(), any(), any(), eq(TIME_NANOS))).thenReturn(result); + when(pullQueryExecutor.execute(any(), any(), any(), any())).thenReturn(result); when(entity.getSchema()).thenReturn(SCHEMA); doAnswer(callRequestAgain()).when(subscriber).onNext(any()); @@ -110,7 +110,7 @@ public void shouldRunQueryWithCorrectParams() { subscription.request(1); // Then: - verify(pullQueryExecutor).execute(statement, ImmutableMap.of(), serviceContext, Optional.of(false), TIME_NANOS); + verify(pullQueryExecutor).execute(statement, serviceContext, Optional.of(false), Optional.empty()); } @Test @@ -123,7 +123,7 @@ public void shouldOnlyExecuteOnce() { // Then: verify(subscriber).onNext(any()); - verify(pullQueryExecutor).execute(statement, ImmutableMap.of(), serviceContext, Optional.of(false), TIME_NANOS); + verify(pullQueryExecutor).execute(statement, serviceContext, Optional.of(false), Optional.empty()); } @Test @@ -158,7 +158,7 @@ public void shouldCallOnErrorOnFailure() { // Given: givenSubscribed(); final Throwable e = new RuntimeException("Boom!"); - when(pullQueryExecutor.execute(any(), any(), any(), any(), eq(TIME_NANOS))).thenThrow(e); + when(pullQueryExecutor.execute(any(), any(), any(), any())).thenThrow(e); // When: subscription.request(1); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java index 339af1d414e7..86c3d65f112e 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java @@ -192,7 +192,7 @@ public void setup() { securityContext = new KsqlSecurityContext(Optional.empty(), serviceContext); pullQueryExecutor = new PullQueryExecutor( - mockKsqlEngine, ROUTING_FILTER_FACTORY, VALID_CONFIG, SERVICE_ID, time); + mockKsqlEngine, ROUTING_FILTER_FACTORY, VALID_CONFIG, SERVICE_ID); testResource = new StreamedQueryResource( mockKsqlEngine, mockStatementParser, @@ -203,7 +203,8 @@ public void setup() { Optional.of(authorizationValidator), errorsHandler, pullQueryExecutor, - denyListPropertyValidator + denyListPropertyValidator, + Optional.empty() ); testResource.configure(VALID_CONFIG); @@ -231,7 +232,8 @@ public void shouldThrowOnHandleStatementIfNotConfigured() { Optional.of(authorizationValidator), errorsHandler, pullQueryExecutor, - denyListPropertyValidator + denyListPropertyValidator, + Optional.empty() ); // When: @@ -241,7 +243,8 @@ public void shouldThrowOnHandleStatementIfNotConfigured() { securityContext, new KsqlRequest("query", Collections.emptyMap(), Collections.emptyMap(), null), new CompletableFuture<>(), - Optional.empty() + Optional.empty(), + any() ) ); @@ -263,7 +266,8 @@ public void shouldReturn400OnBadStatement() { securityContext, new KsqlRequest("query", Collections.emptyMap(), Collections.emptyMap(), null), new CompletableFuture<>(), - Optional.empty() + Optional.empty(), + any() ) ); @@ -280,7 +284,8 @@ public void shouldNotWaitIfCommandSequenceNumberSpecified() throws Exception { securityContext, new KsqlRequest(PUSH_QUERY_STRING, Collections.emptyMap(), Collections.emptyMap(), null), new CompletableFuture<>(), - Optional.empty() + Optional.empty(), + any() ); // Then: @@ -294,7 +299,8 @@ public void shouldWaitIfCommandSequenceNumberSpecified() throws Exception { securityContext, new KsqlRequest(PUSH_QUERY_STRING, Collections.emptyMap(), Collections.emptyMap(), 3L), new CompletableFuture<>(), - Optional.empty() + Optional.empty(), + any() ); // Then: @@ -315,7 +321,8 @@ public void shouldReturnServiceUnavailableIfTimeoutWaitingForCommandSequenceNumb securityContext, new KsqlRequest(PUSH_QUERY_STRING, Collections.emptyMap(), Collections.emptyMap(), 3L), new CompletableFuture<>(), - Optional.empty() + Optional.empty(), + any() ) ); @@ -338,7 +345,8 @@ public void shouldNotCreateExternalClientsForPullQuery() { securityContext, new KsqlRequest(PULL_QUERY_STRING, Collections.emptyMap(), Collections.emptyMap(), null), new CompletableFuture<>(), - Optional.empty() + Optional.empty(), + any() ); // Then: @@ -362,7 +370,8 @@ public void shouldReturnForbiddenKafkaAccessForPullQueryAuthorizationDenied() { securityContext, new KsqlRequest(PULL_QUERY_STRING, Collections.emptyMap(), Collections.emptyMap(), null), new CompletableFuture<>(), - Optional.empty() + Optional.empty(), + any() ); final KsqlErrorMessage responseEntity = (KsqlErrorMessage) response.getEntity(); @@ -385,7 +394,8 @@ public void shouldThrowOnDenyListedStreamProperty() { Optional.of(authorizationValidator), errorsHandler, pullQueryExecutor, - denyListPropertyValidator + denyListPropertyValidator, + Optional.empty() ); final Map props = new HashMap<>(ImmutableMap.of( StreamsConfig.APPLICATION_SERVER_CONFIG, "something:1" @@ -411,7 +421,8 @@ public void shouldThrowOnDenyListedStreamProperty() { null ), new CompletableFuture<>(), - Optional.empty() + Optional.empty(), + any() ); // Then: @@ -486,7 +497,8 @@ public void shouldStreamRowsCorrectly() throws Throwable { securityContext, new KsqlRequest(queryString, requestStreamsProperties, Collections.emptyMap(), null), new CompletableFuture<>(), - Optional.empty() + Optional.empty(), + any() ); final PipedOutputStream responseOutputStream = new EOFPipedOutputStream(); final PipedInputStream responseInputStream = new PipedInputStream(responseOutputStream, 1); @@ -628,7 +640,8 @@ public void shouldUpdateTheLastRequestTime() { securityContext, new KsqlRequest(PUSH_QUERY_STRING, Collections.emptyMap(), Collections.emptyMap(), null), new CompletableFuture<>(), - Optional.empty() + Optional.empty(), + any() ); // Then: @@ -649,7 +662,8 @@ public void shouldReturnForbiddenKafkaAccessIfKsqlTopicAuthorizationException() securityContext, new KsqlRequest(PUSH_QUERY_STRING, Collections.emptyMap(), Collections.emptyMap(), null), new CompletableFuture<>(), - Optional.empty() + Optional.empty(), + any() ); final KsqlErrorMessage responseEntity = (KsqlErrorMessage) response.getEntity(); @@ -674,7 +688,8 @@ public void shouldReturnForbiddenKafkaAccessIfPrintTopicKsqlTopicAuthorizationEx securityContext, new KsqlRequest(PRINT_TOPIC, Collections.emptyMap(), Collections.emptyMap(), null), new CompletableFuture<>(), - Optional.empty() + Optional.empty(), + any() ); assertEquals(response.getStatus(), AUTHORIZATION_ERROR_RESPONSE.getStatus()); @@ -704,7 +719,8 @@ public void shouldSuggestAlternativesIfPrintTopicDoesNotExist() { securityContext, new KsqlRequest(PRINT_TOPIC, Collections.emptyMap(), Collections.emptyMap(), null), new CompletableFuture<>(), - Optional.empty() + Optional.empty(), + any() ) ); From 339ffe22267898b14600735fdcb4167afcefce71 Mon Sep 17 00:00:00 2001 From: Vicky Papavasileiou Date: Wed, 2 Sep 2020 18:36:29 -0700 Subject: [PATCH 3/7] remove jol dependency --- ksqldb-rest-app/pom.xml | 6 ------ 1 file changed, 6 deletions(-) diff --git a/ksqldb-rest-app/pom.xml b/ksqldb-rest-app/pom.xml index 33a82813391d..f84bd051afac 100644 --- a/ksqldb-rest-app/pom.xml +++ b/ksqldb-rest-app/pom.xml @@ -134,12 +134,6 @@ ${reactive-streams.version} - - org.openjdk.jol - jol-core - 0.13 - - From ea24f23838991e3f210ba0af6569c91b542b5604 Mon Sep 17 00:00:00 2001 From: Vicky Papavasileiou Date: Thu, 3 Sep 2020 13:09:25 -0700 Subject: [PATCH 4/7] removed routingContext from streamedqueryresource --- .../ksql/api/server/OldApiUtils.java | 4 +- .../ksql/api/server/ServerVerticle.java | 3 +- .../io/confluent/ksql/api/spi/Endpoints.java | 3 +- .../ksql/rest/server/KsqlServerEndpoints.java | 8 ++-- .../streaming/StreamedQueryResource.java | 14 +------ .../resources/streaming/WSQueryEndpoint.java | 5 --- .../io/confluent/ksql/api/TestEndpoints.java | 7 +--- .../ksql/api/perf/InsertsStreamRunner.java | 7 +--- .../ksql/api/perf/PullQueryRunner.java | 7 +--- .../ksql/api/perf/QueryStreamRunner.java | 4 +- .../streaming/StreamedQueryResourceTest.java | 39 +++++++------------ 11 files changed, 30 insertions(+), 71 deletions(-) diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/OldApiUtils.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/OldApiUtils.java index 5f233eff1ab5..85194d8fa629 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/OldApiUtils.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/OldApiUtils.java @@ -67,7 +67,9 @@ static void handleOldApiRequest( } else { requestObject = null; } - + pullQueryMetrics + .ifPresent(pullQueryExecutorMetrics -> pullQueryExecutorMetrics.recordRequestSize( + routingContext.request().bytesRead())); final CompletableFuture completableFuture = requestor .apply(requestObject, DefaultApiSecurityContext.create(routingContext)); completableFuture.thenAccept(endpointResponse -> { diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/ServerVerticle.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/ServerVerticle.java index 4f8df8689d27..0c3afe29ed80 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/ServerVerticle.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/ServerVerticle.java @@ -238,8 +238,7 @@ private void handleQueryRequest(final RoutingContext routingContext) { .executeQueryRequest( request, server.getWorkerExecutor(), connectionClosedFuture, DefaultApiSecurityContext.create(routingContext), - isInternalRequest(routingContext), - routingContext) + isInternalRequest(routingContext)) ); } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/spi/Endpoints.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/spi/Endpoints.java index 759c8e9b1e81..f881735705e4 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/spi/Endpoints.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/spi/Endpoints.java @@ -28,7 +28,6 @@ import io.vertx.core.WorkerExecutor; import io.vertx.core.http.ServerWebSocket; import io.vertx.core.json.JsonObject; -import io.vertx.ext.web.RoutingContext; import java.util.Optional; import java.util.concurrent.CompletableFuture; import org.reactivestreams.Subscriber; @@ -82,7 +81,7 @@ CompletableFuture executeTerminate(ClusterTerminateRequest req CompletableFuture executeQueryRequest( KsqlRequest request, WorkerExecutor workerExecutor, CompletableFuture connectionClosedFuture, ApiSecurityContext apiSecurityContext, - Optional isInternalRequest, RoutingContext routingContext); + Optional isInternalRequest); CompletableFuture executeInfo(ApiSecurityContext apiSecurityContext); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerEndpoints.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerEndpoints.java index 31e668fc3de8..a7067aa17d01 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerEndpoints.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerEndpoints.java @@ -52,7 +52,6 @@ import io.vertx.core.WorkerExecutor; import io.vertx.core.http.ServerWebSocket; import io.vertx.core.json.JsonObject; -import io.vertx.ext.web.RoutingContext; import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -162,16 +161,15 @@ public CompletableFuture executeQueryRequest( final WorkerExecutor workerExecutor, final CompletableFuture connectionClosedFuture, final ApiSecurityContext apiSecurityContext, - final Optional isInternalRequest, - final RoutingContext routingContext + final Optional isInternalRequest ) { return executeOldApiEndpointOnWorker(apiSecurityContext, ksqlSecurityContext -> streamedQueryResource.streamQuery( ksqlSecurityContext, request, connectionClosedFuture, - isInternalRequest, - routingContext), workerExecutor); + isInternalRequest + ), workerExecutor); } @Override diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java index d735fef1a8cf..d3cab6e172a1 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java @@ -50,7 +50,6 @@ import io.confluent.ksql.util.KsqlStatementException; import io.confluent.ksql.util.TransientQueryMetadata; import io.confluent.ksql.version.metrics.ActivenessRegistrar; -import io.vertx.ext.web.RoutingContext; import java.time.Duration; import java.util.Collection; import java.util.HashMap; @@ -64,7 +63,6 @@ import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; -import org.openjdk.jol.info.ClassLayout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -160,8 +158,7 @@ public EndpointResponse streamQuery( final KsqlSecurityContext securityContext, final KsqlRequest request, final CompletableFuture connectionClosedFuture, - final Optional isInternalRequest, - final RoutingContext routingContext + final Optional isInternalRequest ) { final long startTimeNanos = Time.SYSTEM.nanoseconds(); throwIfNotConfigured(); @@ -173,7 +170,7 @@ public EndpointResponse streamQuery( commandQueue, request, commandQueueCatchupTimeout); return handleStatement(securityContext, request, statement, connectionClosedFuture, - isInternalRequest, routingContext, startTimeNanos); + isInternalRequest, startTimeNanos); } private void throwIfNotConfigured() { @@ -202,7 +199,6 @@ private EndpointResponse handleStatement( final PreparedStatement statement, final CompletableFuture connectionClosedFuture, final Optional isInternalRequest, - final RoutingContext routingContext, final long startTimeNanos ) { try { @@ -218,9 +214,6 @@ private EndpointResponse handleStatement( if (statement.getStatement() instanceof Query) { final PreparedStatement queryStmt = (PreparedStatement) statement; - pullQueryMetrics - .ifPresent(pullQueryExecutorMetrics -> pullQueryExecutorMetrics.recordRequestSize( - routingContext.request().bytesRead())); if (queryStmt.getStatement().isPullQuery()) { final EndpointResponse response = handlePullQuery( @@ -281,9 +274,6 @@ private EndpointResponse handlePullQuery( final PullQueryResult result = pullQueryExecutor .execute(configured, serviceContext, isInternalRequest, pullQueryMetrics); - pullQueryMetrics - .ifPresent(pullQueryExecutorMetrics -> pullQueryExecutorMetrics.recordRequestSize( - ClassLayout.parseInstance(result).instanceSize())); final TableRows tableRows = result.getTableRows(); final Optional host = result.getSourceNode() .map(KsqlNode::location) diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java index d58364a37b66..77bb389ed4d8 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java @@ -52,7 +52,6 @@ import java.util.concurrent.TimeoutException; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.utils.Time; -import org.openjdk.jol.info.ClassLayout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -194,10 +193,6 @@ public void executeStreamQuery(final ServerWebSocket webSocket, final MultiMap r ksqlSecurityContext); if (statement instanceof Query) { - if (((Query) statement).isPullQuery() && pullQueryMetrics.isPresent()) { - pullQueryMetrics.get().recordRequestSize( - ClassLayout.parseInstance(request).instanceSize()); - } handleQuery(requestContext, (Query) statement, startTimeNanos); } else if (statement instanceof PrintTopic) { handlePrintTopic(requestContext, (PrintTopic) statement); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/TestEndpoints.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/TestEndpoints.java index c780200bcc8f..8dc576ae9a6d 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/TestEndpoints.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/TestEndpoints.java @@ -34,7 +34,6 @@ import io.vertx.core.WorkerExecutor; import io.vertx.core.http.ServerWebSocket; import io.vertx.core.json.JsonObject; -import io.vertx.ext.web.RoutingContext; import java.util.HashSet; import java.util.List; import java.util.Optional; @@ -137,11 +136,9 @@ public CompletableFuture executeTerminate(final ClusterTermina } @Override - public CompletableFuture executeQueryRequest( - KsqlRequest request, + public CompletableFuture executeQueryRequest(KsqlRequest request, WorkerExecutor workerExecutor, CompletableFuture connectionClosedFuture, - ApiSecurityContext apiSecurityContext, Optional isInternalRequest, - RoutingContext routingContext) { + ApiSecurityContext apiSecurityContext, Optional isInternalRequest) { return null; } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/InsertsStreamRunner.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/InsertsStreamRunner.java index d95033245177..b274e36cb6a2 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/InsertsStreamRunner.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/InsertsStreamRunner.java @@ -38,7 +38,6 @@ import io.vertx.core.json.JsonObject; import io.vertx.core.parsetools.RecordParser; import io.vertx.core.streams.ReadStream; -import io.vertx.ext.web.RoutingContext; import io.vertx.ext.web.codec.BodyCodec; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -189,11 +188,9 @@ public CompletableFuture executeTerminate( } @Override - public CompletableFuture executeQueryRequest( - KsqlRequest request, + public CompletableFuture executeQueryRequest(KsqlRequest request, WorkerExecutor workerExecutor, CompletableFuture connectionClosedFuture, - ApiSecurityContext apiSecurityContext, Optional isInternalRequest, - RoutingContext routingContext) { + ApiSecurityContext apiSecurityContext, Optional isInternalRequest) { return null; } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/PullQueryRunner.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/PullQueryRunner.java index 8236df4ce808..7cc2d26d40ca 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/PullQueryRunner.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/PullQueryRunner.java @@ -38,7 +38,6 @@ import io.vertx.core.buffer.Buffer; import io.vertx.core.http.ServerWebSocket; import io.vertx.core.json.JsonObject; -import io.vertx.ext.web.RoutingContext; import io.vertx.ext.web.client.HttpResponse; import java.util.ArrayList; import java.util.HashSet; @@ -146,11 +145,9 @@ public CompletableFuture executeTerminate( } @Override - public CompletableFuture executeQueryRequest( - KsqlRequest request, + public CompletableFuture executeQueryRequest(KsqlRequest request, WorkerExecutor workerExecutor, CompletableFuture connectionClosedFuture, - ApiSecurityContext apiSecurityContext, Optional isInternalRequest, - RoutingContext routingContext) { + ApiSecurityContext apiSecurityContext, Optional isInternalRequest) { return null; } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/QueryStreamRunner.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/QueryStreamRunner.java index 633e544afc85..eb1a7345036e 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/QueryStreamRunner.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/QueryStreamRunner.java @@ -39,7 +39,6 @@ import io.vertx.core.http.ServerWebSocket; import io.vertx.core.json.JsonObject; import io.vertx.core.parsetools.RecordParser; -import io.vertx.ext.web.RoutingContext; import io.vertx.ext.web.codec.BodyCodec; import java.util.HashSet; import java.util.List; @@ -135,8 +134,7 @@ public CompletableFuture executeQueryRequest( WorkerExecutor workerExecutor, CompletableFuture connectionClosedFuture, ApiSecurityContext apiSecurityContext, - Optional isInternalRequest, - RoutingContext routingContext + Optional isInternalRequest ) { return null; } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java index 86c3d65f112e..6a2545416d71 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java @@ -243,8 +243,7 @@ public void shouldThrowOnHandleStatementIfNotConfigured() { securityContext, new KsqlRequest("query", Collections.emptyMap(), Collections.emptyMap(), null), new CompletableFuture<>(), - Optional.empty(), - any() + Optional.empty() ) ); @@ -266,8 +265,7 @@ public void shouldReturn400OnBadStatement() { securityContext, new KsqlRequest("query", Collections.emptyMap(), Collections.emptyMap(), null), new CompletableFuture<>(), - Optional.empty(), - any() + Optional.empty() ) ); @@ -284,8 +282,7 @@ public void shouldNotWaitIfCommandSequenceNumberSpecified() throws Exception { securityContext, new KsqlRequest(PUSH_QUERY_STRING, Collections.emptyMap(), Collections.emptyMap(), null), new CompletableFuture<>(), - Optional.empty(), - any() + Optional.empty() ); // Then: @@ -299,8 +296,7 @@ public void shouldWaitIfCommandSequenceNumberSpecified() throws Exception { securityContext, new KsqlRequest(PUSH_QUERY_STRING, Collections.emptyMap(), Collections.emptyMap(), 3L), new CompletableFuture<>(), - Optional.empty(), - any() + Optional.empty() ); // Then: @@ -321,8 +317,7 @@ public void shouldReturnServiceUnavailableIfTimeoutWaitingForCommandSequenceNumb securityContext, new KsqlRequest(PUSH_QUERY_STRING, Collections.emptyMap(), Collections.emptyMap(), 3L), new CompletableFuture<>(), - Optional.empty(), - any() + Optional.empty() ) ); @@ -345,8 +340,7 @@ public void shouldNotCreateExternalClientsForPullQuery() { securityContext, new KsqlRequest(PULL_QUERY_STRING, Collections.emptyMap(), Collections.emptyMap(), null), new CompletableFuture<>(), - Optional.empty(), - any() + Optional.empty() ); // Then: @@ -370,8 +364,7 @@ public void shouldReturnForbiddenKafkaAccessForPullQueryAuthorizationDenied() { securityContext, new KsqlRequest(PULL_QUERY_STRING, Collections.emptyMap(), Collections.emptyMap(), null), new CompletableFuture<>(), - Optional.empty(), - any() + Optional.empty() ); final KsqlErrorMessage responseEntity = (KsqlErrorMessage) response.getEntity(); @@ -421,8 +414,7 @@ public void shouldThrowOnDenyListedStreamProperty() { null ), new CompletableFuture<>(), - Optional.empty(), - any() + Optional.empty() ); // Then: @@ -497,8 +489,7 @@ public void shouldStreamRowsCorrectly() throws Throwable { securityContext, new KsqlRequest(queryString, requestStreamsProperties, Collections.emptyMap(), null), new CompletableFuture<>(), - Optional.empty(), - any() + Optional.empty() ); final PipedOutputStream responseOutputStream = new EOFPipedOutputStream(); final PipedInputStream responseInputStream = new PipedInputStream(responseOutputStream, 1); @@ -640,8 +631,7 @@ public void shouldUpdateTheLastRequestTime() { securityContext, new KsqlRequest(PUSH_QUERY_STRING, Collections.emptyMap(), Collections.emptyMap(), null), new CompletableFuture<>(), - Optional.empty(), - any() + Optional.empty() ); // Then: @@ -662,8 +652,7 @@ public void shouldReturnForbiddenKafkaAccessIfKsqlTopicAuthorizationException() securityContext, new KsqlRequest(PUSH_QUERY_STRING, Collections.emptyMap(), Collections.emptyMap(), null), new CompletableFuture<>(), - Optional.empty(), - any() + Optional.empty() ); final KsqlErrorMessage responseEntity = (KsqlErrorMessage) response.getEntity(); @@ -688,8 +677,7 @@ public void shouldReturnForbiddenKafkaAccessIfPrintTopicKsqlTopicAuthorizationEx securityContext, new KsqlRequest(PRINT_TOPIC, Collections.emptyMap(), Collections.emptyMap(), null), new CompletableFuture<>(), - Optional.empty(), - any() + Optional.empty() ); assertEquals(response.getStatus(), AUTHORIZATION_ERROR_RESPONSE.getStatus()); @@ -719,8 +707,7 @@ public void shouldSuggestAlternativesIfPrintTopicDoesNotExist() { securityContext, new KsqlRequest(PRINT_TOPIC, Collections.emptyMap(), Collections.emptyMap(), null), new CompletableFuture<>(), - Optional.empty(), - any() + Optional.empty() ) ); From 7c91764cdbf7d9aaec93bafa2daf04390eb14bf1 Mon Sep 17 00:00:00 2001 From: Vicky Papavasileiou Date: Fri, 11 Sep 2020 11:11:42 -0700 Subject: [PATCH 5/7] Addressed alan's comments --- .../io/confluent/ksql/api/impl/QueryEndpoint.java | 9 +++++++-- .../io/confluent/ksql/api/server/OldApiUtils.java | 5 +++++ .../confluent/ksql/api/server/ServerVerticle.java | 1 - .../execution/PullQueryExecutorMetrics.java | 11 +++++++++-- .../resources/streaming/PullQueryPublisher.java | 11 +++-------- .../streaming/StreamedQueryResource.java | 15 +++------------ 6 files changed, 27 insertions(+), 25 deletions(-) diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java index 6dcd51e6474a..311dbc0f5814 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java @@ -46,6 +46,7 @@ import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; +import org.apache.kafka.common.utils.Time; public class QueryEndpoint { @@ -71,13 +72,15 @@ public QueryPublisher createQueryPublisher( final Context context, final WorkerExecutor workerExecutor, final ServiceContext serviceContext) { + final long startTimeNanos = Time.SYSTEM.nanoseconds(); // Must be run on worker as all this stuff is slow VertxUtils.checkIsWorker(); final ConfiguredStatement statement = createStatement(sql, properties.getMap()); if (statement.getStatement().isPullQuery()) { - return createPullQueryPublisher(context, serviceContext, statement, pullQueryMetrics); + return createPullQueryPublisher( + context, serviceContext, statement, pullQueryMetrics, startTimeNanos); } else { return createPushQueryPublisher(context, serviceContext, statement, workerExecutor); } @@ -102,10 +105,12 @@ private QueryPublisher createPullQueryPublisher( final Context context, final ServiceContext serviceContext, final ConfiguredStatement statement, - final Optional pullQueryMetrics + final Optional pullQueryMetrics, + final long startTimeNanos ) { final PullQueryResult result = pullQueryExecutor.execute( statement, serviceContext, Optional.of(false), pullQueryMetrics); + pullQueryMetrics.ifPresent(p -> p.recordLatency(startTimeNanos)); final TableRows tableRows = result.getTableRows(); return new PullQueryPublisher( diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/OldApiUtils.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/OldApiUtils.java index 85194d8fa629..2164f14e5457 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/OldApiUtils.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/OldApiUtils.java @@ -40,6 +40,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.function.BiFunction; +import org.apache.kafka.common.utils.Time; public final class OldApiUtils { @@ -56,6 +57,7 @@ static void handleOldApiRequest( final Class requestClass, final Optional pullQueryMetrics, final BiFunction> requestor) { + final long startTimeNanos = Time.SYSTEM.nanoseconds(); final T requestObject; if (requestClass != null) { final Optional optRequestObject = ServerUtils @@ -70,6 +72,9 @@ static void handleOldApiRequest( pullQueryMetrics .ifPresent(pullQueryExecutorMetrics -> pullQueryExecutorMetrics.recordRequestSize( routingContext.request().bytesRead())); + //Record latency at microsecond scale + pullQueryMetrics.ifPresent(pullQueryExecutorMetrics -> pullQueryExecutorMetrics + .recordLatency(startTimeNanos)); final CompletableFuture completableFuture = requestor .apply(requestObject, DefaultApiSecurityContext.create(routingContext)); completableFuture.thenAccept(endpointResponse -> { diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/ServerVerticle.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/ServerVerticle.java index 0c3afe29ed80..65839179ea1b 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/ServerVerticle.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/ServerVerticle.java @@ -231,7 +231,6 @@ private void handleQueryRequest(final RoutingContext routingContext) { final CompletableFuture connectionClosedFuture = new CompletableFuture<>(); routingContext.request().connection().closeHandler(v -> connectionClosedFuture.complete(null)); - routingContext.request().bytesRead(); handleOldApiRequest(server, routingContext, KsqlRequest.class, pullQueryMetrics, (request, apiSecurityContext) -> endpoints diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorMetrics.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorMetrics.java index d9659d606b7f..da0c2dd1f1a6 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorMetrics.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorMetrics.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.TimeUnit; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; @@ -33,6 +34,7 @@ import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing; import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.common.metrics.stats.WindowedCount; +import org.apache.kafka.common.utils.Time; @SuppressWarnings("checkstyle:ClassDataAbstractionCoupling") public class PullQueryExecutorMetrics implements Closeable { @@ -51,6 +53,7 @@ public class PullQueryExecutorMetrics implements Closeable { private final Metrics metrics; private final Map customMetricsTags; private final String ksqlServiceId; + private final Time time = Time.SYSTEM; public PullQueryExecutorMetrics( final String ksqlServiceId, @@ -84,9 +87,13 @@ public void recordRemoteRequests(final double value) { this.remoteRequestsSensor.record(value); } - public void recordLatency(final double value) { + public void recordLatency(final long startTimeNanos) { // Record latency at microsecond scale - this.latencySensor.record(value); + //this.latencySensor.record(value); + //this.requestRateSensor.record(1); + final long nowNanos = time.nanoseconds(); + final double latency = TimeUnit.NANOSECONDS.toMicros(nowNanos - startTimeNanos); + this.latencySensor.record(latency); this.requestRateSensor.record(1); } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java index bbb5233bcb14..bf113e168cd0 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java @@ -35,9 +35,7 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import org.apache.kafka.common.utils.Time; class PullQueryPublisher implements Flow.Publisher> { @@ -69,12 +67,9 @@ public synchronized void subscribe(final Subscriber> sub () -> { final PullQueryResult result = pullQueryExecutor.execute( query, serviceContext, Optional.of(false), pullQueryMetrics); - if (pullQueryMetrics.isPresent()) { - //Record latency at microsecond scale - final long nowNanos = Time.SYSTEM.nanoseconds(); - final double latency = TimeUnit.NANOSECONDS.toMicros(nowNanos - startTimeNanos); - pullQueryMetrics.get().recordLatency(latency); - } + //Record latency at microsecond scale + pullQueryMetrics.ifPresent(pullQueryExecutorMetrics -> pullQueryExecutorMetrics + .recordLatency(startTimeNanos)); return result; } ); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java index d3cab6e172a1..4d2e9546f050 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java @@ -58,10 +58,8 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.kafka.common.errors.TopicAuthorizationException; -import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -160,7 +158,6 @@ public EndpointResponse streamQuery( final CompletableFuture connectionClosedFuture, final Optional isInternalRequest ) { - final long startTimeNanos = Time.SYSTEM.nanoseconds(); throwIfNotConfigured(); activenessRegistrar.updateLastRequestTime(); @@ -170,7 +167,7 @@ public EndpointResponse streamQuery( commandQueue, request, commandQueueCatchupTimeout); return handleStatement(securityContext, request, statement, connectionClosedFuture, - isInternalRequest, startTimeNanos); + isInternalRequest); } private void throwIfNotConfigured() { @@ -198,8 +195,7 @@ private EndpointResponse handleStatement( final KsqlRequest request, final PreparedStatement statement, final CompletableFuture connectionClosedFuture, - final Optional isInternalRequest, - final long startTimeNanos + final Optional isInternalRequest ) { try { authorizationValidator.ifPresent(validator -> @@ -224,12 +220,7 @@ private EndpointResponse handleStatement( isInternalRequest, pullQueryMetrics ); - if (pullQueryMetrics.isPresent()) { - //Record latency at microsecond scale - final long nowNanos = Time.SYSTEM.nanoseconds(); - final double latency = TimeUnit.NANOSECONDS.toMicros(nowNanos - startTimeNanos); - pullQueryMetrics.get().recordLatency(latency); - } + return response; } From b635f18486f5f03d31c09ad6a95c952895ddf9e8 Mon Sep 17 00:00:00 2001 From: Vicky Papavasileiou Date: Fri, 25 Sep 2020 12:33:54 -0700 Subject: [PATCH 6/7] address alan's comments --- .../io/confluent/ksql/api/server/OldApiUtils.java | 15 +++++++++------ .../ksql/rest/server/KsqlRestApplication.java | 5 +++-- .../execution/PullQueryExecutorMetrics.java | 13 ++++++------- .../execution/PullQueryExecutorMetricsTest.java | 2 +- 4 files changed, 19 insertions(+), 16 deletions(-) diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/OldApiUtils.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/OldApiUtils.java index 2164f14e5457..1a91c8b9073e 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/OldApiUtils.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/OldApiUtils.java @@ -72,18 +72,17 @@ static void handleOldApiRequest( pullQueryMetrics .ifPresent(pullQueryExecutorMetrics -> pullQueryExecutorMetrics.recordRequestSize( routingContext.request().bytesRead())); - //Record latency at microsecond scale - pullQueryMetrics.ifPresent(pullQueryExecutorMetrics -> pullQueryExecutorMetrics - .recordLatency(startTimeNanos)); final CompletableFuture completableFuture = requestor .apply(requestObject, DefaultApiSecurityContext.create(routingContext)); completableFuture.thenAccept(endpointResponse -> { - handleOldApiResponse(server, routingContext, endpointResponse, pullQueryMetrics); + handleOldApiResponse( + server, routingContext, endpointResponse, pullQueryMetrics, startTimeNanos); }).exceptionally(t -> { if (t instanceof CompletionException) { t = t.getCause(); } - handleOldApiResponse(server, routingContext, mapException(t), pullQueryMetrics); + handleOldApiResponse( + server, routingContext, mapException(t), pullQueryMetrics, startTimeNanos); return null; }); } @@ -91,7 +90,8 @@ static void handleOldApiRequest( static void handleOldApiResponse( final Server server, final RoutingContext routingContext, final EndpointResponse endpointResponse, - final Optional pullQueryMetrics + final Optional pullQueryMetrics, + final long startTimeNanos ) { final HttpServerResponse response = routingContext.response(); response.putHeader(CONTENT_TYPE_HEADER, JSON_CONTENT_TYPE); @@ -128,6 +128,9 @@ static void handleOldApiResponse( pullQueryMetrics .ifPresent(pullQueryExecutorMetrics -> pullQueryExecutorMetrics.recordResponseSize( routingContext.response().bytesWritten())); + pullQueryMetrics.ifPresent(pullQueryExecutorMetrics -> pullQueryExecutorMetrics + .recordLatency(startTimeNanos)); + } private static void streamEndpointResponse(final Server server, diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index ced3e0aa8056..32f967ed777c 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -141,6 +141,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; import org.apache.log4j.LogManager; import org.slf4j.Logger; @@ -718,10 +719,10 @@ static KsqlRestApplication buildApplication( KsqlConfig.KSQL_QUERY_PULL_METRICS_ENABLED) ? Optional.of(new PullQueryExecutorMetrics( ksqlEngine.getServiceId(), - ksqlConfig.getStringAsMap(KsqlConfig.KSQL_CUSTOM_METRICS_TAGS))) + ksqlConfig.getStringAsMap(KsqlConfig.KSQL_CUSTOM_METRICS_TAGS), + Time.SYSTEM)) : Optional.empty(); - final StreamedQueryResource streamedQueryResource = new StreamedQueryResource( ksqlEngine, commandStore, diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorMetrics.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorMetrics.java index da0c2dd1f1a6..1a43dae5abb0 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorMetrics.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorMetrics.java @@ -53,17 +53,18 @@ public class PullQueryExecutorMetrics implements Closeable { private final Metrics metrics; private final Map customMetricsTags; private final String ksqlServiceId; - private final Time time = Time.SYSTEM; + private final Time time; public PullQueryExecutorMetrics( final String ksqlServiceId, - final Map customMetricsTags + final Map customMetricsTags, + final Time time ) { - - this.customMetricsTags = Objects.requireNonNull(customMetricsTags, "customMetricsTags"); - this.metrics = MetricCollectors.getMetrics(); this.ksqlServiceId = ReservedInternalTopics.KSQL_INTERNAL_TOPIC_PREFIX + ksqlServiceId; + this.customMetricsTags = Objects.requireNonNull(customMetricsTags, "customMetricsTags"); + this.time = Objects.requireNonNull(time, "time"); + this.metrics = MetricCollectors.getMetrics(); this.sensors = new ArrayList<>(); this.localRequestsSensor = configureLocalRequestsSensor(); this.remoteRequestsSensor = configureRemoteRequestsSensor(); @@ -89,8 +90,6 @@ public void recordRemoteRequests(final double value) { public void recordLatency(final long startTimeNanos) { // Record latency at microsecond scale - //this.latencySensor.record(value); - //this.requestRateSensor.record(1); final long nowNanos = time.nanoseconds(); final double latency = TimeUnit.NANOSECONDS.toMicros(nowNanos - startTimeNanos); this.latencySensor.record(latency); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorMetricsTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorMetricsTest.java index 83c4533b77cf..f622982fe8c5 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorMetricsTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorMetricsTest.java @@ -56,7 +56,7 @@ public void setUp() { when(ksqlEngine.getServiceId()).thenReturn(KSQL_SERVICE_ID); when(time.nanoseconds()).thenReturn(6000L); - pullMetrics = new PullQueryExecutorMetrics(ksqlEngine.getServiceId(), CUSTOM_TAGS); + pullMetrics = new PullQueryExecutorMetrics(ksqlEngine.getServiceId(), CUSTOM_TAGS, time); } @After From db7409073c6e089358ce4eeb62ac90217f3e3da6 Mon Sep 17 00:00:00 2001 From: Vicky Papavasileiou Date: Fri, 25 Sep 2020 14:50:39 -0700 Subject: [PATCH 7/7] rebase --- .../java/io/confluent/ksql/api/impl/QueryEndpoint.java | 2 +- .../server/resources/streaming/PullQueryPublisher.java | 3 ++- .../server/resources/streaming/StreamedQueryResource.java | 4 ++-- .../ksql/rest/server/execution/PullQueryExecutorTest.java | 2 +- .../resources/streaming/PullQueryPublisherTest.java | 8 ++++---- 5 files changed, 10 insertions(+), 9 deletions(-) diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java index 311dbc0f5814..a430784b6eea 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java @@ -109,7 +109,7 @@ private QueryPublisher createPullQueryPublisher( final long startTimeNanos ) { final PullQueryResult result = pullQueryExecutor.execute( - statement, serviceContext, Optional.of(false), pullQueryMetrics); + statement, ImmutableMap.of(), serviceContext, Optional.of(false), pullQueryMetrics); pullQueryMetrics.ifPresent(p -> p.recordLatency(startTimeNanos)); final TableRows tableRows = result.getTableRows(); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java index bf113e168cd0..f2a5b3517741 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java @@ -19,6 +19,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import io.confluent.ksql.GenericRow; import io.confluent.ksql.execution.streams.materialization.Locator.KsqlNode; import io.confluent.ksql.parser.tree.Query; @@ -66,7 +67,7 @@ public synchronized void subscribe(final Subscriber> sub subscriber, () -> { final PullQueryResult result = pullQueryExecutor.execute( - query, serviceContext, Optional.of(false), pullQueryMetrics); + query, ImmutableMap.of(), serviceContext, Optional.of(false), pullQueryMetrics); //Record latency at microsecond scale pullQueryMetrics.ifPresent(pullQueryExecutorMetrics -> pullQueryExecutorMetrics .recordLatency(startTimeNanos)); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java index 4d2e9546f050..710f5be92a6f 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java @@ -263,8 +263,8 @@ private EndpointResponse handlePullQuery( final ConfiguredStatement configured = ConfiguredStatement .of(statement, SessionConfig.of(ksqlConfig, configOverrides)); - final PullQueryResult result = pullQueryExecutor - .execute(configured, serviceContext, isInternalRequest, pullQueryMetrics); + final PullQueryResult result = pullQueryExecutor.execute( + configured, requestProperties, serviceContext, isInternalRequest, pullQueryMetrics); final TableRows tableRows = result.getTableRows(); final Optional host = result.getSourceNode() .map(KsqlNode::location) diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorTest.java index 2c348471597f..ac8f21a26be5 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorTest.java @@ -81,7 +81,7 @@ public void shouldThrowExceptionIfConfigDisabled() { final Exception e = assertThrows( KsqlStatementException.class, () -> pullQueryExecutor.execute( - query, engine.getServiceContext(), Optional.empty(), Optional.empty()) + query, ImmutableMap.of(), engine.getServiceContext(), Optional.empty(), Optional.empty()) ); // Then: diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisherTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisherTest.java index 53389092ece9..09b5fca11fbe 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisherTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisherTest.java @@ -86,7 +86,7 @@ public void setUp() { TIME_NANOS); PullQueryResult result = new PullQueryResult(entity, Optional.empty()); - when(pullQueryExecutor.execute(any(), any(), any(), any())).thenReturn(result); + when(pullQueryExecutor.execute(any(), any(), any(), any(), any())).thenReturn(result); when(entity.getSchema()).thenReturn(SCHEMA); doAnswer(callRequestAgain()).when(subscriber).onNext(any()); @@ -110,7 +110,7 @@ public void shouldRunQueryWithCorrectParams() { subscription.request(1); // Then: - verify(pullQueryExecutor).execute(statement, serviceContext, Optional.of(false), Optional.empty()); + verify(pullQueryExecutor).execute(statement, ImmutableMap.of(), serviceContext, Optional.of(false), Optional.empty()); } @Test @@ -123,7 +123,7 @@ public void shouldOnlyExecuteOnce() { // Then: verify(subscriber).onNext(any()); - verify(pullQueryExecutor).execute(statement, serviceContext, Optional.of(false), Optional.empty()); + verify(pullQueryExecutor).execute(statement, ImmutableMap.of(), serviceContext, Optional.of(false), Optional.empty()); } @Test @@ -158,7 +158,7 @@ public void shouldCallOnErrorOnFailure() { // Given: givenSubscribed(); final Throwable e = new RuntimeException("Boom!"); - when(pullQueryExecutor.execute(any(), any(), any(), any())).thenThrow(e); + when(pullQueryExecutor.execute(any(), any(), any(), any(), any())).thenThrow(e); // When: subscription.request(1);