From 0ea157b3ca590bc5b7fb2826430aee6772bb1e23 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Pe=C3=B1a?= Date: Thu, 22 Aug 2019 13:51:15 -0500 Subject: [PATCH] feat: validate createTopic permissions on SandboxedKafkaTopicClient (#3250) --- .../ksql/services/KafkaTopicClient.java | 56 ++++++++++++++++++- .../ksql/services/KafkaTopicClientImpl.java | 15 ++++- .../services/SandboxedKafkaTopicClient.java | 3 + .../ksql/services/FakeKafkaTopicClient.java | 5 +- .../services/KafkaTopicClientImplTest.java | 44 +++++++++++++-- .../SandboxedKafkaTopicClientTest.java | 34 +++++++++++ .../server/mock/MockKafkaTopicClient.java | 5 +- 7 files changed, 151 insertions(+), 11 deletions(-) diff --git a/ksql-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClient.java b/ksql-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClient.java index 953720748866..51413a81c257 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClient.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClient.java @@ -22,6 +22,8 @@ import java.util.Map; import java.util.Optional; import java.util.Set; + +import org.apache.kafka.clients.admin.CreateTopicsOptions; import org.apache.kafka.clients.admin.TopicDescription; /** @@ -35,6 +37,27 @@ enum TopicCleanupPolicy { COMPACT_DELETE } + default void validateCreateTopic( + final String topic, + final int numPartitions, + final short replicationFactor) { + validateCreateTopic(topic, numPartitions, replicationFactor, Collections.emptyMap()); + } + + default void validateCreateTopic( + String topic, + int numPartitions, + short replicationFactor, + Map configs) { + createTopic( + topic, + numPartitions, + replicationFactor, + configs, + new CreateTopicsOptions().validateOnly(true) + ); + } + /** * Create a new topic with the specified name, numPartitions and replicationFactor. * @@ -67,11 +90,42 @@ default void createTopic( * @param numPartitions the partition count of the topic. * @param configs any additional topic configs to use */ - void createTopic( + default void createTopic( String topic, int numPartitions, short replicationFactor, Map configs + ) { + createTopic( + topic, + numPartitions, + replicationFactor, + configs, + new CreateTopicsOptions() + ); + } + + /** + * Create a new topic with the specified name, numPartitions and replicationFactor. + * + *

If the topic already exists the method checks that partition count matchesmatches + * {@code numPartitions} and that the replication factor is at least + * {@code replicationFactor} + * + * @param topic name of the topic to create + * @param replicationFactor the replication factor for the new topic, or + * {@link io.confluent.ksql.topic.TopicProperties#DEFAULT_REPLICAS} + * to use the default replication of the cluster + * @param numPartitions the partition count of the topic. + * @param configs any additional topic configs to use + * @param createOptions the options to use when creating the new topic + */ + void createTopic( + String topic, + int numPartitions, + short replicationFactor, + Map configs, + CreateTopicsOptions createOptions ); /** diff --git a/ksql-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java b/ksql-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java index b62d3b80264c..61d4f2dbea17 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java @@ -41,6 +41,7 @@ import org.apache.kafka.clients.admin.AlterConfigOp; import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.CreateTopicsOptions; import org.apache.kafka.clients.admin.DeleteTopicsResult; import org.apache.kafka.clients.admin.DescribeTopicsOptions; import org.apache.kafka.clients.admin.NewTopic; @@ -86,7 +87,8 @@ public void createTopic( final String topic, final int numPartitions, final short replicationFactor, - final Map configs + final Map configs, + final CreateTopicsOptions createOptions ) { if (isTopicExists(topic)) { validateTopicProperties(topic, numPartitions, replicationFactor); @@ -101,9 +103,16 @@ public void createTopic( newTopic.configs(toStringConfigs(configs)); try { - LOG.info("Creating topic '{}'", topic); + LOG.info(String.format("Creating topic '{}' %s", + topic, + (createOptions.shouldValidateOnly()) ? "(ONLY VALIDATE)" : "" + )); + ExecutorUtil.executeWithRetries( - () -> adminClient.createTopics(Collections.singleton(newTopic)).all().get(), + () -> adminClient.createTopics( + Collections.singleton(newTopic), + createOptions + ).all().get(), ExecutorUtil.RetryBehaviour.ON_RETRYABLE); } catch (final InterruptedException e) { Thread.currentThread().interrupt(); diff --git a/ksql-engine/src/main/java/io/confluent/ksql/services/SandboxedKafkaTopicClient.java b/ksql-engine/src/main/java/io/confluent/ksql/services/SandboxedKafkaTopicClient.java index 4e780900980d..ebed7ee30ff9 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/services/SandboxedKafkaTopicClient.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/services/SandboxedKafkaTopicClient.java @@ -99,6 +99,9 @@ private void createTopic( Collections.emptyList())) .collect(Collectors.toList()); + // This is useful to validate permissions to create the topic + delegate.validateCreateTopic(topic, numPartitions, replicationFactor, configs); + createdTopics.put(topic, new SandboxedTopicDescription( topic, false, diff --git a/ksql-engine/src/test/java/io/confluent/ksql/services/FakeKafkaTopicClient.java b/ksql-engine/src/test/java/io/confluent/ksql/services/FakeKafkaTopicClient.java index 43ba70729fc3..6eacea6058da 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/services/FakeKafkaTopicClient.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/services/FakeKafkaTopicClient.java @@ -31,6 +31,8 @@ import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; + +import org.apache.kafka.clients.admin.CreateTopicsOptions; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartitionInfo; @@ -122,7 +124,8 @@ public void createTopic( final String topic, final int numPartitions, final short replicationFactor, - final Map configs + final Map configs, + final CreateTopicsOptions createOptions ) { final short replicas = replicationFactor == TopicProperties.DEFAULT_REPLICAS ? 1 diff --git a/ksql-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplTest.java b/ksql-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplTest.java index f7b3e93d6271..cdc438d3f583 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplTest.java @@ -16,6 +16,7 @@ package io.confluent.ksql.services; import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.mock; @@ -53,6 +54,7 @@ import org.apache.kafka.clients.admin.AlterConfigsResult; import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.CreateTopicsOptions; import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.DeleteTopicsResult; import org.apache.kafka.clients.admin.DescribeClusterResult; @@ -118,7 +120,8 @@ public void init() { @Test public void shouldCreateTopic() { expect(adminClient.listTopics()).andReturn(getListTopicsResult()); - expect(adminClient.createTopics(anyObject())).andReturn(getCreateTopicsResult()); + expect(adminClient.createTopics(anyObject(), shouldValidateCreate(false))) + .andReturn(getCreateTopicsResult()); replay(adminClient); final KafkaTopicClient kafkaTopicClient = new KafkaTopicClientImpl(adminClient); @@ -126,6 +129,18 @@ public void shouldCreateTopic() { verify(adminClient); } + @Test + public void shouldValidateCreateTopic() { + expect(adminClient.listTopics()).andReturn(getListTopicsResult()); + expect(adminClient.createTopics(anyObject(), shouldValidateCreate(true))) + .andReturn(getCreateTopicsResult()); + replay(adminClient); + + final KafkaTopicClient kafkaTopicClient = new KafkaTopicClientImpl(adminClient); + kafkaTopicClient.validateCreateTopic("test", 1, (short) 1); + verify(adminClient); + } + @Test public void shouldUseExistingTopicWithTheSameSpecsInsteadOfCreate() { expect(adminClient.listTopics()).andReturn(getListTopicsResult()); @@ -160,7 +175,8 @@ public void shouldFailCreateExistingTopic() { public void shouldFailCreateTopicWhenNoAclsSet() { // Given: expect(adminClient.listTopics()).andReturn(getEmptyListTopicResult()); - expect(adminClient.createTopics(anyObject())).andReturn(createTopicAuthorizationException()); + expect(adminClient.createTopics(anyObject(), anyObject())) + .andReturn(createTopicAuthorizationException()); replay(adminClient); @@ -189,7 +205,7 @@ public void shouldNotFailIfTopicAlreadyExistsButCreateUsesDefaultReplicas() { @Test public void shouldNotFailIfTopicAlreadyExistsWhenCreating() { expect(adminClient.listTopics()).andReturn(getEmptyListTopicResult()); - expect(adminClient.createTopics(anyObject())) + expect(adminClient.createTopics(anyObject(), anyObject())) .andReturn(createTopicReturningTopicExistsException()); expect(adminClient.describeTopics(anyObject(), anyObject())) .andReturn(getDescribeTopicsResult()); @@ -202,7 +218,7 @@ public void shouldNotFailIfTopicAlreadyExistsWhenCreating() { @Test public void shouldRetryDescribeTopicOnRetriableException() { expect(adminClient.listTopics()).andReturn(getEmptyListTopicResult()); - expect(adminClient.createTopics(anyObject())) + expect(adminClient.createTopics(anyObject(), anyObject())) .andReturn(createTopicReturningTopicExistsException()); expect(adminClient.describeTopics(anyObject(), anyObject())) .andReturn(describeTopicReturningUnknownPartitionException()).once(); @@ -433,7 +449,8 @@ public void shouldSetTopicCleanupPolicyToCompact() { // Verify that the new topic configuration being passed to the admin client is what we expect. final NewTopic newTopic = new NewTopic(topicName1, 1, (short) 1); newTopic.configs(Collections.singletonMap("cleanup.policy", "compact")); - expect(adminClient.createTopics(singleNewTopic(newTopic))).andReturn(getCreateTopicsResult()); + expect(adminClient.createTopics(singleNewTopic(newTopic), anyObject())) + .andReturn(getCreateTopicsResult()); replay(adminClient); final KafkaTopicClient kafkaTopicClient = new KafkaTopicClientImpl(adminClient); @@ -828,4 +845,21 @@ public void appendTo(final StringBuffer buffer) { EasyMock.reportMatcher(new NewTopicsMatcher()); return null; } + + private CreateTopicsOptions shouldValidateCreate(final boolean validateOnly) { + EasyMock.reportMatcher(new IArgumentMatcher() { + @Override + public boolean matches(Object argument) { + return argument instanceof CreateTopicsOptions + && ((CreateTopicsOptions) argument).shouldValidateOnly() == validateOnly; + } + + @Override + public void appendTo(StringBuffer buffer) { + buffer.append("validateOnly(\"" + validateOnly + "\")"); + } + }); + + return null; + } } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/services/SandboxedKafkaTopicClientTest.java b/ksql-engine/src/test/java/io/confluent/ksql/services/SandboxedKafkaTopicClientTest.java index d2732bebab50..69a2e78492d4 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/services/SandboxedKafkaTopicClientTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/services/SandboxedKafkaTopicClientTest.java @@ -21,8 +21,10 @@ import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.is; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; @@ -45,6 +47,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.errors.TopicAuthorizationException; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -186,6 +189,37 @@ public void shouldTrackCreatedTopicsDetails() { Sets.newHashSet(AclOperation.READ, AclOperation.WRITE)))); } + @Test + public void shouldThrowOnCreateIfValidateCreateTopicFails() { + // Given: + doThrow(TopicAuthorizationException.class).when(delegate) + .validateCreateTopic("some topic", 2, (short) 3, configs); + + // Expect: + expectedException.expect(TopicAuthorizationException.class); + + // When: + sandboxedClient.createTopic("some topic", 2, (short) 3, configs); + } + + @Test + public void shouldNotCreateTopicIfValidateCreateTopicFails() { + // Given: + doThrow(TopicAuthorizationException.class).when(delegate) + .validateCreateTopic("some topic", 2, (short) 3, configs); + + // When: + try { + sandboxedClient.createTopic("some topic", 2, (short) 3, configs); + } catch (final TopicAuthorizationException e) { + // skip + } + + // Then: + verify(delegate, times(0)) + .createTopic("some topic", 2, (short) 3, configs); + } + @Test public void shouldThrowOnCreateIfTopicPreviouslyCreatedInScopeWithDifferentPartitionCount() { // Given: diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/mock/MockKafkaTopicClient.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/mock/MockKafkaTopicClient.java index a4d3cccc30c6..8c9af7341197 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/mock/MockKafkaTopicClient.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/mock/MockKafkaTopicClient.java @@ -20,6 +20,8 @@ import java.util.Collections; import java.util.Map; import java.util.Set; + +import org.apache.kafka.clients.admin.CreateTopicsOptions; import org.apache.kafka.clients.admin.TopicDescription; /** @@ -32,7 +34,8 @@ public class MockKafkaTopicClient implements KafkaTopicClient { public void createTopic(final String topic, final int numPartitions, final short replicationFactor, - final Map configs) { + final Map configs, + final CreateTopicsOptions createOptions) { } @Override