Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improved error message and updated error code for PrintTopics command #3246

Merged
merged 2 commits into from
Aug 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.parser.tree.CreateAsSelect;
import io.confluent.ksql.parser.tree.InsertInto;
import io.confluent.ksql.parser.tree.PrintTopic;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.services.ServiceContext;
Expand Down Expand Up @@ -49,6 +50,8 @@ public void validate(
validateInsertInto(serviceContext, metaStore, (InsertInto)statement);
} else if (statement instanceof CreateAsSelect) {
validateCreateAsSelect(serviceContext, metaStore, (CreateAsSelect)statement);
} else if (statement instanceof PrintTopic) {
validatePrintTopic(serviceContext, (PrintTopic)statement);
}
}

Expand Down Expand Up @@ -102,6 +105,13 @@ private void validateInsertInto(
checkAccess(serviceContext, kafkaTopic, AclOperation.WRITE);
}

private void validatePrintTopic(
final ServiceContext serviceContext,
final PrintTopic printTopic
) {
checkAccess(serviceContext, printTopic.getTopic().toString(), AclOperation.READ);
}

private String getSourceTopicName(final MetaStore metaStore, final String streamOrTable) {
final DataSource<?> dataSource = metaStore.getSource(streamOrTable);
if (dataSource == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ public class AuthorizationTopicAccessValidatorTest {

private static final String STREAM_TOPIC_1 = "s1";
private static final String STREAM_TOPIC_2 = "s2";
private final static String TOPIC_NAME_1 = "topic1";
private final static String TOPIC_NAME_2 = "topic2";

@Mock
private ServiceContext serviceContext;
Expand All @@ -85,10 +87,10 @@ public void setUp() {
accessValidator = new AuthorizationTopicAccessValidator();
when(serviceContext.getTopicClient()).thenReturn(kafkaTopicClient);

givenTopic("topic1", TOPIC_1);
givenTopic(TOPIC_NAME_1, TOPIC_1);
givenStreamWithTopic(STREAM_TOPIC_1, TOPIC_1);

givenTopic("topic2", TOPIC_2);
givenTopic(TOPIC_NAME_2, TOPIC_2);
givenStreamWithTopic(STREAM_TOPIC_2, TOPIC_2);
}

Expand Down Expand Up @@ -128,7 +130,7 @@ public void shouldSingleSelectWithReadPermissionsAllowed() {
}

@Test
public void shouldSingleSelectWithoutReadPermissionsDenied() {
public void shouldThrowWhenSingleSelectWithoutReadPermissionsDenied() {
// Given:
givenTopicPermissions(TOPIC_1, Collections.emptySet());
final Statement statement = givenStatement(String.format(
Expand Down Expand Up @@ -162,7 +164,7 @@ public void shouldJoinSelectWithReadPermissionsAllowed() {
}

@Test
public void shouldJoinSelectWithoutReadPermissionsDenied() {
public void shouldThrowWhenJoinSelectWithoutReadPermissionsDenied() {
// Given:
givenTopicPermissions(TOPIC_1, Collections.singleton(AclOperation.WRITE));
givenTopicPermissions(TOPIC_2, Collections.singleton(AclOperation.WRITE));
Expand All @@ -181,7 +183,7 @@ public void shouldJoinSelectWithoutReadPermissionsDenied() {
}

@Test
public void shouldJoinWithOneRightTopicWithReadPermissionsDenied() {
public void shouldThrowWhenJoinWithOneRightTopicWithReadPermissionsDenied() {
// Given:
givenTopicPermissions(TOPIC_1, Collections.singleton(AclOperation.READ));
givenTopicPermissions(TOPIC_2, Collections.singleton(AclOperation.WRITE));
Expand All @@ -200,7 +202,7 @@ public void shouldJoinWithOneRightTopicWithReadPermissionsDenied() {
}

@Test
public void shouldJoinWitOneLeftTopicWithReadPermissionsDenied() {
public void shouldThrowWhenJoinWitOneLeftTopicWithReadPermissionsDenied() {
// Given:
givenTopicPermissions(TOPIC_1, Collections.singleton(AclOperation.WRITE));
givenTopicPermissions(TOPIC_2, Collections.singleton(AclOperation.READ));
Expand Down Expand Up @@ -235,7 +237,7 @@ public void shouldInsertIntoWithAllPermissionsAllowed() {
}

@Test
public void shouldInsertIntoWithOnlyReadPermissionsDenied() {
public void shouldThrowWhenInsertIntoWithOnlyReadPermissionsDenied() {
// Given:
givenTopicPermissions(TOPIC_1, Collections.singleton(AclOperation.READ));
givenTopicPermissions(TOPIC_2, Collections.singleton(AclOperation.READ));
Expand All @@ -254,7 +256,7 @@ public void shouldInsertIntoWithOnlyReadPermissionsDenied() {
}

@Test
public void shouldInsertIntoWithOnlyWritePermissionsDenied() {
public void shouldThrowWhenInsertIntoWithOnlyWritePermissionsDenied() {
// Given:
givenTopicPermissions(TOPIC_1, Collections.singleton(AclOperation.WRITE));
givenTopicPermissions(TOPIC_2, Collections.singleton(AclOperation.WRITE));
Expand All @@ -273,7 +275,7 @@ public void shouldInsertIntoWithOnlyWritePermissionsDenied() {
}

@Test
public void shouldCreateAsSelectWithoutReadPermissionsDenied() {
public void shouldThrowWhenCreateAsSelectWithoutReadPermissionsDenied() {
// Given:
givenTopicPermissions(TOPIC_1, Collections.emptySet());
final Statement statement = givenStatement(String.format(
Expand Down Expand Up @@ -307,7 +309,7 @@ public void shouldCreateAsSelectExistingTopicWithWritePermissionsAllowed() {
}

@Test
public void shouldCreateAsSelectExistingStreamWithoutWritePermissionsDenied() {
public void shouldThrowWhenCreateAsSelectExistingStreamWithoutWritePermissionsDenied() {
// Given:
givenTopicPermissions(TOPIC_1, Collections.singleton(AclOperation.READ));
givenTopicPermissions(TOPIC_2, Collections.singleton(AclOperation.READ));
Expand Down Expand Up @@ -343,6 +345,35 @@ public void shouldCreateAsSelectWithTopicAndWritePermissionsAllowed() {
// Above command should not throw any exception
}

@Test
public void shouldPrintTopicWithReadPermissionsAllowed() {
// Given:
givenTopicPermissions(TOPIC_1, Collections.singleton(AclOperation.READ));
final Statement statement = givenStatement(String.format("Print '%s';", TOPIC_NAME_1));

// When:
accessValidator.validate(serviceContext, metaStore, statement);

// Then:
// Above command should not throw any exception
}

@Test
public void shouldThrowWhenThrowPrintTopicWithoutReadPermissionsDenied() {
// Given:
givenTopicPermissions(TOPIC_1, Collections.emptySet());
final Statement statement = givenStatement(String.format("Print '%s';", TOPIC_NAME_1));

// Then:
expectedException.expect(KsqlTopicAuthorizationException.class);
expectedException.expectMessage(String.format(
"Authorization denied to Read on topic(s): [%s]", TOPIC_1.name()
));

// When:
accessValidator.validate(serviceContext, metaStore, statement);
}

@Test
public void shouldThrowExceptionWhenTopicClientFails() {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ private Response handleStatement(
final PreparedStatement<?> statement
) throws Exception {
try {
topicAccessValidator.validate(
serviceContext,
ksqlEngine.getMetaStore(),
statement.getStatement()
);

if (statement.getStatement() instanceof Query) {
return handleQuery(
serviceContext,
Expand Down Expand Up @@ -163,12 +169,6 @@ private Response handleQuery(
final ConfiguredStatement<Query> configured =
ConfiguredStatement.of(statement, streamsProperties, ksqlConfig);

topicAccessValidator.validate(
serviceContext,
ksqlEngine.getMetaStore(),
statement.getStatement()
);

final QueryMetadata query = ksqlEngine.execute(serviceContext, configured)
.getQuery()
.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import io.confluent.ksql.exception.KsqlTopicAuthorizationException;
import io.confluent.ksql.json.JsonMapper;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.parser.tree.PrintTopic;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.planner.PlanSourceExtractorVisitor;
Expand Down Expand Up @@ -126,6 +127,7 @@ public class StreamedQueryResourceTest {
private StreamedQueryResource testResource;

private final static String queryString = "SELECT * FROM test_stream;";
private final static String printString = "Print TEST_TOPIC;";
private final static String topicName = "test_stream";
private PreparedStatement<Statement> statement;

Expand Down Expand Up @@ -446,27 +448,24 @@ public void shouldUpdateTheLastRequestTime() throws Exception {
@Test
public void shouldReturnForbiddenKafkaAccessIfKsqlTopicAuthorizationException() throws Exception {
// Given:
reset(mockStatementParser);
reset(mockStatementParser, topicAccessValidator);

statement = PreparedStatement.of("query", mock(Query.class));
expect(mockStatementParser.parseSingleStatement(queryString))
.andReturn(statement);

replay(mockStatementParser);

reset(topicAccessValidator);
.andReturn(statement);
topicAccessValidator.validate(anyObject(), anyObject(), anyObject());
expectLastCall().andThrow(
new KsqlTopicAuthorizationException(AclOperation.READ, Collections.singleton(topicName)));

replay(topicAccessValidator);
replay(mockStatementParser, topicAccessValidator);

// When:
Response response = testResource.streamQuery(
final Response response = testResource.streamQuery(
serviceContext,
new KsqlRequest(queryString, Collections.emptyMap(), null)
);

Response expected = Errors.accessDeniedFromKafka(
final Response expected = Errors.accessDeniedFromKafka(
new KsqlTopicAuthorizationException(AclOperation.READ, Collections.singleton(topicName)));

assertEquals(response.getStatus(), expected.getStatus());
Expand All @@ -476,35 +475,59 @@ public void shouldReturnForbiddenKafkaAccessIfKsqlTopicAuthorizationException()
@Test
public void shouldReturnForbiddenKafkaAccessIfRootCauseKsqlTopicAuthorizationException() throws Exception {
// Given:
reset(mockStatementParser);
reset(mockStatementParser, topicAccessValidator);

statement = PreparedStatement.of("query", mock(Query.class));
expect(mockStatementParser.parseSingleStatement(queryString))
.andReturn(statement);

replay(mockStatementParser);

reset(topicAccessValidator);
.andReturn(statement);
topicAccessValidator.validate(anyObject(), anyObject(), anyObject());
expectLastCall().andThrow(
new KsqlException(
"",
new KsqlTopicAuthorizationException(AclOperation.READ, Collections.singleton(topicName)
)));

replay(topicAccessValidator);
replay(mockStatementParser, topicAccessValidator);

// When:
Response response = testResource.streamQuery(
serviceContext,
new KsqlRequest(queryString, Collections.emptyMap(), null)
final Response response = testResource.streamQuery(
serviceContext,
new KsqlRequest(queryString, Collections.emptyMap(), null)
);

Response expected = Errors.accessDeniedFromKafka(
final Response expected = Errors.accessDeniedFromKafka(
new KsqlException(
"",
new KsqlTopicAuthorizationException(AclOperation.READ, Collections.singleton(topicName))));

assertEquals(response.getStatus(), expected.getStatus());
assertEquals(response.getEntity(), expected.getEntity());
}

@Test
public void shouldReturnForbiddenKafkaAccessIfPrintTopicKsqlTopicAuthorizationException() throws Exception {
// Given:
reset(mockStatementParser, topicAccessValidator);

statement = PreparedStatement.of("print", mock(PrintTopic.class));
expect(mockStatementParser.parseSingleStatement(printString))
.andReturn(statement);
topicAccessValidator.validate(anyObject(), anyObject(), anyObject());
expectLastCall().andThrow(
new KsqlTopicAuthorizationException(AclOperation.READ, Collections.singleton(topicName)));

replay(mockStatementParser, topicAccessValidator);

// When:
final Response response = testResource.streamQuery(
serviceContext,
new KsqlRequest(printString, Collections.emptyMap(), null)
);

final Response expected = Errors.accessDeniedFromKafka(
new KsqlTopicAuthorizationException(AclOperation.READ, Collections.singleton(topicName)));

assertEquals(response.getStatus(), expected.getStatus());
assertEquals(response.getEntity(), expected.getEntity());
}
}