Skip to content

Commit

Permalink
fix: Make pull query metrics apply only to pull and not also push
Browse files Browse the repository at this point in the history
  • Loading branch information
AlanConfluent committed Mar 10, 2021
1 parent 55f5403 commit d8f17cb
Show file tree
Hide file tree
Showing 10 changed files with 132 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import io.confluent.ksql.api.auth.ApiSecurityContext;
import io.confluent.ksql.api.auth.DefaultApiSecurityContext;
import io.confluent.ksql.internal.PullQueryExecutorMetrics;
import io.confluent.ksql.rest.EndpointResponse;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
Expand All @@ -39,6 +38,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import org.apache.kafka.common.utils.Time;

Expand All @@ -55,7 +55,7 @@ static <T> void handleOldApiRequest(
final Server server,
final RoutingContext routingContext,
final Class<T> requestClass,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics,
final Optional<EndpointMetricsCallbacks> metricsCallbacks,
final BiFunction<T, ApiSecurityContext, CompletableFuture<EndpointResponse>> requestor) {
final long startTimeNanos = Time.SYSTEM.nanoseconds();
final T requestObject;
Expand All @@ -69,28 +69,25 @@ static <T> void handleOldApiRequest(
} else {
requestObject = null;
}
pullQueryMetrics
.ifPresent(pullQueryExecutorMetrics -> pullQueryExecutorMetrics.recordRequestSize(
routingContext.request().bytesRead()));
final CompletableFuture<EndpointResponse> completableFuture = requestor
.apply(requestObject, DefaultApiSecurityContext.create(routingContext));
completableFuture.thenAccept(endpointResponse -> {
handleOldApiResponse(
server, routingContext, endpointResponse, pullQueryMetrics, startTimeNanos);
server, routingContext, endpointResponse, metricsCallbacks, startTimeNanos);
}).exceptionally(t -> {
if (t instanceof CompletionException) {
t = t.getCause();
}
handleOldApiResponse(
server, routingContext, mapException(t), pullQueryMetrics, startTimeNanos);
server, routingContext, mapException(t), metricsCallbacks, startTimeNanos);
return null;
});
}

static void handleOldApiResponse(
final Server server, final RoutingContext routingContext,
final EndpointResponse endpointResponse,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics,
final Optional<EndpointMetricsCallbacks> metricsCallbacks,
final long startTimeNanos
) {
final HttpServerResponse response = routingContext.response();
Expand All @@ -111,7 +108,8 @@ static void handleOldApiResponse(
return;
}
response.putHeader(TRANSFER_ENCODING, CHUNKED_ENCODING);
streamEndpointResponse(server, routingContext, streamingOutput);
streamEndpointResponse(server, routingContext, streamingOutput, metricsCallbacks,
startTimeNanos);
} else {
if (endpointResponse.getEntity() == null) {
response.end();
Expand All @@ -124,18 +122,15 @@ static void handleOldApiResponse(
}
response.end(responseBody);
}
reportMetrics(routingContext, metricsCallbacks, startTimeNanos);
}
pullQueryMetrics
.ifPresent(pullQueryExecutorMetrics -> pullQueryExecutorMetrics.recordResponseSize(
routingContext.response().bytesWritten()));
pullQueryMetrics.ifPresent(pullQueryExecutorMetrics -> pullQueryExecutorMetrics
.recordLatency(startTimeNanos));

}

private static void streamEndpointResponse(final Server server,
final RoutingContext routingContext,
final StreamingOutput streamingOutput) {
final StreamingOutput streamingOutput,
final Optional<EndpointMetricsCallbacks> metricsCallbacks,
final long startTimeNanos) {
final WorkerExecutor workerExecutor = server.getWorkerExecutor();
final VertxCompletableFuture<Void> vcf = new VertxCompletableFuture<>();
workerExecutor.executeBlocking(promise -> {
Expand All @@ -162,6 +157,21 @@ private static void streamEndpointResponse(final Server server,
}
}
}, vcf);
vcf.handle((v, throwable) -> {
reportMetrics(routingContext, metricsCallbacks, startTimeNanos);
return null;
});
}

