Skip to content

Commit

Permalink
changes
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenpyzhang committed Dec 17, 2019
1 parent 286a5f4 commit 8d61f2a
Show file tree
Hide file tree
Showing 11 changed files with 137 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ protected void registerWebSocketEndpoints(final ServerContainer container) {
final Optional<KsqlAuthorizationValidator> authorizationValidator =
KsqlAuthorizationValidatorFactory.create(ksqlConfigNoPort, serviceContext);
final Errors errorHandler = new Errors(restConfig.getConfiguredInstance(
KsqlRestConfig.KSQL_SERVER_ERRORS,
KsqlRestConfig.KSQL_SERVER_ERROR_MESSAGES,
ErrorMessages.class
));

Expand Down Expand Up @@ -506,7 +506,7 @@ static KsqlRestApplication buildApplication(
KsqlAuthorizationValidatorFactory.create(ksqlConfig, serviceContext);

final Errors errorHandler = new Errors(restConfig.getConfiguredInstance(
KsqlRestConfig.KSQL_SERVER_ERRORS,
KsqlRestConfig.KSQL_SERVER_ERROR_MESSAGES,
ErrorMessages.class
));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.rest.server;

import io.confluent.ksql.rest.DefaultErrorMessages;
import io.confluent.ksql.rest.ErrorMessages;
import io.confluent.ksql.util.KsqlException;
import io.confluent.rest.RestConfig;
import java.util.Map;
Expand Down Expand Up @@ -65,11 +66,11 @@ public class KsqlRestConfig extends RestConfig {
+ "will not start serving requests until all preconditions are satisfied. Until that time, "
+ "requests will return a 503 error";

static final String KSQL_SERVER_ERRORS =
KSQL_CONFIG_PREFIX + "server.errors";
static final String KSQL_SERVER_ERROR_MESSAGES =
KSQL_CONFIG_PREFIX + "server.error.messages";
private static final String KSQL_SERVER_ERRORS_DOC =
"A class implementing Errors. This allows the KSQL server to return pluggable "
+ "error messages.";
"A class the implementing " + ErrorMessages.class.getSimpleName() + " interface."
+ "This allows the KSQL server to return pluggable error messages.";

static final String KSQL_SERVER_ENABLE_UNCAUGHT_EXCEPTION_HANDLER =
KSQL_CONFIG_PREFIX + "server.exception.uncaught.handler.enable";
Expand Down Expand Up @@ -144,7 +145,7 @@ public class KsqlRestConfig extends RestConfig {
Importance.LOW,
KSQL_COMMAND_RUNNER_BLOCKED_THRESHHOLD_ERROR_MS_DOC
).define(
KSQL_SERVER_ERRORS,
KSQL_SERVER_ERROR_MESSAGES,
Type.CLASS,
DefaultErrorMessages.class,
Importance.LOW,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import io.confluent.ksql.rest.server.validation.CustomValidators;
import io.confluent.ksql.rest.server.validation.RequestValidator;
import io.confluent.ksql.rest.util.CommandStoreUtil;
import io.confluent.ksql.rest.util.ErrorResponseUtil;
import io.confluent.ksql.rest.util.TerminateCluster;
import io.confluent.ksql.security.KsqlAuthorizationValidator;
import io.confluent.ksql.services.SandboxedServiceContext;
Expand Down Expand Up @@ -236,11 +235,10 @@ public Response handleKsqlStatements(
} catch (final KsqlStatementException e) {
return Errors.badStatement(e.getRawMessage(), e.getSqlStatement());
} catch (final KsqlException e) {
return ErrorResponseUtil.generateResponse(
e, Errors.badRequest(e), errorHandler);
return errorHandler.generateResponse(e, Errors.badRequest(e));
} catch (final Exception e) {
return ErrorResponseUtil.generateResponse(
e, Errors.serverErrorForStatement(e, request.getKsql()), errorHandler);
return errorHandler.generateResponse(
e, Errors.serverErrorForStatement(e, request.getKsql()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import io.confluent.ksql.rest.server.resources.KsqlConfigurable;
import io.confluent.ksql.rest.server.resources.KsqlRestException;
import io.confluent.ksql.rest.util.CommandStoreUtil;
import io.confluent.ksql.rest.util.ErrorResponseUtil;
import io.confluent.ksql.security.KsqlAuthorizationValidator;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
Expand Down Expand Up @@ -229,12 +228,12 @@ private Response handleStatement(
"Statement type `%s' not supported for this resource",
statement.getClass().getName()));
} catch (final TopicAuthorizationException e) {
return errorHandler.accessDeniedFromKafka(e);
return errorHandler.accessDeniedFromKafkaResponse(e);
} catch (final KsqlStatementException e) {
return Errors.badStatement(e.getRawMessage(), e.getSqlStatement());
} catch (final KsqlException e) {
return ErrorResponseUtil.generateResponse(
e, Errors.badRequest(e), errorHandler);
System.out.println("here");
return errorHandler.generateResponse(e, Errors.badRequest(e));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ public void onOpen(final Session session, final EndpointConfig unused) {
SessionUtil.closeSilently(
session,
CloseCodes.CANNOT_ACCEPT,
errorHandler.webSocketKafkaAuthorizationErrorMessage(e));
errorHandler.kafkaAuthorizationErrorMessage(e));
} catch (final Exception e) {
log.debug("Error processing request", e);
SessionUtil.closeSilently(session, CloseCodes.CANNOT_ACCEPT, e.getMessage());
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;

Expand Down Expand Up @@ -340,7 +341,13 @@ public void setUp() throws IOException, RestClientException {
when(topicInjector.inject(any()))
.thenAnswer(inv -> inv.getArgument(0));

when(errorsHandler.accessDeniedFromKafka(any(Exception.class))).thenReturn(AUTHORIZATION_ERROR_RESPONSE);
when(errorsHandler.generateResponse(any(), any())).thenAnswer(new Answer<Response>() {
@Override
public Response answer(InvocationOnMock invocation) throws Throwable {
Object[] args = invocation.getArguments();
return (Response) args[1];
}
});

setUpKsqlResource();
}
Expand Down Expand Up @@ -758,6 +765,10 @@ public void shouldFailIfCreateStatementMissingKafkaTopicName() {
@Test
public void shouldReturnForbiddenKafkaAccessIfKsqlTopicAuthorizationException() {
// Given:
when(errorsHandler.generateResponse(any(), any())).thenReturn(Response
.status(FORBIDDEN)
.entity(new KsqlErrorMessage(ERROR_CODE_FORBIDDEN_KAFKA_ACCESS, "some error"))
.build());
doThrow(new KsqlTopicAuthorizationException(
AclOperation.DELETE,
Collections.singleton("topic"))).when(authorizationValidator).checkAuthorization(any(), any(), any());
Expand All @@ -772,26 +783,6 @@ public void shouldReturnForbiddenKafkaAccessIfKsqlTopicAuthorizationException()
assertThat(result.getErrorCode(), is(Errors.ERROR_CODE_FORBIDDEN_KAFKA_ACCESS));
}

@Test
public void shouldReturnForbiddenKafkaAccessIfRootCauseKsqlTopicAuthorizationException() {
// Given:
doThrow(new KsqlException("Could not delete the corresponding kafka topic: topic",
new KsqlTopicAuthorizationException(
AclOperation.DELETE,
Collections.singleton("topic"))))
.when(authorizationValidator).checkAuthorization(any(), any(), any());


// When:
final KsqlErrorMessage result = makeFailingRequest(
"DROP STREAM TEST_STREAM DELETE TOPIC;",
Code.FORBIDDEN);

// Then:
assertThat(result, is(instanceOf(KsqlErrorMessage.class)));
assertThat(result.getErrorCode(), is(Errors.ERROR_CODE_FORBIDDEN_KAFKA_ACCESS));
}

@Test
public void shouldReturnBadStatementIfStatementFailsValidation() {
// When:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.QueryMetadata;
import io.confluent.ksql.util.TransientQueryMetadata;
import io.confluent.ksql.version.metrics.ActivenessRegistrar;
Expand Down Expand Up @@ -161,7 +160,7 @@ public void setup() {
when(pullQuery.isPullQuery()).thenReturn(true);
final PreparedStatement<Statement> pullQueryStatement = PreparedStatement.of(PULL_QUERY_STRING, pullQuery);
when(mockStatementParser.parseSingleStatement(PULL_QUERY_STRING)).thenReturn(pullQueryStatement);
when(errorsHandler.accessDeniedFromKafka(any(Exception.class))).thenReturn(AUTHORIZATION_ERROR_RESPONSE);
when(errorsHandler.accessDeniedFromKafkaResponse(any(Exception.class))).thenReturn(AUTHORIZATION_ERROR_RESPONSE);

testResource = new StreamedQueryResource(
mockKsqlEngine,
Expand Down Expand Up @@ -568,28 +567,6 @@ public void shouldReturnForbiddenKafkaAccessIfKsqlTopicAuthorizationException()
assertEquals(responseEntity.getMessage(), expectedEntity.getMessage());
}

@Test
public void shouldReturnForbiddenKafkaAccessIfRootCauseKsqlTopicAuthorizationException() {
// Given:
when(mockStatementParser.<Query>parseSingleStatement(PUSH_QUERY_STRING))
.thenReturn(query);
doThrow(new KsqlException(
"",
new KsqlTopicAuthorizationException(AclOperation.READ, Collections.singleton(TOPIC_NAME))))
.when(authorizationValidator).checkAuthorization(any(), any(), any());

// When:
final Response response = testResource.streamQuery(
serviceContext,
new KsqlRequest(PUSH_QUERY_STRING, Collections.emptyMap(), null)
);

final KsqlErrorMessage responseEntity = (KsqlErrorMessage) response.getEntity();
final KsqlErrorMessage expectedEntity = (KsqlErrorMessage) AUTHORIZATION_ERROR_RESPONSE.getEntity();
assertEquals(response.getStatus(), AUTHORIZATION_ERROR_RESPONSE.getStatus());
assertEquals(responseEntity.getMessage(), expectedEntity.getMessage());
}

@Test
public void shouldReturnForbiddenKafkaAccessIfPrintTopicKsqlTopicAuthorizationException() {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ public void shouldReturnErrorMessageWhenTopicAuthorizationException() throws Exc
// Given:
final String errorMessage = "authorization error";
givenRequestIs(query);
when(errorsHandler.webSocketKafkaAuthorizationErrorMessage(any(TopicAuthorizationException.class)))
when(errorsHandler.kafkaAuthorizationErrorMessage(any(TopicAuthorizationException.class)))
.thenReturn(errorMessage);
doThrow(new KsqlTopicAuthorizationException(AclOperation.CREATE, Collections.singleton("topic")))
.when(authorizationValidator).checkAuthorization(serviceContext, metaStore, query);
Expand Down
23 changes: 19 additions & 4 deletions ksql-rest-model/src/main/java/io/confluent/ksql/rest/Errors.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,12 @@
import io.confluent.ksql.rest.entity.KsqlStatementErrorMessage;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.common.errors.TopicAuthorizationException;

public class Errors {
import java.util.Objects;

public final class Errors {
private static final int HTTP_TO_ERROR_CODE_MULTIPLIER = 100;

public static final int ERROR_CODE_BAD_REQUEST = toErrorCode(BAD_REQUEST.getStatusCode());
Expand Down Expand Up @@ -192,14 +196,25 @@ public static Response serverNotReady(final KsqlErrorMessage error) {


public Errors(final ErrorMessages errorMessages) {
this.errorMessages = errorMessages;
this.errorMessages = Objects.requireNonNull(errorMessages, "errorMessages");
}

public Response accessDeniedFromKafka(final Exception e) {
public Response accessDeniedFromKafkaResponse(final Exception e) {
return constructAccessDeniedFromKafkaResponse(errorMessages.kafkaAuthorizationErrorMessage(e));
}

public String webSocketKafkaAuthorizationErrorMessage(final Exception e) {
public String kafkaAuthorizationErrorMessage(final Exception e) {
return errorMessages.kafkaAuthorizationErrorMessage(e);
}

public Response generateResponse(
final Exception e,
final Response defaultResponse
) {
if (ExceptionUtils.indexOfType(e, TopicAuthorizationException.class) >= 0) {
return accessDeniedFromKafkaResponse(e);
} else {
return defaultResponse;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest;

import static javax.ws.rs.core.Response.Status.FORBIDDEN;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;

import io.confluent.ksql.rest.entity.KsqlErrorMessage;
import io.confluent.ksql.util.KsqlException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

import javax.ws.rs.core.Response;

@RunWith(MockitoJUnitRunner.class)
public class ErrorsTest {

private static String SOME_ERROR = "error string";
private static Response KAFKA_DENIED_ERROR = Response
.status(FORBIDDEN)
.entity(new KsqlErrorMessage(Errors.ERROR_CODE_FORBIDDEN_KAFKA_ACCESS, SOME_ERROR))
.build();

@Mock
private ErrorMessages errorMessages;
@Mock
private Exception exception;

private Errors errorHandler;

@Before
public void setUp() {
when(errorMessages.kafkaAuthorizationErrorMessage(any(Exception.class)))
.thenReturn(SOME_ERROR);
errorHandler = new Errors(errorMessages);
}

@Test
public void shouldReturnForbiddenKafkaResponse() {
final Response response = errorHandler.accessDeniedFromKafkaResponse(exception);
assertThat(response.getStatus(), is(403));
assertThat(response.getEntity(), instanceOf(KsqlErrorMessage.class));
assertThat(((KsqlErrorMessage) response.getEntity()).getMessage(), is(SOME_ERROR));
}

@Test
public void shouldReturnForbiddenKafkaErrorMessageString() {
final String error = errorHandler.kafkaAuthorizationErrorMessage(exception);
assertThat(error, is(SOME_ERROR));
}

@Test
public void shouldReturnForbiddenKafkaResponseIfRootCauseTopicAuthorizationException() {
final Response response = errorHandler.generateResponse(new KsqlException(
new TopicAuthorizationException("error")), Errors.badRequest("bad"));
assertThat(response.getStatus(), is(403));
assertThat(response.getEntity(), instanceOf(KsqlErrorMessage.class));
assertThat(((KsqlErrorMessage) response.getEntity()).getMessage(), is(SOME_ERROR));
}

@Test
public void shouldReturnResponseIfRootCauseNotTopicAuthorizationException() {
final Response response = errorHandler.generateResponse(new KsqlException(
new RuntimeException("error")), Errors.badRequest("bad"));
assertThat(response.getStatus(), is(400));
}
}

0 comments on commit 8d61f2a

Please sign in to comment.