Skip to content

Commit

Permalink
feat: improved error message and updated error code for PrintTopics c…
Browse files Browse the repository at this point in the history
…ommand
  • Loading branch information
stevenpyzhang committed Aug 21, 2019
1 parent 4eb9080 commit c6441a4
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.parser.tree.CreateAsSelect;
import io.confluent.ksql.parser.tree.InsertInto;
import io.confluent.ksql.parser.tree.PrintTopic;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.services.ServiceContext;
Expand Down Expand Up @@ -49,6 +50,8 @@ public void validate(
validateInsertInto(serviceContext, metaStore, (InsertInto)statement);
} else if (statement instanceof CreateAsSelect) {
validateCreateAsSelect(serviceContext, metaStore, (CreateAsSelect)statement);
} else if (statement instanceof PrintTopic) {
validatePrintTopic(serviceContext, (PrintTopic)statement);
}
}

Expand Down Expand Up @@ -102,6 +105,13 @@ private void validateInsertInto(
checkAccess(serviceContext, kafkaTopic, AclOperation.WRITE);
}

private void validatePrintTopic(
final ServiceContext serviceContext,
final PrintTopic printTopic
) {
checkAccess(serviceContext, printTopic.getTopic().toString(), AclOperation.READ);
}

private String getSourceTopicName(final MetaStore metaStore, final String streamOrTable) {
final DataSource<?> dataSource = metaStore.getSource(streamOrTable);
if (dataSource == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ public class AuthorizationTopicAccessValidatorTest {
private TopicAccessValidator accessValidator;
private KsqlEngine ksqlEngine;
private MutableMetaStore metaStore;
private final String topicName1 = "topic1";
private final String topicName2 = "topic2";

@Before
public void setUp() {
Expand All @@ -85,10 +87,10 @@ public void setUp() {
accessValidator = new AuthorizationTopicAccessValidator();
when(serviceContext.getTopicClient()).thenReturn(kafkaTopicClient);

givenTopic("topic1", TOPIC_1);
givenTopic(topicName1, TOPIC_1);
givenStreamWithTopic(STREAM_TOPIC_1, TOPIC_1);

givenTopic("topic2", TOPIC_2);
givenTopic(topicName2, TOPIC_2);
givenStreamWithTopic(STREAM_TOPIC_2, TOPIC_2);
}

Expand Down Expand Up @@ -343,6 +345,35 @@ public void shouldCreateAsSelectWithTopicAndWritePermissionsAllowed() {
// Above command should not throw any exception
}

@Test
public void shouldPrintTopicWithReadPermissionsAllowed() {
// Given:
givenTopicPermissions(TOPIC_1, Collections.singleton(AclOperation.READ));
final Statement statement = givenStatement(String.format("Print '%s';", topicName1));

// When:
accessValidator.validate(serviceContext, metaStore, statement);

// Then:
// Above command should not throw any exception
}

@Test
public void shouldPrintTopicWithoutReadPermissionsDenied() {
// Given:
givenTopicPermissions(TOPIC_1, Collections.emptySet());
final Statement statement = givenStatement(String.format("Print '%s';", topicName1));

// Then:
expectedException.expect(KsqlTopicAuthorizationException.class);
expectedException.expectMessage(String.format(
"Authorization denied to Read on topic(s): [%s]", TOPIC_1.name()
));

// When:
accessValidator.validate(serviceContext, metaStore, statement);
}

@Test
public void shouldThrowExceptionWhenTopicClientFails() {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ private Response handleStatement(
final PreparedStatement<?> statement
) throws Exception {
try {
topicAccessValidator.validate(
serviceContext,
ksqlEngine.getMetaStore(),
statement.getStatement()
);

if (statement.getStatement() instanceof Query) {
return handleQuery(
serviceContext,
Expand Down Expand Up @@ -163,12 +169,6 @@ private Response handleQuery(
final ConfiguredStatement<Query> configured =
ConfiguredStatement.of(statement, streamsProperties, ksqlConfig);

topicAccessValidator.validate(
serviceContext,
ksqlEngine.getMetaStore(),
statement.getStatement()
);

final QueryMetadata query = ksqlEngine.execute(serviceContext, configured)
.getQuery()
.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import io.confluent.ksql.exception.KsqlTopicAuthorizationException;
import io.confluent.ksql.json.JsonMapper;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.parser.tree.PrintTopic;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.planner.PlanSourceExtractorVisitor;
Expand Down Expand Up @@ -126,6 +127,7 @@ public class StreamedQueryResourceTest {
private StreamedQueryResource testResource;

private final static String queryString = "SELECT * FROM test_stream;";
private final static String printString = "Print TEST_TOPIC;";
private final static String topicName = "test_stream";
private PreparedStatement<Statement> statement;

Expand Down Expand Up @@ -479,7 +481,7 @@ public void shouldReturnForbiddenKafkaAccessIfRootCauseKsqlTopicAuthorizationExc
reset(mockStatementParser);
statement = PreparedStatement.of("query", mock(Query.class));
expect(mockStatementParser.parseSingleStatement(queryString))
.andReturn(statement);
.andReturn(statement);

replay(mockStatementParser);

Expand All @@ -495,8 +497,8 @@ public void shouldReturnForbiddenKafkaAccessIfRootCauseKsqlTopicAuthorizationExc

// When:
Response response = testResource.streamQuery(
serviceContext,
new KsqlRequest(queryString, Collections.emptyMap(), null)
serviceContext,
new KsqlRequest(queryString, Collections.emptyMap(), null)
);

Response expected = Errors.accessDeniedFromKafka(
Expand All @@ -507,4 +509,34 @@ public void shouldReturnForbiddenKafkaAccessIfRootCauseKsqlTopicAuthorizationExc
assertEquals(response.getStatus(), expected.getStatus());
assertEquals(response.getEntity(), expected.getEntity());
}

@Test
public void shouldReturnForbiddenKafkaAccessIfPrintTopicKsqlTopicAuthorizationException() throws Exception {
// Given:
reset(mockStatementParser);
statement = PreparedStatement.of("print", mock(PrintTopic.class));
expect(mockStatementParser.parseSingleStatement(printString))
.andReturn(statement);

replay(mockStatementParser);

reset(topicAccessValidator);
topicAccessValidator.validate(anyObject(), anyObject(), anyObject());
expectLastCall().andThrow(
new KsqlTopicAuthorizationException(AclOperation.READ, Collections.singleton(topicName)));

replay(topicAccessValidator);

// When:
Response response = testResource.streamQuery(
serviceContext,
new KsqlRequest(printString, Collections.emptyMap(), null)
);

Response expected = Errors.accessDeniedFromKafka(
new KsqlTopicAuthorizationException(AclOperation.READ, Collections.singleton(topicName)));

assertEquals(response.getStatus(), expected.getStatus());
assertEquals(response.getEntity(), expected.getEntity());
}
}

0 comments on commit c6441a4

Please sign in to comment.