Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add metrics for pull query request/response size in bytes #6148

Merged
merged 7 commits into from
Sep 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.confluent.ksql.query.BlockingRowQueue;
import io.confluent.ksql.rest.entity.TableRows;
import io.confluent.ksql.rest.server.execution.PullQueryExecutor;
import io.confluent.ksql.rest.server.execution.PullQueryExecutorMetrics;
import io.confluent.ksql.rest.server.execution.PullQueryResult;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.utils.FormatOptions;
Expand All @@ -52,12 +53,18 @@ public class QueryEndpoint {
private final KsqlEngine ksqlEngine;
private final KsqlConfig ksqlConfig;
private final PullQueryExecutor pullQueryExecutor;
private final Optional<PullQueryExecutorMetrics> pullQueryMetrics;

public QueryEndpoint(final KsqlEngine ksqlEngine, final KsqlConfig ksqlConfig,
final PullQueryExecutor pullQueryExecutor) {
public QueryEndpoint(
final KsqlEngine ksqlEngine,
final KsqlConfig ksqlConfig,
final PullQueryExecutor pullQueryExecutor,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics
) {
this.ksqlEngine = ksqlEngine;
this.ksqlConfig = ksqlConfig;
this.pullQueryExecutor = pullQueryExecutor;
this.pullQueryMetrics = pullQueryMetrics;
}

public QueryPublisher createQueryPublisher(
Expand All @@ -72,7 +79,8 @@ public QueryPublisher createQueryPublisher(
final ConfiguredStatement<Query> statement = createStatement(sql, properties.getMap());

if (statement.getStatement().isPullQuery()) {
return createPullQueryPublisher(context, serviceContext, statement, startTimeNanos);
return createPullQueryPublisher(
context, serviceContext, statement, pullQueryMetrics, startTimeNanos);
} else {
return createPushQueryPublisher(context, serviceContext, statement, workerExecutor);
}
Expand All @@ -97,10 +105,12 @@ private QueryPublisher createPullQueryPublisher(
final Context context,
final ServiceContext serviceContext,
final ConfiguredStatement<Query> statement,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics,
final long startTimeNanos
) {
final PullQueryResult result = pullQueryExecutor.execute(
statement, ImmutableMap.of(), serviceContext, Optional.of(false), startTimeNanos);
statement, ImmutableMap.of(), serviceContext, Optional.of(false), pullQueryMetrics);
pullQueryMetrics.ifPresent(p -> p.recordLatency(startTimeNanos));
final TableRows tableRows = result.getTableRows();

return new PullQueryPublisher(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.confluent.ksql.rest.EndpointResponse;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
import io.confluent.ksql.rest.server.execution.PullQueryExecutorMetrics;
import io.confluent.ksql.rest.server.resources.KsqlRestException;
import io.confluent.ksql.util.VertxCompletableFuture;
import io.vertx.core.WorkerExecutor;
Expand All @@ -39,6 +40,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.BiFunction;
import org.apache.kafka.common.utils.Time;

public final class OldApiUtils {

Expand All @@ -49,10 +51,13 @@ private OldApiUtils() {
private static final String JSON_CONTENT_TYPE = "application/json";
private static final String CHUNKED_ENCODING = "chunked";

static <T> void handleOldApiRequest(final Server server,
static <T> void handleOldApiRequest(
final Server server,
final RoutingContext routingContext,
final Class<T> requestClass,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics,
final BiFunction<T, ApiSecurityContext, CompletableFuture<EndpointResponse>> requestor) {
final long startTimeNanos = Time.SYSTEM.nanoseconds();
final T requestObject;
if (requestClass != null) {
final Optional<T> optRequestObject = ServerUtils
Expand All @@ -64,22 +69,30 @@ static <T> void handleOldApiRequest(final Server server,
} else {
requestObject = null;
}

pullQueryMetrics
.ifPresent(pullQueryExecutorMetrics -> pullQueryExecutorMetrics.recordRequestSize(
routingContext.request().bytesRead()));
final CompletableFuture<EndpointResponse> completableFuture = requestor
.apply(requestObject, DefaultApiSecurityContext.create(routingContext));
completableFuture.thenAccept(endpointResponse -> {
handleOldApiResponse(server, routingContext, endpointResponse);
handleOldApiResponse(
server, routingContext, endpointResponse, pullQueryMetrics, startTimeNanos);
}).exceptionally(t -> {
if (t instanceof CompletionException) {
t = t.getCause();
}
handleOldApiResponse(server, routingContext, mapException(t));
handleOldApiResponse(
server, routingContext, mapException(t), pullQueryMetrics, startTimeNanos);
return null;
});
}

static void handleOldApiResponse(final Server server, final RoutingContext routingContext,
final EndpointResponse endpointResponse) {
static void handleOldApiResponse(
final Server server, final RoutingContext routingContext,
final EndpointResponse endpointResponse,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics,
final long startTimeNanos
) {
final HttpServerResponse response = routingContext.response();
response.putHeader(CONTENT_TYPE_HEADER, JSON_CONTENT_TYPE);

Expand Down Expand Up @@ -112,6 +125,12 @@ static void handleOldApiResponse(final Server server, final RoutingContext routi
response.end(responseBody);
}
}
pullQueryMetrics
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only place where we have access to the response sent over the wire. That's why I had to pass the pullQueryMetrics object in here

.ifPresent(pullQueryExecutorMetrics -> pullQueryExecutorMetrics.recordResponseSize(
routingContext.response().bytesWritten()));
pullQueryMetrics.ifPresent(pullQueryExecutorMetrics -> pullQueryExecutorMetrics
.recordLatency(startTimeNanos));

}

private static void streamEndpointResponse(final Server server,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.confluent.ksql.api.spi.Endpoints;
import io.confluent.ksql.rest.entity.PushQueryId;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.confluent.ksql.rest.server.execution.PullQueryExecutorMetrics;
import io.confluent.ksql.rest.server.state.ServerState;
import io.confluent.ksql.rest.util.KeystoreUtil;
import io.confluent.ksql.security.KsqlSecurityExtension;
Expand Down Expand Up @@ -78,21 +79,25 @@ public class Server {
private final Optional<AuthenticationPlugin> authenticationPlugin;
private final ServerState serverState;
private final List<URI> listeners = new ArrayList<>();
private final Optional<PullQueryExecutorMetrics> pullQueryMetrics;
private URI internalListener;
private WorkerExecutor workerExecutor;
private FileWatcher fileWatcher;

public Server(final Vertx vertx, final KsqlRestConfig config, final Endpoints endpoints,
public Server(
final Vertx vertx, final KsqlRestConfig config, final Endpoints endpoints,
final KsqlSecurityExtension securityExtension,
final Optional<AuthenticationPlugin> authenticationPlugin,
final ServerState serverState) {
final ServerState serverState,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics) {
this.vertx = Objects.requireNonNull(vertx);
this.config = Objects.requireNonNull(config);
this.endpoints = Objects.requireNonNull(endpoints);
this.securityExtension = Objects.requireNonNull(securityExtension);
this.authenticationPlugin = Objects.requireNonNull(authenticationPlugin);
this.serverState = Objects.requireNonNull(serverState);
this.maxPushQueryCount = config.getInt(KsqlRestConfig.MAX_PUSH_QUERIES);
this.pullQueryMetrics = Objects.requireNonNull(pullQueryMetrics, "pullQueryMetrics");
if (!OpenSsl.isAvailable()) {
log.warn("OpenSSL does not appear to be installed. ksqlDB will fall back to using the JDK "
+ "TLS implementation. OpenSSL is recommended for better performance.");
Expand Down Expand Up @@ -126,7 +131,7 @@ public synchronized void start() {
final ServerVerticle serverVerticle = new ServerVerticle(endpoints,
createHttpServerOptions(config, listener.getHost(), listener.getPort(),
listener.getScheme().equalsIgnoreCase("https"), isInternalListener.orElse(false)),
this, isInternalListener);
this, isInternalListener, pullQueryMetrics);
vertx.deployVerticle(serverVerticle, vcf);
final int index = i;
final CompletableFuture<String> deployFuture = vcf.thenApply(s -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.confluent.ksql.rest.entity.KsqlRequest;
import io.confluent.ksql.rest.entity.LagReportingMessage;
import io.confluent.ksql.rest.entity.Versions;
import io.confluent.ksql.rest.server.execution.PullQueryExecutorMetrics;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.http.HttpHeaders;
Expand Down Expand Up @@ -65,16 +66,19 @@ public class ServerVerticle extends AbstractVerticle {
private ConnectionQueryManager connectionQueryManager;
private HttpServer httpServer;
private final Optional<Boolean> isInternalListener;
private final Optional<PullQueryExecutorMetrics> pullQueryMetrics;

public ServerVerticle(
final Endpoints endpoints,
final HttpServerOptions httpServerOptions,
final Server server,
final Optional<Boolean> isInternalListener) {
final Optional<Boolean> isInternalListener,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics) {
this.endpoints = Objects.requireNonNull(endpoints);
this.httpServerOptions = Objects.requireNonNull(httpServerOptions);
this.server = Objects.requireNonNull(server);
this.isInternalListener = Objects.requireNonNull(isInternalListener);
this.pullQueryMetrics = Objects.requireNonNull(pullQueryMetrics);
}

@Override
Expand Down Expand Up @@ -206,7 +210,7 @@ private Router setupRouter() {
}

private void handleKsqlRequest(final RoutingContext routingContext) {
handleOldApiRequest(server, routingContext, KsqlRequest.class,
handleOldApiRequest(server, routingContext, KsqlRequest.class, Optional.empty(),
(ksqlRequest, apiSecurityContext) ->
endpoints
.executeKsqlRequest(ksqlRequest, server.getWorkerExecutor(),
Expand All @@ -215,7 +219,7 @@ private void handleKsqlRequest(final RoutingContext routingContext) {
}

private void handleTerminateRequest(final RoutingContext routingContext) {
handleOldApiRequest(server, routingContext, ClusterTerminateRequest.class,
handleOldApiRequest(server, routingContext, ClusterTerminateRequest.class, Optional.empty(),
(request, apiSecurityContext) ->
endpoints
.executeTerminate(request, server.getWorkerExecutor(),
Expand All @@ -227,32 +231,33 @@ private void handleQueryRequest(final RoutingContext routingContext) {

final CompletableFuture<Void> connectionClosedFuture = new CompletableFuture<>();
routingContext.request().connection().closeHandler(v -> connectionClosedFuture.complete(null));

handleOldApiRequest(server, routingContext, KsqlRequest.class,
handleOldApiRequest(server, routingContext, KsqlRequest.class, pullQueryMetrics,
(request, apiSecurityContext) ->
endpoints
.executeQueryRequest(request, server.getWorkerExecutor(), connectionClosedFuture,
.executeQueryRequest(
request, server.getWorkerExecutor(), connectionClosedFuture,
DefaultApiSecurityContext.create(routingContext),
isInternalRequest(routingContext))

);
}

private void handleInfoRequest(final RoutingContext routingContext) {
handleOldApiRequest(server, routingContext, null,
handleOldApiRequest(server, routingContext, null, Optional.empty(),
(request, apiSecurityContext) ->
endpoints.executeInfo(DefaultApiSecurityContext.create(routingContext))
);
}

private void handleClusterStatusRequest(final RoutingContext routingContext) {
handleOldApiRequest(server, routingContext, null,
handleOldApiRequest(server, routingContext, null, Optional.empty(),
(request, apiSecurityContext) ->
endpoints.executeClusterStatus(DefaultApiSecurityContext.create(routingContext))
);
}

private void handleHeartbeatRequest(final RoutingContext routingContext) {
handleOldApiRequest(server, routingContext, HeartbeatMessage.class,
handleOldApiRequest(server, routingContext, HeartbeatMessage.class, Optional.empty(),
(request, apiSecurityContext) ->
endpoints.executeHeartbeat(request, DefaultApiSecurityContext.create(routingContext))
);
Expand All @@ -263,43 +268,43 @@ private void handleStatusRequest(final RoutingContext routingContext) {
final String type = request.getParam("type");
final String entity = request.getParam("entity");
final String action = request.getParam("action");
handleOldApiRequest(server, routingContext, null,
handleOldApiRequest(server, routingContext, null, Optional.empty(),
(r, apiSecurityContext) ->
endpoints.executeStatus(type, entity, action,
DefaultApiSecurityContext.create(routingContext))
);
}

private void handleAllStatusesRequest(final RoutingContext routingContext) {
handleOldApiRequest(server, routingContext, null,
handleOldApiRequest(server, routingContext, null, Optional.empty(),
(r, apiSecurityContext) ->
endpoints.executeAllStatuses(DefaultApiSecurityContext.create(routingContext))
);
}

private void handleLagReportRequest(final RoutingContext routingContext) {
handleOldApiRequest(server, routingContext, LagReportingMessage.class,
handleOldApiRequest(server, routingContext, LagReportingMessage.class, Optional.empty(),
(request, apiSecurityContext) ->
endpoints.executeLagReport(request, DefaultApiSecurityContext.create(routingContext))
);
}

private void handleHealthcheckRequest(final RoutingContext routingContext) {
handleOldApiRequest(server, routingContext, null,
handleOldApiRequest(server, routingContext, null, Optional.empty(),
(request, apiSecurityContext) ->
endpoints.executeCheckHealth(DefaultApiSecurityContext.create(routingContext))
);
}

private void handleServerMetadataRequest(final RoutingContext routingContext) {
handleOldApiRequest(server, routingContext, null,
handleOldApiRequest(server, routingContext, null, Optional.empty(),
(request, apiSecurityContext) ->
endpoints.executeServerMetadata(DefaultApiSecurityContext.create(routingContext))
);
}

private void handleServerMetadataClusterIdRequest(final RoutingContext routingContext) {
handleOldApiRequest(server, routingContext, null,
handleOldApiRequest(server, routingContext, null, Optional.empty(),
(request, apiSecurityContext) ->
endpoints
.executeServerMetadataClusterId(DefaultApiSecurityContext.create(routingContext))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,10 @@ CompletableFuture<EndpointResponse> executeKsqlRequest(KsqlRequest request,
CompletableFuture<EndpointResponse> executeTerminate(ClusterTerminateRequest request,
WorkerExecutor workerExecutor, ApiSecurityContext apiSecurityContext);

CompletableFuture<EndpointResponse> executeQueryRequest(KsqlRequest request,
WorkerExecutor workerExecutor, CompletableFuture<Void> connectionClosedFuture,
ApiSecurityContext apiSecurityContext, Optional<Boolean> isInternalRequest);
CompletableFuture<EndpointResponse> executeQueryRequest(
KsqlRequest request, WorkerExecutor workerExecutor,
CompletableFuture<Void> connectionClosedFuture, ApiSecurityContext apiSecurityContext,
Optional<Boolean> isInternalRequest);

CompletableFuture<EndpointResponse> executeInfo(ApiSecurityContext apiSecurityContext);

Expand Down
Loading