Skip to content

Commit

Permalink
feat: add support to terminate all running queries (#3944)
Browse files Browse the repository at this point in the history
* feat: add support to terminate all running queries

You can now issue `TERMINATE ALL` to terminate all running queries.

BREAKING CHANGE: `ALL` is now a reserved word and can not be used for identifiers without being quoted.
  • Loading branch information
big-andy-coates authored Nov 23, 2019
1 parent d9e11bd commit abbce84
Show file tree
Hide file tree
Showing 22 changed files with 208 additions and 103 deletions.
3 changes: 2 additions & 1 deletion docs-md/img/ksqldb-pull-query.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
3 changes: 2 additions & 1 deletion docs-md/img/ksqldb-push-query.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
3 changes: 2 additions & 1 deletion docs-md/img/ksqldb-stream-processing.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.confluent.ksql.execution.streams.KsqlValueTransformerWithKey;
import io.confluent.ksql.execution.streams.SelectValueMapperFactory;
import io.confluent.ksql.execution.util.StructKeyUtil;
import io.confluent.ksql.function.InternalFunctionRegistry;
import io.confluent.ksql.function.TestFunctionRegistry;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.metastore.MetaStore;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@
import org.apache.kafka.common.TopicPartitionInfo;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;

public class TopicPropertiesTest {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ statement
| DESCRIBE CONNECTOR identifier #describeConnector
| PRINT (identifier| STRING) printClause #printTopic
| (LIST | SHOW) QUERIES EXTENDED? #listQueries
| TERMINATE QUERY? identifier #terminateQuery
| TERMINATE identifier #terminateQuery
| TERMINATE ALL #terminateQuery
| SET STRING EQ STRING #setProperty
| UNSET STRING #unsetProperty
| CREATE STREAM (IF NOT EXISTS)? sourceName
Expand Down Expand Up @@ -339,6 +340,7 @@ CHANGES: 'CHANGES';
SELECT: 'SELECT';
FROM: 'FROM';
AS: 'AS';
ALL: 'ALL';
DISTINCT: 'DISTINCT';
WHERE: 'WHERE';
WITHIN: 'WITHIN';
Expand Down
23 changes: 13 additions & 10 deletions ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@
import io.confluent.ksql.parser.tree.UnsetProperty;
import io.confluent.ksql.parser.tree.WindowExpression;
import io.confluent.ksql.parser.tree.WithinExpression;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.schema.Operator;
import io.confluent.ksql.schema.ksql.ColumnRef;
import io.confluent.ksql.schema.ksql.SqlTypeParser;
Expand Down Expand Up @@ -157,23 +158,21 @@ public AstBuilder(final TypeRegistry typeRegistry) {
this.typeRegistry = requireNonNull(typeRegistry, "typeRegistry");
}

@SuppressWarnings("unchecked")
public Statement buildStatement(final ParserRuleContext parseTree) {
return build(Optional.of(getSources(parseTree)), typeRegistry, parseTree);
return build(Optional.of(getSources(parseTree)), parseTree);
}

public Expression buildExpression(final ParserRuleContext parseTree) {
return build(Optional.empty(), typeRegistry, parseTree);
return build(Optional.empty(), parseTree);
}

public WindowExpression buildWindowExpression(final ParserRuleContext parseTree) {
return build(Optional.empty(), typeRegistry, parseTree);
return build(Optional.empty(), parseTree);
}

@SuppressWarnings("unchecked")
private <T extends Node> T build(
final Optional<Set<SourceName>> sources,
final TypeRegistry typeRegistry,
final ParserRuleContext parseTree) {
return (T) new Visitor(sources, typeRegistry).visit(parseTree);
}
Expand Down Expand Up @@ -632,11 +631,15 @@ public Node visitListTypes(final ListTypesContext ctx) {

@Override
public Node visitTerminateQuery(final SqlBaseParser.TerminateQueryContext context) {
return new TerminateQuery(
getLocation(context),
// use case sensitive parsing here to maintain backwards compatibility
ParserUtil.getIdentifierText(true, context.identifier())
);
final Optional<NodeLocation> location = getLocation(context);

return context.ALL() != null
? TerminateQuery.all(location)
: TerminateQuery.query(
location,
// use case sensitive parsing here to maintain backwards compatibility
new QueryId(ParserUtil.getIdentifierText(true, context.identifier()))
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,18 @@
import io.confluent.ksql.parser.tree.TableElement.Namespace;
import io.confluent.ksql.parser.tree.TerminateQuery;
import io.confluent.ksql.parser.tree.UnsetProperty;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.schema.ksql.FormatOptions;
import io.confluent.ksql.util.IdentifierUtil;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;

public final class SqlFormatter {

private static final String INDENT = " ";
private static final Pattern NAME_PATTERN = Pattern.compile("[a-z_][a-z0-9_]*");
private static final FormatOptions FORMAT_OPTIONS = FormatOptions.of(IdentifierUtil::needsQuotes);

private SqlFormatter() {
Expand Down Expand Up @@ -287,13 +286,6 @@ protected Void visitCreateTableAsSelect(
return null;
}

private static String formatName(final String name) {
if (NAME_PATTERN.matcher(name).matches()) {
return name;
}
return "\"" + name + "\"";
}

@Override
protected Void visitInsertInto(final InsertInto node, final Integer indent) {
builder.append("INSERT INTO ");
Expand Down Expand Up @@ -329,7 +321,7 @@ protected Void visitInsertValues(final InsertValues node, final Integer context)
builder.append(
node.getValues()
.stream()
.map(exp -> ExpressionFormatterUtil.formatExpression(exp))
.map(ExpressionFormatterUtil::formatExpression)
.collect(Collectors.joining(", ")));
builder.append(")");

Expand All @@ -345,7 +337,7 @@ protected Void visitDropTable(final DropTable node, final Integer context) {
@Override
protected Void visitTerminateQuery(final TerminateQuery node, final Integer context) {
builder.append("TERMINATE ");
builder.append(node.getQueryId().getId());
builder.append(node.getQueryId().map(QueryId::toString).orElse("ALL"));
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,27 @@
import java.util.Optional;

@Immutable
public class TerminateQuery extends Statement {
public final class TerminateQuery extends Statement {

private final QueryId queryId;
private final Optional<QueryId> queryId;

public TerminateQuery(final String queryId) {
this(Optional.empty(), queryId);
public static TerminateQuery all(final Optional<NodeLocation> location) {
return new TerminateQuery(location, Optional.empty());
}

public TerminateQuery(final Optional<NodeLocation> location, final String queryId) {
public static TerminateQuery query(final Optional<NodeLocation> location, final QueryId queryId) {
return new TerminateQuery(location, Optional.of(queryId));
}

private TerminateQuery(final Optional<NodeLocation> location, final Optional<QueryId> queryId) {
super(location);
this.queryId = new QueryId(queryId);
this.queryId = Objects.requireNonNull(queryId, "queryId");
}

public QueryId getQueryId() {
/**
* @return the id of the query to terminate or {@code empty()} if all should be terminated.
*/
public Optional<QueryId> getQueryId() {
return queryId;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import com.google.common.collect.Iterables;
import io.confluent.ksql.execution.ddl.commands.KsqlTopic;
import io.confluent.ksql.execution.expression.tree.ArithmeticUnaryExpression;
import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp;
import io.confluent.ksql.execution.expression.tree.ComparisonExpression;
import io.confluent.ksql.execution.expression.tree.DereferenceExpression;
import io.confluent.ksql.execution.expression.tree.Expression;
Expand All @@ -54,7 +53,6 @@
import io.confluent.ksql.parser.exception.ParseFailedException;
import io.confluent.ksql.parser.tree.AliasedRelation;
import io.confluent.ksql.parser.tree.AllColumns;
import io.confluent.ksql.parser.tree.AstNode;
import io.confluent.ksql.parser.tree.CreateConnector;
import io.confluent.ksql.parser.tree.CreateSource;
import io.confluent.ksql.parser.tree.CreateStream;
Expand All @@ -64,7 +62,6 @@
import io.confluent.ksql.parser.tree.DropTable;
import io.confluent.ksql.parser.tree.InsertInto;
import io.confluent.ksql.parser.tree.Join;
import io.confluent.ksql.parser.tree.JoinMatchers;
import io.confluent.ksql.parser.tree.ListProperties;
import io.confluent.ksql.parser.tree.ListQueries;
import io.confluent.ksql.parser.tree.ListStreams;
Expand All @@ -77,11 +74,11 @@
import io.confluent.ksql.parser.tree.SetProperty;
import io.confluent.ksql.parser.tree.SingleColumn;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.parser.tree.Table;
import io.confluent.ksql.parser.tree.TableElement;
import io.confluent.ksql.parser.tree.TableElements;
import io.confluent.ksql.parser.tree.TerminateQuery;
import io.confluent.ksql.parser.tree.WithinExpression;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.schema.ksql.ColumnRef;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.SqlBaseType;
Expand All @@ -94,7 +91,6 @@
import io.confluent.ksql.serde.KeyFormat;
import io.confluent.ksql.serde.SerdeOption;
import io.confluent.ksql.serde.ValueFormat;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.MetaStoreFixture;
import io.confluent.ksql.util.timestamp.MetadataTimestampExtractionPolicy;
import java.util.List;
Expand All @@ -103,8 +99,6 @@
import java.util.concurrent.TimeUnit;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.TypeSafeMatcher;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -478,10 +472,10 @@ public void shouldThrowOnNonAlphanumericSourceName() {
public void shouldAllowEscapedTerminateQuery() {
// When:
final PreparedStatement<TerminateQuery> statement = KsqlParserTestUtil
.buildSingleAst("TERMINATE QUERY `CSAS-foo_2`;", metaStore);
.buildSingleAst("TERMINATE `CSAS-foo_2`;", metaStore);

// Then:
assertThat(statement.getStatement().getQueryId().getId(), is("CSAS-foo_2"));
assertThat(statement.getStatement().getQueryId().map(QueryId::toString), is(Optional.of("CSAS-foo_2")));
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import io.confluent.ksql.parser.tree.WithinExpression;
import io.confluent.ksql.properties.with.CommonCreateConfigs;
import io.confluent.ksql.properties.with.CreateConfigs;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.schema.ksql.ColumnRef;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.types.SqlStruct;
Expand All @@ -74,7 +75,6 @@
import org.junit.Test;
import org.junit.rules.ExpectedException;

@SuppressWarnings("OptionalGetWithoutIsPresent")
public class SqlFormatterTest {

@Rule
Expand Down Expand Up @@ -585,7 +585,7 @@ public void shouldFormatDropTableStatement() {
@Test
public void shouldFormatTerminateQuery() {
// Given:
final TerminateQuery terminateQuery = new TerminateQuery(Optional.empty(), "FOO");
final TerminateQuery terminateQuery = TerminateQuery.query(Optional.empty(), new QueryId("FOO"));

// When:
final String formatted = SqlFormatter.formatSql(terminateQuery);
Expand All @@ -594,6 +594,18 @@ public void shouldFormatTerminateQuery() {
assertThat(formatted, is("TERMINATE FOO"));
}

@Test
public void shouldFormatTerminateAllQueries() {
// Given:
final TerminateQuery terminateQuery = TerminateQuery.all(Optional.empty());

// When:
final String formatted = SqlFormatter.formatSql(terminateQuery);

// Then:
assertThat(formatted, is("TERMINATE ALL"));
}

@Test
public void shouldFormatShowTables() {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,27 @@

import com.google.common.testing.EqualsTester;
import io.confluent.ksql.parser.NodeLocation;
import io.confluent.ksql.query.QueryId;
import java.util.Optional;
import org.junit.Test;

public class TerminateQueryTest {

public static final NodeLocation SOME_LOCATION = new NodeLocation(0, 0);
public static final NodeLocation OTHER_LOCATION = new NodeLocation(1, 0);
private static final String SOME_QUERY_ID = "query0";
private static final QueryId SOME_QUERY_ID = new QueryId("query0");

@SuppressWarnings("UnstableApiUsage")
@Test
public void shouldImplementHashCodeAndEqualsProperty() {
new EqualsTester()
.addEqualityGroup(
// Note: At the moment location does not take part in equality testing
new TerminateQuery(SOME_QUERY_ID),
new TerminateQuery(SOME_QUERY_ID),
new TerminateQuery(Optional.of(SOME_LOCATION), SOME_QUERY_ID),
new TerminateQuery(Optional.of(OTHER_LOCATION), SOME_QUERY_ID)
TerminateQuery.query(Optional.of(SOME_LOCATION), SOME_QUERY_ID),
TerminateQuery.query(Optional.of(OTHER_LOCATION), SOME_QUERY_ID)
)
.addEqualityGroup(
new TerminateQuery("diff")
TerminateQuery.query(Optional.empty(), new QueryId("diff"))
)
.testEquals();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.confluent.ksql.parser.tree.RegisterType;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.parser.tree.TerminateQuery;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.entity.CommandId.Action;
import io.confluent.ksql.rest.entity.CommandId.Type;
Expand Down Expand Up @@ -65,9 +66,6 @@ interface CommandIdSupplier {
command -> new CommandId(Type.CLUSTER, "TerminateCluster", Action.TERMINATE))
.build();

public CommandIdAssigner() {
}

public CommandId getCommandId(final Statement command) {
final CommandIdSupplier supplier = SUPPLIERS.get(command.getClass());
if (supplier == null) {
Expand Down Expand Up @@ -112,7 +110,7 @@ private static CommandId getDropTypeCommandId(final DropType dropType) {
private static CommandId getTerminateCommandId(final TerminateQuery terminateQuery) {
return new CommandId(
CommandId.Type.TERMINATE,
terminateQuery.getQueryId().toString(),
terminateQuery.getQueryId().map(QueryId::toString).orElse("ALL"),
CommandId.Action.EXECUTE
);
}
Expand Down
Loading

0 comments on commit abbce84

Please sign in to comment.