private static void reportMetrics(
final RoutingContext routingContext,
final Optional<EndpointMetricsCallbacks> metricsCallbacks,
final long startTimeNanos
) {
metricsCallbacks.ifPresent(mc -> mc.reportMetrics(
routingContext.request().bytesRead(),
routingContext.response().bytesWritten(),
startTimeNanos));
}

public static EndpointResponse mapException(final Throwable exception) {
Expand All @@ -176,4 +186,33 @@ public static EndpointResponse mapException(final Throwable exception) {
.build();
}

/**
* Interface for reporting metrics to a resource. A resource may choose to break things down
* arbitrarily, e.g. /query is used for both push and pull queries so we let the resource
* determine how to report the metrics.
*/
public interface MetricsCallback {

void reportMetricsOnCompletion(long requestBytes, long responseBytes, long startTimeNanos);
}

public static class EndpointMetricsCallbacks {

private AtomicReference<MetricsCallback> callbackRef = new AtomicReference<>(null);

public EndpointMetricsCallbacks() {
}

public void setCallback(final MetricsCallback callback) {
this.callbackRef.set(callback);
}

void reportMetrics(long requestBytes, long responseBytes, long startTimeNanos) {
final MetricsCallback callback = callbackRef.get();
if (callback != null) {
callback.reportMetricsOnCompletion(requestBytes, responseBytes, startTimeNanos);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import io.confluent.ksql.api.auth.ApiSecurityContext;
import io.confluent.ksql.api.auth.DefaultApiSecurityContext;
import io.confluent.ksql.api.server.OldApiUtils.EndpointMetricsCallbacks;
import io.confluent.ksql.api.spi.Endpoints;
import io.confluent.ksql.internal.PullQueryExecutorMetrics;
import io.confluent.ksql.rest.entity.ClusterTerminateRequest;
Expand Down Expand Up @@ -237,14 +238,16 @@ private void handleQueryRequest(final RoutingContext routingContext) {

final CompletableFuture<Void> connectionClosedFuture = new CompletableFuture<>();
routingContext.request().connection().closeHandler(v -> connectionClosedFuture.complete(null));
handleOldApiRequest(server, routingContext, KsqlRequest.class, pullQueryMetrics,
EndpointMetricsCallbacks metricsCallbacks = new EndpointMetricsCallbacks();
handleOldApiRequest(server, routingContext, KsqlRequest.class, Optional.of(metricsCallbacks),
(request, apiSecurityContext) ->
endpoints
.executeQueryRequest(
request, server.getWorkerExecutor(), connectionClosedFuture,
DefaultApiSecurityContext.create(routingContext),
isInternalRequest(routingContext),
getContentType(routingContext)
getContentType(routingContext),
metricsCallbacks
)

);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.confluent.ksql.api.auth.ApiSecurityContext;
import io.confluent.ksql.api.server.InsertResult;
import io.confluent.ksql.api.server.InsertsStreamSubscriber;
import io.confluent.ksql.api.server.OldApiUtils.EndpointMetricsCallbacks;
import io.confluent.ksql.rest.EndpointResponse;
import io.confluent.ksql.rest.entity.ClusterTerminateRequest;
import io.confluent.ksql.rest.entity.HeartbeatMessage;
Expand Down Expand Up @@ -83,7 +84,8 @@ CompletableFuture<EndpointResponse> executeQueryRequest(
KsqlRequest request, WorkerExecutor workerExecutor,
CompletableFuture<Void> connectionClosedFuture, ApiSecurityContext apiSecurityContext,
Optional<Boolean> isInternalRequest,
KsqlMediaType mediaType);
KsqlMediaType mediaType,
EndpointMetricsCallbacks metricsCallbacks);

CompletableFuture<EndpointResponse> executeInfo(ApiSecurityContext apiSecurityContext);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.confluent.ksql.api.impl.QueryEndpoint;
import io.confluent.ksql.api.server.InsertResult;
import io.confluent.ksql.api.server.InsertsStreamSubscriber;
import io.confluent.ksql.api.server.OldApiUtils.EndpointMetricsCallbacks;
import io.confluent.ksql.api.spi.Endpoints;
import io.confluent.ksql.api.spi.QueryPublisher;
import io.confluent.ksql.engine.KsqlEngine;
Expand Down Expand Up @@ -194,15 +195,17 @@ public CompletableFuture<EndpointResponse> executeQueryRequest(
final CompletableFuture<Void> connectionClosedFuture,
final ApiSecurityContext apiSecurityContext,
final Optional<Boolean> isInternalRequest,
final KsqlMediaType mediaType
final KsqlMediaType mediaType,
final EndpointMetricsCallbacks metricsCallbacks
) {
return executeOldApiEndpointOnWorker(apiSecurityContext,
ksqlSecurityContext -> streamedQueryResource.streamQuery(
ksqlSecurityContext,
request,
connectionClosedFuture,
isInternalRequest,
mediaType
mediaType,
metricsCallbacks
), workerExecutor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.util.concurrent.RateLimiter;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.analyzer.PullQueryValidator;
import io.confluent.ksql.api.server.OldApiUtils.EndpointMetricsCallbacks;
import io.confluent.ksql.config.SessionConfig;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.engine.PullQueryExecutionUtil;
Expand Down Expand Up @@ -193,7 +194,8 @@ public EndpointResponse streamQuery(
final KsqlRequest request,
final CompletableFuture<Void> connectionClosedFuture,
final Optional<Boolean> isInternalRequest,
final KsqlMediaType mediaType
final KsqlMediaType mediaType,
final EndpointMetricsCallbacks metricsCallbacks
) {
throwIfNotConfigured();
activenessRegistrar.updateLastRequestTime();
Expand All @@ -204,7 +206,7 @@ public EndpointResponse streamQuery(
commandQueue, request, commandQueueCatchupTimeout);

return handleStatement(securityContext, request, statement, connectionClosedFuture,
isInternalRequest, mediaType);
isInternalRequest, mediaType, metricsCallbacks);
}

private void throwIfNotConfigured() {
Expand Down Expand Up @@ -233,7 +235,8 @@ private EndpointResponse handleStatement(
final PreparedStatement<?> statement,
final CompletableFuture<Void> connectionClosedFuture,
final Optional<Boolean> isInternalRequest,
final KsqlMediaType mediaType
final KsqlMediaType mediaType,
final EndpointMetricsCallbacks metricsCallbacks
) {
try {
authorizationValidator.ifPresent(validator ->
Expand All @@ -255,8 +258,8 @@ private EndpointResponse handleStatement(
configProperties,
request.getRequestProperties(),
isInternalRequest,
pullQueryMetrics,
connectionClosedFuture
connectionClosedFuture,
metricsCallbacks
);
}

Expand Down Expand Up @@ -295,9 +298,11 @@ private EndpointResponse handlePullQuery(
final Map<String, Object> configOverrides,
final Map<String, Object> requestProperties,
final Optional<Boolean> isInternalRequest,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics,
final CompletableFuture<Void> connectionClosedFuture
final CompletableFuture<Void> connectionClosedFuture,
final EndpointMetricsCallbacks metricsCallbacks
) {
setupMetricsCallbackForPullQuery(metricsCallbacks);

final ConfiguredStatement<Query> configured = ConfiguredStatement
.of(statement, SessionConfig.of(ksqlConfig, configOverrides));

Expand Down Expand Up @@ -368,6 +373,16 @@ private EndpointResponse handlePullQuery(
}
}

private void setupMetricsCallbackForPullQuery(final EndpointMetricsCallbacks metricsCallbacks) {
metricsCallbacks.setCallback((requestBytes, responseBytes, startTimeNanos) -> {
pullQueryMetrics.ifPresent(metrics -> {
metrics.recordRequestSize(requestBytes);
metrics.recordResponseSize(responseBytes);
metrics.recordLatency(startTimeNanos);
});
});
}

private EndpointResponse handlePushQuery(
final ServiceContext serviceContext,
final PreparedStatement<Query> statement,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.confluent.ksql.api.auth.ApiSecurityContext;
import io.confluent.ksql.api.server.InsertResult;
import io.confluent.ksql.api.server.InsertsStreamSubscriber;
import io.confluent.ksql.api.server.OldApiUtils.EndpointMetricsCallbacks;
import io.confluent.ksql.api.spi.Endpoints;
import io.confluent.ksql.api.spi.QueryPublisher;
import io.confluent.ksql.api.utils.RowGenerator;
Expand Down Expand Up @@ -143,7 +144,7 @@ public CompletableFuture<EndpointResponse> executeTerminate(final ClusterTermina
public CompletableFuture<EndpointResponse> executeQueryRequest(KsqlRequest request,
WorkerExecutor workerExecutor, CompletableFuture<Void> connectionClosedFuture,
ApiSecurityContext apiSecurityContext, Optional<Boolean> isInternalRequest,
KsqlMediaType mediaType) {
KsqlMediaType mediaType, final EndpointMetricsCallbacks metricsCallbacks) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.confluent.ksql.api.auth.ApiSecurityContext;
import io.confluent.ksql.api.server.InsertResult;
import io.confluent.ksql.api.server.InsertsStreamSubscriber;
import io.confluent.ksql.api.server.OldApiUtils.EndpointMetricsCallbacks;
import io.confluent.ksql.api.spi.Endpoints;
import io.confluent.ksql.api.spi.QueryPublisher;
import io.confluent.ksql.reactive.BaseSubscriber;
Expand Down Expand Up @@ -192,7 +193,7 @@ public CompletableFuture<EndpointResponse> executeTerminate(
public CompletableFuture<EndpointResponse> executeQueryRequest(KsqlRequest request,
WorkerExecutor workerExecutor, CompletableFuture<Void> connectionClosedFuture,
ApiSecurityContext apiSecurityContext, Optional<Boolean> isInternalRequest,
KsqlMediaType mediaType) {
KsqlMediaType mediaType, final EndpointMetricsCallbacks metricsCallbacks) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.confluent.ksql.api.auth.ApiSecurityContext;
import io.confluent.ksql.api.server.InsertResult;
import io.confluent.ksql.api.server.InsertsStreamSubscriber;
import io.confluent.ksql.api.server.OldApiUtils.EndpointMetricsCallbacks;
import io.confluent.ksql.api.spi.Endpoints;
import io.confluent.ksql.api.spi.QueryPublisher;
import io.confluent.ksql.reactive.BufferedPublisher;
Expand Down Expand Up @@ -152,7 +153,7 @@ public CompletableFuture<EndpointResponse> executeTerminate(
public CompletableFuture<EndpointResponse> executeQueryRequest(KsqlRequest request,
WorkerExecutor workerExecutor, CompletableFuture<Void> connectionClosedFuture,
ApiSecurityContext apiSecurityContext, Optional<Boolean> isInternalRequest,
KsqlMediaType mediaType) {
KsqlMediaType mediaType, final EndpointMetricsCallbacks metricsCallbacks) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.confluent.ksql.api.impl.BlockingQueryPublisher;
import io.confluent.ksql.api.server.InsertResult;
import io.confluent.ksql.api.server.InsertsStreamSubscriber;
import io.confluent.ksql.api.server.OldApiUtils.EndpointMetricsCallbacks;
import io.confluent.ksql.api.server.QueryHandle;
import io.confluent.ksql.api.spi.Endpoints;
import io.confluent.ksql.api.spi.QueryPublisher;
Expand Down Expand Up @@ -138,7 +139,8 @@ public CompletableFuture<EndpointResponse> executeQueryRequest(
CompletableFuture<Void> connectionClosedFuture,
ApiSecurityContext apiSecurityContext,
Optional<Boolean> isInternalRequest,
KsqlMediaType mediaType) {
KsqlMediaType mediaType,
final EndpointMetricsCallbacks metricsCallbacks) {
return null;
}

Expand Down
Loading

0 comments on commit d8f17cb

Please sign in to comment.