Skip to content

Commit

Permalink
fix: check for other sources using a topic before deleting
Browse files Browse the repository at this point in the history
When handling DELETE TOPIC, check that other sources don't use the topic
before deleting. This is an optimistic check - the user could still delete
topics out from under us, and its not safe racing against concurrent statements.
But, we've seen single users kick themselves this way in the past, so this should
help in those cases.
  • Loading branch information
rodesai committed Jul 12, 2019
1 parent 508fab9 commit 4da9686
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@
import io.confluent.ksql.util.ExecutorUtil.RetryBehaviour;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

/**
* This {@code Injector} will delete the topic associated with a
Expand Down Expand Up @@ -75,6 +78,26 @@ public TopicDeleteInjector(
Objects.requireNonNull(schemaRegistryClient, "schemaRegistryClient");
}

private void checkTopicRefs(final DataSource<?> source) {
final String topicName = source.getKafkaTopicName();
final String sourceName = source.getName();
final Map<String, DataSource<?>> sources = metastore.getAllDataSources();
final List<String> using = sources.values().stream()
.filter(s -> s.getKafkaTopicName().equals(topicName))
.map(s -> s.getName())
.filter(name -> !sourceName.equals(name))
.collect(Collectors.toList());
if (!using.isEmpty()) {
throw new KsqlException(
String.format(
"Refusing to delete topic. Found other data sources (%s) using topic %s",
String.join(", ", using),
topicName
)
);
}
}

@SuppressWarnings("unchecked")
@Override
public <T extends Statement> ConfiguredStatement<T> inject(
Expand All @@ -92,6 +115,7 @@ public <T extends Statement> ConfiguredStatement<T> inject(
final DataSource<?> source = metastore.getSource(sourceName);

if (source != null) {
checkTopicRefs(source);
try {
ExecutorUtil.executeWithRetries(
() -> topicClient.deleteTopics(ImmutableList.of(source.getKafkaTopicName())),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
Expand All @@ -43,6 +44,8 @@
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlException;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.junit.Before;
import org.junit.Rule;
Expand All @@ -55,12 +58,14 @@
@RunWith(MockitoJUnitRunner.class)
public class TopicDeleteInjectorTest {

private static final String SOURCE_NAME = "SOMETHING";
private static final String TOPIC_NAME = "something";
private static final ConfiguredStatement<DropStream> DROP_WITH_DELETE_TOPIC = givenStatement(
"DROP STREAM SOMETHING DELETE TOPIC",
new DropStream(QualifiedName.of("SOMETHING"), false, true));
new DropStream(QualifiedName.of(SOURCE_NAME), false, true));
private static final ConfiguredStatement<DropStream> DROP_WITHOUT_DELETE_TOPIC = givenStatement(
"DROP STREAM SOMETHING",
new DropStream(QualifiedName.of("SOMETHING"), false, false));
new DropStream(QualifiedName.of(SOURCE_NAME), false, false));

@Rule
public final ExpectedException expectedException = ExpectedException.none();
Expand All @@ -80,8 +85,9 @@ public class TopicDeleteInjectorTest {
public void setUp() {
deleteInjector = new TopicDeleteInjector(metaStore, topicClient, registryClient);

when(metaStore.getSource("SOMETHING")).thenAnswer(inv -> source);
when(source.getKafkaTopicName()).thenReturn("something");
when(metaStore.getSource(SOURCE_NAME)).thenAnswer(inv -> source);
when(source.getName()).thenReturn(SOURCE_NAME);
when(source.getKafkaTopicName()).thenReturn(TOPIC_NAME);
when(source.getValueSerdeFactory()).thenReturn(new KsqlJsonSerdeFactory());
}

Expand Down Expand Up @@ -162,6 +168,60 @@ public void shouldThrowExceptionIfSourceDoesNotExist() {
deleteInjector.inject(dropStatement);
}

@Test
public void shouldNotThrowIfNoOtherSourcesUsingTopic() {
// Given:
final ConfiguredStatement<DropStream> dropStatement = givenStatement(
"DROP SOMETHING DELETE TOPIC;",
new DropStream(QualifiedName.of(SOURCE_NAME),
true,
true)
);
final DataSource<?> other1 = givenSource("OTHER", "other");
final Map<String, DataSource<?>> sources = new HashMap<>();
sources.put(SOURCE_NAME, source);
sources.put("OTHER", other1);
when(metaStore.getAllDataSources()).thenReturn(sources);

// When:
deleteInjector.inject(dropStatement);
}

@Test
@SuppressWarnings("unchecked")
public void shouldThrowExceptionIfOtherSourcesUsingTopic() {
// Given:
final ConfiguredStatement<DropStream> dropStatement = givenStatement(
"DROP SOMETHING DELETE TOPIC;",
new DropStream(QualifiedName.of(SOURCE_NAME),
true,
true)
);
final DataSource<?> other1 = givenSource("OTHER1", TOPIC_NAME);
final DataSource<?> other2 = givenSource("OTHER2", TOPIC_NAME);
final Map<String, DataSource<?>> sources = new HashMap<>();
sources.put(SOURCE_NAME, source);
sources.put("OTHER1", other1);
sources.put("OTHER2", other2);
when(metaStore.getAllDataSources()).thenReturn(sources);

// Expect:
expectedException.expect(KsqlException.class);
expectedException.expectMessage(
"Refusing to delete topic. "
+ "Found other data sources (OTHER1, OTHER2) using topic something");

// When:
deleteInjector.inject(dropStatement);
}

private DataSource<?> givenSource(final String name, final String topicName) {
final DataSource source = mock(DataSource.class);
when(source.getName()).thenReturn(name);
when(source.getKafkaTopicName()).thenReturn(topicName);
return source;
}

private static <T extends Statement> ConfiguredStatement<T> givenStatement(
final String text,
final T statement
Expand Down

0 comments on commit 4da9686

Please sign in to comment.