Skip to content

Commit

Permalink
fix: Removes orphaned topics from transient queries (#6714)
Browse files Browse the repository at this point in the history
* fix: Removes orphaned topics from transient queries
  • Loading branch information
AlanConfluent authored Dec 10, 2020
1 parent aa7391d commit 06d6e3e
Show file tree
Hide file tree
Showing 20 changed files with 1,017 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class KsqlEngine implements KsqlExecutionContext, Closeable {
private final String serviceId;
private final EngineContext primaryContext;
private final QueryCleanupService cleanupService;
private final OrphanedTransientQueryCleaner orphanedTransientQueryCleaner;

public KsqlEngine(
final ServiceContext serviceContext,
Expand Down Expand Up @@ -101,6 +102,7 @@ public KsqlEngine(
final QueryIdGenerator queryIdGenerator
) {
this.cleanupService = new QueryCleanupService();
this.orphanedTransientQueryCleaner = new OrphanedTransientQueryCleaner(this.cleanupService);
this.primaryContext = EngineContext.create(
serviceContext,
processingLogContext,
Expand Down Expand Up @@ -312,6 +314,14 @@ public void close() {
close(false);
}

public void cleanupOrphanedInternalTopics(
final ServiceContext serviceContext,
final Set<String> queryApplicationIds
) {
orphanedTransientQueryCleaner
.cleanupOrphanedInternalTopics(serviceContext, queryApplicationIds);
}

/**
* Determines if a statement is executable by the engine.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.engine;

import static java.util.Objects.requireNonNull;

import io.confluent.ksql.exception.KafkaResponseGetFailedException;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.services.ServiceContext;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OrphanedTransientQueryCleaner {

private static final Logger LOG = LoggerFactory.getLogger(OrphanedTransientQueryCleaner.class);

private final QueryCleanupService cleanupService;

public OrphanedTransientQueryCleaner(final QueryCleanupService cleanupService) {
this.cleanupService = requireNonNull(cleanupService);
}

/**
* Cleans up any internal topics that may exist for the given set of query application
* ids, since it's assumed that they are completed.
* @param serviceContext The service context
* @param queryApplicationIds The set of completed query application ids
*/
public void cleanupOrphanedInternalTopics(
final ServiceContext serviceContext,
final Set<String> queryApplicationIds
) {
final KafkaTopicClient topicClient = serviceContext.getTopicClient();
final Set<String> topicNames;
try {
topicNames = topicClient.listTopicNames();
} catch (KafkaResponseGetFailedException e) {
LOG.error("Couldn't fetch topic names", e);
return;
}
// Find any transient query topics
final Set<String> orphanedQueryApplicationIds = topicNames.stream()
.map(topicName -> queryApplicationIds.stream().filter(topicName::startsWith).findFirst())
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toSet());
for (final String queryApplicationId : orphanedQueryApplicationIds) {
cleanupService.addCleanupTask(
new QueryCleanupService.QueryCleanupTask(
serviceContext,
queryApplicationId,
true
));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ static class QueryCleanupTask implements Runnable {
this.isTransient = isTransient;
}

public String getAppId() {
return appId;
}

@Override
public void run() {
tryRun(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.engine;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableSet;
import io.confluent.ksql.engine.QueryCleanupService.QueryCleanupTask;
import io.confluent.ksql.exception.KafkaResponseGetFailedException;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.services.ServiceContext;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class OrphanedTransientQueryCleanerTest {
private static final String TOPIC1
= "_confluent-ksql-default_transient_932097300573686369_1606940079718"
+ "-Aggregate-GroupBy-repartition";
private static final String TOPIC2
= "_confluent-ksql-default_transient_932097300573686369_1606940079718"
+ "-Aggregate-Aggregate-Materialize-changelog";
private static final String TOPIC3
= "_confluent-ksql-default_transient_123497300573686369_1606940012345"
+ "-Aggregate-Aggregate-Materialize-changelog";

private static final String BAD_TOPIC_NAME
= "_confluent-ksql-default_node0_transient_bad";

private static final String APP_ID_1
= "_confluent-ksql-default_transient_932097300573686369_1606940079718";
private static final String APP_ID_2
= "_confluent-ksql-default_transient_123497300573686369_1606940012345";

@Mock
private QueryCleanupService queryCleanupService;
@Mock
private ServiceContext serviceContext;
@Mock
private KafkaTopicClient topicClient;
@Captor
private ArgumentCaptor<QueryCleanupTask> taskCaptor;

private OrphanedTransientQueryCleaner cleaner;

@Before
public void setUp() {
when(serviceContext.getTopicClient()).thenReturn(topicClient);
cleaner = new OrphanedTransientQueryCleaner(queryCleanupService);
}

@Test
public void shouldCleanup_allApplicationIds() {
// Given
when(topicClient.listTopicNames()).thenReturn(ImmutableSet.of(TOPIC1, TOPIC2, TOPIC3));

// When
cleaner.cleanupOrphanedInternalTopics(serviceContext, ImmutableSet.of(APP_ID_1, APP_ID_2));

// Then
verify(queryCleanupService, times(2)).addCleanupTask(taskCaptor.capture());
assertThat(taskCaptor.getAllValues().get(0).getAppId(), is(APP_ID_1));
assertThat(taskCaptor.getAllValues().get(1).getAppId(), is(APP_ID_2));
}

@Test
public void shouldCleanup_someApplicationIds() {
// Given
when(topicClient.listTopicNames()).thenReturn(ImmutableSet.of(TOPIC1, TOPIC2));

// When
cleaner.cleanupOrphanedInternalTopics(serviceContext, ImmutableSet.of(APP_ID_1, APP_ID_2));

// Then
verify(queryCleanupService, times(1)).addCleanupTask(taskCaptor.capture());
assertThat(taskCaptor.getAllValues().get(0).getAppId(), is(APP_ID_1));
}

@Test
public void skipNonMatchingTopics() {
// Given
when(topicClient.listTopicNames()).thenReturn(ImmutableSet.of(TOPIC1, TOPIC2, TOPIC3));

// When
cleaner.cleanupOrphanedInternalTopics(serviceContext, ImmutableSet.of(APP_ID_2));

// Then
verify(queryCleanupService, times(1)).addCleanupTask(taskCaptor.capture());
assertThat(taskCaptor.getAllValues().get(0).getAppId(), is(APP_ID_2));
}

@Test
public void shouldSkip_exception() {
// Given
when(topicClient.listTopicNames())
.thenThrow(new KafkaResponseGetFailedException("error!", new Exception()));

// When
cleaner.cleanupOrphanedInternalTopics(serviceContext, ImmutableSet.of());

// Then
verify(queryCleanupService, never()).addCleanupTask(any());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.confluent.ksql.physical.pull.PullQueryResult;
import io.confluent.ksql.query.BlockingRowQueue;
import io.confluent.ksql.rest.entity.TableRows;
import io.confluent.ksql.rest.server.LocalCommands;
import io.confluent.ksql.rest.server.resources.streaming.PullQueryConfigRoutingOptions;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.utils.FormatOptions;
Expand Down Expand Up @@ -61,21 +62,24 @@ public class QueryEndpoint {
private final Optional<PullQueryExecutorMetrics> pullQueryMetrics;
private final RateLimiter rateLimiter;
private final HARouting routing;
private final Optional<LocalCommands> localCommands;

public QueryEndpoint(
final KsqlEngine ksqlEngine,
final KsqlConfig ksqlConfig,
final RoutingFilterFactory routingFilterFactory,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics,
final RateLimiter rateLimiter,
final HARouting routing
final HARouting routing,
final Optional<LocalCommands> localCommands
) {
this.ksqlEngine = ksqlEngine;
this.ksqlConfig = ksqlConfig;
this.routingFilterFactory = routingFilterFactory;
this.pullQueryMetrics = pullQueryMetrics;
this.rateLimiter = rateLimiter;
this.routing = routing;
this.localCommands = localCommands;
}

public QueryPublisher createQueryPublisher(
Expand Down Expand Up @@ -110,6 +114,8 @@ private QueryPublisher createPushQueryPublisher(
final TransientQueryMetadata queryMetadata = ksqlEngine
.executeQuery(serviceContext, statement, true);

localCommands.ifPresent(lc -> lc.write(queryMetadata));

publisher.setQueryHandle(new KsqlQueryHandle(queryMetadata));

return publisher;
Expand Down
Loading

0 comments on commit 06d6e3e

Please sign in to comment.