diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index 63ff38d8f50c..24bff5b12850 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -384,7 +384,7 @@ protected void registerWebSocketEndpoints(final ServerContainer container) { final Optional authorizationValidator = KsqlAuthorizationValidatorFactory.create(ksqlConfigNoPort, serviceContext); final Errors errorHandler = new Errors(restConfig.getConfiguredInstance( - KsqlRestConfig.KSQL_SERVER_ERRORS, + KsqlRestConfig.KSQL_SERVER_ERROR_MESSAGES, ErrorMessages.class )); @@ -506,7 +506,7 @@ static KsqlRestApplication buildApplication( KsqlAuthorizationValidatorFactory.create(ksqlConfig, serviceContext); final Errors errorHandler = new Errors(restConfig.getConfiguredInstance( - KsqlRestConfig.KSQL_SERVER_ERRORS, + KsqlRestConfig.KSQL_SERVER_ERROR_MESSAGES, ErrorMessages.class )); diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java index a2af2b12372f..3f7e53acd1ea 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java @@ -16,6 +16,7 @@ package io.confluent.ksql.rest.server; import io.confluent.ksql.rest.DefaultErrorMessages; +import io.confluent.ksql.rest.ErrorMessages; import io.confluent.ksql.util.KsqlException; import io.confluent.rest.RestConfig; import java.util.Map; @@ -65,11 +66,11 @@ public class KsqlRestConfig extends RestConfig { + "will not start serving requests until all preconditions are satisfied. Until that time, " + "requests will return a 503 error"; - static final String KSQL_SERVER_ERRORS = - KSQL_CONFIG_PREFIX + "server.errors"; + static final String KSQL_SERVER_ERROR_MESSAGES = + KSQL_CONFIG_PREFIX + "server.error.messages"; private static final String KSQL_SERVER_ERRORS_DOC = - "A class implementing Errors. This allows the KSQL server to return pluggable " - + "error messages."; + "A class the implementing " + ErrorMessages.class.getSimpleName() + " interface." + + "This allows the KSQL server to return pluggable error messages."; static final String KSQL_SERVER_ENABLE_UNCAUGHT_EXCEPTION_HANDLER = KSQL_CONFIG_PREFIX + "server.exception.uncaught.handler.enable"; @@ -144,7 +145,7 @@ public class KsqlRestConfig extends RestConfig { Importance.LOW, KSQL_COMMAND_RUNNER_BLOCKED_THRESHHOLD_ERROR_MS_DOC ).define( - KSQL_SERVER_ERRORS, + KSQL_SERVER_ERROR_MESSAGES, Type.CLASS, DefaultErrorMessages.class, Importance.LOW, diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java index 76d80c0c9f1c..931b794ec90a 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java @@ -42,7 +42,6 @@ import io.confluent.ksql.rest.server.validation.CustomValidators; import io.confluent.ksql.rest.server.validation.RequestValidator; import io.confluent.ksql.rest.util.CommandStoreUtil; -import io.confluent.ksql.rest.util.ErrorResponseUtil; import io.confluent.ksql.rest.util.TerminateCluster; import io.confluent.ksql.security.KsqlAuthorizationValidator; import io.confluent.ksql.services.SandboxedServiceContext; @@ -236,11 +235,10 @@ public Response handleKsqlStatements( } catch (final KsqlStatementException e) { return Errors.badStatement(e.getRawMessage(), e.getSqlStatement()); } catch (final KsqlException e) { - return ErrorResponseUtil.generateResponse( - e, Errors.badRequest(e), errorHandler); + return errorHandler.generateResponse(e, Errors.badRequest(e)); } catch (final Exception e) { - return ErrorResponseUtil.generateResponse( - e, Errors.serverErrorForStatement(e, request.getKsql()), errorHandler); + return errorHandler.generateResponse( + e, Errors.serverErrorForStatement(e, request.getKsql())); } } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java index 033c18eca203..5a6e38f8328a 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java @@ -35,7 +35,6 @@ import io.confluent.ksql.rest.server.resources.KsqlConfigurable; import io.confluent.ksql.rest.server.resources.KsqlRestException; import io.confluent.ksql.rest.util.CommandStoreUtil; -import io.confluent.ksql.rest.util.ErrorResponseUtil; import io.confluent.ksql.security.KsqlAuthorizationValidator; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; @@ -229,12 +228,12 @@ private Response handleStatement( "Statement type `%s' not supported for this resource", statement.getClass().getName())); } catch (final TopicAuthorizationException e) { - return errorHandler.accessDeniedFromKafka(e); + return errorHandler.accessDeniedFromKafkaResponse(e); } catch (final KsqlStatementException e) { return Errors.badStatement(e.getRawMessage(), e.getSqlStatement()); } catch (final KsqlException e) { - return ErrorResponseUtil.generateResponse( - e, Errors.badRequest(e), errorHandler); + System.out.println("here"); + return errorHandler.generateResponse(e, Errors.badRequest(e)); } } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java index ee5e296285d8..e3ac3cddea37 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java @@ -234,7 +234,7 @@ public void onOpen(final Session session, final EndpointConfig unused) { SessionUtil.closeSilently( session, CloseCodes.CANNOT_ACCEPT, - errorHandler.webSocketKafkaAuthorizationErrorMessage(e)); + errorHandler.kafkaAuthorizationErrorMessage(e)); } catch (final Exception e) { log.debug("Error processing request", e); SessionUtil.closeSilently(session, CloseCodes.CANNOT_ACCEPT, e.getMessage()); diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/ErrorResponseUtil.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/ErrorResponseUtil.java deleted file mode 100644 index cc7aa838346b..000000000000 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/ErrorResponseUtil.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright 2019 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.rest.util; - -import io.confluent.ksql.rest.Errors; -import javax.ws.rs.core.Response; -import org.apache.commons.lang3.exception.ExceptionUtils; -import org.apache.kafka.common.errors.TopicAuthorizationException; - - -public final class ErrorResponseUtil { - - private ErrorResponseUtil() { - } - - public static Response generateResponse( - final Exception e, - final Response defaultResponse, - final Errors errorHandler - ) { - if (ExceptionUtils.indexOfType(e, TopicAuthorizationException.class) >= 0) { - return errorHandler.accessDeniedFromKafka(e); - } else { - return defaultResponse; - } - } -} diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java index f7b94a1d6c01..32995124f10a 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java @@ -179,6 +179,7 @@ import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.mockito.Mock; +import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.MockitoJUnitRunner; import org.mockito.stubbing.Answer; @@ -340,7 +341,13 @@ public void setUp() throws IOException, RestClientException { when(topicInjector.inject(any())) .thenAnswer(inv -> inv.getArgument(0)); - when(errorsHandler.accessDeniedFromKafka(any(Exception.class))).thenReturn(AUTHORIZATION_ERROR_RESPONSE); + when(errorsHandler.generateResponse(any(), any())).thenAnswer(new Answer() { + @Override + public Response answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + return (Response) args[1]; + } + }); setUpKsqlResource(); } @@ -758,6 +765,10 @@ public void shouldFailIfCreateStatementMissingKafkaTopicName() { @Test public void shouldReturnForbiddenKafkaAccessIfKsqlTopicAuthorizationException() { // Given: + when(errorsHandler.generateResponse(any(), any())).thenReturn(Response + .status(FORBIDDEN) + .entity(new KsqlErrorMessage(ERROR_CODE_FORBIDDEN_KAFKA_ACCESS, "some error")) + .build()); doThrow(new KsqlTopicAuthorizationException( AclOperation.DELETE, Collections.singleton("topic"))).when(authorizationValidator).checkAuthorization(any(), any(), any()); @@ -772,26 +783,6 @@ public void shouldReturnForbiddenKafkaAccessIfKsqlTopicAuthorizationException() assertThat(result.getErrorCode(), is(Errors.ERROR_CODE_FORBIDDEN_KAFKA_ACCESS)); } - @Test - public void shouldReturnForbiddenKafkaAccessIfRootCauseKsqlTopicAuthorizationException() { - // Given: - doThrow(new KsqlException("Could not delete the corresponding kafka topic: topic", - new KsqlTopicAuthorizationException( - AclOperation.DELETE, - Collections.singleton("topic")))) - .when(authorizationValidator).checkAuthorization(any(), any(), any()); - - - // When: - final KsqlErrorMessage result = makeFailingRequest( - "DROP STREAM TEST_STREAM DELETE TOPIC;", - Code.FORBIDDEN); - - // Then: - assertThat(result, is(instanceOf(KsqlErrorMessage.class))); - assertThat(result.getErrorCode(), is(Errors.ERROR_CODE_FORBIDDEN_KAFKA_ACCESS)); - } - @Test public void shouldReturnBadStatementIfStatementFailsValidation() { // When: diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java index ed4b32dac315..8f4ee101ae2b 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java @@ -62,7 +62,6 @@ import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KsqlConfig; -import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.QueryMetadata; import io.confluent.ksql.util.TransientQueryMetadata; import io.confluent.ksql.version.metrics.ActivenessRegistrar; @@ -161,7 +160,7 @@ public void setup() { when(pullQuery.isPullQuery()).thenReturn(true); final PreparedStatement pullQueryStatement = PreparedStatement.of(PULL_QUERY_STRING, pullQuery); when(mockStatementParser.parseSingleStatement(PULL_QUERY_STRING)).thenReturn(pullQueryStatement); - when(errorsHandler.accessDeniedFromKafka(any(Exception.class))).thenReturn(AUTHORIZATION_ERROR_RESPONSE); + when(errorsHandler.accessDeniedFromKafkaResponse(any(Exception.class))).thenReturn(AUTHORIZATION_ERROR_RESPONSE); testResource = new StreamedQueryResource( mockKsqlEngine, @@ -568,28 +567,6 @@ public void shouldReturnForbiddenKafkaAccessIfKsqlTopicAuthorizationException() assertEquals(responseEntity.getMessage(), expectedEntity.getMessage()); } - @Test - public void shouldReturnForbiddenKafkaAccessIfRootCauseKsqlTopicAuthorizationException() { - // Given: - when(mockStatementParser.parseSingleStatement(PUSH_QUERY_STRING)) - .thenReturn(query); - doThrow(new KsqlException( - "", - new KsqlTopicAuthorizationException(AclOperation.READ, Collections.singleton(TOPIC_NAME)))) - .when(authorizationValidator).checkAuthorization(any(), any(), any()); - - // When: - final Response response = testResource.streamQuery( - serviceContext, - new KsqlRequest(PUSH_QUERY_STRING, Collections.emptyMap(), null) - ); - - final KsqlErrorMessage responseEntity = (KsqlErrorMessage) response.getEntity(); - final KsqlErrorMessage expectedEntity = (KsqlErrorMessage) AUTHORIZATION_ERROR_RESPONSE.getEntity(); - assertEquals(response.getStatus(), AUTHORIZATION_ERROR_RESPONSE.getStatus()); - assertEquals(responseEntity.getMessage(), expectedEntity.getMessage()); - } - @Test public void shouldReturnForbiddenKafkaAccessIfPrintTopicKsqlTopicAuthorizationException() { // Given: diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpointTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpointTest.java index 262d5454d090..86a312ec679d 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpointTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpointTest.java @@ -399,7 +399,7 @@ public void shouldReturnErrorMessageWhenTopicAuthorizationException() throws Exc // Given: final String errorMessage = "authorization error"; givenRequestIs(query); - when(errorsHandler.webSocketKafkaAuthorizationErrorMessage(any(TopicAuthorizationException.class))) + when(errorsHandler.kafkaAuthorizationErrorMessage(any(TopicAuthorizationException.class))) .thenReturn(errorMessage); doThrow(new KsqlTopicAuthorizationException(AclOperation.CREATE, Collections.singleton("topic"))) .when(authorizationValidator).checkAuthorization(serviceContext, metaStore, query); diff --git a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/Errors.java b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/Errors.java index e1e14356fbc2..c7aa2a7919e8 100644 --- a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/Errors.java +++ b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/Errors.java @@ -27,8 +27,12 @@ import io.confluent.ksql.rest.entity.KsqlStatementErrorMessage; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Response; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.kafka.common.errors.TopicAuthorizationException; -public class Errors { +import java.util.Objects; + +public final class Errors { private static final int HTTP_TO_ERROR_CODE_MULTIPLIER = 100; public static final int ERROR_CODE_BAD_REQUEST = toErrorCode(BAD_REQUEST.getStatusCode()); @@ -192,14 +196,25 @@ public static Response serverNotReady(final KsqlErrorMessage error) { public Errors(final ErrorMessages errorMessages) { - this.errorMessages = errorMessages; + this.errorMessages = Objects.requireNonNull(errorMessages, "errorMessages"); } - public Response accessDeniedFromKafka(final Exception e) { + public Response accessDeniedFromKafkaResponse(final Exception e) { return constructAccessDeniedFromKafkaResponse(errorMessages.kafkaAuthorizationErrorMessage(e)); } - public String webSocketKafkaAuthorizationErrorMessage(final Exception e) { + public String kafkaAuthorizationErrorMessage(final Exception e) { return errorMessages.kafkaAuthorizationErrorMessage(e); } + + public Response generateResponse( + final Exception e, + final Response defaultResponse + ) { + if (ExceptionUtils.indexOfType(e, TopicAuthorizationException.class) >= 0) { + return accessDeniedFromKafkaResponse(e); + } else { + return defaultResponse; + } + } } diff --git a/ksql-rest-model/src/test/java/io/confluent/ksql/rest/ErrorsTest.java b/ksql-rest-model/src/test/java/io/confluent/ksql/rest/ErrorsTest.java new file mode 100644 index 000000000000..9fd85a991391 --- /dev/null +++ b/ksql-rest-model/src/test/java/io/confluent/ksql/rest/ErrorsTest.java @@ -0,0 +1,89 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest; + +import static javax.ws.rs.core.Response.Status.FORBIDDEN; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +import io.confluent.ksql.rest.entity.KsqlErrorMessage; +import io.confluent.ksql.util.KsqlException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import javax.ws.rs.core.Response; + +@RunWith(MockitoJUnitRunner.class) +public class ErrorsTest { + + private static String SOME_ERROR = "error string"; + private static Response KAFKA_DENIED_ERROR = Response + .status(FORBIDDEN) + .entity(new KsqlErrorMessage(Errors.ERROR_CODE_FORBIDDEN_KAFKA_ACCESS, SOME_ERROR)) + .build(); + + @Mock + private ErrorMessages errorMessages; + @Mock + private Exception exception; + + private Errors errorHandler; + + @Before + public void setUp() { + when(errorMessages.kafkaAuthorizationErrorMessage(any(Exception.class))) + .thenReturn(SOME_ERROR); + errorHandler = new Errors(errorMessages); + } + + @Test + public void shouldReturnForbiddenKafkaResponse() { + final Response response = errorHandler.accessDeniedFromKafkaResponse(exception); + assertThat(response.getStatus(), is(403)); + assertThat(response.getEntity(), instanceOf(KsqlErrorMessage.class)); + assertThat(((KsqlErrorMessage) response.getEntity()).getMessage(), is(SOME_ERROR)); + } + + @Test + public void shouldReturnForbiddenKafkaErrorMessageString() { + final String error = errorHandler.kafkaAuthorizationErrorMessage(exception); + assertThat(error, is(SOME_ERROR)); + } + + @Test + public void shouldReturnForbiddenKafkaResponseIfRootCauseTopicAuthorizationException() { + final Response response = errorHandler.generateResponse(new KsqlException( + new TopicAuthorizationException("error")), Errors.badRequest("bad")); + assertThat(response.getStatus(), is(403)); + assertThat(response.getEntity(), instanceOf(KsqlErrorMessage.class)); + assertThat(((KsqlErrorMessage) response.getEntity()).getMessage(), is(SOME_ERROR)); + } + + @Test + public void shouldReturnResponseIfRootCauseNotTopicAuthorizationException() { + final Response response = errorHandler.generateResponse(new KsqlException( + new RuntimeException("error")), Errors.badRequest("bad")); + assertThat(response.getStatus(), is(400)); + } +}