From c6441a4b7a59027d250ca942911407e80cde0afc Mon Sep 17 00:00:00 2001 From: Steven Zhang Date: Wed, 21 Aug 2019 13:48:42 -0700 Subject: [PATCH] feat: improved error message and updated error code for PrintTopics command --- .../AuthorizationTopicAccessValidator.java | 10 +++++ ...AuthorizationTopicAccessValidatorTest.java | 35 ++++++++++++++++- .../streaming/StreamedQueryResource.java | 12 +++--- .../resources/StreamedQueryResourceTest.java | 38 +++++++++++++++++-- 4 files changed, 84 insertions(+), 11 deletions(-) diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/AuthorizationTopicAccessValidator.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/AuthorizationTopicAccessValidator.java index 0f7cfffaad23..4a242eea85e8 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/engine/AuthorizationTopicAccessValidator.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/AuthorizationTopicAccessValidator.java @@ -20,6 +20,7 @@ import io.confluent.ksql.metastore.model.DataSource; import io.confluent.ksql.parser.tree.CreateAsSelect; import io.confluent.ksql.parser.tree.InsertInto; +import io.confluent.ksql.parser.tree.PrintTopic; import io.confluent.ksql.parser.tree.Query; import io.confluent.ksql.parser.tree.Statement; import io.confluent.ksql.services.ServiceContext; @@ -49,6 +50,8 @@ public void validate( validateInsertInto(serviceContext, metaStore, (InsertInto)statement); } else if (statement instanceof CreateAsSelect) { validateCreateAsSelect(serviceContext, metaStore, (CreateAsSelect)statement); + } else if (statement instanceof PrintTopic) { + validatePrintTopic(serviceContext, (PrintTopic)statement); } } @@ -102,6 +105,13 @@ private void validateInsertInto( checkAccess(serviceContext, kafkaTopic, AclOperation.WRITE); } + private void validatePrintTopic( + final ServiceContext serviceContext, + final PrintTopic printTopic + ) { + checkAccess(serviceContext, printTopic.getTopic().toString(), AclOperation.READ); + } + private String getSourceTopicName(final MetaStore metaStore, final String streamOrTable) { final DataSource dataSource = metaStore.getSource(streamOrTable); if (dataSource == null) { diff --git a/ksql-engine/src/test/java/io/confluent/ksql/engine/AuthorizationTopicAccessValidatorTest.java b/ksql-engine/src/test/java/io/confluent/ksql/engine/AuthorizationTopicAccessValidatorTest.java index 7be242c4ca7d..b75710a9de4a 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/engine/AuthorizationTopicAccessValidatorTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/engine/AuthorizationTopicAccessValidatorTest.java @@ -76,6 +76,8 @@ public class AuthorizationTopicAccessValidatorTest { private TopicAccessValidator accessValidator; private KsqlEngine ksqlEngine; private MutableMetaStore metaStore; + private final String topicName1 = "topic1"; + private final String topicName2 = "topic2"; @Before public void setUp() { @@ -85,10 +87,10 @@ public void setUp() { accessValidator = new AuthorizationTopicAccessValidator(); when(serviceContext.getTopicClient()).thenReturn(kafkaTopicClient); - givenTopic("topic1", TOPIC_1); + givenTopic(topicName1, TOPIC_1); givenStreamWithTopic(STREAM_TOPIC_1, TOPIC_1); - givenTopic("topic2", TOPIC_2); + givenTopic(topicName2, TOPIC_2); givenStreamWithTopic(STREAM_TOPIC_2, TOPIC_2); } @@ -343,6 +345,35 @@ public void shouldCreateAsSelectWithTopicAndWritePermissionsAllowed() { // Above command should not throw any exception } + @Test + public void shouldPrintTopicWithReadPermissionsAllowed() { + // Given: + givenTopicPermissions(TOPIC_1, Collections.singleton(AclOperation.READ)); + final Statement statement = givenStatement(String.format("Print '%s';", topicName1)); + + // When: + accessValidator.validate(serviceContext, metaStore, statement); + + // Then: + // Above command should not throw any exception + } + + @Test + public void shouldPrintTopicWithoutReadPermissionsDenied() { + // Given: + givenTopicPermissions(TOPIC_1, Collections.emptySet()); + final Statement statement = givenStatement(String.format("Print '%s';", topicName1)); + + // Then: + expectedException.expect(KsqlTopicAuthorizationException.class); + expectedException.expectMessage(String.format( + "Authorization denied to Read on topic(s): [%s]", TOPIC_1.name() + )); + + // When: + accessValidator.validate(serviceContext, metaStore, statement); + } + @Test public void shouldThrowExceptionWhenTopicClientFails() { // Given: diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java index 403b38c9b4ef..c401defdb1aa 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java @@ -129,6 +129,12 @@ private Response handleStatement( final PreparedStatement statement ) throws Exception { try { + topicAccessValidator.validate( + serviceContext, + ksqlEngine.getMetaStore(), + statement.getStatement() + ); + if (statement.getStatement() instanceof Query) { return handleQuery( serviceContext, @@ -163,12 +169,6 @@ private Response handleQuery( final ConfiguredStatement configured = ConfiguredStatement.of(statement, streamsProperties, ksqlConfig); - topicAccessValidator.validate( - serviceContext, - ksqlEngine.getMetaStore(), - statement.getStatement() - ); - final QueryMetadata query = ksqlEngine.execute(serviceContext, configured) .getQuery() .get(); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/StreamedQueryResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/StreamedQueryResourceTest.java index 9d1a1b480804..ac5545ea4642 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/StreamedQueryResourceTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/StreamedQueryResourceTest.java @@ -43,6 +43,7 @@ import io.confluent.ksql.exception.KsqlTopicAuthorizationException; import io.confluent.ksql.json.JsonMapper; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; +import io.confluent.ksql.parser.tree.PrintTopic; import io.confluent.ksql.parser.tree.Query; import io.confluent.ksql.parser.tree.Statement; import io.confluent.ksql.planner.PlanSourceExtractorVisitor; @@ -126,6 +127,7 @@ public class StreamedQueryResourceTest { private StreamedQueryResource testResource; private final static String queryString = "SELECT * FROM test_stream;"; + private final static String printString = "Print TEST_TOPIC;"; private final static String topicName = "test_stream"; private PreparedStatement statement; @@ -479,7 +481,7 @@ public void shouldReturnForbiddenKafkaAccessIfRootCauseKsqlTopicAuthorizationExc reset(mockStatementParser); statement = PreparedStatement.of("query", mock(Query.class)); expect(mockStatementParser.parseSingleStatement(queryString)) - .andReturn(statement); + .andReturn(statement); replay(mockStatementParser); @@ -495,8 +497,8 @@ public void shouldReturnForbiddenKafkaAccessIfRootCauseKsqlTopicAuthorizationExc // When: Response response = testResource.streamQuery( - serviceContext, - new KsqlRequest(queryString, Collections.emptyMap(), null) + serviceContext, + new KsqlRequest(queryString, Collections.emptyMap(), null) ); Response expected = Errors.accessDeniedFromKafka( @@ -507,4 +509,34 @@ public void shouldReturnForbiddenKafkaAccessIfRootCauseKsqlTopicAuthorizationExc assertEquals(response.getStatus(), expected.getStatus()); assertEquals(response.getEntity(), expected.getEntity()); } + + @Test + public void shouldReturnForbiddenKafkaAccessIfPrintTopicKsqlTopicAuthorizationException() throws Exception { + // Given: + reset(mockStatementParser); + statement = PreparedStatement.of("print", mock(PrintTopic.class)); + expect(mockStatementParser.parseSingleStatement(printString)) + .andReturn(statement); + + replay(mockStatementParser); + + reset(topicAccessValidator); + topicAccessValidator.validate(anyObject(), anyObject(), anyObject()); + expectLastCall().andThrow( + new KsqlTopicAuthorizationException(AclOperation.READ, Collections.singleton(topicName))); + + replay(topicAccessValidator); + + // When: + Response response = testResource.streamQuery( + serviceContext, + new KsqlRequest(printString, Collections.emptyMap(), null) + ); + + Response expected = Errors.accessDeniedFromKafka( + new KsqlTopicAuthorizationException(AclOperation.READ, Collections.singleton(topicName))); + + assertEquals(response.getStatus(), expected.getStatus()); + assertEquals(response.getEntity(), expected.getEntity()); + } }