From 4da9686c63aa34e1018628d19ee366fc38de26ee Mon Sep 17 00:00:00 2001 From: rodesai Date: Thu, 11 Jul 2019 21:52:46 -0700 Subject: [PATCH] fix: check for other sources using a topic before deleting When handling DELETE TOPIC, check that other sources don't use the topic before deleting. This is an optimistic check - the user could still delete topics out from under us, and its not safe racing against concurrent statements. But, we've seen single users kick themselves this way in the past, so this should help in those cases. --- .../ksql/topic/TopicDeleteInjector.java | 24 +++++++ .../ksql/topic/TopicDeleteInjectorTest.java | 68 +++++++++++++++++-- 2 files changed, 88 insertions(+), 4 deletions(-) diff --git a/ksql-engine/src/main/java/io/confluent/ksql/topic/TopicDeleteInjector.java b/ksql-engine/src/main/java/io/confluent/ksql/topic/TopicDeleteInjector.java index 2684325e22e0..8346a889a821 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/topic/TopicDeleteInjector.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/topic/TopicDeleteInjector.java @@ -34,7 +34,10 @@ import io.confluent.ksql.util.ExecutorUtil.RetryBehaviour; import io.confluent.ksql.util.KsqlConstants; import io.confluent.ksql.util.KsqlException; +import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; /** * This {@code Injector} will delete the topic associated with a @@ -75,6 +78,26 @@ public TopicDeleteInjector( Objects.requireNonNull(schemaRegistryClient, "schemaRegistryClient"); } + private void checkTopicRefs(final DataSource source) { + final String topicName = source.getKafkaTopicName(); + final String sourceName = source.getName(); + final Map> sources = metastore.getAllDataSources(); + final List using = sources.values().stream() + .filter(s -> s.getKafkaTopicName().equals(topicName)) + .map(s -> s.getName()) + .filter(name -> !sourceName.equals(name)) + .collect(Collectors.toList()); + if (!using.isEmpty()) { + throw new KsqlException( + String.format( + "Refusing to delete topic. Found other data sources (%s) using topic %s", + String.join(", ", using), + topicName + ) + ); + } + } + @SuppressWarnings("unchecked") @Override public ConfiguredStatement inject( @@ -92,6 +115,7 @@ public ConfiguredStatement inject( final DataSource source = metastore.getSource(sourceName); if (source != null) { + checkTopicRefs(source); try { ExecutorUtil.executeWithRetries( () -> topicClient.deleteTopics(ImmutableList.of(source.getKafkaTopicName())), diff --git a/ksql-engine/src/test/java/io/confluent/ksql/topic/TopicDeleteInjectorTest.java b/ksql-engine/src/test/java/io/confluent/ksql/topic/TopicDeleteInjectorTest.java index 63d0d6213014..e8a71a9c46c9 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/topic/TopicDeleteInjectorTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/topic/TopicDeleteInjectorTest.java @@ -19,6 +19,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -43,6 +44,8 @@ import io.confluent.ksql.util.KsqlConstants; import io.confluent.ksql.util.KsqlException; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import java.util.Optional; import org.junit.Before; import org.junit.Rule; @@ -55,12 +58,14 @@ @RunWith(MockitoJUnitRunner.class) public class TopicDeleteInjectorTest { + private static final String SOURCE_NAME = "SOMETHING"; + private static final String TOPIC_NAME = "something"; private static final ConfiguredStatement DROP_WITH_DELETE_TOPIC = givenStatement( "DROP STREAM SOMETHING DELETE TOPIC", - new DropStream(QualifiedName.of("SOMETHING"), false, true)); + new DropStream(QualifiedName.of(SOURCE_NAME), false, true)); private static final ConfiguredStatement DROP_WITHOUT_DELETE_TOPIC = givenStatement( "DROP STREAM SOMETHING", - new DropStream(QualifiedName.of("SOMETHING"), false, false)); + new DropStream(QualifiedName.of(SOURCE_NAME), false, false)); @Rule public final ExpectedException expectedException = ExpectedException.none(); @@ -80,8 +85,9 @@ public class TopicDeleteInjectorTest { public void setUp() { deleteInjector = new TopicDeleteInjector(metaStore, topicClient, registryClient); - when(metaStore.getSource("SOMETHING")).thenAnswer(inv -> source); - when(source.getKafkaTopicName()).thenReturn("something"); + when(metaStore.getSource(SOURCE_NAME)).thenAnswer(inv -> source); + when(source.getName()).thenReturn(SOURCE_NAME); + when(source.getKafkaTopicName()).thenReturn(TOPIC_NAME); when(source.getValueSerdeFactory()).thenReturn(new KsqlJsonSerdeFactory()); } @@ -162,6 +168,60 @@ public void shouldThrowExceptionIfSourceDoesNotExist() { deleteInjector.inject(dropStatement); } + @Test + public void shouldNotThrowIfNoOtherSourcesUsingTopic() { + // Given: + final ConfiguredStatement dropStatement = givenStatement( + "DROP SOMETHING DELETE TOPIC;", + new DropStream(QualifiedName.of(SOURCE_NAME), + true, + true) + ); + final DataSource other1 = givenSource("OTHER", "other"); + final Map> sources = new HashMap<>(); + sources.put(SOURCE_NAME, source); + sources.put("OTHER", other1); + when(metaStore.getAllDataSources()).thenReturn(sources); + + // When: + deleteInjector.inject(dropStatement); + } + + @Test + @SuppressWarnings("unchecked") + public void shouldThrowExceptionIfOtherSourcesUsingTopic() { + // Given: + final ConfiguredStatement dropStatement = givenStatement( + "DROP SOMETHING DELETE TOPIC;", + new DropStream(QualifiedName.of(SOURCE_NAME), + true, + true) + ); + final DataSource other1 = givenSource("OTHER1", TOPIC_NAME); + final DataSource other2 = givenSource("OTHER2", TOPIC_NAME); + final Map> sources = new HashMap<>(); + sources.put(SOURCE_NAME, source); + sources.put("OTHER1", other1); + sources.put("OTHER2", other2); + when(metaStore.getAllDataSources()).thenReturn(sources); + + // Expect: + expectedException.expect(KsqlException.class); + expectedException.expectMessage( + "Refusing to delete topic. " + + "Found other data sources (OTHER1, OTHER2) using topic something"); + + // When: + deleteInjector.inject(dropStatement); + } + + private DataSource givenSource(final String name, final String topicName) { + final DataSource source = mock(DataSource.class); + when(source.getName()).thenReturn(name); + when(source.getKafkaTopicName()).thenReturn(topicName); + return source; + } + private static ConfiguredStatement givenStatement( final String text, final T statement