From 0d656012fa67e01c969db4cb6b7334137832e35c Mon Sep 17 00:00:00 2001 From: Victoria Xia Date: Tue, 30 Jun 2020 16:05:06 -0700 Subject: [PATCH 1/4] fix: windowed tables now have cleanup policy compact+delete --- .../ksql/topic/TopicCreateInjector.java | 27 ++++++++++++------- .../ksql/topic/TopicCreateInjectorTest.java | 9 ++++--- 2 files changed, 22 insertions(+), 14 deletions(-) diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/topic/TopicCreateInjector.java b/ksqldb-engine/src/main/java/io/confluent/ksql/topic/TopicCreateInjector.java index b6f897be5575..c1a1504c825e 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/topic/TopicCreateInjector.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/topic/TopicCreateInjector.java @@ -23,8 +23,8 @@ import io.confluent.ksql.parser.properties.with.CreateSourceProperties; import io.confluent.ksql.parser.tree.CreateAsSelect; import io.confluent.ksql.parser.tree.CreateSource; +import io.confluent.ksql.parser.tree.CreateStreamAsSelect; import io.confluent.ksql.parser.tree.CreateTable; -import io.confluent.ksql.parser.tree.CreateTableAsSelect; import io.confluent.ksql.parser.tree.Statement; import io.confluent.ksql.properties.with.CommonCreateConfigs; import io.confluent.ksql.services.KafkaTopicClient; @@ -34,7 +34,6 @@ import io.confluent.ksql.topic.TopicProperties.Builder; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; -import java.util.Collections; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -126,7 +125,10 @@ private ConfiguredStatement injectForCreateSource( properties.getPartitions(), properties.getReplicas()); - createTopic(topicPropertiesBuilder, createSource instanceof CreateTable); + final String topicCleanUpPolicy = createSource instanceof CreateTable + ? TopicConfig.CLEANUP_POLICY_COMPACT : TopicConfig.CLEANUP_POLICY_DELETE; + + createTopic(topicPropertiesBuilder, topicCleanUpPolicy); return statement; } @@ -157,10 +159,16 @@ private ConfiguredStatement injectForCreateAsSelec properties.getPartitions(), properties.getReplicas()); - final boolean shouldCompactTopic = createAsSelect instanceof CreateTableAsSelect - && !createAsSelect.getQuery().getWindow().isPresent(); + final String topicCleanUpPolicy; + if (createAsSelect instanceof CreateStreamAsSelect) { + topicCleanUpPolicy = TopicConfig.CLEANUP_POLICY_DELETE; + } else { + topicCleanUpPolicy = createAsSelect.getQuery().getWindow().isPresent() + ? TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE + : TopicConfig.CLEANUP_POLICY_COMPACT; + } - final TopicProperties info = createTopic(topicPropertiesBuilder, shouldCompactTopic); + final TopicProperties info = createTopic(topicPropertiesBuilder, topicCleanUpPolicy); final T withTopic = (T) createAsSelect.copyWith(properties.withTopic( info.getTopicName(), @@ -175,13 +183,12 @@ private ConfiguredStatement injectForCreateAsSelec private TopicProperties createTopic( final Builder topicPropertiesBuilder, - final boolean shouldCompactTopic + final String topicCleanUpPolicy ) { final TopicProperties info = topicPropertiesBuilder.build(); - final Map config = shouldCompactTopic - ? ImmutableMap.of(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT) - : Collections.emptyMap(); + final Map config = + ImmutableMap.of(TopicConfig.CLEANUP_POLICY_CONFIG, topicCleanUpPolicy); topicClient.createTopic(info.getTopicName(), info.getPartitions(), info.getReplicas(), config); diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/topic/TopicCreateInjectorTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/topic/TopicCreateInjectorTest.java index 3dc720b470eb..32c78ccb5fa2 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/topic/TopicCreateInjectorTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/topic/TopicCreateInjectorTest.java @@ -345,7 +345,7 @@ public void shouldCreateMissingTopic() { "expectedName", 10, (short) 10, - ImmutableMap.of()); + ImmutableMap.of(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE)); } @Test @@ -362,7 +362,7 @@ public void shouldCreateMissingTopicForCreate() { "expectedName", 10, (short) 10, - ImmutableMap.of()); + ImmutableMap.of(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE)); } @Test @@ -401,7 +401,7 @@ public void shouldCreateMissingTopicWithCompactCleanupPolicyForCreateTable() { } @Test - public void shouldCreateMissingTopicWithDefaultCleanupPolicyForWindowedTables() { + public void shouldCreateMissingTopicWithCompactAndDeleteCleanupPolicyForWindowedTables() { // Given: givenStatement("CREATE TABLE x WITH (kafka_topic='topic') " + "AS SELECT * FROM SOURCE WINDOW TUMBLING (SIZE 10 SECONDS);"); @@ -415,7 +415,8 @@ public void shouldCreateMissingTopicWithDefaultCleanupPolicyForWindowedTables() "expectedName", 10, (short) 10, - ImmutableMap.of()); + ImmutableMap.of(TopicConfig.CLEANUP_POLICY_CONFIG, + TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE)); } @Test From 5770306c711d3ea3cbef86c9aec169aaff9472e8 Mon Sep 17 00:00:00 2001 From: Victoria Xia Date: Mon, 6 Jul 2020 10:27:12 -0700 Subject: [PATCH 2/4] fix: kafka topic client now properly handles compact delete --- .../ksql/services/KafkaTopicClientImpl.java | 22 +++---- .../ksql/integration/WindowingIntTest.java | 6 +- .../services/KafkaTopicClientImplTest.java | 57 ++++++++++++++++--- 3 files changed, 64 insertions(+), 21 deletions(-) diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java b/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java index 72be3d1fbdf5..e41e774ddb2c 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java @@ -240,17 +240,17 @@ public boolean addTopicConfig(final String topicName, final Map overr @Override public TopicCleanupPolicy getTopicCleanupPolicy(final String topicName) { final String policy = getTopicConfig(topicName) - .getOrDefault(TopicConfig.CLEANUP_POLICY_CONFIG, ""); - - switch (policy) { - case "compact": - return TopicCleanupPolicy.COMPACT; - case "delete": - return TopicCleanupPolicy.DELETE; - case "compact+delete": - return TopicCleanupPolicy.COMPACT_DELETE; - default: - throw new KsqlException("Could not get the topic configs for : " + topicName); + .getOrDefault(TopicConfig.CLEANUP_POLICY_CONFIG, "") + .toLowerCase(); + + if (policy.equals("compact")) { + return TopicCleanupPolicy.COMPACT; + } else if (policy.equals("delete")) { + return TopicCleanupPolicy.DELETE; + } else if (policy.contains("compact") && policy.contains("delete")) { + return TopicCleanupPolicy.COMPACT_DELETE; + } else { + throw new KsqlException("Could not get the topic configs for : " + topicName); } } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/integration/WindowingIntTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/integration/WindowingIntTest.java index 10bca064c7b1..8dcd9416aabd 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/integration/WindowingIntTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/integration/WindowingIntTest.java @@ -156,7 +156,7 @@ public void shouldAggregateTumblingWindow() { // Then: assertOutputOf(resultStream0, expected, is(expected)); - assertTopicsCleanedUp(TopicCleanupPolicy.DELETE, 3, resultStream0); + assertTopicsCleanedUp(TopicCleanupPolicy.COMPACT_DELETE, 3, resultStream0); } @Test @@ -179,7 +179,7 @@ public void shouldAggregateHoppingWindow() { // Then: assertOutputOf(resultStream0, expected, is(expected)); - assertTopicsCleanedUp(TopicCleanupPolicy.DELETE, 3, resultStream0); + assertTopicsCleanedUp(TopicCleanupPolicy.COMPACT_DELETE, 3, resultStream0); } @Test @@ -210,7 +210,7 @@ public void shouldAggregateSessionWindow() { // Then: assertOutputOf(resultStream0, expected, mapHasItems(expected)); - assertTopicsCleanedUp(TopicCleanupPolicy.DELETE, 2, resultStream0); + assertTopicsCleanedUp(TopicCleanupPolicy.COMPACT_DELETE, 2, resultStream0); } private void givenTable(final String sql) { diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplTest.java index 69e6ea1026c1..a3162722a37a 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplTest.java @@ -16,6 +16,8 @@ package io.confluent.ksql.services; import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_COMPACT; +import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_CONFIG; +import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_DELETE; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.is; @@ -39,6 +41,7 @@ import io.confluent.ksql.exception.KafkaResponseGetFailedException; import io.confluent.ksql.exception.KafkaTopicExistsException; import io.confluent.ksql.exception.KsqlTopicAuthorizationException; +import io.confluent.ksql.services.KafkaTopicClient.TopicCleanupPolicy; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -459,6 +462,46 @@ public void shouldGetTopicConfig() { assertThat(config.get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG), is("1")); } + @Test + public void shouldGetTopicCleanUpPolicyDelete() { + // Given: + givenTopicConfigs( + "foo", + overriddenConfigEntry(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_DELETE) + ); + + // When / Then: + assertThat(kafkaTopicClient.getTopicCleanupPolicy("foo"), + is(TopicCleanupPolicy.DELETE)); + } + + @Test + public void shouldGetTopicCleanUpPolicyCompact() { + // Given: + givenTopicConfigs( + "foo", + overriddenConfigEntry(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT) + ); + + // When / Then: + assertThat(kafkaTopicClient.getTopicCleanupPolicy("foo"), + is(TopicCleanupPolicy.COMPACT)); + } + + @Test + public void shouldGetTopicCleanUpPolicyCompactAndDelete() { + // Given: + givenTopicConfigs( + "foo", + overriddenConfigEntry(CLEANUP_POLICY_CONFIG, + CLEANUP_POLICY_COMPACT + "," + CLEANUP_POLICY_DELETE) + ); + + // When / Then: + assertThat(kafkaTopicClient.getTopicCleanupPolicy("foo"), + is(TopicCleanupPolicy.COMPACT_DELETE)); + } + @Test public void shouldThrowOnNoneRetryableGetTopicConfigError() { // Given: @@ -517,7 +560,7 @@ public void shouldSetStringTopicConfig() { ); final Map configOverrides = ImmutableMap.of( - TopicConfig.CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT + CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT ); // When: @@ -528,7 +571,7 @@ public void shouldSetStringTopicConfig() { verify(adminClient).incrementalAlterConfigs(ImmutableMap.of( topicResource("peter"), ImmutableSet.of( - setConfig(TopicConfig.CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT) + setConfig(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT) ) )); } @@ -570,7 +613,7 @@ public void shouldFallBackToAddTopicConfigForOlderBrokers() { ); final Map overrides = ImmutableMap.of( - TopicConfig.CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT + CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT ); when(adminClient.incrementalAlterConfigs(any())) @@ -583,7 +626,7 @@ public void shouldFallBackToAddTopicConfigForOlderBrokers() { verify(adminClient).alterConfigs(ImmutableMap.of( topicResource("peter"), new Config(ImmutableSet.of( - new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT), + new ConfigEntry(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT), new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "1234") )) )); @@ -594,12 +637,12 @@ public void shouldNotAlterStringConfigIfMatchingConfigOverrideExists() { // Given: givenTopicConfigs( "peter", - overriddenConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT), + overriddenConfigEntry(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT), defaultConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "snappy") ); final Map overrides = ImmutableMap.of( - TopicConfig.CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT + CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT ); // When: @@ -641,7 +684,7 @@ public void shouldRetryAddingTopicConfig() { ); final Map overrides = ImmutableMap.of( - TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT + CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT ); when(adminClient.incrementalAlterConfigs(any())) From 7ca1560a75dcfcb233bc3519d87f3d824c9c2c2a Mon Sep 17 00:00:00 2001 From: Victoria Xia Date: Tue, 7 Jul 2020 16:18:20 -0700 Subject: [PATCH 3/4] chore: use default instead of specifying delete --- .../ksql/services/KafkaTopicClientImpl.java | 3 +-- .../ksql/topic/TopicCreateInjector.java | 19 +++++++++++-------- .../ksql/topic/TopicCreateInjectorTest.java | 5 +++-- 3 files changed, 15 insertions(+), 12 deletions(-) diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java b/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java index e41e774ddb2c..e60fcd35d818 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java @@ -240,8 +240,7 @@ public boolean addTopicConfig(final String topicName, final Map overr @Override public TopicCleanupPolicy getTopicCleanupPolicy(final String topicName) { final String policy = getTopicConfig(topicName) - .getOrDefault(TopicConfig.CLEANUP_POLICY_CONFIG, "") - .toLowerCase(); + .getOrDefault(TopicConfig.CLEANUP_POLICY_CONFIG, ""); if (policy.equals("compact")) { return TopicCleanupPolicy.COMPACT; diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/topic/TopicCreateInjector.java b/ksqldb-engine/src/main/java/io/confluent/ksql/topic/TopicCreateInjector.java index c1a1504c825e..7907e4434f12 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/topic/TopicCreateInjector.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/topic/TopicCreateInjector.java @@ -34,6 +34,7 @@ import io.confluent.ksql.topic.TopicProperties.Builder; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; +import java.util.Collections; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -125,8 +126,8 @@ private ConfiguredStatement injectForCreateSource( properties.getPartitions(), properties.getReplicas()); - final String topicCleanUpPolicy = createSource instanceof CreateTable - ? TopicConfig.CLEANUP_POLICY_COMPACT : TopicConfig.CLEANUP_POLICY_DELETE; + final Optional topicCleanUpPolicy = createSource instanceof CreateTable + ? Optional.of(TopicConfig.CLEANUP_POLICY_COMPACT) : Optional.empty(); createTopic(topicPropertiesBuilder, topicCleanUpPolicy); @@ -159,13 +160,14 @@ private ConfiguredStatement injectForCreateAsSelec properties.getPartitions(), properties.getReplicas()); - final String topicCleanUpPolicy; + final Optional topicCleanUpPolicy; if (createAsSelect instanceof CreateStreamAsSelect) { - topicCleanUpPolicy = TopicConfig.CLEANUP_POLICY_DELETE; + topicCleanUpPolicy = Optional.empty(); } else { - topicCleanUpPolicy = createAsSelect.getQuery().getWindow().isPresent() + final String policy = createAsSelect.getQuery().getWindow().isPresent() ? TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE : TopicConfig.CLEANUP_POLICY_COMPACT; + topicCleanUpPolicy = Optional.of(policy); } final TopicProperties info = createTopic(topicPropertiesBuilder, topicCleanUpPolicy); @@ -183,12 +185,13 @@ private ConfiguredStatement injectForCreateAsSelec private TopicProperties createTopic( final Builder topicPropertiesBuilder, - final String topicCleanUpPolicy + final Optional topicCleanUpPolicy ) { final TopicProperties info = topicPropertiesBuilder.build(); - final Map config = - ImmutableMap.of(TopicConfig.CLEANUP_POLICY_CONFIG, topicCleanUpPolicy); + final Map config = topicCleanUpPolicy.isPresent() + ? ImmutableMap.of(TopicConfig.CLEANUP_POLICY_CONFIG, topicCleanUpPolicy.get()) + : Collections.emptyMap(); topicClient.createTopic(info.getTopicName(), info.getPartitions(), info.getReplicas(), config); diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/topic/TopicCreateInjectorTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/topic/TopicCreateInjectorTest.java index 32c78ccb5fa2..c434c0d1b98d 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/topic/TopicCreateInjectorTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/topic/TopicCreateInjectorTest.java @@ -54,6 +54,7 @@ import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -345,7 +346,7 @@ public void shouldCreateMissingTopic() { "expectedName", 10, (short) 10, - ImmutableMap.of(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE)); + Collections.emptyMap()); } @Test @@ -362,7 +363,7 @@ public void shouldCreateMissingTopicForCreate() { "expectedName", 10, (short) 10, - ImmutableMap.of(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE)); + Collections.emptyMap()); } @Test From df833719743d21de22e18536580b23b07aa2bfbc Mon Sep 17 00:00:00 2001 From: Victoria Xia Date: Wed, 8 Jul 2020 16:03:11 -0700 Subject: [PATCH 4/4] Revert "chore: use default instead of specifying delete" This reverts commit 7ca1560a75dcfcb233bc3519d87f3d824c9c2c2a. --- .../ksql/services/KafkaTopicClientImpl.java | 3 ++- .../ksql/topic/TopicCreateInjector.java | 19 ++++++++----------- .../ksql/topic/TopicCreateInjectorTest.java | 5 ++--- 3 files changed, 12 insertions(+), 15 deletions(-) diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java b/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java index e60fcd35d818..e41e774ddb2c 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java @@ -240,7 +240,8 @@ public boolean addTopicConfig(final String topicName, final Map overr @Override public TopicCleanupPolicy getTopicCleanupPolicy(final String topicName) { final String policy = getTopicConfig(topicName) - .getOrDefault(TopicConfig.CLEANUP_POLICY_CONFIG, ""); + .getOrDefault(TopicConfig.CLEANUP_POLICY_CONFIG, "") + .toLowerCase(); if (policy.equals("compact")) { return TopicCleanupPolicy.COMPACT; diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/topic/TopicCreateInjector.java b/ksqldb-engine/src/main/java/io/confluent/ksql/topic/TopicCreateInjector.java index 7907e4434f12..c1a1504c825e 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/topic/TopicCreateInjector.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/topic/TopicCreateInjector.java @@ -34,7 +34,6 @@ import io.confluent.ksql.topic.TopicProperties.Builder; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; -import java.util.Collections; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -126,8 +125,8 @@ private ConfiguredStatement injectForCreateSource( properties.getPartitions(), properties.getReplicas()); - final Optional topicCleanUpPolicy = createSource instanceof CreateTable - ? Optional.of(TopicConfig.CLEANUP_POLICY_COMPACT) : Optional.empty(); + final String topicCleanUpPolicy = createSource instanceof CreateTable + ? TopicConfig.CLEANUP_POLICY_COMPACT : TopicConfig.CLEANUP_POLICY_DELETE; createTopic(topicPropertiesBuilder, topicCleanUpPolicy); @@ -160,14 +159,13 @@ private ConfiguredStatement injectForCreateAsSelec properties.getPartitions(), properties.getReplicas()); - final Optional topicCleanUpPolicy; + final String topicCleanUpPolicy; if (createAsSelect instanceof CreateStreamAsSelect) { - topicCleanUpPolicy = Optional.empty(); + topicCleanUpPolicy = TopicConfig.CLEANUP_POLICY_DELETE; } else { - final String policy = createAsSelect.getQuery().getWindow().isPresent() + topicCleanUpPolicy = createAsSelect.getQuery().getWindow().isPresent() ? TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE : TopicConfig.CLEANUP_POLICY_COMPACT; - topicCleanUpPolicy = Optional.of(policy); } final TopicProperties info = createTopic(topicPropertiesBuilder, topicCleanUpPolicy); @@ -185,13 +183,12 @@ private ConfiguredStatement injectForCreateAsSelec private TopicProperties createTopic( final Builder topicPropertiesBuilder, - final Optional topicCleanUpPolicy + final String topicCleanUpPolicy ) { final TopicProperties info = topicPropertiesBuilder.build(); - final Map config = topicCleanUpPolicy.isPresent() - ? ImmutableMap.of(TopicConfig.CLEANUP_POLICY_CONFIG, topicCleanUpPolicy.get()) - : Collections.emptyMap(); + final Map config = + ImmutableMap.of(TopicConfig.CLEANUP_POLICY_CONFIG, topicCleanUpPolicy); topicClient.createTopic(info.getTopicName(), info.getPartitions(), info.getReplicas(), config); diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/topic/TopicCreateInjectorTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/topic/TopicCreateInjectorTest.java index c434c0d1b98d..32c78ccb5fa2 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/topic/TopicCreateInjectorTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/topic/TopicCreateInjectorTest.java @@ -54,7 +54,6 @@ import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; -import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -346,7 +345,7 @@ public void shouldCreateMissingTopic() { "expectedName", 10, (short) 10, - Collections.emptyMap()); + ImmutableMap.of(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE)); } @Test @@ -363,7 +362,7 @@ public void shouldCreateMissingTopicForCreate() { "expectedName", 10, (short) 10, - Collections.emptyMap()); + ImmutableMap.of(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE)); } @Test