diff --git a/docs-md/img/ksqldb-pull-query.svg b/docs-md/img/ksqldb-pull-query.svg index 1bd02e6f4b12..aaa36e8ecab0 100644 --- a/docs-md/img/ksqldb-pull-query.svg +++ b/docs-md/img/ksqldb-pull-query.svg @@ -1,5 +1,6 @@ - + pull Created with Sketch. diff --git a/docs-md/img/ksqldb-push-query.svg b/docs-md/img/ksqldb-push-query.svg index 438705089adf..a3f230ebcc0d 100644 --- a/docs-md/img/ksqldb-push-query.svg +++ b/docs-md/img/ksqldb-push-query.svg @@ -1,5 +1,6 @@ - + push Created with Sketch. diff --git a/docs-md/img/ksqldb-stream-processing.svg b/docs-md/img/ksqldb-stream-processing.svg index 00d4b3c61c65..fe2a61ae8db3 100644 --- a/docs-md/img/ksqldb-stream-processing.svg +++ b/docs-md/img/ksqldb-stream-processing.svg @@ -1,5 +1,6 @@ - + stream-processing Created with Sketch. diff --git a/ksql-engine/src/test/java/io/confluent/ksql/integration/SelectValueMapperIntegrationTest.java b/ksql-engine/src/test/java/io/confluent/ksql/integration/SelectValueMapperIntegrationTest.java index 3072e040eb85..3607c17d54b8 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/integration/SelectValueMapperIntegrationTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/integration/SelectValueMapperIntegrationTest.java @@ -23,7 +23,6 @@ import io.confluent.ksql.execution.streams.KsqlValueTransformerWithKey; import io.confluent.ksql.execution.streams.SelectValueMapperFactory; import io.confluent.ksql.execution.util.StructKeyUtil; -import io.confluent.ksql.function.InternalFunctionRegistry; import io.confluent.ksql.function.TestFunctionRegistry; import io.confluent.ksql.logging.processing.ProcessingLogger; import io.confluent.ksql.metastore.MetaStore; diff --git a/ksql-engine/src/test/java/io/confluent/ksql/topic/TopicPropertiesTest.java b/ksql-engine/src/test/java/io/confluent/ksql/topic/TopicPropertiesTest.java index 25b224dbccdf..52a26828b920 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/topic/TopicPropertiesTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/topic/TopicPropertiesTest.java @@ -31,9 +31,7 @@ import org.apache.kafka.common.TopicPartitionInfo; import org.junit.Rule; import org.junit.Test; -import org.junit.experimental.runners.Enclosed; import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; public class TopicPropertiesTest { diff --git a/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 b/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 index 2c5c0ad3a0de..4a0ae292bbf4 100644 --- a/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 +++ b/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 @@ -45,7 +45,8 @@ statement | DESCRIBE CONNECTOR identifier #describeConnector | PRINT (identifier| STRING) printClause #printTopic | (LIST | SHOW) QUERIES EXTENDED? #listQueries - | TERMINATE QUERY? identifier #terminateQuery + | TERMINATE identifier #terminateQuery + | TERMINATE ALL #terminateQuery | SET STRING EQ STRING #setProperty | UNSET STRING #unsetProperty | CREATE STREAM (IF NOT EXISTS)? sourceName @@ -339,6 +340,7 @@ CHANGES: 'CHANGES'; SELECT: 'SELECT'; FROM: 'FROM'; AS: 'AS'; +ALL: 'ALL'; DISTINCT: 'DISTINCT'; WHERE: 'WHERE'; WITHIN: 'WITHIN'; diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java index b46c23562a75..40bb3ed4a140 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java @@ -126,6 +126,7 @@ import io.confluent.ksql.parser.tree.UnsetProperty; import io.confluent.ksql.parser.tree.WindowExpression; import io.confluent.ksql.parser.tree.WithinExpression; +import io.confluent.ksql.query.QueryId; import io.confluent.ksql.schema.Operator; import io.confluent.ksql.schema.ksql.ColumnRef; import io.confluent.ksql.schema.ksql.SqlTypeParser; @@ -157,23 +158,21 @@ public AstBuilder(final TypeRegistry typeRegistry) { this.typeRegistry = requireNonNull(typeRegistry, "typeRegistry"); } - @SuppressWarnings("unchecked") public Statement buildStatement(final ParserRuleContext parseTree) { - return build(Optional.of(getSources(parseTree)), typeRegistry, parseTree); + return build(Optional.of(getSources(parseTree)), parseTree); } public Expression buildExpression(final ParserRuleContext parseTree) { - return build(Optional.empty(), typeRegistry, parseTree); + return build(Optional.empty(), parseTree); } public WindowExpression buildWindowExpression(final ParserRuleContext parseTree) { - return build(Optional.empty(), typeRegistry, parseTree); + return build(Optional.empty(), parseTree); } @SuppressWarnings("unchecked") private T build( final Optional> sources, - final TypeRegistry typeRegistry, final ParserRuleContext parseTree) { return (T) new Visitor(sources, typeRegistry).visit(parseTree); } @@ -632,11 +631,15 @@ public Node visitListTypes(final ListTypesContext ctx) { @Override public Node visitTerminateQuery(final SqlBaseParser.TerminateQueryContext context) { - return new TerminateQuery( - getLocation(context), - // use case sensitive parsing here to maintain backwards compatibility - ParserUtil.getIdentifierText(true, context.identifier()) - ); + final Optional location = getLocation(context); + + return context.ALL() != null + ? TerminateQuery.all(location) + : TerminateQuery.query( + location, + // use case sensitive parsing here to maintain backwards compatibility + new QueryId(ParserUtil.getIdentifierText(true, context.identifier())) + ); } @Override diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java index b846b07544b1..dddde01cb1d8 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java @@ -56,19 +56,18 @@ import io.confluent.ksql.parser.tree.TableElement.Namespace; import io.confluent.ksql.parser.tree.TerminateQuery; import io.confluent.ksql.parser.tree.UnsetProperty; +import io.confluent.ksql.query.QueryId; import io.confluent.ksql.schema.ksql.FormatOptions; import io.confluent.ksql.util.IdentifierUtil; import java.util.List; import java.util.Objects; import java.util.Optional; -import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; public final class SqlFormatter { private static final String INDENT = " "; - private static final Pattern NAME_PATTERN = Pattern.compile("[a-z_][a-z0-9_]*"); private static final FormatOptions FORMAT_OPTIONS = FormatOptions.of(IdentifierUtil::needsQuotes); private SqlFormatter() { @@ -287,13 +286,6 @@ protected Void visitCreateTableAsSelect( return null; } - private static String formatName(final String name) { - if (NAME_PATTERN.matcher(name).matches()) { - return name; - } - return "\"" + name + "\""; - } - @Override protected Void visitInsertInto(final InsertInto node, final Integer indent) { builder.append("INSERT INTO "); @@ -329,7 +321,7 @@ protected Void visitInsertValues(final InsertValues node, final Integer context) builder.append( node.getValues() .stream() - .map(exp -> ExpressionFormatterUtil.formatExpression(exp)) + .map(ExpressionFormatterUtil::formatExpression) .collect(Collectors.joining(", "))); builder.append(")"); @@ -345,7 +337,7 @@ protected Void visitDropTable(final DropTable node, final Integer context) { @Override protected Void visitTerminateQuery(final TerminateQuery node, final Integer context) { builder.append("TERMINATE "); - builder.append(node.getQueryId().getId()); + builder.append(node.getQueryId().map(QueryId::toString).orElse("ALL")); return null; } diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/TerminateQuery.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/TerminateQuery.java index 52bf610733b0..4664072056e9 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/TerminateQuery.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/TerminateQuery.java @@ -22,20 +22,27 @@ import java.util.Optional; @Immutable -public class TerminateQuery extends Statement { +public final class TerminateQuery extends Statement { - private final QueryId queryId; + private final Optional queryId; - public TerminateQuery(final String queryId) { - this(Optional.empty(), queryId); + public static TerminateQuery all(final Optional location) { + return new TerminateQuery(location, Optional.empty()); } - public TerminateQuery(final Optional location, final String queryId) { + public static TerminateQuery query(final Optional location, final QueryId queryId) { + return new TerminateQuery(location, Optional.of(queryId)); + } + + private TerminateQuery(final Optional location, final Optional queryId) { super(location); - this.queryId = new QueryId(queryId); + this.queryId = Objects.requireNonNull(queryId, "queryId"); } - public QueryId getQueryId() { + /** + * @return the id of the query to terminate or {@code empty()} if all should be terminated. + */ + public Optional getQueryId() { return queryId; } diff --git a/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java index 7b55e258d3a8..1757b4a4228a 100644 --- a/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java +++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java @@ -34,7 +34,6 @@ import com.google.common.collect.Iterables; import io.confluent.ksql.execution.ddl.commands.KsqlTopic; import io.confluent.ksql.execution.expression.tree.ArithmeticUnaryExpression; -import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp; import io.confluent.ksql.execution.expression.tree.ComparisonExpression; import io.confluent.ksql.execution.expression.tree.DereferenceExpression; import io.confluent.ksql.execution.expression.tree.Expression; @@ -54,7 +53,6 @@ import io.confluent.ksql.parser.exception.ParseFailedException; import io.confluent.ksql.parser.tree.AliasedRelation; import io.confluent.ksql.parser.tree.AllColumns; -import io.confluent.ksql.parser.tree.AstNode; import io.confluent.ksql.parser.tree.CreateConnector; import io.confluent.ksql.parser.tree.CreateSource; import io.confluent.ksql.parser.tree.CreateStream; @@ -64,7 +62,6 @@ import io.confluent.ksql.parser.tree.DropTable; import io.confluent.ksql.parser.tree.InsertInto; import io.confluent.ksql.parser.tree.Join; -import io.confluent.ksql.parser.tree.JoinMatchers; import io.confluent.ksql.parser.tree.ListProperties; import io.confluent.ksql.parser.tree.ListQueries; import io.confluent.ksql.parser.tree.ListStreams; @@ -77,11 +74,11 @@ import io.confluent.ksql.parser.tree.SetProperty; import io.confluent.ksql.parser.tree.SingleColumn; import io.confluent.ksql.parser.tree.Statement; -import io.confluent.ksql.parser.tree.Table; import io.confluent.ksql.parser.tree.TableElement; import io.confluent.ksql.parser.tree.TableElements; import io.confluent.ksql.parser.tree.TerminateQuery; import io.confluent.ksql.parser.tree.WithinExpression; +import io.confluent.ksql.query.QueryId; import io.confluent.ksql.schema.ksql.ColumnRef; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.SqlBaseType; @@ -94,7 +91,6 @@ import io.confluent.ksql.serde.KeyFormat; import io.confluent.ksql.serde.SerdeOption; import io.confluent.ksql.serde.ValueFormat; -import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.MetaStoreFixture; import io.confluent.ksql.util.timestamp.MetadataTimestampExtractionPolicy; import java.util.List; @@ -103,8 +99,6 @@ import java.util.concurrent.TimeUnit; import org.hamcrest.Description; import org.hamcrest.Matcher; -import org.hamcrest.MatcherAssert; -import org.hamcrest.Matchers; import org.hamcrest.TypeSafeMatcher; import org.junit.Assert; import org.junit.Before; @@ -478,10 +472,10 @@ public void shouldThrowOnNonAlphanumericSourceName() { public void shouldAllowEscapedTerminateQuery() { // When: final PreparedStatement statement = KsqlParserTestUtil - .buildSingleAst("TERMINATE QUERY `CSAS-foo_2`;", metaStore); + .buildSingleAst("TERMINATE `CSAS-foo_2`;", metaStore); // Then: - assertThat(statement.getStatement().getQueryId().getId(), is("CSAS-foo_2")); + assertThat(statement.getStatement().getQueryId().map(QueryId::toString), is(Optional.of("CSAS-foo_2"))); } diff --git a/ksql-parser/src/test/java/io/confluent/ksql/parser/SqlFormatterTest.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/SqlFormatterTest.java index 0f826dabda8d..fa2fca8ace41 100644 --- a/ksql-parser/src/test/java/io/confluent/ksql/parser/SqlFormatterTest.java +++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/SqlFormatterTest.java @@ -55,6 +55,7 @@ import io.confluent.ksql.parser.tree.WithinExpression; import io.confluent.ksql.properties.with.CommonCreateConfigs; import io.confluent.ksql.properties.with.CreateConfigs; +import io.confluent.ksql.query.QueryId; import io.confluent.ksql.schema.ksql.ColumnRef; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.types.SqlStruct; @@ -74,7 +75,6 @@ import org.junit.Test; import org.junit.rules.ExpectedException; -@SuppressWarnings("OptionalGetWithoutIsPresent") public class SqlFormatterTest { @Rule @@ -585,7 +585,7 @@ public void shouldFormatDropTableStatement() { @Test public void shouldFormatTerminateQuery() { // Given: - final TerminateQuery terminateQuery = new TerminateQuery(Optional.empty(), "FOO"); + final TerminateQuery terminateQuery = TerminateQuery.query(Optional.empty(), new QueryId("FOO")); // When: final String formatted = SqlFormatter.formatSql(terminateQuery); @@ -594,6 +594,18 @@ public void shouldFormatTerminateQuery() { assertThat(formatted, is("TERMINATE FOO")); } + @Test + public void shouldFormatTerminateAllQueries() { + // Given: + final TerminateQuery terminateQuery = TerminateQuery.all(Optional.empty()); + + // When: + final String formatted = SqlFormatter.formatSql(terminateQuery); + + // Then: + assertThat(formatted, is("TERMINATE ALL")); + } + @Test public void shouldFormatShowTables() { // Given: diff --git a/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/TerminateQueryTest.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/TerminateQueryTest.java index 74ac92955ef7..5abe48d33cc0 100644 --- a/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/TerminateQueryTest.java +++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/TerminateQueryTest.java @@ -17,6 +17,7 @@ import com.google.common.testing.EqualsTester; import io.confluent.ksql.parser.NodeLocation; +import io.confluent.ksql.query.QueryId; import java.util.Optional; import org.junit.Test; @@ -24,20 +25,19 @@ public class TerminateQueryTest { public static final NodeLocation SOME_LOCATION = new NodeLocation(0, 0); public static final NodeLocation OTHER_LOCATION = new NodeLocation(1, 0); - private static final String SOME_QUERY_ID = "query0"; + private static final QueryId SOME_QUERY_ID = new QueryId("query0"); + @SuppressWarnings("UnstableApiUsage") @Test public void shouldImplementHashCodeAndEqualsProperty() { new EqualsTester() .addEqualityGroup( // Note: At the moment location does not take part in equality testing - new TerminateQuery(SOME_QUERY_ID), - new TerminateQuery(SOME_QUERY_ID), - new TerminateQuery(Optional.of(SOME_LOCATION), SOME_QUERY_ID), - new TerminateQuery(Optional.of(OTHER_LOCATION), SOME_QUERY_ID) + TerminateQuery.query(Optional.of(SOME_LOCATION), SOME_QUERY_ID), + TerminateQuery.query(Optional.of(OTHER_LOCATION), SOME_QUERY_ID) ) .addEqualityGroup( - new TerminateQuery("diff") + TerminateQuery.query(Optional.empty(), new QueryId("diff")) ) .testEquals(); } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandIdAssigner.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandIdAssigner.java index 80aadda0cdfc..d478cc14b3de 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandIdAssigner.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandIdAssigner.java @@ -27,6 +27,7 @@ import io.confluent.ksql.parser.tree.RegisterType; import io.confluent.ksql.parser.tree.Statement; import io.confluent.ksql.parser.tree.TerminateQuery; +import io.confluent.ksql.query.QueryId; import io.confluent.ksql.rest.entity.CommandId; import io.confluent.ksql.rest.entity.CommandId.Action; import io.confluent.ksql.rest.entity.CommandId.Type; @@ -65,9 +66,6 @@ interface CommandIdSupplier { command -> new CommandId(Type.CLUSTER, "TerminateCluster", Action.TERMINATE)) .build(); - public CommandIdAssigner() { - } - public CommandId getCommandId(final Statement command) { final CommandIdSupplier supplier = SUPPLIERS.get(command.getClass()); if (supplier == null) { @@ -112,7 +110,7 @@ private static CommandId getDropTypeCommandId(final DropType dropType) { private static CommandId getTerminateCommandId(final TerminateQuery terminateQuery) { return new CommandId( CommandId.Type.TERMINATE, - terminateQuery.getQueryId().toString(), + terminateQuery.getQueryId().map(QueryId::toString).orElse("ALL"), CommandId.Action.EXECUTE ); } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java index 241a87e57e43..6519ab7cd795 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java @@ -226,7 +226,7 @@ private void handleStatementWithTerminatedQueries( } } - @SuppressWarnings({"unchecked", "deprecation"}) + @SuppressWarnings("unchecked") private void executeStatement( final PreparedStatement statement, final Command command, @@ -369,9 +369,14 @@ private KsqlConfig buildMergedConfig(final Command command) { } private void terminateQuery(final PreparedStatement terminateQuery) { - final QueryId queryId = terminateQuery.getStatement().getQueryId(); + final Optional queryId = terminateQuery.getStatement().getQueryId(); - ksqlEngine.getPersistentQuery(queryId) + if (!queryId.isPresent()) { + ksqlEngine.getPersistentQueries().forEach(PersistentQueryMetadata::close); + return; + } + + ksqlEngine.getPersistentQuery(queryId.get()) .orElseThrow(() -> new KsqlException(String.format("No running query with id %s was found", queryId))) .close(); diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/TerminateQueryValidator.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/TerminateQueryValidator.java index e1f7c7ca11fc..33ab62e5843e 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/TerminateQueryValidator.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/TerminateQueryValidator.java @@ -21,7 +21,9 @@ import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KsqlStatementException; +import io.confluent.ksql.util.PersistentQueryMetadata; import java.util.Map; +import java.util.Optional; public final class TerminateQueryValidator { @@ -34,13 +36,17 @@ public static void validate( final ServiceContext serviceContext ) { final TerminateQuery terminateQuery = (TerminateQuery) statement.getStatement(); - final QueryId queryId = terminateQuery.getQueryId(); + final Optional queryId = terminateQuery.getQueryId(); - context.getPersistentQuery(queryId) + if (!queryId.isPresent()) { + context.getPersistentQueries().forEach(PersistentQueryMetadata::close); + return; + } + + context.getPersistentQuery(queryId.get()) .orElseThrow(() -> new KsqlStatementException( - "Unknown queryId: " + queryId, + "Unknown queryId: " + queryId.get(), statement.getStatementText())) .close(); } - } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestApiTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestApiTest.java index 2a0675503ba9..41262bc3e73d 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestApiTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestApiTest.java @@ -361,7 +361,7 @@ public void shouldDeleteTopic() { // Given: makeKsqlRequest("CREATE STREAM X AS SELECT * FROM " + PAGE_VIEW_STREAM + ";"); final String query = REST_APP.getPersistentQueries().iterator().next(); - makeKsqlRequest("TERMINATE QUERY " + query + ";"); + makeKsqlRequest("TERMINATE " + query + ";"); assertThat("Expected topic X to be created", topicExists("X")); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/ServerUtilTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/ServerUtilTest.java index d0e2cdd480bc..ff19fd76622c 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/ServerUtilTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/ServerUtilTest.java @@ -15,16 +15,11 @@ package io.confluent.ksql.rest.server; -import java.util.Collections; import io.confluent.rest.RestConfig; -import org.apache.kafka.clients.consumer.ConsumerConfig; +import java.util.Collections; import org.apache.kafka.common.config.ConfigException; -import org.apache.kafka.test.TestUtils; - import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.mockito.junit.MockitoJUnitRunner; diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java index 7f6d2167179d..5994f7d5aa53 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java @@ -16,19 +16,20 @@ package io.confluent.ksql.rest.server.computation; import static java.util.Collections.emptyMap; +import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.eq; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -52,6 +53,7 @@ import io.confluent.ksql.parser.tree.DropStream; import io.confluent.ksql.parser.tree.RunScript; import io.confluent.ksql.parser.tree.Statement; +import io.confluent.ksql.parser.tree.TerminateQuery; import io.confluent.ksql.planner.plan.ConfiguredKsqlPlan; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.query.id.HybridQueryIdGenerator; @@ -282,7 +284,7 @@ public void shouldBuildQueriesWithPersistedConfig() { // When: handleStatement(statementExecutorWithMocks, csasCommand, csasCommandId, Optional.empty(), 1); - + // Then: verify(mockQueryMetadata, times(1)).start(); } @@ -423,7 +425,7 @@ public void shouldEnforceReferentialIntegrity() { // Now try to drop streams/tables to test referential integrity tryDropThatViolatesReferentialIntegrity(); - + // Terminate the queries using the stream/table terminateQueries(); @@ -523,7 +525,7 @@ public void shouldCascade4Dot1DropStreamCommand() { final KsqlPlan plan = Mockito.mock(KsqlPlan.class); when(mockEngine.plan(eq(serviceContext), eqConfiguredStatement(PreparedStatement.of("DROP", mockDropStream)))) .thenReturn(plan); - + when(mockEngine .execute( eq(serviceContext), @@ -547,7 +549,7 @@ public void shouldCascade4Dot1DropStreamCommand() { private ConfiguredStatement eqConfiguredStatement(PreparedStatement preparedStatement) { return argThat(new ConfiguredStatementMatcher<>(preparedStatement)); } - + private ConfiguredKsqlPlan eqConfiguredPlan(final KsqlPlan plan) { return argThat(new ConfiguredKsqlPlanMatcher(plan)); } @@ -565,7 +567,7 @@ public boolean matches(ConfiguredKsqlPlan configuredKsqlPlan) { return plan.getPlan().equals(configuredKsqlPlan.getPlan()); } } - + private class ConfiguredStatementMatcher implements ArgumentMatcher> { private ConfiguredStatement statement; @@ -652,6 +654,37 @@ public void shouldRestoreLegacyRunScriptCommand() { verify(mockQuery, times(0)).start(); } + @Test + public void shouldTerminateAll() { + // Given: + final String queryStatement = "a persistent query"; + + final TerminateQuery terminateAll = mock(TerminateQuery.class); + when(terminateAll.getQueryId()).thenReturn(Optional.empty()); + + when(mockParser.parseSingleStatement(any())) + .thenReturn(PreparedStatement.of(queryStatement, terminateAll)); + + final PersistentQueryMetadata query0 = mock(PersistentQueryMetadata.class); + PersistentQueryMetadata query1 = mock(PersistentQueryMetadata.class); + + when(mockEngine.getPersistentQueries()).thenReturn(ImmutableList.of(query0, query1)); + + // When: + statementExecutorWithMocks.handleStatement( + new QueuedCommand( + new CommandId(Type.TERMINATE, "-", Action.EXECUTE), + new Command("terminate all", true, emptyMap(), emptyMap()), + Optional.empty(), + 0L + ) + ); + + // Then: + verify(query0).close(); + verify(query1).close(); + } + private void createStreamsAndStartTwoPersistentQueries() { final Command csCommand = new Command( "CREATE STREAM pageview (" diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java index 60f2746acce5..f43b087f2dbe 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java @@ -1160,6 +1160,30 @@ public void shouldDistributeTerminateQuery() { assertThat(result.getStatementText(), is(terminateSql)); } + @Test + public void shouldDistributeTerminateAllQueries() { + // Given: + createQuery( + "CREATE STREAM test_explain AS SELECT * FROM test_stream;", + emptyMap()); + + final String terminateSql = "TERMINATE ALL;"; + + // When: + final CommandStatusEntity result = makeSingleRequest(terminateSql, CommandStatusEntity.class); + + // Then: + verify(commandStore) + .enqueueCommand( + argThat(is(configured(preparedStatement( + is(terminateSql), + is(TerminateQuery.all(Optional.empty())))) + )), + any(Producer.class)); + + assertThat(result.getStatementText(), is(terminateSql)); + } + @Test public void shouldThrowOnTerminateUnknownQuery() { // Then: diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/TerminateQueryValidatorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/TerminateQueryValidatorTest.java index 91ec449d3614..8963ef4d6a83 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/TerminateQueryValidatorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/TerminateQueryValidatorTest.java @@ -16,15 +16,18 @@ package io.confluent.ksql.rest.server.validation; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; import io.confluent.ksql.parser.tree.TerminateQuery; -import io.confluent.ksql.rest.server.TemporaryEngine; +import io.confluent.ksql.query.QueryId; +import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; +import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlStatementException; import io.confluent.ksql.util.PersistentQueryMetadata; import java.util.Optional; @@ -32,13 +35,25 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; +import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) public class TerminateQueryValidatorTest { - @Rule public final TemporaryEngine engine = new TemporaryEngine(); - @Rule public final ExpectedException expectedException = ExpectedException.none(); + private static final KsqlConfig KSQL_CONFIG = new KsqlConfig(ImmutableMap.of()); + + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + + @Mock + private PersistentQueryMetadata query0; + @Mock + private PersistentQueryMetadata query1; + @Mock + private KsqlEngine engine; + @Mock + private ServiceContext serviceContext; @Test public void shouldFailOnTerminateUnknownQueryId() { @@ -48,34 +63,56 @@ public void shouldFailOnTerminateUnknownQueryId() { // When: CustomValidators.TERMINATE_QUERY.validate( - ConfiguredStatement.of( - PreparedStatement.of("", new TerminateQuery("id")), - ImmutableMap.of(), - engine.getKsqlConfig() - ), + configuredStmt(TerminateQuery.query(Optional.empty(), new QueryId("id"))), ImmutableMap.of(), - engine.getEngine(), - engine.getServiceContext() + engine, + serviceContext ); } @Test public void shouldValidateKnownQueryId() { // Given: - final PersistentQueryMetadata metadata = mock(PersistentQueryMetadata.class); - final KsqlEngine mockEngine = mock(KsqlEngine.class); - when(mockEngine.getPersistentQuery(any())).thenReturn(Optional.ofNullable(metadata)); + when(engine.getPersistentQuery(any())).thenReturn(Optional.of(query0)); - // Expect nothing when: + // When: CustomValidators.TERMINATE_QUERY.validate( - ConfiguredStatement.of( - PreparedStatement.of("", new TerminateQuery("id")), - ImmutableMap.of(), - engine.getKsqlConfig() - ), + configuredStmt(TerminateQuery.query(Optional.empty(), new QueryId("id"))), ImmutableMap.of(), - mockEngine, - engine.getServiceContext() + engine, + serviceContext + ); + + // Then: + verify(query0).close(); + } + + @Test + public void shouldValidateTerminateAllQueries() { + // Given: + when(engine.getPersistentQueries()).thenReturn(ImmutableList.of(query0, query1)); + + // When: + CustomValidators.TERMINATE_QUERY.validate( + configuredStmt(TerminateQuery.all(Optional.empty())), + ImmutableMap.of(), + engine, + serviceContext + ); + + // Then: + verify(query0).close(); + verify(query1).close(); + } + + private static ConfiguredStatement configuredStmt( + final TerminateQuery terminateQuery + ) { + return ConfiguredStatement.of( + PreparedStatement.of("meh", terminateQuery), + ImmutableMap.of(), + KSQL_CONFIG ); } } + diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/util/ProcessingLogServerUtilsTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/util/ProcessingLogServerUtilsTest.java index a9963a161d9b..6c9ba752cf7d 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/util/ProcessingLogServerUtilsTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/util/ProcessingLogServerUtilsTest.java @@ -41,13 +41,11 @@ import io.confluent.ksql.metastore.model.KsqlStream; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.name.SourceName; -import io.confluent.ksql.parser.KsqlParser.PreparedStatement; import io.confluent.ksql.schema.ksql.Column; import io.confluent.ksql.serde.Format; import io.confluent.ksql.services.KafkaTopicClient; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.services.TestServiceContext; -import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KsqlConfig; import java.util.List; import java.util.Optional; diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSourceBuilderTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSourceBuilderTest.java index 037fb334724b..5af84c1730f1 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSourceBuilderTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSourceBuilderTest.java @@ -56,7 +56,6 @@ import io.confluent.ksql.serde.WindowInfo; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.timestamp.TimestampExtractionPolicy; -import java.util.ArrayList; import java.util.HashSet; import java.util.LinkedList; import java.util.Optional;