From 919924edf9e353ff5ffb118dcfb9e594c5866354 Mon Sep 17 00:00:00 2001 From: Steven Zhang Date: Wed, 11 Dec 2019 16:26:33 -0800 Subject: [PATCH 1/3] feat: add config to make error messages configurable --- .../ksql/rest/server/KsqlRestApplication.java | 31 ++++++--- .../ksql/rest/server/KsqlRestConfig.java | 13 ++++ .../rest/server/resources/KsqlResource.java | 15 +++-- .../streaming/StreamedQueryResource.java | 15 +++-- .../resources/streaming/WSQueryEndpoint.java | 12 ++++ .../ksql/rest/util/ErrorResponseUtil.java | 8 ++- .../rest/server/computation/RecoveryTest.java | 5 +- .../server/resources/KsqlResourceTest.java | 12 +++- .../streaming/StreamedQueryResourceTest.java | 7 +- .../streaming/WSQueryEndpointTest.java | 2 + .../ksql/rest/DefaultErrorsImpl.java | 31 +++++++++ .../java/io/confluent/ksql/rest/Errors.java | 66 ++++++++++--------- 12 files changed, 158 insertions(+), 59 deletions(-) create mode 100644 ksql-rest-model/src/main/java/io/confluent/ksql/rest/DefaultErrorsImpl.java diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index 40b894b08d29..a7f3222dda5d 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -39,6 +39,7 @@ import io.confluent.ksql.parser.KsqlParser.ParsedStatement; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; import io.confluent.ksql.query.id.SpecificQueryIdGenerator; +import io.confluent.ksql.rest.Errors; import io.confluent.ksql.rest.client.RestResponse; import io.confluent.ksql.rest.entity.KsqlEntityList; import io.confluent.ksql.rest.entity.KsqlErrorMessage; @@ -381,6 +382,10 @@ protected void registerWebSocketEndpoints(final ServerContainer container) { final StatementParser statementParser = new StatementParser(ksqlEngine); final Optional authorizationValidator = KsqlAuthorizationValidatorFactory.create(ksqlConfigNoPort, serviceContext); + final Errors errorHandler = restConfig.getConfiguredInstance( + KsqlRestConfig.KSQL_SERVER_ERRORS, + Errors.class + ); container.addEndpoint( ServerEndpointConfig.Builder @@ -403,6 +408,7 @@ public T getEndpointInstance(final Class endpointClass) { Duration.ofMillis(config.getLong( KsqlRestConfig.DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)), authorizationValidator, + errorHandler, securityExtension, serverState ); @@ -498,6 +504,11 @@ static KsqlRestApplication buildApplication( final Optional authorizationValidator = KsqlAuthorizationValidatorFactory.create(ksqlConfig, serviceContext); + final Errors errorHandler = restConfig.getConfiguredInstance( + KsqlRestConfig.KSQL_SERVER_ERRORS, + Errors.class + ); + final StreamedQueryResource streamedQueryResource = new StreamedQueryResource( ksqlEngine, commandStore, @@ -505,7 +516,17 @@ static KsqlRestApplication buildApplication( restConfig.getLong(KsqlRestConfig.STREAMED_QUERY_DISCONNECT_CHECK_MS_CONFIG)), Duration.ofMillis(restConfig.getLong(DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)), versionChecker::updateLastRequestTime, - authorizationValidator + authorizationValidator, + errorHandler + ); + + final KsqlResource ksqlResource = new KsqlResource( + ksqlEngine, + commandStore, + Duration.ofMillis(restConfig.getLong(DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)), + versionChecker::updateLastRequestTime, + authorizationValidator, + errorHandler ); final List managedTopics = new LinkedList<>(); @@ -526,14 +547,6 @@ static KsqlRestApplication buildApplication( metricsPrefix ); - final KsqlResource ksqlResource = new KsqlResource( - ksqlEngine, - commandStore, - Duration.ofMillis(restConfig.getLong(DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)), - versionChecker::updateLastRequestTime, - authorizationValidator - ); - final List preconditions = restConfig.getConfiguredInstances( KsqlRestConfig.KSQL_SERVER_PRECONDITIONS, KsqlServerPrecondition.class diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java index a2e0e97ea7e2..0536cdc2c766 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java @@ -15,6 +15,7 @@ package io.confluent.ksql.rest.server; +import io.confluent.ksql.rest.DefaultErrorsImpl; import io.confluent.ksql.util.KsqlException; import io.confluent.rest.RestConfig; import java.util.Map; @@ -64,6 +65,12 @@ public class KsqlRestConfig extends RestConfig { + "will not start serving requests until all preconditions are satisfied. Until that time, " + "requests will return a 503 error"; + static final String KSQL_SERVER_ERRORS = + KSQL_CONFIG_PREFIX + "server.errors"; + private static final String KSQL_SERVER_ERRORS_DOC = + "A class implementing Errors. This allows the KSQL server to return pluggable " + + "error messages."; + static final String KSQL_SERVER_ENABLE_UNCAUGHT_EXCEPTION_HANDLER = KSQL_CONFIG_PREFIX + "server.exception.uncaught.handler.enable"; @@ -136,6 +143,12 @@ public class KsqlRestConfig extends RestConfig { 15000L, Importance.LOW, KSQL_COMMAND_RUNNER_BLOCKED_THRESHHOLD_ERROR_MS_DOC + ).define( + KSQL_SERVER_ERRORS, + Type.CLASS, + DefaultErrorsImpl.class, + Importance.LOW, + KSQL_SERVER_ERRORS_DOC ); } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java index f828fbf14371..76d80c0c9f1c 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java @@ -101,6 +101,7 @@ public class KsqlResource implements KsqlConfigurable { private final Optional authorizationValidator; private RequestValidator validator; private RequestHandler handler; + private Errors errorHandler; public KsqlResource( @@ -108,7 +109,8 @@ public KsqlResource( final CommandQueue commandQueue, final Duration distributedCmdResponseTimeout, final ActivenessRegistrar activenessRegistrar, - final Optional authorizationValidator + final Optional authorizationValidator, + final Errors errorHandler ) { this( ksqlEngine, @@ -116,7 +118,8 @@ public KsqlResource( distributedCmdResponseTimeout, activenessRegistrar, Injectors.DEFAULT, - authorizationValidator + authorizationValidator, + errorHandler ); } @@ -126,7 +129,8 @@ public KsqlResource( final Duration distributedCmdResponseTimeout, final ActivenessRegistrar activenessRegistrar, final BiFunction injectorFactory, - final Optional authorizationValidator + final Optional authorizationValidator, + final Errors errorHandler ) { this.ksqlEngine = Objects.requireNonNull(ksqlEngine, "ksqlEngine"); this.commandQueue = Objects.requireNonNull(commandQueue, "commandQueue"); @@ -137,6 +141,7 @@ public KsqlResource( this.injectorFactory = Objects.requireNonNull(injectorFactory, "injectorFactory"); this.authorizationValidator = Objects .requireNonNull(authorizationValidator, "authorizationValidator"); + this.errorHandler = Objects.requireNonNull(errorHandler, "errorHandler"); } @Override @@ -232,10 +237,10 @@ public Response handleKsqlStatements( return Errors.badStatement(e.getRawMessage(), e.getSqlStatement()); } catch (final KsqlException e) { return ErrorResponseUtil.generateResponse( - e, Errors.badRequest(e)); + e, Errors.badRequest(e), errorHandler); } catch (final Exception e) { return ErrorResponseUtil.generateResponse( - e, Errors.serverErrorForStatement(e, request.getKsql())); + e, Errors.serverErrorForStatement(e, request.getKsql()), errorHandler); } } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java index 8d13a4b1b6b6..2aedbfbb0f58 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java @@ -80,6 +80,7 @@ public class StreamedQueryResource implements KsqlConfigurable { private final ObjectMapper objectMapper; private final ActivenessRegistrar activenessRegistrar; private final Optional authorizationValidator; + private final Errors errorHandler; private KsqlConfig ksqlConfig; public StreamedQueryResource( @@ -88,7 +89,8 @@ public StreamedQueryResource( final Duration disconnectCheckInterval, final Duration commandQueueCatchupTimeout, final ActivenessRegistrar activenessRegistrar, - final Optional authorizationValidator + final Optional authorizationValidator, + final Errors errorHandler ) { this( ksqlEngine, @@ -97,7 +99,8 @@ public StreamedQueryResource( disconnectCheckInterval, commandQueueCatchupTimeout, activenessRegistrar, - authorizationValidator + authorizationValidator, + errorHandler ); } @@ -109,7 +112,8 @@ public StreamedQueryResource( final Duration disconnectCheckInterval, final Duration commandQueueCatchupTimeout, final ActivenessRegistrar activenessRegistrar, - final Optional authorizationValidator + final Optional authorizationValidator, + final Errors errorHandler ) { this.ksqlEngine = Objects.requireNonNull(ksqlEngine, "ksqlEngine"); this.statementParser = Objects.requireNonNull(statementParser, "statementParser"); @@ -122,6 +126,7 @@ public StreamedQueryResource( this.activenessRegistrar = Objects.requireNonNull(activenessRegistrar, "activenessRegistrar"); this.authorizationValidator = authorizationValidator; + this.errorHandler = Objects.requireNonNull(errorHandler, "errorHandler");; } @Override @@ -224,12 +229,12 @@ private Response handleStatement( "Statement type `%s' not supported for this resource", statement.getClass().getName())); } catch (final TopicAuthorizationException e) { - return Errors.accessDeniedFromKafka(e); + return errorHandler.accessDeniedFromKafkaResponse(e); } catch (final KsqlStatementException e) { return Errors.badStatement(e.getRawMessage(), e.getSqlStatement()); } catch (final KsqlException e) { return ErrorResponseUtil.generateResponse( - e, Errors.badRequest(e)); + e, Errors.badRequest(e), errorHandler); } } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java index d987477ef372..0d43755b5bb0 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java @@ -23,6 +23,7 @@ import io.confluent.ksql.parser.tree.PrintTopic; import io.confluent.ksql.parser.tree.Query; import io.confluent.ksql.parser.tree.Statement; +import io.confluent.ksql.rest.Errors; import io.confluent.ksql.rest.entity.KsqlErrorMessage; import io.confluent.ksql.rest.entity.KsqlRequest; import io.confluent.ksql.rest.entity.StreamedRow; @@ -60,6 +61,8 @@ import javax.websocket.Session; import javax.websocket.server.ServerEndpoint; import javax.ws.rs.core.Response; + +import org.apache.kafka.common.errors.TopicAuthorizationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -93,6 +96,7 @@ public class WSQueryEndpoint { private final UserServiceContextFactory serviceContextFactory; private final DefaultServiceContextFactory defaultServiceContextFactory; private final ServerState serverState; + private final Errors errorHandler; private WebSocketSubscriber subscriber; private ServiceContext serviceContext; @@ -109,6 +113,7 @@ public WSQueryEndpoint( final ActivenessRegistrar activenessRegistrar, final Duration commandQueueCatchupTimeout, final Optional authorizationValidator, + final Errors errorHandler, final KsqlSecurityExtension securityExtension, final ServerState serverState ) { @@ -124,6 +129,7 @@ public WSQueryEndpoint( activenessRegistrar, commandQueueCatchupTimeout, authorizationValidator, + errorHandler, securityExtension, RestServiceContextFactory::create, RestServiceContextFactory::create, @@ -145,6 +151,7 @@ public WSQueryEndpoint( final ActivenessRegistrar activenessRegistrar, final Duration commandQueueCatchupTimeout, final Optional authorizationValidator, + final Errors errorHandler, final KsqlSecurityExtension securityExtension, final UserServiceContextFactory serviceContextFactory, final DefaultServiceContextFactory defaultServiceContextFactory, @@ -172,6 +179,7 @@ public WSQueryEndpoint( this.defaultServiceContextFactory = Objects.requireNonNull(defaultServiceContextFactory, "defaultServiceContextFactory"); this.serverState = Objects.requireNonNull(serverState, "serverState"); + this.errorHandler = Objects.requireNonNull(errorHandler, "errorHandler");; } @SuppressWarnings("unused") @@ -221,6 +229,10 @@ public void onOpen(final Session session, final EndpointConfig unused) { HANDLER_MAP .getOrDefault(type, WSQueryEndpoint::handleUnsupportedStatement) .handle(this, new RequestContext(session, request, serviceContext), statement); + } catch (final TopicAuthorizationException e) { + log.debug("Error processing request", e); + SessionUtil.closeSilently( + session, CloseCodes.CANNOT_ACCEPT, errorHandler.webSocketAuthorizationErrorMessage(e)); } catch (final Exception e) { log.debug("Error processing request", e); SessionUtil.closeSilently(session, CloseCodes.CANNOT_ACCEPT, e.getMessage()); diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/ErrorResponseUtil.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/ErrorResponseUtil.java index 907262bba87a..051f8b30e708 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/ErrorResponseUtil.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/ErrorResponseUtil.java @@ -26,9 +26,13 @@ public final class ErrorResponseUtil { private ErrorResponseUtil() { } - public static Response generateResponse(final Exception e, final Response defaultResponse) { + public static Response generateResponse( + final Exception e, + final Response defaultResponse, + final Errors errorHandler + ) { if (ExceptionUtils.indexOfType(e, TopicAuthorizationException.class) >= 0) { - return Errors.accessDeniedFromKafka(e); + return errorHandler.accessDeniedFromKafkaResponse(e); } else { return defaultResponse; } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java index cee9143c7c0f..21bccfbe5739 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java @@ -38,6 +38,8 @@ import io.confluent.ksql.name.SourceName; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.query.id.SpecificQueryIdGenerator; +import io.confluent.ksql.rest.DefaultErrorsImpl; +import io.confluent.ksql.rest.Errors; import io.confluent.ksql.rest.entity.CommandId; import io.confluent.ksql.rest.entity.CommandId.Action; import io.confluent.ksql.rest.entity.CommandId.Type; @@ -220,7 +222,8 @@ private class KsqlServer { fakeCommandQueue, Duration.ofMillis(0), ()->{}, - Optional.of((sc, metastore, statement) -> { }) + Optional.of((sc, metastore, statement) -> { }), + new DefaultErrorsImpl() ); this.statementExecutor.configure(ksqlConfig); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java index b9c512376480..e8459ae6fa9b 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java @@ -87,6 +87,7 @@ import io.confluent.ksql.parser.tree.TableElement.Namespace; import io.confluent.ksql.parser.tree.TableElements; import io.confluent.ksql.parser.tree.TerminateQuery; +import io.confluent.ksql.rest.DefaultErrorsImpl; import io.confluent.ksql.rest.Errors; import io.confluent.ksql.rest.entity.ClusterTerminateRequest; import io.confluent.ksql.rest.entity.CommandId; @@ -267,6 +268,8 @@ public class KsqlResourceTest { private KsqlAuthorizationValidator authorizationValidator; @Mock private Producer transactionalProducer; + @Mock + private Errors errorsHandler; private KsqlResource ksqlResource; private SchemaRegistryClient schemaRegistryClient; @@ -361,7 +364,8 @@ public void shouldThrowOnHandleStatementIfNotConfigured() { schemaInjectorFactory.apply(sc), topicInjectorFactory.apply(ec), new TopicDeleteInjector(ec, sc)), - Optional.of(authorizationValidator) + Optional.of(authorizationValidator), + errorsHandler ); // Then: @@ -389,7 +393,8 @@ public void shouldThrowOnHandleTerminateIfNotConfigured() { schemaInjectorFactory.apply(sc), topicInjectorFactory.apply(ec), new TopicDeleteInjector(ec, sc)), - Optional.of(authorizationValidator) + Optional.of(authorizationValidator), + errorsHandler ); // Then: @@ -2078,7 +2083,8 @@ private void setUpKsqlResource() { schemaInjectorFactory.apply(sc), topicInjectorFactory.apply(ec), new TopicDeleteInjector(ec, sc)), - Optional.of(authorizationValidator) + Optional.of(authorizationValidator), + new DefaultErrorsImpl() ); ksqlResource.configure(ksqlConfig); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java index 43655bb03ccd..21a5d911e217 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java @@ -46,6 +46,7 @@ import io.confluent.ksql.parser.tree.PrintTopic; import io.confluent.ksql.parser.tree.Query; import io.confluent.ksql.parser.tree.Statement; +import io.confluent.ksql.rest.DefaultErrorsImpl; import io.confluent.ksql.rest.Errors; import io.confluent.ksql.rest.entity.KsqlErrorMessage; import io.confluent.ksql.rest.entity.KsqlRequest; @@ -160,7 +161,8 @@ public void setup() { DISCONNECT_CHECK_INTERVAL, COMMAND_QUEUE_CATCHUP_TIMOEUT, activenessRegistrar, - Optional.of(authorizationValidator) + Optional.of(authorizationValidator), + new DefaultErrorsImpl() ); testResource.configure(VALID_CONFIG); @@ -185,7 +187,8 @@ public void shouldThrowOnHandleStatementIfNotConfigured() { DISCONNECT_CHECK_INTERVAL, COMMAND_QUEUE_CATCHUP_TIMOEUT, activenessRegistrar, - Optional.of(authorizationValidator) + Optional.of(authorizationValidator), + new DefaultErrorsImpl() ); // Then: diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpointTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpointTest.java index 2d3bd33861b5..2ca64b081ba1 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpointTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpointTest.java @@ -43,6 +43,7 @@ import io.confluent.ksql.parser.tree.ResultMaterialization; import io.confluent.ksql.parser.tree.Select; import io.confluent.ksql.parser.tree.Statement; +import io.confluent.ksql.rest.DefaultErrorsImpl; import io.confluent.ksql.rest.entity.KsqlErrorMessage; import io.confluent.ksql.rest.entity.KsqlRequest; import io.confluent.ksql.rest.entity.Versions; @@ -192,6 +193,7 @@ public void setUp() { activenessRegistrar, COMMAND_QUEUE_CATCHUP_TIMEOUT, Optional.of(authorizationValidator), + new DefaultErrorsImpl(), securityExtension, serviceContextFactory, defaultServiceContextProvider, diff --git a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/DefaultErrorsImpl.java b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/DefaultErrorsImpl.java new file mode 100644 index 000000000000..6f08cf71875e --- /dev/null +++ b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/DefaultErrorsImpl.java @@ -0,0 +1,31 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest; + +import javax.ws.rs.core.Response; + +public class DefaultErrorsImpl implements Errors { + + @Override + public Response accessDeniedFromKafkaResponse(final Throwable t) { + return Errors.accessDeniedFromKafka(t); + } + + @Override + public String webSocketAuthorizationErrorMessage(final Throwable t) { + return t.getMessage(); + } +} diff --git a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/Errors.java b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/Errors.java index b17890e06974..44b2e1fc2aca 100644 --- a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/Errors.java +++ b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/Errors.java @@ -28,45 +28,42 @@ import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Response; -public final class Errors { - private static final int HTTP_TO_ERROR_CODE_MULTIPLIER = 100; +public interface Errors { + int HTTP_TO_ERROR_CODE_MULTIPLIER = 100; - public static final int ERROR_CODE_BAD_REQUEST = toErrorCode(BAD_REQUEST.getStatusCode()); - public static final int ERROR_CODE_BAD_STATEMENT = toErrorCode(BAD_REQUEST.getStatusCode()) + 1; - public static final int ERROR_CODE_QUERY_ENDPOINT = toErrorCode(BAD_REQUEST.getStatusCode()) + 2; + int ERROR_CODE_BAD_REQUEST = toErrorCode(BAD_REQUEST.getStatusCode()); + int ERROR_CODE_BAD_STATEMENT = toErrorCode(BAD_REQUEST.getStatusCode()) + 1; + int ERROR_CODE_QUERY_ENDPOINT = toErrorCode(BAD_REQUEST.getStatusCode()) + 2; - public static final int ERROR_CODE_UNAUTHORIZED = toErrorCode(UNAUTHORIZED.getStatusCode()); + int ERROR_CODE_UNAUTHORIZED = toErrorCode(UNAUTHORIZED.getStatusCode()); - public static final int ERROR_CODE_FORBIDDEN = toErrorCode(FORBIDDEN.getStatusCode()); - public static final int ERROR_CODE_FORBIDDEN_KAFKA_ACCESS = + int ERROR_CODE_FORBIDDEN = toErrorCode(FORBIDDEN.getStatusCode()); + int ERROR_CODE_FORBIDDEN_KAFKA_ACCESS = toErrorCode(FORBIDDEN.getStatusCode()) + 1; - public static final int ERROR_CODE_NOT_FOUND = toErrorCode(NOT_FOUND.getStatusCode()); + int ERROR_CODE_NOT_FOUND = toErrorCode(NOT_FOUND.getStatusCode()); - public static final int ERROR_CODE_SERVER_SHUTTING_DOWN = + int ERROR_CODE_SERVER_SHUTTING_DOWN = toErrorCode(SERVICE_UNAVAILABLE.getStatusCode()); - public static final int ERROR_CODE_COMMAND_QUEUE_CATCHUP_TIMEOUT = + int ERROR_CODE_COMMAND_QUEUE_CATCHUP_TIMEOUT = toErrorCode(SERVICE_UNAVAILABLE.getStatusCode()) + 1; - public static final int ERROR_CODE_SERVER_NOT_READY = + int ERROR_CODE_SERVER_NOT_READY = toErrorCode(SERVICE_UNAVAILABLE.getStatusCode()) + 2; - private Errors() { - } - - public static final int ERROR_CODE_SERVER_ERROR = + int ERROR_CODE_SERVER_ERROR = toErrorCode(INTERNAL_SERVER_ERROR.getStatusCode()); - public static int toStatusCode(final int errorCode) { + static int toStatusCode(final int errorCode) { return errorCode / HTTP_TO_ERROR_CODE_MULTIPLIER; } - public static int toErrorCode(final int statusCode) { + static int toErrorCode(final int statusCode) { return statusCode * HTTP_TO_ERROR_CODE_MULTIPLIER; } - public static Response notReady() { + static Response notReady() { return Response .status(SERVICE_UNAVAILABLE) .header(HttpHeaders.RETRY_AFTER, 10) @@ -74,35 +71,35 @@ public static Response notReady() { .build(); } - public static Response accessDenied(final String msg) { + static Response accessDenied(final String msg) { return Response .status(FORBIDDEN) .entity(new KsqlErrorMessage(ERROR_CODE_FORBIDDEN, msg)) .build(); } - public static Response accessDeniedFromKafka(final Throwable t) { + static Response accessDeniedFromKafka(final Throwable t) { return Response .status(FORBIDDEN) .entity(new KsqlErrorMessage(ERROR_CODE_FORBIDDEN_KAFKA_ACCESS, t)) .build(); } - public static Response badRequest(final String msg) { + static Response badRequest(final String msg) { return Response .status(BAD_REQUEST) .entity(new KsqlErrorMessage(ERROR_CODE_BAD_REQUEST, msg)) .build(); } - public static Response badRequest(final Throwable t) { + static Response badRequest(final Throwable t) { return Response .status(BAD_REQUEST) .entity(new KsqlErrorMessage(ERROR_CODE_BAD_REQUEST, t)) .build(); } - public static Response badStatement(final String msg, final String statementText) { + static Response badStatement(final String msg, final String statementText) { return badStatement(msg, statementText, new KsqlEntityList()); } @@ -117,7 +114,7 @@ static Response badStatement( .build(); } - public static Response badStatement(final Throwable t, final String statementText) { + static Response badStatement(final Throwable t, final String statementText) { return badStatement(t, statementText, new KsqlEntityList()); } @@ -132,7 +129,7 @@ static Response badStatement( .build(); } - public static Response queryEndpoint(final String statementText) { + static Response queryEndpoint(final String statementText) { return Response .status(BAD_REQUEST) .entity(new KsqlStatementErrorMessage( @@ -146,18 +143,18 @@ statementText, new KsqlEntityList())) .build(); } - public static Response notFound(final String msg) { + static Response notFound(final String msg) { return Response .status(NOT_FOUND) .entity(new KsqlErrorMessage(ERROR_CODE_NOT_FOUND, msg)) .build(); } - public static Response serverErrorForStatement(final Throwable t, final String statementText) { + static Response serverErrorForStatement(final Throwable t, final String statementText) { return serverErrorForStatement(t, statementText, new KsqlEntityList()); } - public static Response serverErrorForStatement( + static Response serverErrorForStatement( final Throwable t, final String statementText, final KsqlEntityList entities) { return Response .status(INTERNAL_SERVER_ERROR) @@ -165,7 +162,7 @@ public static Response serverErrorForStatement( .build(); } - public static Response commandQueueCatchUpTimeout(final long cmdSeqNum) { + static Response commandQueueCatchUpTimeout(final long cmdSeqNum) { final String errorMsg = "Timed out while waiting for a previous command to execute. " + "command sequence number: " + cmdSeqNum; @@ -175,7 +172,7 @@ public static Response commandQueueCatchUpTimeout(final long cmdSeqNum) { .build(); } - public static Response serverShuttingDown() { + static Response serverShuttingDown() { return Response .status(SERVICE_UNAVAILABLE) .entity(new KsqlErrorMessage( @@ -184,10 +181,15 @@ public static Response serverShuttingDown() { .build(); } - public static Response serverNotReady(final KsqlErrorMessage error) { + static Response serverNotReady(final KsqlErrorMessage error) { return Response .status(SERVICE_UNAVAILABLE) .entity(error) .build(); } + + Response accessDeniedFromKafkaResponse(Throwable t); + + String webSocketAuthorizationErrorMessage(Throwable t); + } From 286a5f47426f47f324747ee95c7139ad505833f6 Mon Sep 17 00:00:00 2001 From: Steven Zhang Date: Thu, 12 Dec 2019 13:45:58 -0800 Subject: [PATCH 2/3] refactor the config --- .../ksql/rest/server/KsqlRestApplication.java | 13 +-- .../ksql/rest/server/KsqlRestConfig.java | 4 +- .../streaming/StreamedQueryResource.java | 2 +- .../resources/streaming/WSQueryEndpoint.java | 4 +- .../ksql/rest/util/ErrorResponseUtil.java | 2 +- .../rest/server/computation/RecoveryTest.java | 3 +- .../server/resources/KsqlResourceTest.java | 15 ++-- .../streaming/StreamedQueryResourceTest.java | 38 +++++---- .../streaming/WSQueryEndpointTest.java | 33 +++++++- ...orsImpl.java => DefaultErrorMessages.java} | 13 +-- .../io/confluent/ksql/rest/ErrorMessages.java | 21 +++++ .../java/io/confluent/ksql/rest/Errors.java | 80 +++++++++++-------- 12 files changed, 143 insertions(+), 85 deletions(-) rename ksql-rest-model/src/main/java/io/confluent/ksql/rest/{DefaultErrorsImpl.java => DefaultErrorMessages.java} (67%) create mode 100644 ksql-rest-model/src/main/java/io/confluent/ksql/rest/ErrorMessages.java diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index a7f3222dda5d..63ff38d8f50c 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -39,6 +39,7 @@ import io.confluent.ksql.parser.KsqlParser.ParsedStatement; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; import io.confluent.ksql.query.id.SpecificQueryIdGenerator; +import io.confluent.ksql.rest.ErrorMessages; import io.confluent.ksql.rest.Errors; import io.confluent.ksql.rest.client.RestResponse; import io.confluent.ksql.rest.entity.KsqlEntityList; @@ -382,10 +383,10 @@ protected void registerWebSocketEndpoints(final ServerContainer container) { final StatementParser statementParser = new StatementParser(ksqlEngine); final Optional authorizationValidator = KsqlAuthorizationValidatorFactory.create(ksqlConfigNoPort, serviceContext); - final Errors errorHandler = restConfig.getConfiguredInstance( + final Errors errorHandler = new Errors(restConfig.getConfiguredInstance( KsqlRestConfig.KSQL_SERVER_ERRORS, - Errors.class - ); + ErrorMessages.class + )); container.addEndpoint( ServerEndpointConfig.Builder @@ -504,10 +505,10 @@ static KsqlRestApplication buildApplication( final Optional authorizationValidator = KsqlAuthorizationValidatorFactory.create(ksqlConfig, serviceContext); - final Errors errorHandler = restConfig.getConfiguredInstance( + final Errors errorHandler = new Errors(restConfig.getConfiguredInstance( KsqlRestConfig.KSQL_SERVER_ERRORS, - Errors.class - ); + ErrorMessages.class + )); final StreamedQueryResource streamedQueryResource = new StreamedQueryResource( ksqlEngine, diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java index 0536cdc2c766..a2af2b12372f 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java @@ -15,7 +15,7 @@ package io.confluent.ksql.rest.server; -import io.confluent.ksql.rest.DefaultErrorsImpl; +import io.confluent.ksql.rest.DefaultErrorMessages; import io.confluent.ksql.util.KsqlException; import io.confluent.rest.RestConfig; import java.util.Map; @@ -146,7 +146,7 @@ public class KsqlRestConfig extends RestConfig { ).define( KSQL_SERVER_ERRORS, Type.CLASS, - DefaultErrorsImpl.class, + DefaultErrorMessages.class, Importance.LOW, KSQL_SERVER_ERRORS_DOC ); diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java index 2aedbfbb0f58..033c18eca203 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java @@ -229,7 +229,7 @@ private Response handleStatement( "Statement type `%s' not supported for this resource", statement.getClass().getName())); } catch (final TopicAuthorizationException e) { - return errorHandler.accessDeniedFromKafkaResponse(e); + return errorHandler.accessDeniedFromKafka(e); } catch (final KsqlStatementException e) { return Errors.badStatement(e.getRawMessage(), e.getSqlStatement()); } catch (final KsqlException e) { diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java index 0d43755b5bb0..ee5e296285d8 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java @@ -232,7 +232,9 @@ public void onOpen(final Session session, final EndpointConfig unused) { } catch (final TopicAuthorizationException e) { log.debug("Error processing request", e); SessionUtil.closeSilently( - session, CloseCodes.CANNOT_ACCEPT, errorHandler.webSocketAuthorizationErrorMessage(e)); + session, + CloseCodes.CANNOT_ACCEPT, + errorHandler.webSocketKafkaAuthorizationErrorMessage(e)); } catch (final Exception e) { log.debug("Error processing request", e); SessionUtil.closeSilently(session, CloseCodes.CANNOT_ACCEPT, e.getMessage()); diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/ErrorResponseUtil.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/ErrorResponseUtil.java index 051f8b30e708..cc7aa838346b 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/ErrorResponseUtil.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/ErrorResponseUtil.java @@ -32,7 +32,7 @@ public static Response generateResponse( final Errors errorHandler ) { if (ExceptionUtils.indexOfType(e, TopicAuthorizationException.class) >= 0) { - return errorHandler.accessDeniedFromKafkaResponse(e); + return errorHandler.accessDeniedFromKafka(e); } else { return defaultResponse; } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java index 21bccfbe5739..b411186dbbef 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java @@ -38,7 +38,6 @@ import io.confluent.ksql.name.SourceName; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.query.id.SpecificQueryIdGenerator; -import io.confluent.ksql.rest.DefaultErrorsImpl; import io.confluent.ksql.rest.Errors; import io.confluent.ksql.rest.entity.CommandId; import io.confluent.ksql.rest.entity.CommandId.Action; @@ -223,7 +222,7 @@ private class KsqlServer { Duration.ofMillis(0), ()->{}, Optional.of((sc, metastore, statement) -> { }), - new DefaultErrorsImpl() + mock(Errors.class) ); this.statementExecutor.configure(ksqlConfig); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java index e8459ae6fa9b..f7b94a1d6c01 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java @@ -18,6 +18,7 @@ import static io.confluent.ksql.parser.ParserMatchers.configured; import static io.confluent.ksql.parser.ParserMatchers.preparedStatement; import static io.confluent.ksql.parser.ParserMatchers.preparedStatementText; +import static io.confluent.ksql.rest.Errors.ERROR_CODE_FORBIDDEN_KAFKA_ACCESS; import static io.confluent.ksql.rest.entity.CommandId.Action.CREATE; import static io.confluent.ksql.rest.entity.CommandId.Action.DROP; import static io.confluent.ksql.rest.entity.CommandId.Action.EXECUTE; @@ -31,6 +32,7 @@ import static io.confluent.ksql.rest.server.resources.KsqlRestExceptionMatchers.exceptionStatementErrorMessage; import static io.confluent.ksql.rest.server.resources.KsqlRestExceptionMatchers.exceptionStatusCode; import static java.util.Collections.emptyMap; +import static javax.ws.rs.core.Response.Status.FORBIDDEN; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.hasItem; @@ -87,7 +89,6 @@ import io.confluent.ksql.parser.tree.TableElement.Namespace; import io.confluent.ksql.parser.tree.TableElements; import io.confluent.ksql.parser.tree.TerminateQuery; -import io.confluent.ksql.rest.DefaultErrorsImpl; import io.confluent.ksql.rest.Errors; import io.confluent.ksql.rest.entity.ClusterTerminateRequest; import io.confluent.ksql.rest.entity.CommandId; @@ -238,6 +239,11 @@ public class KsqlResourceTest { .valueColumn(ColumnName.of("f1"), SqlTypes.STRING) .build(); + private static Response AUTHORIZATION_ERROR_RESPONSE = Response + .status(FORBIDDEN) + .entity(new KsqlErrorMessage(ERROR_CODE_FORBIDDEN_KAFKA_ACCESS, "some error")) + .build(); + @Rule public final ExpectedException expectedException = ExpectedException.none(); @@ -334,6 +340,8 @@ public void setUp() throws IOException, RestClientException { when(topicInjector.inject(any())) .thenAnswer(inv -> inv.getArgument(0)); + when(errorsHandler.accessDeniedFromKafka(any(Exception.class))).thenReturn(AUTHORIZATION_ERROR_RESPONSE); + setUpKsqlResource(); } @@ -782,9 +790,6 @@ public void shouldReturnForbiddenKafkaAccessIfRootCauseKsqlTopicAuthorizationExc // Then: assertThat(result, is(instanceOf(KsqlErrorMessage.class))); assertThat(result.getErrorCode(), is(Errors.ERROR_CODE_FORBIDDEN_KAFKA_ACCESS)); - assertThat(result.getMessage(), is( - "Could not delete the corresponding kafka topic: topic\n" + - "Caused by: Authorization denied to Delete on topic(s): [topic]")); } @Test @@ -2084,7 +2089,7 @@ private void setUpKsqlResource() { topicInjectorFactory.apply(ec), new TopicDeleteInjector(ec, sc)), Optional.of(authorizationValidator), - new DefaultErrorsImpl() + errorsHandler ); ksqlResource.configure(ksqlConfig); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java index 21a5d911e217..ed4b32dac315 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java @@ -17,8 +17,10 @@ import static io.confluent.ksql.rest.entity.KsqlErrorMessageMatchers.errorCode; import static io.confluent.ksql.rest.entity.KsqlErrorMessageMatchers.errorMessage; +import static io.confluent.ksql.rest.Errors.ERROR_CODE_FORBIDDEN_KAFKA_ACCESS; import static io.confluent.ksql.rest.server.resources.KsqlRestExceptionMatchers.exceptionErrorMessage; import static io.confluent.ksql.rest.server.resources.KsqlRestExceptionMatchers.exceptionStatusCode; +import static javax.ws.rs.core.Response.Status.FORBIDDEN; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.instanceOf; @@ -46,7 +48,6 @@ import io.confluent.ksql.parser.tree.PrintTopic; import io.confluent.ksql.parser.tree.Query; import io.confluent.ksql.parser.tree.Statement; -import io.confluent.ksql.rest.DefaultErrorsImpl; import io.confluent.ksql.rest.Errors; import io.confluent.ksql.rest.entity.KsqlErrorMessage; import io.confluent.ksql.rest.entity.KsqlRequest; @@ -112,6 +113,11 @@ public class StreamedQueryResourceTest { StreamsConfig.APPLICATION_SERVER_CONFIG, "something:1" )); private static final Long closeTimeout = KsqlConfig.KSQL_SHUTDOWN_TIMEOUT_MS_DEFAULT; + + private static Response AUTHORIZATION_ERROR_RESPONSE = Response + .status(FORBIDDEN) + .entity(new KsqlErrorMessage(ERROR_CODE_FORBIDDEN_KAFKA_ACCESS, "some error")) + .build(); private static final String TOPIC_NAME = "test_stream"; private static final String PUSH_QUERY_STRING = "SELECT * FROM " + TOPIC_NAME + " EMIT CHANGES;"; @@ -137,6 +143,8 @@ public class StreamedQueryResourceTest { private Consumer queryCloseCallback; @Mock private KsqlAuthorizationValidator authorizationValidator; + @Mock + private Errors errorsHandler; private StreamedQueryResource testResource; private PreparedStatement invalid; private PreparedStatement query; @@ -153,6 +161,7 @@ public void setup() { when(pullQuery.isPullQuery()).thenReturn(true); final PreparedStatement pullQueryStatement = PreparedStatement.of(PULL_QUERY_STRING, pullQuery); when(mockStatementParser.parseSingleStatement(PULL_QUERY_STRING)).thenReturn(pullQueryStatement); + when(errorsHandler.accessDeniedFromKafka(any(Exception.class))).thenReturn(AUTHORIZATION_ERROR_RESPONSE); testResource = new StreamedQueryResource( mockKsqlEngine, @@ -162,7 +171,7 @@ public void setup() { COMMAND_QUEUE_CATCHUP_TIMOEUT, activenessRegistrar, Optional.of(authorizationValidator), - new DefaultErrorsImpl() + errorsHandler ); testResource.configure(VALID_CONFIG); @@ -188,7 +197,7 @@ public void shouldThrowOnHandleStatementIfNotConfigured() { COMMAND_QUEUE_CATCHUP_TIMOEUT, activenessRegistrar, Optional.of(authorizationValidator), - new DefaultErrorsImpl() + errorsHandler ); // Then: @@ -553,12 +562,9 @@ public void shouldReturnForbiddenKafkaAccessIfKsqlTopicAuthorizationException() new KsqlRequest(PUSH_QUERY_STRING, Collections.emptyMap(), null) ); - final Response expected = Errors.accessDeniedFromKafka( - new KsqlTopicAuthorizationException(AclOperation.READ, Collections.singleton(TOPIC_NAME))); - final KsqlErrorMessage responseEntity = (KsqlErrorMessage) response.getEntity(); - final KsqlErrorMessage expectedEntity = (KsqlErrorMessage) expected.getEntity(); - assertEquals(response.getStatus(), expected.getStatus()); + final KsqlErrorMessage expectedEntity = (KsqlErrorMessage) AUTHORIZATION_ERROR_RESPONSE.getEntity(); + assertEquals(response.getStatus(), AUTHORIZATION_ERROR_RESPONSE.getStatus()); assertEquals(responseEntity.getMessage(), expectedEntity.getMessage()); } @@ -578,14 +584,9 @@ public void shouldReturnForbiddenKafkaAccessIfRootCauseKsqlTopicAuthorizationExc new KsqlRequest(PUSH_QUERY_STRING, Collections.emptyMap(), null) ); - final Response expected = Errors.accessDeniedFromKafka( - new KsqlException( - "", - new KsqlTopicAuthorizationException(AclOperation.READ, Collections.singleton(TOPIC_NAME)))); - final KsqlErrorMessage responseEntity = (KsqlErrorMessage) response.getEntity(); - final KsqlErrorMessage expectedEntity = (KsqlErrorMessage) expected.getEntity(); - assertEquals(response.getStatus(), expected.getStatus()); + final KsqlErrorMessage expectedEntity = (KsqlErrorMessage) AUTHORIZATION_ERROR_RESPONSE.getEntity(); + assertEquals(response.getStatus(), AUTHORIZATION_ERROR_RESPONSE.getStatus()); assertEquals(responseEntity.getMessage(), expectedEntity.getMessage()); } @@ -606,11 +607,8 @@ public void shouldReturnForbiddenKafkaAccessIfPrintTopicKsqlTopicAuthorizationEx new KsqlRequest(PRINT_TOPIC, Collections.emptyMap(), null) ); - final Response expected = Errors.accessDeniedFromKafka( - new KsqlTopicAuthorizationException(AclOperation.READ, Collections.singleton(TOPIC_NAME))); - - assertEquals(response.getStatus(), expected.getStatus()); - assertEquals(response.getEntity(), expected.getEntity()); + assertEquals(response.getStatus(), AUTHORIZATION_ERROR_RESPONSE.getStatus()); + assertEquals(response.getEntity(), AUTHORIZATION_ERROR_RESPONSE.getEntity()); } @Test diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpointTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpointTest.java index 2ca64b081ba1..262d5454d090 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpointTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpointTest.java @@ -36,14 +36,16 @@ import com.google.common.util.concurrent.ListeningScheduledExecutorService; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.ksql.engine.KsqlEngine; +import io.confluent.ksql.exception.KsqlTopicAuthorizationException; import io.confluent.ksql.json.JsonMapper; +import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; import io.confluent.ksql.parser.tree.Query; import io.confluent.ksql.parser.tree.Relation; import io.confluent.ksql.parser.tree.ResultMaterialization; import io.confluent.ksql.parser.tree.Select; import io.confluent.ksql.parser.tree.Statement; -import io.confluent.ksql.rest.DefaultErrorsImpl; +import io.confluent.ksql.rest.Errors; import io.confluent.ksql.rest.entity.KsqlErrorMessage; import io.confluent.ksql.rest.entity.KsqlRequest; import io.confluent.ksql.rest.entity.Versions; @@ -81,6 +83,8 @@ import javax.websocket.Session; import javax.ws.rs.core.Response; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.errors.TopicAuthorizationException; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -148,12 +152,16 @@ public class WSQueryEndpointTest { @Mock private ServiceContext serviceContext; @Mock + private MetaStore metaStore; + @Mock private UserServiceContextFactory serviceContextFactory; @Mock private ServerState serverState; @Mock private KsqlUserContextProvider userContextProvider; @Mock + private Errors errorsHandler; + @Mock private DefaultServiceContextFactory defaultServiceContextProvider; @Captor private ArgumentCaptor closeReasonCaptor; @@ -178,6 +186,7 @@ public void setUp() { when(defaultServiceContextProvider.create(any(), any())).thenReturn(serviceContext); when(serviceContext.getTopicClient()).thenReturn(topicClient); when(serverState.checkReady()).thenReturn(Optional.empty()); + when(ksqlEngine.getMetaStore()).thenReturn(metaStore); givenRequest(VALID_REQUEST); wsQueryEndpoint = new WSQueryEndpoint( @@ -193,7 +202,7 @@ public void setUp() { activenessRegistrar, COMMAND_QUEUE_CATCHUP_TIMEOUT, Optional.of(authorizationValidator), - new DefaultErrorsImpl(), + errorsHandler, securityExtension, serviceContextFactory, defaultServiceContextProvider, @@ -385,6 +394,26 @@ public void shouldHandlePushQuery() { any()); } + @Test + public void shouldReturnErrorMessageWhenTopicAuthorizationException() throws Exception { + // Given: + final String errorMessage = "authorization error"; + givenRequestIs(query); + when(errorsHandler.webSocketKafkaAuthorizationErrorMessage(any(TopicAuthorizationException.class))) + .thenReturn(errorMessage); + doThrow(new KsqlTopicAuthorizationException(AclOperation.CREATE, Collections.singleton("topic"))) + .when(authorizationValidator).checkAuthorization(serviceContext, metaStore, query); + + // When: + wsQueryEndpoint.onOpen(session, null); + + // Then: + verifyClosedContainingReason( + errorMessage, + CloseCodes.CANNOT_ACCEPT + ); + } + @Test public void shouldHandlePullQuery() { // Given: diff --git a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/DefaultErrorsImpl.java b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/DefaultErrorMessages.java similarity index 67% rename from ksql-rest-model/src/main/java/io/confluent/ksql/rest/DefaultErrorsImpl.java rename to ksql-rest-model/src/main/java/io/confluent/ksql/rest/DefaultErrorMessages.java index 6f08cf71875e..d6ff2a273256 100644 --- a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/DefaultErrorsImpl.java +++ b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/DefaultErrorMessages.java @@ -15,17 +15,10 @@ package io.confluent.ksql.rest; -import javax.ws.rs.core.Response; - -public class DefaultErrorsImpl implements Errors { - - @Override - public Response accessDeniedFromKafkaResponse(final Throwable t) { - return Errors.accessDeniedFromKafka(t); - } +public class DefaultErrorMessages implements ErrorMessages { @Override - public String webSocketAuthorizationErrorMessage(final Throwable t) { - return t.getMessage(); + public String kafkaAuthorizationErrorMessage(final Exception e) { + return e.getMessage(); } } diff --git a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/ErrorMessages.java b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/ErrorMessages.java new file mode 100644 index 000000000000..064399ae4f9b --- /dev/null +++ b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/ErrorMessages.java @@ -0,0 +1,21 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest; + +public interface ErrorMessages { + + String kafkaAuthorizationErrorMessage(Exception e); +} diff --git a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/Errors.java b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/Errors.java index 44b2e1fc2aca..e1e14356fbc2 100644 --- a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/Errors.java +++ b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/Errors.java @@ -28,42 +28,44 @@ import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Response; -public interface Errors { - int HTTP_TO_ERROR_CODE_MULTIPLIER = 100; +public class Errors { + private static final int HTTP_TO_ERROR_CODE_MULTIPLIER = 100; - int ERROR_CODE_BAD_REQUEST = toErrorCode(BAD_REQUEST.getStatusCode()); - int ERROR_CODE_BAD_STATEMENT = toErrorCode(BAD_REQUEST.getStatusCode()) + 1; - int ERROR_CODE_QUERY_ENDPOINT = toErrorCode(BAD_REQUEST.getStatusCode()) + 2; + public static final int ERROR_CODE_BAD_REQUEST = toErrorCode(BAD_REQUEST.getStatusCode()); + public static final int ERROR_CODE_BAD_STATEMENT = toErrorCode(BAD_REQUEST.getStatusCode()) + 1; + private static final int ERROR_CODE_QUERY_ENDPOINT = toErrorCode(BAD_REQUEST.getStatusCode()) + 2; - int ERROR_CODE_UNAUTHORIZED = toErrorCode(UNAUTHORIZED.getStatusCode()); + public static final int ERROR_CODE_UNAUTHORIZED = toErrorCode(UNAUTHORIZED.getStatusCode()); - int ERROR_CODE_FORBIDDEN = toErrorCode(FORBIDDEN.getStatusCode()); - int ERROR_CODE_FORBIDDEN_KAFKA_ACCESS = + public static final int ERROR_CODE_FORBIDDEN = toErrorCode(FORBIDDEN.getStatusCode()); + public static final int ERROR_CODE_FORBIDDEN_KAFKA_ACCESS = toErrorCode(FORBIDDEN.getStatusCode()) + 1; - int ERROR_CODE_NOT_FOUND = toErrorCode(NOT_FOUND.getStatusCode()); + public static final int ERROR_CODE_NOT_FOUND = toErrorCode(NOT_FOUND.getStatusCode()); - int ERROR_CODE_SERVER_SHUTTING_DOWN = + public static final int ERROR_CODE_SERVER_SHUTTING_DOWN = toErrorCode(SERVICE_UNAVAILABLE.getStatusCode()); - int ERROR_CODE_COMMAND_QUEUE_CATCHUP_TIMEOUT = + public static final int ERROR_CODE_COMMAND_QUEUE_CATCHUP_TIMEOUT = toErrorCode(SERVICE_UNAVAILABLE.getStatusCode()) + 1; - int ERROR_CODE_SERVER_NOT_READY = + public static final int ERROR_CODE_SERVER_NOT_READY = toErrorCode(SERVICE_UNAVAILABLE.getStatusCode()) + 2; - int ERROR_CODE_SERVER_ERROR = + public static final int ERROR_CODE_SERVER_ERROR = toErrorCode(INTERNAL_SERVER_ERROR.getStatusCode()); - static int toStatusCode(final int errorCode) { + private final ErrorMessages errorMessages; + + public static int toStatusCode(final int errorCode) { return errorCode / HTTP_TO_ERROR_CODE_MULTIPLIER; } - static int toErrorCode(final int statusCode) { + public static int toErrorCode(final int statusCode) { return statusCode * HTTP_TO_ERROR_CODE_MULTIPLIER; } - static Response notReady() { + public static Response notReady() { return Response .status(SERVICE_UNAVAILABLE) .header(HttpHeaders.RETRY_AFTER, 10) @@ -71,39 +73,39 @@ static Response notReady() { .build(); } - static Response accessDenied(final String msg) { + public static Response accessDenied(final String msg) { return Response .status(FORBIDDEN) .entity(new KsqlErrorMessage(ERROR_CODE_FORBIDDEN, msg)) .build(); } - static Response accessDeniedFromKafka(final Throwable t) { + private Response constructAccessDeniedFromKafkaResponse(final String errorMessage) { return Response .status(FORBIDDEN) - .entity(new KsqlErrorMessage(ERROR_CODE_FORBIDDEN_KAFKA_ACCESS, t)) + .entity(new KsqlErrorMessage(ERROR_CODE_FORBIDDEN_KAFKA_ACCESS, errorMessage)) .build(); } - static Response badRequest(final String msg) { + public static Response badRequest(final String msg) { return Response .status(BAD_REQUEST) .entity(new KsqlErrorMessage(ERROR_CODE_BAD_REQUEST, msg)) .build(); } - static Response badRequest(final Throwable t) { + public static Response badRequest(final Throwable t) { return Response .status(BAD_REQUEST) .entity(new KsqlErrorMessage(ERROR_CODE_BAD_REQUEST, t)) .build(); } - static Response badStatement(final String msg, final String statementText) { + public static Response badStatement(final String msg, final String statementText) { return badStatement(msg, statementText, new KsqlEntityList()); } - static Response badStatement( + public static Response badStatement( final String msg, final String statementText, final KsqlEntityList entities) { @@ -114,11 +116,11 @@ static Response badStatement( .build(); } - static Response badStatement(final Throwable t, final String statementText) { + public static Response badStatement(final Throwable t, final String statementText) { return badStatement(t, statementText, new KsqlEntityList()); } - static Response badStatement( + public static Response badStatement( final Throwable t, final String statementText, final KsqlEntityList entities) { @@ -129,7 +131,7 @@ static Response badStatement( .build(); } - static Response queryEndpoint(final String statementText) { + public static Response queryEndpoint(final String statementText) { return Response .status(BAD_REQUEST) .entity(new KsqlStatementErrorMessage( @@ -143,18 +145,18 @@ statementText, new KsqlEntityList())) .build(); } - static Response notFound(final String msg) { + public static Response notFound(final String msg) { return Response .status(NOT_FOUND) .entity(new KsqlErrorMessage(ERROR_CODE_NOT_FOUND, msg)) .build(); } - static Response serverErrorForStatement(final Throwable t, final String statementText) { + public static Response serverErrorForStatement(final Throwable t, final String statementText) { return serverErrorForStatement(t, statementText, new KsqlEntityList()); } - static Response serverErrorForStatement( + public static Response serverErrorForStatement( final Throwable t, final String statementText, final KsqlEntityList entities) { return Response .status(INTERNAL_SERVER_ERROR) @@ -162,7 +164,7 @@ static Response serverErrorForStatement( .build(); } - static Response commandQueueCatchUpTimeout(final long cmdSeqNum) { + public static Response commandQueueCatchUpTimeout(final long cmdSeqNum) { final String errorMsg = "Timed out while waiting for a previous command to execute. " + "command sequence number: " + cmdSeqNum; @@ -172,7 +174,7 @@ static Response commandQueueCatchUpTimeout(final long cmdSeqNum) { .build(); } - static Response serverShuttingDown() { + public static Response serverShuttingDown() { return Response .status(SERVICE_UNAVAILABLE) .entity(new KsqlErrorMessage( @@ -181,15 +183,23 @@ static Response serverShuttingDown() { .build(); } - static Response serverNotReady(final KsqlErrorMessage error) { + public static Response serverNotReady(final KsqlErrorMessage error) { return Response .status(SERVICE_UNAVAILABLE) .entity(error) .build(); } - - Response accessDeniedFromKafkaResponse(Throwable t); - String webSocketAuthorizationErrorMessage(Throwable t); + + public Errors(final ErrorMessages errorMessages) { + this.errorMessages = errorMessages; + } + public Response accessDeniedFromKafka(final Exception e) { + return constructAccessDeniedFromKafkaResponse(errorMessages.kafkaAuthorizationErrorMessage(e)); + } + + public String webSocketKafkaAuthorizationErrorMessage(final Exception e) { + return errorMessages.kafkaAuthorizationErrorMessage(e); + } } From 2ba7a26712daa1e0d0ada98e2517d9d6af92be8e Mon Sep 17 00:00:00 2001 From: Steven Zhang Date: Mon, 16 Dec 2019 12:51:46 -0800 Subject: [PATCH 3/3] changes --- .../ksql/rest/server/KsqlRestApplication.java | 4 +- .../ksql/rest/server/KsqlRestConfig.java | 11 +-- .../rest/server/resources/KsqlResource.java | 8 +- .../streaming/StreamedQueryResource.java | 6 +- .../resources/streaming/WSQueryEndpoint.java | 3 +- .../ksql/rest/util/ErrorResponseUtil.java | 40 --------- .../server/resources/KsqlResourceTest.java | 40 +++------ .../streaming/StreamedQueryResourceTest.java | 25 +----- .../streaming/WSQueryEndpointTest.java | 2 +- .../java/io/confluent/ksql/rest/Errors.java | 23 ++++- .../io/confluent/ksql/rest/ErrorsTest.java | 89 +++++++++++++++++++ 11 files changed, 138 insertions(+), 113 deletions(-) delete mode 100644 ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/ErrorResponseUtil.java create mode 100644 ksql-rest-model/src/test/java/io/confluent/ksql/rest/ErrorsTest.java diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index 63ff38d8f50c..24bff5b12850 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -384,7 +384,7 @@ protected void registerWebSocketEndpoints(final ServerContainer container) { final Optional authorizationValidator = KsqlAuthorizationValidatorFactory.create(ksqlConfigNoPort, serviceContext); final Errors errorHandler = new Errors(restConfig.getConfiguredInstance( - KsqlRestConfig.KSQL_SERVER_ERRORS, + KsqlRestConfig.KSQL_SERVER_ERROR_MESSAGES, ErrorMessages.class )); @@ -506,7 +506,7 @@ static KsqlRestApplication buildApplication( KsqlAuthorizationValidatorFactory.create(ksqlConfig, serviceContext); final Errors errorHandler = new Errors(restConfig.getConfiguredInstance( - KsqlRestConfig.KSQL_SERVER_ERRORS, + KsqlRestConfig.KSQL_SERVER_ERROR_MESSAGES, ErrorMessages.class )); diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java index a2af2b12372f..e1971e44e749 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java @@ -16,6 +16,7 @@ package io.confluent.ksql.rest.server; import io.confluent.ksql.rest.DefaultErrorMessages; +import io.confluent.ksql.rest.ErrorMessages; import io.confluent.ksql.util.KsqlException; import io.confluent.rest.RestConfig; import java.util.Map; @@ -65,11 +66,11 @@ public class KsqlRestConfig extends RestConfig { + "will not start serving requests until all preconditions are satisfied. Until that time, " + "requests will return a 503 error"; - static final String KSQL_SERVER_ERRORS = - KSQL_CONFIG_PREFIX + "server.errors"; + static final String KSQL_SERVER_ERROR_MESSAGES = + KSQL_CONFIG_PREFIX + "server.error.messages"; private static final String KSQL_SERVER_ERRORS_DOC = - "A class implementing Errors. This allows the KSQL server to return pluggable " - + "error messages."; + "A class the implementing " + ErrorMessages.class.getSimpleName() + " interface." + + "This allows the KSQL server to return pluggable error messages."; static final String KSQL_SERVER_ENABLE_UNCAUGHT_EXCEPTION_HANDLER = KSQL_CONFIG_PREFIX + "server.exception.uncaught.handler.enable"; @@ -144,7 +145,7 @@ public class KsqlRestConfig extends RestConfig { Importance.LOW, KSQL_COMMAND_RUNNER_BLOCKED_THRESHHOLD_ERROR_MS_DOC ).define( - KSQL_SERVER_ERRORS, + KSQL_SERVER_ERROR_MESSAGES, Type.CLASS, DefaultErrorMessages.class, Importance.LOW, diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java index 76d80c0c9f1c..931b794ec90a 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java @@ -42,7 +42,6 @@ import io.confluent.ksql.rest.server.validation.CustomValidators; import io.confluent.ksql.rest.server.validation.RequestValidator; import io.confluent.ksql.rest.util.CommandStoreUtil; -import io.confluent.ksql.rest.util.ErrorResponseUtil; import io.confluent.ksql.rest.util.TerminateCluster; import io.confluent.ksql.security.KsqlAuthorizationValidator; import io.confluent.ksql.services.SandboxedServiceContext; @@ -236,11 +235,10 @@ public Response handleKsqlStatements( } catch (final KsqlStatementException e) { return Errors.badStatement(e.getRawMessage(), e.getSqlStatement()); } catch (final KsqlException e) { - return ErrorResponseUtil.generateResponse( - e, Errors.badRequest(e), errorHandler); + return errorHandler.generateResponse(e, Errors.badRequest(e)); } catch (final Exception e) { - return ErrorResponseUtil.generateResponse( - e, Errors.serverErrorForStatement(e, request.getKsql()), errorHandler); + return errorHandler.generateResponse( + e, Errors.serverErrorForStatement(e, request.getKsql())); } } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java index 033c18eca203..57f1df807025 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java @@ -35,7 +35,6 @@ import io.confluent.ksql.rest.server.resources.KsqlConfigurable; import io.confluent.ksql.rest.server.resources.KsqlRestException; import io.confluent.ksql.rest.util.CommandStoreUtil; -import io.confluent.ksql.rest.util.ErrorResponseUtil; import io.confluent.ksql.security.KsqlAuthorizationValidator; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; @@ -229,12 +228,11 @@ private Response handleStatement( "Statement type `%s' not supported for this resource", statement.getClass().getName())); } catch (final TopicAuthorizationException e) { - return errorHandler.accessDeniedFromKafka(e); + return errorHandler.accessDeniedFromKafkaResponse(e); } catch (final KsqlStatementException e) { return Errors.badStatement(e.getRawMessage(), e.getSqlStatement()); } catch (final KsqlException e) { - return ErrorResponseUtil.generateResponse( - e, Errors.badRequest(e), errorHandler); + return errorHandler.generateResponse(e, Errors.badRequest(e)); } } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java index ee5e296285d8..0d9fd46614fc 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java @@ -61,7 +61,6 @@ import javax.websocket.Session; import javax.websocket.server.ServerEndpoint; import javax.ws.rs.core.Response; - import org.apache.kafka.common.errors.TopicAuthorizationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -234,7 +233,7 @@ public void onOpen(final Session session, final EndpointConfig unused) { SessionUtil.closeSilently( session, CloseCodes.CANNOT_ACCEPT, - errorHandler.webSocketKafkaAuthorizationErrorMessage(e)); + errorHandler.kafkaAuthorizationErrorMessage(e)); } catch (final Exception e) { log.debug("Error processing request", e); SessionUtil.closeSilently(session, CloseCodes.CANNOT_ACCEPT, e.getMessage()); diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/ErrorResponseUtil.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/ErrorResponseUtil.java deleted file mode 100644 index cc7aa838346b..000000000000 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/ErrorResponseUtil.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright 2019 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.rest.util; - -import io.confluent.ksql.rest.Errors; -import javax.ws.rs.core.Response; -import org.apache.commons.lang3.exception.ExceptionUtils; -import org.apache.kafka.common.errors.TopicAuthorizationException; - - -public final class ErrorResponseUtil { - - private ErrorResponseUtil() { - } - - public static Response generateResponse( - final Exception e, - final Response defaultResponse, - final Errors errorHandler - ) { - if (ExceptionUtils.indexOfType(e, TopicAuthorizationException.class) >= 0) { - return errorHandler.accessDeniedFromKafka(e); - } else { - return defaultResponse; - } - } -} diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java index f7b94a1d6c01..658c1eb02930 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java @@ -179,6 +179,7 @@ import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.mockito.Mock; +import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.MockitoJUnitRunner; import org.mockito.stubbing.Answer; @@ -239,11 +240,6 @@ public class KsqlResourceTest { .valueColumn(ColumnName.of("f1"), SqlTypes.STRING) .build(); - private static Response AUTHORIZATION_ERROR_RESPONSE = Response - .status(FORBIDDEN) - .entity(new KsqlErrorMessage(ERROR_CODE_FORBIDDEN_KAFKA_ACCESS, "some error")) - .build(); - @Rule public final ExpectedException expectedException = ExpectedException.none(); @@ -340,7 +336,13 @@ public void setUp() throws IOException, RestClientException { when(topicInjector.inject(any())) .thenAnswer(inv -> inv.getArgument(0)); - when(errorsHandler.accessDeniedFromKafka(any(Exception.class))).thenReturn(AUTHORIZATION_ERROR_RESPONSE); + when(errorsHandler.generateResponse(any(), any())).thenAnswer(new Answer() { + @Override + public Response answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + return (Response) args[1]; + } + }); setUpKsqlResource(); } @@ -758,6 +760,11 @@ public void shouldFailIfCreateStatementMissingKafkaTopicName() { @Test public void shouldReturnForbiddenKafkaAccessIfKsqlTopicAuthorizationException() { // Given: + final String errorMsg = "some error"; + when(errorsHandler.generateResponse(any(), any())).thenReturn(Response + .status(FORBIDDEN) + .entity(new KsqlErrorMessage(ERROR_CODE_FORBIDDEN_KAFKA_ACCESS, errorMsg)) + .build()); doThrow(new KsqlTopicAuthorizationException( AclOperation.DELETE, Collections.singleton("topic"))).when(authorizationValidator).checkAuthorization(any(), any(), any()); @@ -770,26 +777,7 @@ public void shouldReturnForbiddenKafkaAccessIfKsqlTopicAuthorizationException() // Then: assertThat(result, is(instanceOf(KsqlErrorMessage.class))); assertThat(result.getErrorCode(), is(Errors.ERROR_CODE_FORBIDDEN_KAFKA_ACCESS)); - } - - @Test - public void shouldReturnForbiddenKafkaAccessIfRootCauseKsqlTopicAuthorizationException() { - // Given: - doThrow(new KsqlException("Could not delete the corresponding kafka topic: topic", - new KsqlTopicAuthorizationException( - AclOperation.DELETE, - Collections.singleton("topic")))) - .when(authorizationValidator).checkAuthorization(any(), any(), any()); - - - // When: - final KsqlErrorMessage result = makeFailingRequest( - "DROP STREAM TEST_STREAM DELETE TOPIC;", - Code.FORBIDDEN); - - // Then: - assertThat(result, is(instanceOf(KsqlErrorMessage.class))); - assertThat(result.getErrorCode(), is(Errors.ERROR_CODE_FORBIDDEN_KAFKA_ACCESS)); + assertThat(result.getMessage(), is(errorMsg)); } @Test diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java index ed4b32dac315..8f4ee101ae2b 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java @@ -62,7 +62,6 @@ import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KsqlConfig; -import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.QueryMetadata; import io.confluent.ksql.util.TransientQueryMetadata; import io.confluent.ksql.version.metrics.ActivenessRegistrar; @@ -161,7 +160,7 @@ public void setup() { when(pullQuery.isPullQuery()).thenReturn(true); final PreparedStatement pullQueryStatement = PreparedStatement.of(PULL_QUERY_STRING, pullQuery); when(mockStatementParser.parseSingleStatement(PULL_QUERY_STRING)).thenReturn(pullQueryStatement); - when(errorsHandler.accessDeniedFromKafka(any(Exception.class))).thenReturn(AUTHORIZATION_ERROR_RESPONSE); + when(errorsHandler.accessDeniedFromKafkaResponse(any(Exception.class))).thenReturn(AUTHORIZATION_ERROR_RESPONSE); testResource = new StreamedQueryResource( mockKsqlEngine, @@ -568,28 +567,6 @@ public void shouldReturnForbiddenKafkaAccessIfKsqlTopicAuthorizationException() assertEquals(responseEntity.getMessage(), expectedEntity.getMessage()); } - @Test - public void shouldReturnForbiddenKafkaAccessIfRootCauseKsqlTopicAuthorizationException() { - // Given: - when(mockStatementParser.parseSingleStatement(PUSH_QUERY_STRING)) - .thenReturn(query); - doThrow(new KsqlException( - "", - new KsqlTopicAuthorizationException(AclOperation.READ, Collections.singleton(TOPIC_NAME)))) - .when(authorizationValidator).checkAuthorization(any(), any(), any()); - - // When: - final Response response = testResource.streamQuery( - serviceContext, - new KsqlRequest(PUSH_QUERY_STRING, Collections.emptyMap(), null) - ); - - final KsqlErrorMessage responseEntity = (KsqlErrorMessage) response.getEntity(); - final KsqlErrorMessage expectedEntity = (KsqlErrorMessage) AUTHORIZATION_ERROR_RESPONSE.getEntity(); - assertEquals(response.getStatus(), AUTHORIZATION_ERROR_RESPONSE.getStatus()); - assertEquals(responseEntity.getMessage(), expectedEntity.getMessage()); - } - @Test public void shouldReturnForbiddenKafkaAccessIfPrintTopicKsqlTopicAuthorizationException() { // Given: diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpointTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpointTest.java index 262d5454d090..86a312ec679d 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpointTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpointTest.java @@ -399,7 +399,7 @@ public void shouldReturnErrorMessageWhenTopicAuthorizationException() throws Exc // Given: final String errorMessage = "authorization error"; givenRequestIs(query); - when(errorsHandler.webSocketKafkaAuthorizationErrorMessage(any(TopicAuthorizationException.class))) + when(errorsHandler.kafkaAuthorizationErrorMessage(any(TopicAuthorizationException.class))) .thenReturn(errorMessage); doThrow(new KsqlTopicAuthorizationException(AclOperation.CREATE, Collections.singleton("topic"))) .when(authorizationValidator).checkAuthorization(serviceContext, metaStore, query); diff --git a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/Errors.java b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/Errors.java index e1e14356fbc2..78b76a0d14a8 100644 --- a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/Errors.java +++ b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/Errors.java @@ -25,10 +25,14 @@ import io.confluent.ksql.rest.entity.KsqlEntityList; import io.confluent.ksql.rest.entity.KsqlErrorMessage; import io.confluent.ksql.rest.entity.KsqlStatementErrorMessage; +import java.util.Objects; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Response; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.kafka.common.errors.TopicAuthorizationException; -public class Errors { + +public final class Errors { private static final int HTTP_TO_ERROR_CODE_MULTIPLIER = 100; public static final int ERROR_CODE_BAD_REQUEST = toErrorCode(BAD_REQUEST.getStatusCode()); @@ -192,14 +196,25 @@ public static Response serverNotReady(final KsqlErrorMessage error) { public Errors(final ErrorMessages errorMessages) { - this.errorMessages = errorMessages; + this.errorMessages = Objects.requireNonNull(errorMessages, "errorMessages"); } - public Response accessDeniedFromKafka(final Exception e) { + public Response accessDeniedFromKafkaResponse(final Exception e) { return constructAccessDeniedFromKafkaResponse(errorMessages.kafkaAuthorizationErrorMessage(e)); } - public String webSocketKafkaAuthorizationErrorMessage(final Exception e) { + public String kafkaAuthorizationErrorMessage(final Exception e) { return errorMessages.kafkaAuthorizationErrorMessage(e); } + + public Response generateResponse( + final Exception e, + final Response defaultResponse + ) { + if (ExceptionUtils.indexOfType(e, TopicAuthorizationException.class) >= 0) { + return accessDeniedFromKafkaResponse(e); + } else { + return defaultResponse; + } + } } diff --git a/ksql-rest-model/src/test/java/io/confluent/ksql/rest/ErrorsTest.java b/ksql-rest-model/src/test/java/io/confluent/ksql/rest/ErrorsTest.java new file mode 100644 index 000000000000..f5f5ad0ac077 --- /dev/null +++ b/ksql-rest-model/src/test/java/io/confluent/ksql/rest/ErrorsTest.java @@ -0,0 +1,89 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest; + +import static javax.ws.rs.core.Response.Status.FORBIDDEN; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +import io.confluent.ksql.rest.entity.KsqlErrorMessage; +import io.confluent.ksql.util.KsqlException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import javax.ws.rs.core.Response; + +@RunWith(MockitoJUnitRunner.class) +public class ErrorsTest { + + private static String SOME_ERROR = "error string"; + private static Response KAFKA_DENIED_ERROR = Response + .status(FORBIDDEN) + .entity(new KsqlErrorMessage(Errors.ERROR_CODE_FORBIDDEN_KAFKA_ACCESS, SOME_ERROR)) + .build(); + + @Mock + private ErrorMessages errorMessages; + @Mock + private Exception exception; + + private Errors errorHandler; + + @Before + public void setUp() { + when(errorMessages.kafkaAuthorizationErrorMessage(any(Exception.class))) + .thenReturn(SOME_ERROR); + errorHandler = new Errors(errorMessages); + } + + @Test + public void shouldReturnForbiddenKafkaResponse() { + final Response response = errorHandler.accessDeniedFromKafkaResponse(exception); + assertThat(response.getStatus(), is(403)); + assertThat(response.getEntity(), is(instanceOf(KsqlErrorMessage.class))); + assertThat(((KsqlErrorMessage) response.getEntity()).getMessage(), is(SOME_ERROR)); + } + + @Test + public void shouldReturnForbiddenKafkaErrorMessageString() { + final String error = errorHandler.kafkaAuthorizationErrorMessage(exception); + assertThat(error, is(SOME_ERROR)); + } + + @Test + public void shouldReturnForbiddenKafkaResponseIfRootCauseTopicAuthorizationException() { + final Response response = errorHandler.generateResponse(new KsqlException( + new TopicAuthorizationException("error")), Errors.badRequest("bad")); + assertThat(response.getStatus(), is(403)); + assertThat(response.getEntity(), is(instanceOf(KsqlErrorMessage.class))); + assertThat(((KsqlErrorMessage) response.getEntity()).getMessage(), is(SOME_ERROR)); + } + + @Test + public void shouldReturnResponseIfRootCauseNotTopicAuthorizationException() { + final Response response = errorHandler.generateResponse(new KsqlException( + new RuntimeException("error")), Errors.badRequest("bad")); + assertThat(response.getStatus(), is(400)); + } +}