Skip to content

Commit

Permalink
changes
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenpyzhang committed Dec 17, 2019
1 parent 286a5f4 commit 565fa73
Show file tree
Hide file tree
Showing 11 changed files with 138 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ protected void registerWebSocketEndpoints(final ServerContainer container) {
final Optional<KsqlAuthorizationValidator> authorizationValidator =
KsqlAuthorizationValidatorFactory.create(ksqlConfigNoPort, serviceContext);
final Errors errorHandler = new Errors(restConfig.getConfiguredInstance(
KsqlRestConfig.KSQL_SERVER_ERRORS,
KsqlRestConfig.KSQL_SERVER_ERROR_MESSAGES,
ErrorMessages.class
));

Expand Down Expand Up @@ -506,7 +506,7 @@ static KsqlRestApplication buildApplication(
KsqlAuthorizationValidatorFactory.create(ksqlConfig, serviceContext);

final Errors errorHandler = new Errors(restConfig.getConfiguredInstance(
KsqlRestConfig.KSQL_SERVER_ERRORS,
KsqlRestConfig.KSQL_SERVER_ERROR_MESSAGES,
ErrorMessages.class
));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.rest.server;

import io.confluent.ksql.rest.DefaultErrorMessages;
import io.confluent.ksql.rest.ErrorMessages;
import io.confluent.ksql.util.KsqlException;
import io.confluent.rest.RestConfig;
import java.util.Map;
Expand Down Expand Up @@ -65,11 +66,11 @@ public class KsqlRestConfig extends RestConfig {
+ "will not start serving requests until all preconditions are satisfied. Until that time, "
+ "requests will return a 503 error";

static final String KSQL_SERVER_ERRORS =
KSQL_CONFIG_PREFIX + "server.errors";
static final String KSQL_SERVER_ERROR_MESSAGES =
KSQL_CONFIG_PREFIX + "server.error.messages";
private static final String KSQL_SERVER_ERRORS_DOC =
"A class implementing Errors. This allows the KSQL server to return pluggable "
+ "error messages.";
"A class the implementing " + ErrorMessages.class.getSimpleName() + " interface."
+ "This allows the KSQL server to return pluggable error messages.";

static final String KSQL_SERVER_ENABLE_UNCAUGHT_EXCEPTION_HANDLER =
KSQL_CONFIG_PREFIX + "server.exception.uncaught.handler.enable";
Expand Down Expand Up @@ -144,7 +145,7 @@ public class KsqlRestConfig extends RestConfig {
Importance.LOW,
KSQL_COMMAND_RUNNER_BLOCKED_THRESHHOLD_ERROR_MS_DOC
).define(
KSQL_SERVER_ERRORS,
KSQL_SERVER_ERROR_MESSAGES,
Type.CLASS,
DefaultErrorMessages.class,
Importance.LOW,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import io.confluent.ksql.rest.server.validation.CustomValidators;
import io.confluent.ksql.rest.server.validation.RequestValidator;
import io.confluent.ksql.rest.util.CommandStoreUtil;
import io.confluent.ksql.rest.util.ErrorResponseUtil;
import io.confluent.ksql.rest.util.TerminateCluster;
import io.confluent.ksql.security.KsqlAuthorizationValidator;
import io.confluent.ksql.services.SandboxedServiceContext;
Expand Down Expand Up @@ -236,11 +235,10 @@ public Response handleKsqlStatements(
} catch (final KsqlStatementException e) {
return Errors.badStatement(e.getRawMessage(), e.getSqlStatement());
} catch (final KsqlException e) {
return ErrorResponseUtil.generateResponse(
e, Errors.badRequest(e), errorHandler);
return errorHandler.generateResponse(e, Errors.badRequest(e));
} catch (final Exception e) {
return ErrorResponseUtil.generateResponse(
e, Errors.serverErrorForStatement(e, request.getKsql()), errorHandler);
return errorHandler.generateResponse(
e, Errors.serverErrorForStatement(e, request.getKsql()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import io.confluent.ksql.rest.server.resources.KsqlConfigurable;
import io.confluent.ksql.rest.server.resources.KsqlRestException;
import io.confluent.ksql.rest.util.CommandStoreUtil;
import io.confluent.ksql.rest.util.ErrorResponseUtil;
import io.confluent.ksql.security.KsqlAuthorizationValidator;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
Expand Down Expand Up @@ -229,12 +228,11 @@ private Response handleStatement(
"Statement type `%s' not supported for this resource",
statement.getClass().getName()));
} catch (final TopicAuthorizationException e) {
return errorHandler.accessDeniedFromKafka(e);
return errorHandler.accessDeniedFromKafkaResponse(e);
} catch (final KsqlStatementException e) {
return Errors.badStatement(e.getRawMessage(), e.getSqlStatement());
} catch (final KsqlException e) {
return ErrorResponseUtil.generateResponse(
e, Errors.badRequest(e), errorHandler);
return errorHandler.generateResponse(e, Errors.badRequest(e));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import javax.ws.rs.core.Response;

import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -234,7 +233,7 @@ public void onOpen(final Session session, final EndpointConfig unused) {
SessionUtil.closeSilently(
session,
CloseCodes.CANNOT_ACCEPT,
errorHandler.webSocketKafkaAuthorizationErrorMessage(e));
errorHandler.kafkaAuthorizationErrorMessage(e));
} catch (final Exception e) {
log.debug("Error processing request", e);
SessionUtil.closeSilently(session, CloseCodes.CANNOT_ACCEPT, e.getMessage());
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;

Expand Down Expand Up @@ -239,11 +240,6 @@ public class KsqlResourceTest {
.valueColumn(ColumnName.of("f1"), SqlTypes.STRING)
.build();

private static Response AUTHORIZATION_ERROR_RESPONSE = Response
.status(FORBIDDEN)
.entity(new KsqlErrorMessage(ERROR_CODE_FORBIDDEN_KAFKA_ACCESS, "some error"))
.build();

@Rule
public final ExpectedException expectedException = ExpectedException.none();

Expand Down Expand Up @@ -340,7 +336,13 @@ public void setUp() throws IOException, RestClientException {
when(topicInjector.inject(any()))
.thenAnswer(inv -> inv.getArgument(0));

when(errorsHandler.accessDeniedFromKafka(any(Exception.class))).thenReturn(AUTHORIZATION_ERROR_RESPONSE);
when(errorsHandler.generateResponse(any(), any())).thenAnswer(new Answer<Response>() {
@Override
public Response answer(InvocationOnMock invocation) throws Throwable {
Object[] args = invocation.getArguments();
return (Response) args[1];
}
});

setUpKsqlResource();
}
Expand Down Expand Up @@ -758,6 +760,11 @@ public void shouldFailIfCreateStatementMissingKafkaTopicName() {
@Test
public void shouldReturnForbiddenKafkaAccessIfKsqlTopicAuthorizationException() {
// Given:
final String errorMsg = "some error";
when(errorsHandler.generateResponse(any(), any())).thenReturn(Response
.status(FORBIDDEN)
.entity(new KsqlErrorMessage(ERROR_CODE_FORBIDDEN_KAFKA_ACCESS, errorMsg))
.build());
doThrow(new KsqlTopicAuthorizationException(
AclOperation.DELETE,
Collections.singleton("topic"))).when(authorizationValidator).checkAuthorization(any(), any(), any());
Expand All @@ -770,26 +777,7 @@ public void shouldReturnForbiddenKafkaAccessIfKsqlTopicAuthorizationException()
// Then:
assertThat(result, is(instanceOf(KsqlErrorMessage.class)));
assertThat(result.getErrorCode(), is(Errors.ERROR_CODE_FORBIDDEN_KAFKA_ACCESS));
}

@Test
public void shouldReturnForbiddenKafkaAccessIfRootCauseKsqlTopicAuthorizationException() {
// Given:
doThrow(new KsqlException("Could not delete the corresponding kafka topic: topic",
new KsqlTopicAuthorizationException(
AclOperation.DELETE,
Collections.singleton("topic"))))
.when(authorizationValidator).checkAuthorization(any(), any(), any());


// When:
final KsqlErrorMessage result = makeFailingRequest(
"DROP STREAM TEST_STREAM DELETE TOPIC;",
Code.FORBIDDEN);

// Then:
assertThat(result, is(instanceOf(KsqlErrorMessage.class)));
assertThat(result.getErrorCode(), is(Errors.ERROR_CODE_FORBIDDEN_KAFKA_ACCESS));
assertThat(result.getMessage(), is(errorMsg));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.QueryMetadata;
import io.confluent.ksql.util.TransientQueryMetadata;
import io.confluent.ksql.version.metrics.ActivenessRegistrar;
Expand Down Expand Up @@ -161,7 +160,7 @@ public void setup() {
when(pullQuery.isPullQuery()).thenReturn(true);
final PreparedStatement<Statement> pullQueryStatement = PreparedStatement.of(PULL_QUERY_STRING, pullQuery);
when(mockStatementParser.parseSingleStatement(PULL_QUERY_STRING)).thenReturn(pullQueryStatement);
when(errorsHandler.accessDeniedFromKafka(any(Exception.class))).thenReturn(AUTHORIZATION_ERROR_RESPONSE);
when(errorsHandler.accessDeniedFromKafkaResponse(any(Exception.class))).thenReturn(AUTHORIZATION_ERROR_RESPONSE);

testResource = new StreamedQueryResource(
mockKsqlEngine,
Expand Down Expand Up @@ -568,28 +567,6 @@ public void shouldReturnForbiddenKafkaAccessIfKsqlTopicAuthorizationException()
assertEquals(responseEntity.getMessage(), expectedEntity.getMessage());
}

@Test
public void shouldReturnForbiddenKafkaAccessIfRootCauseKsqlTopicAuthorizationException() {
// Given:
when(mockStatementParser.<Query>parseSingleStatement(PUSH_QUERY_STRING))
.thenReturn(query);
doThrow(new KsqlException(
"",
new KsqlTopicAuthorizationException(AclOperation.READ, Collections.singleton(TOPIC_NAME))))
.when(authorizationValidator).checkAuthorization(any(), any(), any());

// When:
final Response response = testResource.streamQuery(
serviceContext,
new KsqlRequest(PUSH_QUERY_STRING, Collections.emptyMap(), null)
);

final KsqlErrorMessage responseEntity = (KsqlErrorMessage) response.getEntity();
final KsqlErrorMessage expectedEntity = (KsqlErrorMessage) AUTHORIZATION_ERROR_RESPONSE.getEntity();
assertEquals(response.getStatus(), AUTHORIZATION_ERROR_RESPONSE.getStatus());
assertEquals(responseEntity.getMessage(), expectedEntity.getMessage());
}

@Test
public void shouldReturnForbiddenKafkaAccessIfPrintTopicKsqlTopicAuthorizationException() {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ public void shouldReturnErrorMessageWhenTopicAuthorizationException() throws Exc
// Given:
final String errorMessage = "authorization error";
givenRequestIs(query);
when(errorsHandler.webSocketKafkaAuthorizationErrorMessage(any(TopicAuthorizationException.class)))
when(errorsHandler.kafkaAuthorizationErrorMessage(any(TopicAuthorizationException.class)))
.thenReturn(errorMessage);
doThrow(new KsqlTopicAuthorizationException(AclOperation.CREATE, Collections.singleton("topic")))
.when(authorizationValidator).checkAuthorization(serviceContext, metaStore, query);
Expand Down
23 changes: 19 additions & 4 deletions ksql-rest-model/src/main/java/io/confluent/ksql/rest/Errors.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,12 @@
import io.confluent.ksql.rest.entity.KsqlStatementErrorMessage;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.common.errors.TopicAuthorizationException;

public class Errors {
import java.util.Objects;

public final class Errors {
private static final int HTTP_TO_ERROR_CODE_MULTIPLIER = 100;

public static final int ERROR_CODE_BAD_REQUEST = toErrorCode(BAD_REQUEST.getStatusCode());
Expand Down Expand Up @@ -192,14 +196,25 @@ public static Response serverNotReady(final KsqlErrorMessage error) {


public Errors(final ErrorMessages errorMessages) {
this.errorMessages = errorMessages;
this.errorMessages = Objects.requireNonNull(errorMessages, "errorMessages");
}

public Response accessDeniedFromKafka(final Exception e) {
public Response accessDeniedFromKafkaResponse(final Exception e) {
return constructAccessDeniedFromKafkaResponse(errorMessages.kafkaAuthorizationErrorMessage(e));
}

public String webSocketKafkaAuthorizationErrorMessage(final Exception e) {
public String kafkaAuthorizationErrorMessage(final Exception e) {
return errorMessages.kafkaAuthorizationErrorMessage(e);
}

public Response generateResponse(
final Exception e,
final Response defaultResponse
) {
if (ExceptionUtils.indexOfType(e, TopicAuthorizationException.class) >= 0) {
return accessDeniedFromKafkaResponse(e);
} else {
return defaultResponse;
}
}
}
Loading

0 comments on commit 565fa73

Please sign in to comment.