diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java index cd8953158899..99aab9c52714 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java @@ -17,6 +17,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.RateLimiter; +import io.confluent.ksql.api.server.MetricsCallbackHolder; import io.confluent.ksql.api.server.QueryHandle; import io.confluent.ksql.api.spi.QueryPublisher; import io.confluent.ksql.config.SessionConfig; @@ -58,7 +59,6 @@ import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import java.util.stream.Collectors; -import org.apache.kafka.common.utils.Time; public class QueryEndpoint { @@ -99,8 +99,8 @@ public QueryPublisher createQueryPublisher( final JsonObject properties, final Context context, final WorkerExecutor workerExecutor, - final ServiceContext serviceContext) { - final long startTimeNanos = Time.SYSTEM.nanoseconds(); + final ServiceContext serviceContext, + final MetricsCallbackHolder metricsCallbackHolder) { // Must be run on worker as all this stuff is slow VertxUtils.checkIsWorker(); @@ -108,8 +108,8 @@ public QueryPublisher createQueryPublisher( if (statement.getStatement().isPullQuery()) { return createPullQueryPublisher( - context, serviceContext, statement, pullQueryMetrics, - startTimeNanos, workerExecutor); + context, serviceContext, statement, pullQueryMetrics, workerExecutor, + metricsCallbackHolder); } else { return createPushQueryPublisher(context, serviceContext, statement, workerExecutor); } @@ -146,9 +146,17 @@ private QueryPublisher createPullQueryPublisher( final ServiceContext serviceContext, final ConfiguredStatement statement, final Optional pullQueryMetrics, - final long startTimeNanos, - final WorkerExecutor workerExecutor + final WorkerExecutor workerExecutor, + final MetricsCallbackHolder metricsCallbackHolder ) { + // First thing, set the metrics callback so that it gets called, even if we hit an error + metricsCallbackHolder.setCallback((requestBytes, responseBytes, startTimeNanos) -> { + pullQueryMetrics.ifPresent(metrics -> { + metrics.recordRequestSize(requestBytes); + metrics.recordResponseSize(responseBytes); + metrics.recordLatency(startTimeNanos); + }); + }); final RoutingOptions routingOptions = new PullQueryConfigRoutingOptions( ksqlConfig, @@ -177,9 +185,6 @@ private QueryPublisher createPullQueryPublisher( result.onCompletionOrException((v, throwable) -> { decrementer.decrementAtMostOnce(); - if (throwable == null) { - pullQueryMetrics.ifPresent(p -> p.recordLatency(startTimeNanos)); - } }); final BlockingQueryPublisher publisher = new BlockingQueryPublisher(context, workerExecutor); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/MetricsCallback.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/MetricsCallback.java new file mode 100644 index 000000000000..128b88ad1cd3 --- /dev/null +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/MetricsCallback.java @@ -0,0 +1,32 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.api.server; + +/** + * Interface for reporting metrics to a resource. A resource may choose to break things down + * arbitrarily, e.g. /query is used for both push and pull queries so we let the resource + * determine how to report the metrics. + */ +public interface MetricsCallback { + + /** + * Called to report metrics when the request is complete, error or success + * @param requestBytes The request bytes + * @param responseBytes The response bytes + * @param startTimeNanos The start time of the request in nanos + */ + void reportMetricsOnCompletion(long requestBytes, long responseBytes, long startTimeNanos); +} diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/MetricsCallbackHolder.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/MetricsCallbackHolder.java new file mode 100644 index 000000000000..0a2453f06a2d --- /dev/null +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/MetricsCallbackHolder.java @@ -0,0 +1,41 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.api.server; + +import java.util.concurrent.atomic.AtomicReference; + +/** + * This class give a resource the opportunity to register a set of particular callbacks based upon + * arbitrary criteria. Once the response is complete, the callback is invoked. + */ +public class MetricsCallbackHolder { + + private AtomicReference callbackRef = new AtomicReference<>(null); + + public MetricsCallbackHolder() { + } + + public void setCallback(final MetricsCallback callback) { + this.callbackRef.set(callback); + } + + void reportMetrics(final long requestBytes, final long responseBytes, final long startTimeNanos) { + final MetricsCallback callback = callbackRef.get(); + if (callback != null) { + callback.reportMetricsOnCompletion(requestBytes, responseBytes, startTimeNanos); + } + } +} diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/OldApiUtils.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/OldApiUtils.java index 71273bf05721..b4722828802e 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/OldApiUtils.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/OldApiUtils.java @@ -38,7 +38,6 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import org.apache.kafka.common.utils.Time; @@ -55,7 +54,7 @@ static void handleOldApiRequest( final Server server, final RoutingContext routingContext, final Class requestClass, - final Optional metricsCallbacks, + final Optional metricsCallbackHolder, final BiFunction> requestor) { final long startTimeNanos = Time.SYSTEM.nanoseconds(); final T requestObject; @@ -73,13 +72,13 @@ static void handleOldApiRequest( .apply(requestObject, DefaultApiSecurityContext.create(routingContext)); completableFuture.thenAccept(endpointResponse -> { handleOldApiResponse( - server, routingContext, endpointResponse, metricsCallbacks, startTimeNanos); + server, routingContext, endpointResponse, metricsCallbackHolder, startTimeNanos); }).exceptionally(t -> { if (t instanceof CompletionException) { t = t.getCause(); } handleOldApiResponse( - server, routingContext, mapException(t), metricsCallbacks, startTimeNanos); + server, routingContext, mapException(t), metricsCallbackHolder, startTimeNanos); return null; }); } @@ -87,7 +86,7 @@ static void handleOldApiRequest( static void handleOldApiResponse( final Server server, final RoutingContext routingContext, final EndpointResponse endpointResponse, - final Optional metricsCallbacks, + final Optional metricsCallbackHolder, final long startTimeNanos ) { final HttpServerResponse response = routingContext.response(); @@ -108,7 +107,7 @@ static void handleOldApiResponse( return; } response.putHeader(TRANSFER_ENCODING, CHUNKED_ENCODING); - streamEndpointResponse(server, routingContext, streamingOutput, metricsCallbacks, + streamEndpointResponse(server, routingContext, streamingOutput, metricsCallbackHolder, startTimeNanos); } else { if (endpointResponse.getEntity() == null) { @@ -122,14 +121,14 @@ static void handleOldApiResponse( } response.end(responseBody); } - reportMetrics(routingContext, metricsCallbacks, startTimeNanos); + reportMetrics(routingContext, metricsCallbackHolder, startTimeNanos); } } private static void streamEndpointResponse(final Server server, final RoutingContext routingContext, final StreamingOutput streamingOutput, - final Optional metricsCallbacks, + final Optional metricsCallbackHolder, final long startTimeNanos) { final WorkerExecutor workerExecutor = server.getWorkerExecutor(); final VertxCompletableFuture vcf = new VertxCompletableFuture<>(); @@ -158,17 +157,17 @@ private static void streamEndpointResponse(final Server server, } }, vcf); vcf.handle((v, throwable) -> { - reportMetrics(routingContext, metricsCallbacks, startTimeNanos); + reportMetrics(routingContext, metricsCallbackHolder, startTimeNanos); return null; }); } private static void reportMetrics( final RoutingContext routingContext, - final Optional metricsCallbacks, + final Optional metricsCallbackHolder, final long startTimeNanos ) { - metricsCallbacks.ifPresent(mc -> mc.reportMetrics( + metricsCallbackHolder.ifPresent(mc -> mc.reportMetrics( routingContext.request().bytesRead(), routingContext.response().bytesWritten(), startTimeNanos)); @@ -186,33 +185,4 @@ public static EndpointResponse mapException(final Throwable exception) { .build(); } - /** - * Interface for reporting metrics to a resource. A resource may choose to break things down - * arbitrarily, e.g. /query is used for both push and pull queries so we let the resource - * determine how to report the metrics. - */ - public interface MetricsCallback { - - void reportMetricsOnCompletion(long requestBytes, long responseBytes, long startTimeNanos); - } - - public static class EndpointMetricsCallbacks { - - private AtomicReference callbackRef = new AtomicReference<>(null); - - public EndpointMetricsCallbacks() { - } - - public void setCallback(final MetricsCallback callback) { - this.callbackRef.set(callback); - } - - void reportMetrics(long requestBytes, long responseBytes, long startTimeNanos) { - final MetricsCallback callback = callbackRef.get(); - if (callback != null) { - callback.reportMetricsOnCompletion(requestBytes, responseBytes, startTimeNanos); - } - } - } - } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/QueryStreamHandler.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/QueryStreamHandler.java index cff4921f15de..6def4c7bafcd 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/QueryStreamHandler.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/QueryStreamHandler.java @@ -26,6 +26,7 @@ import io.vertx.ext.web.RoutingContext; import java.util.Objects; import java.util.Optional; +import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,8 +77,11 @@ public void handle(final RoutingContext routingContext) { return; } + final MetricsCallbackHolder metricsCallbackHolder = new MetricsCallbackHolder(); + final long startTimeNanos = Time.SYSTEM.nanoseconds(); endpoints.createQueryPublisher(queryStreamArgs.get().sql, queryStreamArgs.get().properties, - context, server.getWorkerExecutor(), DefaultApiSecurityContext.create(routingContext)) + context, server.getWorkerExecutor(), DefaultApiSecurityContext.create(routingContext), + metricsCallbackHolder) .thenAccept(queryPublisher -> { final QueryResponseMetadata metadata; @@ -101,6 +105,12 @@ public void handle(final RoutingContext routingContext) { // When response is complete, publisher should be closed and query unregistered routingContext.response().endHandler(v -> query.close()); } + routingContext.response().endHandler(v -> { + metricsCallbackHolder.reportMetrics( + routingContext.request().bytesRead(), + routingContext.response().bytesWritten(), + startTimeNanos); + }); queryStreamResponseWriter.writeMetadata(metadata); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/ServerVerticle.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/ServerVerticle.java index 896621b8ba86..d3778311d3f1 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/ServerVerticle.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/ServerVerticle.java @@ -21,7 +21,6 @@ import io.confluent.ksql.api.auth.ApiSecurityContext; import io.confluent.ksql.api.auth.DefaultApiSecurityContext; -import io.confluent.ksql.api.server.OldApiUtils.EndpointMetricsCallbacks; import io.confluent.ksql.api.spi.Endpoints; import io.confluent.ksql.internal.PullQueryExecutorMetrics; import io.confluent.ksql.rest.entity.ClusterTerminateRequest; @@ -238,8 +237,9 @@ private void handleQueryRequest(final RoutingContext routingContext) { final CompletableFuture connectionClosedFuture = new CompletableFuture<>(); routingContext.request().connection().closeHandler(v -> connectionClosedFuture.complete(null)); - EndpointMetricsCallbacks metricsCallbacks = new EndpointMetricsCallbacks(); - handleOldApiRequest(server, routingContext, KsqlRequest.class, Optional.of(metricsCallbacks), + final MetricsCallbackHolder metricsCallbackHolder = new MetricsCallbackHolder(); + handleOldApiRequest(server, routingContext, KsqlRequest.class, + Optional.of(metricsCallbackHolder), (request, apiSecurityContext) -> endpoints .executeQueryRequest( @@ -247,7 +247,7 @@ private void handleQueryRequest(final RoutingContext routingContext) { DefaultApiSecurityContext.create(routingContext), isInternalRequest(routingContext), getContentType(routingContext), - metricsCallbacks + metricsCallbackHolder ) ); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/spi/Endpoints.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/spi/Endpoints.java index 8650d969fd8b..b61c5314482a 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/spi/Endpoints.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/spi/Endpoints.java @@ -18,7 +18,7 @@ import io.confluent.ksql.api.auth.ApiSecurityContext; import io.confluent.ksql.api.server.InsertResult; import io.confluent.ksql.api.server.InsertsStreamSubscriber; -import io.confluent.ksql.api.server.OldApiUtils.EndpointMetricsCallbacks; +import io.confluent.ksql.api.server.MetricsCallbackHolder; import io.confluent.ksql.rest.EndpointResponse; import io.confluent.ksql.rest.entity.ClusterTerminateRequest; import io.confluent.ksql.rest.entity.HeartbeatMessage; @@ -52,7 +52,8 @@ public interface Endpoints { * @return A CompletableFuture representing the future result of the operation */ CompletableFuture createQueryPublisher(String sql, JsonObject properties, - Context context, WorkerExecutor workerExecutor, ApiSecurityContext apiSecurityContext); + Context context, WorkerExecutor workerExecutor, ApiSecurityContext apiSecurityContext, + MetricsCallbackHolder metricsCallbackHolder); /** * Create a subscriber which will receive a stream of inserts from the API server and process @@ -85,7 +86,7 @@ CompletableFuture executeQueryRequest( CompletableFuture connectionClosedFuture, ApiSecurityContext apiSecurityContext, Optional isInternalRequest, KsqlMediaType mediaType, - EndpointMetricsCallbacks metricsCallbacks); + MetricsCallbackHolder metricsCallbackHolder); CompletableFuture executeInfo(ApiSecurityContext apiSecurityContext); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerEndpoints.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerEndpoints.java index a57e20e7c117..70d6c7bf7391 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerEndpoints.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerEndpoints.java @@ -24,7 +24,7 @@ import io.confluent.ksql.api.impl.QueryEndpoint; import io.confluent.ksql.api.server.InsertResult; import io.confluent.ksql.api.server.InsertsStreamSubscriber; -import io.confluent.ksql.api.server.OldApiUtils.EndpointMetricsCallbacks; +import io.confluent.ksql.api.server.MetricsCallbackHolder; import io.confluent.ksql.api.spi.Endpoints; import io.confluent.ksql.api.spi.QueryPublisher; import io.confluent.ksql.engine.KsqlEngine; @@ -143,7 +143,8 @@ public CompletableFuture createQueryPublisher(final String sql, final JsonObject properties, final Context context, final WorkerExecutor workerExecutor, - final ApiSecurityContext apiSecurityContext) { + final ApiSecurityContext apiSecurityContext, + final MetricsCallbackHolder metricsCallbackHolder) { final KsqlSecurityContext ksqlSecurityContext = ksqlSecurityContextProvider .provide(apiSecurityContext); return executeOnWorker(() -> { @@ -156,7 +157,8 @@ public CompletableFuture createQueryPublisher(final String sql, properties, context, workerExecutor, - ksqlSecurityContext.getServiceContext()); + ksqlSecurityContext.getServiceContext(), + metricsCallbackHolder); } finally { ksqlSecurityContext.getServiceContext().close(); } @@ -196,7 +198,7 @@ public CompletableFuture executeQueryRequest( final ApiSecurityContext apiSecurityContext, final Optional isInternalRequest, final KsqlMediaType mediaType, - final EndpointMetricsCallbacks metricsCallbacks + final MetricsCallbackHolder metricsCallbackHolder ) { return executeOldApiEndpointOnWorker(apiSecurityContext, ksqlSecurityContext -> streamedQueryResource.streamQuery( @@ -205,7 +207,7 @@ public CompletableFuture executeQueryRequest( connectionClosedFuture, isInternalRequest, mediaType, - metricsCallbacks + metricsCallbackHolder ), workerExecutor); } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java index ad214c97c314..b86e061330f2 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java @@ -108,15 +108,9 @@ public synchronized void subscribe(final Subscriber> sub true ); - result.onCompletionOrException((v, t) -> decrementer.decrementAtMostOnce()); - result.onCompletion(v -> { - pullQueryMetrics.ifPresent(p -> p.recordLatency(startTimeNanos)); - }); result.onCompletionOrException((v, throwable) -> { decrementer.decrementAtMostOnce(); - if (throwable == null) { - pullQueryMetrics.ifPresent(p -> p.recordLatency(startTimeNanos)); - } + pullQueryMetrics.ifPresent(p -> p.recordLatency(startTimeNanos)); }); final PullQuerySubscription subscription = new PullQuerySubscription( diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java index 9f48ad3c9746..46905072be52 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java @@ -21,7 +21,7 @@ import com.google.common.util.concurrent.RateLimiter; import io.confluent.ksql.GenericRow; import io.confluent.ksql.analyzer.PullQueryValidator; -import io.confluent.ksql.api.server.OldApiUtils.EndpointMetricsCallbacks; +import io.confluent.ksql.api.server.MetricsCallbackHolder; import io.confluent.ksql.config.SessionConfig; import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.engine.PullQueryExecutionUtil; @@ -195,7 +195,7 @@ public EndpointResponse streamQuery( final CompletableFuture connectionClosedFuture, final Optional isInternalRequest, final KsqlMediaType mediaType, - final EndpointMetricsCallbacks metricsCallbacks + final MetricsCallbackHolder metricsCallbackHolder ) { throwIfNotConfigured(); activenessRegistrar.updateLastRequestTime(); @@ -206,7 +206,7 @@ public EndpointResponse streamQuery( commandQueue, request, commandQueueCatchupTimeout); return handleStatement(securityContext, request, statement, connectionClosedFuture, - isInternalRequest, mediaType, metricsCallbacks); + isInternalRequest, mediaType, metricsCallbackHolder); } private void throwIfNotConfigured() { @@ -236,7 +236,7 @@ private EndpointResponse handleStatement( final CompletableFuture connectionClosedFuture, final Optional isInternalRequest, final KsqlMediaType mediaType, - final EndpointMetricsCallbacks metricsCallbacks + final MetricsCallbackHolder metricsCallbackHolder ) { try { authorizationValidator.ifPresent(validator -> @@ -259,7 +259,7 @@ private EndpointResponse handleStatement( request.getRequestProperties(), isInternalRequest, connectionClosedFuture, - metricsCallbacks + metricsCallbackHolder ); } @@ -299,9 +299,16 @@ private EndpointResponse handlePullQuery( final Map requestProperties, final Optional isInternalRequest, final CompletableFuture connectionClosedFuture, - final EndpointMetricsCallbacks metricsCallbacks + final MetricsCallbackHolder metricsCallbackHolder ) { - setupMetricsCallbackForPullQuery(metricsCallbacks); + // First thing, set the metrics callback so that it gets called, even if we hit an error + metricsCallbackHolder.setCallback((requestBytes, responseBytes, startTimeNanos) -> { + pullQueryMetrics.ifPresent(metrics -> { + metrics.recordRequestSize(requestBytes); + metrics.recordResponseSize(responseBytes); + metrics.recordLatency(startTimeNanos); + }); + }); final ConfiguredStatement configured = ConfiguredStatement .of(statement, SessionConfig.of(ksqlConfig, configOverrides)); @@ -373,16 +380,6 @@ private EndpointResponse handlePullQuery( } } - private void setupMetricsCallbackForPullQuery(final EndpointMetricsCallbacks metricsCallbacks) { - metricsCallbacks.setCallback((requestBytes, responseBytes, startTimeNanos) -> { - pullQueryMetrics.ifPresent(metrics -> { - metrics.recordRequestSize(requestBytes); - metrics.recordResponseSize(responseBytes); - metrics.recordLatency(startTimeNanos); - }); - }); - } - private EndpointResponse handlePushQuery( final ServiceContext serviceContext, final PreparedStatement statement, diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/TestEndpoints.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/TestEndpoints.java index 6c6e93471043..5e2656c057c6 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/TestEndpoints.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/TestEndpoints.java @@ -18,7 +18,7 @@ import io.confluent.ksql.api.auth.ApiSecurityContext; import io.confluent.ksql.api.server.InsertResult; import io.confluent.ksql.api.server.InsertsStreamSubscriber; -import io.confluent.ksql.api.server.OldApiUtils.EndpointMetricsCallbacks; +import io.confluent.ksql.api.server.MetricsCallbackHolder; import io.confluent.ksql.api.spi.Endpoints; import io.confluent.ksql.api.spi.QueryPublisher; import io.confluent.ksql.api.utils.RowGenerator; @@ -66,7 +66,8 @@ public class TestEndpoints implements Endpoints { @Override public synchronized CompletableFuture createQueryPublisher(final String sql, final JsonObject properties, final Context context, final WorkerExecutor workerExecutor, - final ApiSecurityContext apiSecurityContext) { + final ApiSecurityContext apiSecurityContext, + final MetricsCallbackHolder metricsCallbackHolder) { CompletableFuture completableFuture = new CompletableFuture<>(); if (createQueryPublisherException != null) { createQueryPublisherException.fillInStackTrace(); @@ -144,7 +145,7 @@ public CompletableFuture executeTerminate(final ClusterTermina public CompletableFuture executeQueryRequest(KsqlRequest request, WorkerExecutor workerExecutor, CompletableFuture connectionClosedFuture, ApiSecurityContext apiSecurityContext, Optional isInternalRequest, - KsqlMediaType mediaType, final EndpointMetricsCallbacks metricsCallbacks) { + KsqlMediaType mediaType, final MetricsCallbackHolder metricsCallbackHolder) { return null; } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/InsertsStreamRunner.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/InsertsStreamRunner.java index ab644bb436d5..be7108c2f8d5 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/InsertsStreamRunner.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/InsertsStreamRunner.java @@ -18,7 +18,7 @@ import io.confluent.ksql.api.auth.ApiSecurityContext; import io.confluent.ksql.api.server.InsertResult; import io.confluent.ksql.api.server.InsertsStreamSubscriber; -import io.confluent.ksql.api.server.OldApiUtils.EndpointMetricsCallbacks; +import io.confluent.ksql.api.server.MetricsCallbackHolder; import io.confluent.ksql.api.spi.Endpoints; import io.confluent.ksql.api.spi.QueryPublisher; import io.confluent.ksql.reactive.BaseSubscriber; @@ -163,7 +163,8 @@ public CompletableFuture createQueryPublisher(final String sql, final JsonObject properties, final Context context, final WorkerExecutor workerExecutor, - final ApiSecurityContext apiSecurityContext) { + final ApiSecurityContext apiSecurityContext, + final MetricsCallbackHolder metricsCallbackHolder) { return null; } @@ -193,7 +194,7 @@ public CompletableFuture executeTerminate( public CompletableFuture executeQueryRequest(KsqlRequest request, WorkerExecutor workerExecutor, CompletableFuture connectionClosedFuture, ApiSecurityContext apiSecurityContext, Optional isInternalRequest, - KsqlMediaType mediaType, final EndpointMetricsCallbacks metricsCallbacks) { + KsqlMediaType mediaType, final MetricsCallbackHolder metricsCallbackHolder) { return null; } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/PullQueryRunner.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/PullQueryRunner.java index 3be6ae5e99d5..a944914b3f7d 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/PullQueryRunner.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/PullQueryRunner.java @@ -25,7 +25,7 @@ import io.confluent.ksql.api.auth.ApiSecurityContext; import io.confluent.ksql.api.server.InsertResult; import io.confluent.ksql.api.server.InsertsStreamSubscriber; -import io.confluent.ksql.api.server.OldApiUtils.EndpointMetricsCallbacks; +import io.confluent.ksql.api.server.MetricsCallbackHolder; import io.confluent.ksql.api.spi.Endpoints; import io.confluent.ksql.api.spi.QueryPublisher; import io.confluent.ksql.reactive.BufferedPublisher; @@ -121,7 +121,8 @@ public synchronized CompletableFuture createQueryPublisher(final final JsonObject properties, final Context context, final WorkerExecutor workerExecutor, - final ApiSecurityContext apiSecurityContext) { + final ApiSecurityContext apiSecurityContext, + final MetricsCallbackHolder metricsCallbackHolder) { PullQueryPublisher publisher = new PullQueryPublisher(context, DEFAULT_ROWS); publishers.add(publisher); return CompletableFuture.completedFuture(publisher); @@ -153,7 +154,7 @@ public CompletableFuture executeTerminate( public CompletableFuture executeQueryRequest(KsqlRequest request, WorkerExecutor workerExecutor, CompletableFuture connectionClosedFuture, ApiSecurityContext apiSecurityContext, Optional isInternalRequest, - KsqlMediaType mediaType, final EndpointMetricsCallbacks metricsCallbacks) { + KsqlMediaType mediaType, final MetricsCallbackHolder metricsCallbackHolder) { return null; } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/QueryStreamRunner.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/QueryStreamRunner.java index 0e03541f92b0..a3d36e8b280b 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/QueryStreamRunner.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/QueryStreamRunner.java @@ -24,7 +24,7 @@ import io.confluent.ksql.api.impl.BlockingQueryPublisher; import io.confluent.ksql.api.server.InsertResult; import io.confluent.ksql.api.server.InsertsStreamSubscriber; -import io.confluent.ksql.api.server.OldApiUtils.EndpointMetricsCallbacks; +import io.confluent.ksql.api.server.MetricsCallbackHolder; import io.confluent.ksql.api.server.QueryHandle; import io.confluent.ksql.api.spi.Endpoints; import io.confluent.ksql.api.spi.QueryPublisher; @@ -100,7 +100,8 @@ public synchronized CompletableFuture createQueryPublisher(final final JsonObject properties, final Context context, final WorkerExecutor workerExecutor, - final ApiSecurityContext apiSecurityContext) { + final ApiSecurityContext apiSecurityContext, + final MetricsCallbackHolder metricsCallbackHolder) { QueryStreamPublisher publisher = new QueryStreamPublisher(context, server.getWorkerExecutor()); publisher.setQueryHandle(new TestQueryHandle(), false); @@ -140,7 +141,7 @@ public CompletableFuture executeQueryRequest( ApiSecurityContext apiSecurityContext, Optional isInternalRequest, KsqlMediaType mediaType, - final EndpointMetricsCallbacks metricsCallbacks) { + final MetricsCallbackHolder metricsCallbackHolder) { return null; } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java index 81828ff7bcda..62653b4deb3a 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java @@ -49,7 +49,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.RateLimiter; import io.confluent.ksql.GenericRow; -import io.confluent.ksql.api.server.OldApiUtils.EndpointMetricsCallbacks; +import io.confluent.ksql.api.server.MetricsCallbackHolder; import io.confluent.ksql.api.server.StreamingOutput; import io.confluent.ksql.config.SessionConfig; import io.confluent.ksql.engine.KsqlEngine; @@ -274,7 +274,7 @@ ERROR_CODE_BAD_STATEMENT, errorMsg, PULL_QUERY_STRING, new KsqlEntityList())) new CompletableFuture<>(), Optional.empty(), KsqlMediaType.LATEST_FORMAT, - new EndpointMetricsCallbacks() + new MetricsCallbackHolder() ); // Then: @@ -345,7 +345,7 @@ public void shouldRateLimit() { new CompletableFuture<>(), Optional.empty(), KsqlMediaType.LATEST_FORMAT, - new EndpointMetricsCallbacks() + new MetricsCallbackHolder() ); // Then: @@ -366,7 +366,7 @@ public void shouldReachConcurrentLimit() { new CompletableFuture<>(), Optional.empty(), KsqlMediaType.LATEST_FORMAT, - new EndpointMetricsCallbacks()); + new MetricsCallbackHolder()); // Then: assertThat(response.getStatus(), is(500)); @@ -413,7 +413,7 @@ public void shouldThrowOnHandleStatementIfNotConfigured() { new CompletableFuture<>(), Optional.empty(), KsqlMediaType.LATEST_FORMAT, - new EndpointMetricsCallbacks() + new MetricsCallbackHolder() ) ); @@ -437,7 +437,7 @@ public void shouldReturn400OnBadStatement() { new CompletableFuture<>(), Optional.empty(), KsqlMediaType.LATEST_FORMAT, - new EndpointMetricsCallbacks() + new MetricsCallbackHolder() ) ); @@ -456,7 +456,7 @@ public void shouldNotWaitIfCommandSequenceNumberSpecified() throws Exception { new CompletableFuture<>(), Optional.empty(), KsqlMediaType.LATEST_FORMAT, - new EndpointMetricsCallbacks() + new MetricsCallbackHolder() ); // Then: @@ -472,7 +472,7 @@ public void shouldWaitIfCommandSequenceNumberSpecified() throws Exception { new CompletableFuture<>(), Optional.empty(), KsqlMediaType.LATEST_FORMAT, - new EndpointMetricsCallbacks() + new MetricsCallbackHolder() ); // Then: @@ -495,7 +495,7 @@ public void shouldReturnServiceUnavailableIfTimeoutWaitingForCommandSequenceNumb new CompletableFuture<>(), Optional.empty(), KsqlMediaType.LATEST_FORMAT, - new EndpointMetricsCallbacks() + new MetricsCallbackHolder() ) ); @@ -520,7 +520,7 @@ public void shouldNotCreateExternalClientsForPullQuery() { new CompletableFuture<>(), Optional.empty(), KsqlMediaType.LATEST_FORMAT, - new EndpointMetricsCallbacks() + new MetricsCallbackHolder() ); // Then: @@ -546,7 +546,7 @@ public void shouldReturnForbiddenKafkaAccessForPullQueryAuthorizationDenied() { new CompletableFuture<>(), Optional.empty(), KsqlMediaType.LATEST_FORMAT, - new EndpointMetricsCallbacks() + new MetricsCallbackHolder() ); final KsqlErrorMessage responseEntity = (KsqlErrorMessage) response.getEntity(); @@ -603,7 +603,7 @@ public void shouldThrowOnDenyListedStreamProperty() { new CompletableFuture<>(), Optional.empty(), KsqlMediaType.LATEST_FORMAT, - new EndpointMetricsCallbacks() + new MetricsCallbackHolder() ); // Then: @@ -686,7 +686,7 @@ public void shouldStreamRowsCorrectly() throws Throwable { new CompletableFuture<>(), Optional.empty(), KsqlMediaType.LATEST_FORMAT, - new EndpointMetricsCallbacks() + new MetricsCallbackHolder() ); final PipedOutputStream responseOutputStream = new EOFPipedOutputStream(); final PipedInputStream responseInputStream = new PipedInputStream(responseOutputStream, 1); @@ -831,7 +831,7 @@ public void shouldUpdateTheLastRequestTime() { new CompletableFuture<>(), Optional.empty(), KsqlMediaType.LATEST_FORMAT, - new EndpointMetricsCallbacks() + new MetricsCallbackHolder() ); // Then: @@ -854,7 +854,7 @@ public void shouldReturnForbiddenKafkaAccessIfKsqlTopicAuthorizationException() new CompletableFuture<>(), Optional.empty(), KsqlMediaType.LATEST_FORMAT, - new EndpointMetricsCallbacks() + new MetricsCallbackHolder() ); final KsqlErrorMessage responseEntity = (KsqlErrorMessage) response.getEntity(); @@ -881,7 +881,7 @@ public void shouldReturnForbiddenKafkaAccessIfPrintTopicKsqlTopicAuthorizationEx new CompletableFuture<>(), Optional.empty(), KsqlMediaType.LATEST_FORMAT, - new EndpointMetricsCallbacks() + new MetricsCallbackHolder() ); assertEquals(response.getStatus(), AUTHORIZATION_ERROR_RESPONSE.getStatus()); @@ -913,7 +913,7 @@ public void shouldSuggestAlternativesIfPrintTopicDoesNotExist() { new CompletableFuture<>(), Optional.empty(), KsqlMediaType.LATEST_FORMAT, - new EndpointMetricsCallbacks() + new MetricsCallbackHolder() ) );