From 57a35932131adc526633d68256bc3c8235aa01e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Pe=C3=B1a?= Date: Tue, 20 Oct 2020 11:19:00 -0500 Subject: [PATCH 1/3] feat: add syntax to DEFINE/UNDEFINE session variables --- .../main/java/io/confluent/ksql/cli/Cli.java | 33 ++++ .../java/io/confluent/ksql/cli/CliTest.java | 7 + .../io/confluent/ksql/parser/SqlBase.g4 | 17 ++ .../io/confluent/ksql/parser/AstBuilder.java | 15 ++ .../confluent/ksql/parser/SqlFormatter.java | 21 +++ .../ksql/parser/tree/AstVisitor.java | 8 + .../ksql/parser/tree/DefineVariable.java | 79 +++++++++ .../ksql/parser/tree/UndefineVariable.java | 67 ++++++++ .../ksql/parser/SqlFormatterTest.java | 20 +++ .../server/execution/CustomExecutors.java | 4 + .../server/execution/VariableExecutor.java | 67 ++++++++ .../server/validation/CustomValidators.java | 7 +- .../execution/VariableExecutorTest.java | 153 ++++++++++++++++++ .../ksql/rest/SessionProperties.java | 16 ++ 14 files changed, 513 insertions(+), 1 deletion(-) create mode 100644 ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/DefineVariable.java create mode 100644 ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/UndefineVariable.java create mode 100644 ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/VariableExecutor.java create mode 100644 ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/VariableExecutorTest.java diff --git a/ksqldb-cli/src/main/java/io/confluent/ksql/cli/Cli.java b/ksqldb-cli/src/main/java/io/confluent/ksql/cli/Cli.java index 89cff4d8db13..2ad5e8e77afa 100644 --- a/ksqldb-cli/src/main/java/io/confluent/ksql/cli/Cli.java +++ b/ksqldb-cli/src/main/java/io/confluent/ksql/cli/Cli.java @@ -25,10 +25,12 @@ import io.confluent.ksql.parser.KsqlParser; import io.confluent.ksql.parser.KsqlParser.ParsedStatement; import io.confluent.ksql.parser.SqlBaseParser; +import io.confluent.ksql.parser.SqlBaseParser.DefineVariableContext; import io.confluent.ksql.parser.SqlBaseParser.PrintTopicContext; import io.confluent.ksql.parser.SqlBaseParser.QueryStatementContext; import io.confluent.ksql.parser.SqlBaseParser.SetPropertyContext; import io.confluent.ksql.parser.SqlBaseParser.StatementContext; +import io.confluent.ksql.parser.SqlBaseParser.UndefineVariableContext; import io.confluent.ksql.parser.SqlBaseParser.UnsetPropertyContext; import io.confluent.ksql.reactive.BaseSubscriber; import io.confluent.ksql.rest.Errors; @@ -54,8 +56,10 @@ import java.io.Closeable; import java.io.PrintWriter; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.TreeMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -84,6 +88,8 @@ public class Cli implements KsqlRequestExecutor, Closeable { .put(PrintTopicContext.class, Cli::handlePrintedTopic) .put(SetPropertyContext.class, Cli::setPropertyFromCtxt) .put(UnsetPropertyContext.class, Cli::unsetPropertyFromCtxt) + .put(DefineVariableContext.class, Cli::defineVariableFromCtxt) + .put(UndefineVariableContext.class, Cli::undefineVariableFromCtxt) .build(); private final Long streamedQueryRowLimit; @@ -93,6 +99,8 @@ public class Cli implements KsqlRequestExecutor, Closeable { private final Console terminal; private final RemoteServerState remoteServerState; + private final Map sessionVariables; + public static Cli build( final Long streamedQueryRowLimit, final Long streamedQueryTimeoutMs, @@ -117,6 +125,7 @@ public static Cli build( this.restClient = restClient; this.terminal = terminal; this.remoteServerState = new RemoteServerState(); + this.sessionVariables = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); final Supplier versionSuppler = () -> restClient.getServerInfo().getResponse().getVersion(); @@ -480,6 +489,30 @@ private void unsetProperty(final String property) { terminal.flush(); } + @SuppressWarnings("unused") + private void defineVariableFromCtxt( + final String ignored, + final DefineVariableContext context + ) { + final String variableName = context.variableName().getText(); + final String variableValue = ParserUtil.unquote(context.variableValue().getText(), "'"); + sessionVariables.put(variableName, variableValue); + } + + @SuppressWarnings("unused") + private void undefineVariableFromCtxt( + final String ignored, + final UndefineVariableContext context + ) { + final String variableName = context.variableName().getText(); + if (sessionVariables.remove(variableName) == null) { + // Print only (no throws exception) to keep it as a warning message (like VariableExecutor) + terminal.writer() + .printf("Cannot undefine variable '%s' which was never defined", variableName); + terminal.flush(); + } + } + private static boolean isSequenceNumberTimeout(final RestResponse response) { return response.isErroneous() && (response.getErrorMessage().getErrorCode() diff --git a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/CliTest.java b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/CliTest.java index e88796e10aad..b92cc351ed61 100644 --- a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/CliTest.java +++ b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/CliTest.java @@ -431,6 +431,13 @@ public void shouldPrintTopicWithDelimitedValue() { containsString(", key: ITEM_1, value: home cinema")); } + @Test + public void testVariableDefineUndefine() { + assertRunCommand("define var1 = '1';", is(EMPTY_RESULT)); + assertRunCommand("define var2 = '2';", is(EMPTY_RESULT)); + assertRunCommand("undefine var1;", is(EMPTY_RESULT)); + } + @Test public void testPropertySetUnset() { assertRunCommand("set 'auto.offset.reset' = 'latest';", is(EMPTY_RESULT)); diff --git a/ksqldb-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 b/ksqldb-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 index c8f1d3470ff8..2c3613c502c6 100644 --- a/ksqldb-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 +++ b/ksqldb-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 @@ -62,6 +62,8 @@ statement | TERMINATE ALL #terminateQuery | SET STRING EQ STRING #setProperty | UNSET STRING #unsetProperty + | DEFINE variableName EQ variableValue #defineVariable + | UNDEFINE variableName #undefineVariable | CREATE (OR REPLACE)? STREAM (IF NOT EXISTS)? sourceName (tableElements)? (WITH tableProperties)? #createStream @@ -337,6 +339,15 @@ identifier | DIGIT_IDENTIFIER #digitIdentifier ; +variableName + : IDENTIFIER + | nonReserved + ; + +variableValue + : STRING + ; + sourceName : identifier ; @@ -480,6 +491,8 @@ RENAME: 'RENAME'; ARRAY: 'ARRAY'; MAP: 'MAP'; SET: 'SET'; +DEFINE: 'DEFINE'; +UNDEFINE: 'UNDEFINE'; RESET: 'RESET'; SESSION: 'SESSION'; SAMPLE: 'SAMPLE'; @@ -566,6 +579,10 @@ TIMESTAMP_WITH_TIME_ZONE : 'TIMESTAMP' WS 'WITH' WS 'TIME' WS 'ZONE' ; +VARIABLE + : '${' IDENTIFIER '}' + ; + fragment EXPONENT : 'E' [+-]? DIGIT+ ; diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java index 0e57d5eef2cb..8f5dca983577 100644 --- a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java @@ -108,6 +108,7 @@ import io.confluent.ksql.parser.tree.CreateStreamAsSelect; import io.confluent.ksql.parser.tree.CreateTable; import io.confluent.ksql.parser.tree.CreateTableAsSelect; +import io.confluent.ksql.parser.tree.DefineVariable; import io.confluent.ksql.parser.tree.DescribeConnector; import io.confluent.ksql.parser.tree.DescribeFunction; import io.confluent.ksql.parser.tree.DropConnector; @@ -147,6 +148,7 @@ import io.confluent.ksql.parser.tree.TableElement.Namespace; import io.confluent.ksql.parser.tree.TableElements; import io.confluent.ksql.parser.tree.TerminateQuery; +import io.confluent.ksql.parser.tree.UndefineVariable; import io.confluent.ksql.parser.tree.UnsetProperty; import io.confluent.ksql.parser.tree.WindowExpression; import io.confluent.ksql.parser.tree.WithinExpression; @@ -743,6 +745,19 @@ public Node visitUnsetProperty(final SqlBaseParser.UnsetPropertyContext context) return new UnsetProperty(getLocation(context), propertyName); } + @Override + public Node visitDefineVariable(final SqlBaseParser.DefineVariableContext context) { + final String variableName = context.variableName().getText(); + final String variableValue = ParserUtil.unquote(context.variableValue().getText(), "'"); + return new DefineVariable(getLocation(context), variableName, variableValue); + } + + @Override + public Node visitUndefineVariable(final SqlBaseParser.UndefineVariableContext context) { + final String variableName = context.variableName().getText(); + return new UndefineVariable(getLocation(context), variableName); + } + @Override public Node visitPrintTopic(final SqlBaseParser.PrintTopicContext context) { final boolean fromBeginning = context.printClause().FROM() != null; diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java index 237e05e1b173..e47b420801c8 100644 --- a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java @@ -38,6 +38,7 @@ import io.confluent.ksql.parser.tree.CreateStreamAsSelect; import io.confluent.ksql.parser.tree.CreateTable; import io.confluent.ksql.parser.tree.CreateTableAsSelect; +import io.confluent.ksql.parser.tree.DefineVariable; import io.confluent.ksql.parser.tree.DropStatement; import io.confluent.ksql.parser.tree.DropStream; import io.confluent.ksql.parser.tree.DropTable; @@ -65,6 +66,7 @@ import io.confluent.ksql.parser.tree.TableElement; import io.confluent.ksql.parser.tree.TableElements; import io.confluent.ksql.parser.tree.TerminateQuery; +import io.confluent.ksql.parser.tree.UndefineVariable; import io.confluent.ksql.parser.tree.UnsetProperty; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.schema.utils.FormatOptions; @@ -443,6 +445,25 @@ protected Void visitSetProperty(final SetProperty node, final Integer context) { return null; } + @Override + protected Void visitDefineVariable(final DefineVariable node, final Integer context) { + builder.append("DEFINE "); + builder.append(node.getVariableName()); + builder.append("='"); + builder.append(node.getVariableValue()); + builder.append("'"); + + return null; + } + + @Override + protected Void visitUndefineVariable(final UndefineVariable node, final Integer context) { + builder.append("UNDEFINE "); + builder.append(node.getVariableName()); + + return null; + } + private void visitExtended() { builder.append(" EXTENDED"); } diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/AstVisitor.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/AstVisitor.java index d3cb030c539e..487f697fddc7 100644 --- a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/AstVisitor.java +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/AstVisitor.java @@ -174,6 +174,14 @@ protected R visitSetProperty(final SetProperty node, final C context) { return visitStatement(node, context); } + protected R visitDefineVariable(final DefineVariable node, final C context) { + return visitStatement(node, context); + } + + protected R visitUndefineVariable(final UndefineVariable node, final C context) { + return visitStatement(node, context); + } + public R visitRegisterType(final RegisterType node, final C context) { return visitStatement(node, context); } diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/DefineVariable.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/DefineVariable.java new file mode 100644 index 000000000000..ea5c9e0b044c --- /dev/null +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/DefineVariable.java @@ -0,0 +1,79 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.parser.tree; + +import static java.util.Objects.requireNonNull; + +import com.google.errorprone.annotations.Immutable; +import io.confluent.ksql.parser.NodeLocation; + +import java.util.Objects; +import java.util.Optional; + +@Immutable +public class DefineVariable extends Statement { + private final String variableName; + private final String variableValue; + + public DefineVariable( + final Optional location, + final String variableName, + final String variableValue + ) { + super(location); + this.variableName = requireNonNull(variableName, "variableName"); + this.variableValue = requireNonNull(variableValue, "variableValue"); + } + + public String getVariableName() { + return variableName; + } + + public String getVariableValue() { + return variableValue; + } + + @Override + public R accept(final AstVisitor visitor, final C context) { + return visitor.visitDefineVariable(this, context); + } + + @Override + public int hashCode() { + return Objects.hash(variableName, variableValue); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final DefineVariable that = (DefineVariable) o; + return Objects.equals(variableName, that.variableName) + && Objects.equals(variableValue, that.variableValue); + } + + @Override + public String toString() { + return "DefineVariable{" + + "name=" + variableName + + ", value=" + variableValue + + '}'; + } +} diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/UndefineVariable.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/UndefineVariable.java new file mode 100644 index 000000000000..842246696916 --- /dev/null +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/UndefineVariable.java @@ -0,0 +1,67 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.parser.tree; + +import static java.util.Objects.requireNonNull; + +import com.google.errorprone.annotations.Immutable; +import io.confluent.ksql.parser.NodeLocation; + +import java.util.Objects; +import java.util.Optional; + +@Immutable +public class UndefineVariable extends Statement { + private final String variableName; + + public UndefineVariable(final Optional location, final String variableName) { + super(location); + this.variableName = requireNonNull(variableName, "variableName"); + } + + public String getVariableName() { + return variableName; + } + + @Override + public R accept(final AstVisitor visitor, final C context) { + return visitor.visitUndefineVariable(this, context); + } + + @Override + public int hashCode() { + return Objects.hash(variableName); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final UndefineVariable that = (UndefineVariable) o; + return Objects.equals(variableName, that.variableName); + } + + @Override + public String toString() { + return "UndefineVariable{" + + "name=" + variableName + + '}'; + } +} diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/SqlFormatterTest.java b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/SqlFormatterTest.java index be7d98f35c6c..78b8af4c8009 100644 --- a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/SqlFormatterTest.java +++ b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/SqlFormatterTest.java @@ -668,6 +668,26 @@ public void shouldFormatInsertIntoPartitionBy() { )); } + @Test + public void shouldFormatDefineStatement() { + final String statementString = "DEFINE _topic='t1';"; + final Statement statement = parseSingle(statementString); + + final String result = SqlFormatter.formatSql(statement); + + assertThat(result, is("DEFINE _topic='t1'")); + } + + @Test + public void shouldFormatUndefineStatement() { + final String statementString = "UNDEFINE _topic;"; + final Statement statement = parseSingle(statementString); + + final String result = SqlFormatter.formatSql(statement); + + assertThat(result, is("UNDEFINE _topic")); + } + @Test public void shouldFormatExplainQuery() { final String statementString = "EXPLAIN foo;"; diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/CustomExecutors.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/CustomExecutors.java index f7d3df747f6e..1594869c2d9e 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/CustomExecutors.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/CustomExecutors.java @@ -19,6 +19,7 @@ import io.confluent.ksql.KsqlExecutionContext; import io.confluent.ksql.engine.InsertValuesExecutor; import io.confluent.ksql.parser.tree.CreateConnector; +import io.confluent.ksql.parser.tree.DefineVariable; import io.confluent.ksql.parser.tree.DescribeConnector; import io.confluent.ksql.parser.tree.DescribeFunction; import io.confluent.ksql.parser.tree.DropConnector; @@ -35,6 +36,7 @@ import io.confluent.ksql.parser.tree.SetProperty; import io.confluent.ksql.parser.tree.ShowColumns; import io.confluent.ksql.parser.tree.Statement; +import io.confluent.ksql.parser.tree.UndefineVariable; import io.confluent.ksql.parser.tree.UnsetProperty; import io.confluent.ksql.rest.SessionProperties; import io.confluent.ksql.rest.entity.KsqlEntity; @@ -69,6 +71,8 @@ public enum CustomExecutors { DESCRIBE_FUNCTION(DescribeFunction.class, DescribeFunctionExecutor::execute), SET_PROPERTY(SetProperty.class, PropertyExecutor::set), UNSET_PROPERTY(UnsetProperty.class, PropertyExecutor::unset), + DEFINE_VARIABLE(DefineVariable.class, VariableExecutor::set), + UNDEFINE_VARIABLE(UndefineVariable.class, VariableExecutor::unset), INSERT_VALUES(InsertValues.class, insertValuesExecutor()), CREATE_CONNECTOR(CreateConnector.class, ConnectExecutor::execute), DROP_CONNECTOR(DropConnector.class, DropConnectorExecutor::execute), diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/VariableExecutor.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/VariableExecutor.java new file mode 100644 index 000000000000..5806aa44920e --- /dev/null +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/VariableExecutor.java @@ -0,0 +1,67 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server.execution; + +import io.confluent.ksql.KsqlExecutionContext; +import io.confluent.ksql.parser.tree.DefineVariable; +import io.confluent.ksql.parser.tree.UndefineVariable; +import io.confluent.ksql.rest.SessionProperties; +import io.confluent.ksql.rest.entity.KsqlEntity; +import io.confluent.ksql.rest.entity.WarningEntity; +import io.confluent.ksql.services.ServiceContext; +import io.confluent.ksql.statement.ConfiguredStatement; + +import java.util.Optional; + +public final class VariableExecutor { + private VariableExecutor() { + } + + public static Optional set( + final ConfiguredStatement statement, + final SessionProperties sessionProperties, + final KsqlExecutionContext executionContext, + final ServiceContext serviceContext + ) { + final DefineVariable defineVariable = statement.getStatement(); + sessionProperties.setVariable( + defineVariable.getVariableName(), + defineVariable.getVariableValue() + ); + + return Optional.empty(); + } + + public static Optional unset( + final ConfiguredStatement statement, + final SessionProperties sessionProperties, + final KsqlExecutionContext executionContext, + final ServiceContext serviceContext + ) { + final String variableName = statement.getStatement().getVariableName(); + + if (!sessionProperties.getSessionVariables().containsKey(variableName)) { + return Optional.of(new WarningEntity( + statement.getStatementText(), + String.format("Cannot undefine variable '%s' which was never defined", variableName) + )); + } + + sessionProperties.unsetVariable(variableName); + + return Optional.empty(); + } +} diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/CustomValidators.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/CustomValidators.java index b6e1e113ffcd..58e7e2a79882 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/CustomValidators.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/CustomValidators.java @@ -19,6 +19,7 @@ import io.confluent.ksql.KsqlExecutionContext; import io.confluent.ksql.engine.InsertValuesExecutor; import io.confluent.ksql.parser.tree.CreateConnector; +import io.confluent.ksql.parser.tree.DefineVariable; import io.confluent.ksql.parser.tree.DescribeConnector; import io.confluent.ksql.parser.tree.DescribeFunction; import io.confluent.ksql.parser.tree.DropConnector; @@ -37,6 +38,7 @@ import io.confluent.ksql.parser.tree.SetProperty; import io.confluent.ksql.parser.tree.ShowColumns; import io.confluent.ksql.parser.tree.Statement; +import io.confluent.ksql.parser.tree.UndefineVariable; import io.confluent.ksql.parser.tree.UnsetProperty; import io.confluent.ksql.rest.SessionProperties; import io.confluent.ksql.rest.server.execution.DescribeConnectorExecutor; @@ -45,6 +47,7 @@ import io.confluent.ksql.rest.server.execution.ListSourceExecutor; import io.confluent.ksql.rest.server.execution.PropertyExecutor; import io.confluent.ksql.rest.server.execution.PullQueryExecutor; +import io.confluent.ksql.rest.server.execution.VariableExecutor; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KsqlException; @@ -81,7 +84,9 @@ public enum CustomValidators { DESCRIBE_FUNCTION(DescribeFunction.class, DescribeFunctionExecutor::execute), DESCRIBE_CONNECTOR(DescribeConnector.class, new DescribeConnectorExecutor()::execute), SET_PROPERTY(SetProperty.class, PropertyExecutor::set), - UNSET_PROPERTY(UnsetProperty.class, PropertyExecutor::unset); + UNSET_PROPERTY(UnsetProperty.class, PropertyExecutor::unset), + DEFINE_VARIABLE(DefineVariable.class, VariableExecutor::set), + UNDEFINE_VARIABLE(UndefineVariable.class, VariableExecutor::unset); public static final Map, StatementValidator> VALIDATOR_MAP = ImmutableMap.copyOf( diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/VariableExecutorTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/VariableExecutorTest.java new file mode 100644 index 000000000000..656db245a203 --- /dev/null +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/VariableExecutorTest.java @@ -0,0 +1,153 @@ +package io.confluent.ksql.rest.server.execution; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.hasEntry; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThrows; +import static org.mockito.Mockito.mock; + +import io.confluent.ksql.parser.exception.ParseFailedException; +import io.confluent.ksql.rest.SessionProperties; +import io.confluent.ksql.rest.entity.KsqlEntity; +import io.confluent.ksql.rest.entity.WarningEntity; +import io.confluent.ksql.rest.server.TemporaryEngine; +import io.confluent.ksql.util.KsqlHostInfo; +import java.net.URL; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class VariableExecutorTest { + @Rule + public final TemporaryEngine engine = new TemporaryEngine(); + + private SessionProperties sessionProperties; + + @Before + public void setup() { + sessionProperties = new SessionProperties( + new HashMap<>(), mock(KsqlHostInfo.class), mock(URL.class), false); + } + + private void executeDefineVariable(final String sql) { + final Optional response = CustomExecutors.DEFINE_VARIABLE.execute( + engine.configure(sql), + sessionProperties, + engine.getEngine(), + engine.getServiceContext() + ); + assertThat(response, is(Optional.empty())); + } + + private Optional executeUndefineVariable(final String sql) { + return CustomExecutors.UNDEFINE_VARIABLE.execute( + engine.configure(sql), + sessionProperties, + engine.getEngine(), + engine.getServiceContext() + ); + } + + @Test + public void shouldSetVariables() { + // When: + executeDefineVariable("DEFINE var1 = 'John Peter';"); + executeDefineVariable("DEFINE var2 = '''John Peter''';"); + + // Then: + final Map variablesMap = sessionProperties.getSessionVariables(); + assertThat(variablesMap.size(), is(2)); + assertThat(variablesMap, hasEntry("var1", "John Peter")); + assertThat(variablesMap, hasEntry("var2", "'John Peter'")); + } + + @Test + public void shouldSetCaseInsensitiveVariables() { + // When: + executeDefineVariable("DEFINE A = 'val1';"); + executeDefineVariable("DEFINE b = 'val2';"); + + // Then: + final Map variablesMap = sessionProperties.getSessionVariables(); + assertThat(variablesMap.containsKey("a"), is(true)); + assertThat(variablesMap.get("a"), is("val1")); + assertThat(variablesMap.containsKey("A"), is(true)); + assertThat(variablesMap.get("A"), is("val1")); + assertThat(variablesMap.containsKey("b"), is(true)); + assertThat(variablesMap.get("b"), is("val2")); + assertThat(variablesMap.containsKey("B"), is(true)); + assertThat(variablesMap.get("B"), is("val2")); + } + + @Test + public void shouldUnsetVariables() { + // Given: + sessionProperties.setVariable("var1", "1"); + sessionProperties.setVariable("var2", "2"); + + // When: + final Optional response = executeUndefineVariable("UNDEFINE var1;"); + assertThat(response, is(Optional.empty())); + + // Then: + final Map variablesMap = sessionProperties.getSessionVariables(); + assertThat(variablesMap.size(), is(1)); + assertThat(variablesMap, hasEntry("var2", "2")); + } + + @Test + public void shouldUnsetCaseInsensitiveVariables() { + // Given: + sessionProperties.setVariable("VAR1", "1"); + + // When: + final Optional response = executeUndefineVariable("UNDEFINE var1;"); + assertThat(response, is(Optional.empty())); + + // Then: + final Map variablesMap = sessionProperties.getSessionVariables(); + assertThat(variablesMap.size(), is(0)); + } + + @Test + public void shouldReturnWarningWhenUndefineAnUnknownVariable() { + // When: + final KsqlEntity response = executeUndefineVariable("UNDEFINE var1;").get(); + + // Then: + assertThat(((WarningEntity)response).getMessage(), + containsString("Cannot undefine variable 'var1' which was never defined")); + } + + @Test + public void shouldThrowOnInvalidValues() { + // Given: + final List invalidValues = Arrays.asList( + "\"3\"", // double-quotes + "`3`", // back-quotes + "3" // no quotes + ); + + for (final String invalidValue : invalidValues) { + // When: + final Exception e = assertThrows( + ParseFailedException.class, + () -> executeDefineVariable(String.format("DEFINE var1=%s;", invalidValue)) + ); + + // Then: + assertThat(e.getMessage(), containsString( + String.format("mismatched input '%s'", invalidValue))); + } + } +} diff --git a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/SessionProperties.java b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/SessionProperties.java index b4bc0f3263de..e9e95afaaa33 100644 --- a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/SessionProperties.java +++ b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/SessionProperties.java @@ -17,9 +17,11 @@ import io.confluent.ksql.util.KsqlHostInfo; import java.net.URL; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.TreeMap; /** * Wraps the incoming {@link io.confluent.ksql.rest.entity.KsqlRequest} streamsProperties @@ -32,6 +34,7 @@ public class SessionProperties { private final KsqlHostInfo ksqlHostInfo; private final URL localUrl; private final boolean internalRequest; + private final Map sessionVariables; /** * @param mutableScopedProperties The streamsProperties of the incoming request @@ -50,6 +53,7 @@ public SessionProperties( this.ksqlHostInfo = Objects.requireNonNull(ksqlHostInfo, "ksqlHostInfo"); this.localUrl = Objects.requireNonNull(localUrl, "localUrl"); this.internalRequest = internalRequest; + this.sessionVariables = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); } public Map getMutableScopedProperties() { @@ -67,4 +71,16 @@ public URL getLocalUrl() { public boolean getInternalRequest() { return internalRequest; } + + public Map getSessionVariables() { + return Collections.unmodifiableMap(sessionVariables); + } + + public void setVariable(final String name, final String value) { + sessionVariables.put(name, value); + } + + public void unsetVariable(final String name) { + sessionVariables.remove(name); + } } From 19433b185281d4c6e4fb7826a17103dbe4c5a45c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Pe=C3=B1a?= Date: Fri, 16 Oct 2020 13:41:59 -0500 Subject: [PATCH 2/3] feat: syntax SHOW VARIABLES to print session variables --- .../main/java/io/confluent/ksql/cli/Cli.java | 22 +++- .../confluent/ksql/cli/console/Console.java | 4 + .../builder/ListVariablesTableBuilder.java | 45 ++++++++ .../java/io/confluent/ksql/cli/CliTest.java | 13 +++ .../io/confluent/ksql/parser/SqlBase.g4 | 2 + .../io/confluent/ksql/parser/AstBuilder.java | 6 + .../confluent/ksql/parser/SqlFormatter.java | 8 ++ .../ksql/parser/tree/AstVisitor.java | 4 + .../ksql/parser/tree/ListVariables.java | 57 ++++++++++ .../ksql/parser/SqlFormatterTest.java | 13 +++ .../server/execution/CustomExecutors.java | 2 + .../execution/ListVariablesExecutor.java | 48 ++++++++ .../server/validation/CustomValidators.java | 3 + .../execution/ListVariablesExecutorTest.java | 85 +++++++++++++++ .../execution/VariableExecutorTest.java | 15 +++ .../ksql/rest/entity/VariablesList.java | 103 ++++++++++++++++++ 16 files changed, 429 insertions(+), 1 deletion(-) create mode 100644 ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/table/builder/ListVariablesTableBuilder.java create mode 100644 ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/ListVariables.java create mode 100644 ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListVariablesExecutor.java create mode 100644 ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListVariablesExecutorTest.java create mode 100644 ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/VariablesList.java diff --git a/ksqldb-cli/src/main/java/io/confluent/ksql/cli/Cli.java b/ksqldb-cli/src/main/java/io/confluent/ksql/cli/Cli.java index 2ad5e8e77afa..ae5499ee9490 100644 --- a/ksqldb-cli/src/main/java/io/confluent/ksql/cli/Cli.java +++ b/ksqldb-cli/src/main/java/io/confluent/ksql/cli/Cli.java @@ -26,6 +26,7 @@ import io.confluent.ksql.parser.KsqlParser.ParsedStatement; import io.confluent.ksql.parser.SqlBaseParser; import io.confluent.ksql.parser.SqlBaseParser.DefineVariableContext; +import io.confluent.ksql.parser.SqlBaseParser.ListVariablesContext; import io.confluent.ksql.parser.SqlBaseParser.PrintTopicContext; import io.confluent.ksql.parser.SqlBaseParser.QueryStatementContext; import io.confluent.ksql.parser.SqlBaseParser.SetPropertyContext; @@ -44,6 +45,7 @@ import io.confluent.ksql.rest.entity.KsqlEntityList; import io.confluent.ksql.rest.entity.ServerInfo; import io.confluent.ksql.rest.entity.StreamedRow; +import io.confluent.ksql.rest.entity.VariablesList; import io.confluent.ksql.util.AppInfo; import io.confluent.ksql.util.ErrorMessageUtil; import io.confluent.ksql.util.HandlerMaps; @@ -55,6 +57,7 @@ import io.vertx.core.VertxException; import java.io.Closeable; import java.io.PrintWriter; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; @@ -65,6 +68,8 @@ import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; import java.util.function.Supplier; +import java.util.stream.Collectors; + import org.jline.reader.EndOfFileException; import org.jline.reader.UserInterruptException; import org.jline.terminal.Terminal; @@ -90,6 +95,7 @@ public class Cli implements KsqlRequestExecutor, Closeable { .put(UnsetPropertyContext.class, Cli::unsetPropertyFromCtxt) .put(DefineVariableContext.class, Cli::defineVariableFromCtxt) .put(UndefineVariableContext.class, Cli::undefineVariableFromCtxt) + .put(ListVariablesContext.class, Cli::listVariablesFromCtxt) .build(); private final Long streamedQueryRowLimit; @@ -508,11 +514,25 @@ private void undefineVariableFromCtxt( if (sessionVariables.remove(variableName) == null) { // Print only (no throws exception) to keep it as a warning message (like VariableExecutor) terminal.writer() - .printf("Cannot undefine variable '%s' which was never defined", variableName); + .printf("Cannot undefine variable '%s' which was never defined.%n", variableName); terminal.flush(); } } + @SuppressWarnings("unused") + private void listVariablesFromCtxt( + final String ignored, + final ListVariablesContext listVariablesContext + ) { + final List variables = sessionVariables.entrySet().stream() + .map(e -> new VariablesList.Variable(e.getKey(), e.getValue())) + .collect(Collectors.toList()); + + terminal.printKsqlEntityList(Collections.singletonList( + new VariablesList(listVariablesContext.getText(),variables) + )); + } + private static boolean isSequenceNumberTimeout(final RestResponse response) { return response.isErroneous() && (response.getErrorMessage().getErrorCode() diff --git a/ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/Console.java b/ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/Console.java index 2de4eefec422..206c945adab5 100644 --- a/ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/Console.java +++ b/ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/Console.java @@ -36,6 +36,7 @@ import io.confluent.ksql.cli.console.table.builder.ExecutionPlanTableBuilder; import io.confluent.ksql.cli.console.table.builder.FunctionNameListTableBuilder; import io.confluent.ksql.cli.console.table.builder.KafkaTopicsListTableBuilder; +import io.confluent.ksql.cli.console.table.builder.ListVariablesTableBuilder; import io.confluent.ksql.cli.console.table.builder.PropertiesListTableBuilder; import io.confluent.ksql.cli.console.table.builder.QueriesTableBuilder; import io.confluent.ksql.cli.console.table.builder.StreamsListTableBuilder; @@ -83,6 +84,7 @@ import io.confluent.ksql.rest.entity.TablesList; import io.confluent.ksql.rest.entity.TopicDescription; import io.confluent.ksql.rest.entity.TypeList; +import io.confluent.ksql.rest.entity.VariablesList; import io.confluent.ksql.rest.entity.WarningEntity; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.util.CmdLineUtil; @@ -178,6 +180,8 @@ public class Console implements Closeable { tablePrinter(ErrorEntity.class, ErrorEntityTableBuilder::new)) .put(WarningEntity.class, tablePrinter(WarningEntity.class, WarningEntityTableBuilder::new)) + .put(VariablesList.class, + tablePrinter(VariablesList.class, ListVariablesTableBuilder::new)) .build(); private static Handler1 tablePrinter( diff --git a/ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/table/builder/ListVariablesTableBuilder.java b/ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/table/builder/ListVariablesTableBuilder.java new file mode 100644 index 000000000000..7c4585ba6696 --- /dev/null +++ b/ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/table/builder/ListVariablesTableBuilder.java @@ -0,0 +1,45 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.cli.console.table.builder; + +import com.google.common.collect.ImmutableList; +import io.confluent.ksql.cli.console.table.Table; +import io.confluent.ksql.rest.entity.VariablesList; + +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; + +public class ListVariablesTableBuilder implements TableBuilder { + private static final List HEADERS = + ImmutableList.of("Variable Name", "Value"); + + @Override + public Table buildTable(final VariablesList entity) { + return new Table.Builder() + .withColumnHeaders(HEADERS) + .withRows(defRowValues(entity.getVariables())) + .build(); + } + + private static List> defRowValues(final List variables) { + return variables.stream() + .sorted(Comparator.comparing(var -> var.getName())) + .map(var -> ImmutableList.of( + var.getName(), var.getValue())) + .collect(Collectors.toList()); + } +} diff --git a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/CliTest.java b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/CliTest.java index b92cc351ed61..4df9d07d8bc8 100644 --- a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/CliTest.java +++ b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/CliTest.java @@ -435,7 +435,20 @@ public void shouldPrintTopicWithDelimitedValue() { public void testVariableDefineUndefine() { assertRunCommand("define var1 = '1';", is(EMPTY_RESULT)); assertRunCommand("define var2 = '2';", is(EMPTY_RESULT)); + assertRunCommand("define var3 = '3';", is(EMPTY_RESULT)); + assertRunCommand("undefine var1;", is(EMPTY_RESULT)); + + assertRunListCommand("variables", hasRows( + row( + "var2", + "2" + ), + row( + "var3", + "3" + ) + )); } @Test diff --git a/ksqldb-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 b/ksqldb-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 index 2c3613c502c6..34aedc71fc5d 100644 --- a/ksqldb-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 +++ b/ksqldb-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 @@ -53,6 +53,7 @@ statement | (LIST | SHOW) FUNCTIONS #listFunctions | (LIST | SHOW) (SOURCE | SINK)? CONNECTORS #listConnectors | (LIST | SHOW) TYPES #listTypes + | (LIST | SHOW) VARIABLES #listVariables | DESCRIBE EXTENDED? sourceName #showColumns | DESCRIBE FUNCTION identifier #describeFunction | DESCRIBE CONNECTOR identifier #describeConnector @@ -517,6 +518,7 @@ REPLACE: 'REPLACE'; ASSERT: 'ASSERT'; ADD: 'ADD'; ALTER: 'ALTER'; +VARIABLES: 'VARIABLES'; IF: 'IF'; diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java index 8f5dca983577..e69d4c27ca7b 100644 --- a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java @@ -131,6 +131,7 @@ import io.confluent.ksql.parser.tree.ListTables; import io.confluent.ksql.parser.tree.ListTopics; import io.confluent.ksql.parser.tree.ListTypes; +import io.confluent.ksql.parser.tree.ListVariables; import io.confluent.ksql.parser.tree.PartitionBy; import io.confluent.ksql.parser.tree.PrintTopic; import io.confluent.ksql.parser.tree.Query; @@ -732,6 +733,11 @@ public Node visitListProperties(final SqlBaseParser.ListPropertiesContext contex return new ListProperties(getLocation(context)); } + @Override + public Node visitListVariables(final SqlBaseParser.ListVariablesContext context) { + return new ListVariables(getLocation(context)); + } + @Override public Node visitSetProperty(final SqlBaseParser.SetPropertyContext context) { final String propertyName = ParserUtil.unquote(context.STRING(0).getText(), "'"); diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java index e47b420801c8..071b96f5b77f 100644 --- a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java @@ -53,6 +53,7 @@ import io.confluent.ksql.parser.tree.ListFunctions; import io.confluent.ksql.parser.tree.ListStreams; import io.confluent.ksql.parser.tree.ListTables; +import io.confluent.ksql.parser.tree.ListVariables; import io.confluent.ksql.parser.tree.PartitionBy; import io.confluent.ksql.parser.tree.Query; import io.confluent.ksql.parser.tree.RegisterType; @@ -456,6 +457,13 @@ protected Void visitDefineVariable(final DefineVariable node, final Integer cont return null; } + @Override + public Void visitListVariables(final ListVariables node, final Integer context) { + builder.append("SHOW VARIABLES"); + + return null; + } + @Override protected Void visitUndefineVariable(final UndefineVariable node, final Integer context) { builder.append("UNDEFINE "); diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/AstVisitor.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/AstVisitor.java index 487f697fddc7..b0efdb5eb09d 100644 --- a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/AstVisitor.java +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/AstVisitor.java @@ -182,6 +182,10 @@ protected R visitUndefineVariable(final UndefineVariable node, final C context) return visitStatement(node, context); } + protected R visitListVariables(final ListVariables node, final C context) { + return visitStatement(node, context); + } + public R visitRegisterType(final RegisterType node, final C context) { return visitStatement(node, context); } diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/ListVariables.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/ListVariables.java new file mode 100644 index 000000000000..9df833de009d --- /dev/null +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/ListVariables.java @@ -0,0 +1,57 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.parser.tree; + +import static com.google.common.base.MoreObjects.toStringHelper; + +import com.google.errorprone.annotations.Immutable; +import io.confluent.ksql.parser.NodeLocation; + +import java.util.Objects; +import java.util.Optional; + +@Immutable +public class ListVariables extends Statement { + public ListVariables(final Optional location) { + super(location); + } + + + @Override + public R accept(final AstVisitor visitor, final C context) { + return visitor.visitListVariables(this, context); + } + + @Override + public int hashCode() { + return Objects.hash(getClass()); + } + + @Override + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } + + return obj != null && obj.getClass().equals(getClass()); + } + + @Override + public String toString() { + return toStringHelper(this) + .toString(); + } +} diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/SqlFormatterTest.java b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/SqlFormatterTest.java index 78b8af4c8009..27a3005b9026 100644 --- a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/SqlFormatterTest.java +++ b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/SqlFormatterTest.java @@ -48,6 +48,7 @@ import io.confluent.ksql.parser.tree.JoinedSource; import io.confluent.ksql.parser.tree.ListStreams; import io.confluent.ksql.parser.tree.ListTables; +import io.confluent.ksql.parser.tree.ListVariables; import io.confluent.ksql.parser.tree.Statement; import io.confluent.ksql.parser.tree.Table; import io.confluent.ksql.parser.tree.TableElement; @@ -804,6 +805,18 @@ public void shouldFormatShowTablesExtended() { assertThat(formatted, is("SHOW TABLES EXTENDED")); } + @Test + public void shouldFormatShowVariables() { + // Given: + final ListVariables listVariables = new ListVariables(Optional.empty()); + + // When: + final String formatted = SqlFormatter.formatSql(listVariables); + + // Then: + assertThat(formatted, is("SHOW VARIABLES")); + } + @Test public void shouldFormatShowStreams() { // Given: diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/CustomExecutors.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/CustomExecutors.java index 1594869c2d9e..b038ea26650f 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/CustomExecutors.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/CustomExecutors.java @@ -33,6 +33,7 @@ import io.confluent.ksql.parser.tree.ListTables; import io.confluent.ksql.parser.tree.ListTopics; import io.confluent.ksql.parser.tree.ListTypes; +import io.confluent.ksql.parser.tree.ListVariables; import io.confluent.ksql.parser.tree.SetProperty; import io.confluent.ksql.parser.tree.ShowColumns; import io.confluent.ksql.parser.tree.Statement; @@ -65,6 +66,7 @@ public enum CustomExecutors { LIST_PROPERTIES(ListProperties.class, ListPropertiesExecutor::execute), LIST_CONNECTORS(ListConnectors.class, ListConnectorsExecutor::execute), LIST_TYPES(ListTypes.class, ListTypesExecutor::execute), + LIST_VARIABLES(ListVariables.class, ListVariablesExecutor::execute), SHOW_COLUMNS(ShowColumns.class, ListSourceExecutor::columns), EXPLAIN(Explain.class, ExplainExecutor::execute), diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListVariablesExecutor.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListVariablesExecutor.java new file mode 100644 index 000000000000..54db03672091 --- /dev/null +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListVariablesExecutor.java @@ -0,0 +1,48 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server.execution; + +import io.confluent.ksql.KsqlExecutionContext; +import io.confluent.ksql.parser.tree.ListVariables; +import io.confluent.ksql.rest.SessionProperties; +import io.confluent.ksql.rest.entity.KsqlEntity; +import io.confluent.ksql.rest.entity.VariablesList; +import io.confluent.ksql.rest.entity.VariablesList.Variable; +import io.confluent.ksql.services.ServiceContext; +import io.confluent.ksql.statement.ConfiguredStatement; + +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +public final class ListVariablesExecutor { + private ListVariablesExecutor() { + } + + public static Optional execute( + final ConfiguredStatement statement, + final SessionProperties sessionProperties, + final KsqlExecutionContext executionContext, + final ServiceContext serviceContext + ) { + final List sessionVariables = sessionProperties.getSessionVariables().entrySet() + .stream() + .map(e -> new Variable(e.getKey(), e.getValue())) + .collect(Collectors.toList()); + + return Optional.of(new VariablesList(statement.getStatementText(), sessionVariables)); + } +} diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/CustomValidators.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/CustomValidators.java index 58e7e2a79882..9a2b2d7ced80 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/CustomValidators.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/CustomValidators.java @@ -33,6 +33,7 @@ import io.confluent.ksql.parser.tree.ListTables; import io.confluent.ksql.parser.tree.ListTopics; import io.confluent.ksql.parser.tree.ListTypes; +import io.confluent.ksql.parser.tree.ListVariables; import io.confluent.ksql.parser.tree.PrintTopic; import io.confluent.ksql.parser.tree.Query; import io.confluent.ksql.parser.tree.SetProperty; @@ -45,6 +46,7 @@ import io.confluent.ksql.rest.server.execution.DescribeFunctionExecutor; import io.confluent.ksql.rest.server.execution.ExplainExecutor; import io.confluent.ksql.rest.server.execution.ListSourceExecutor; +import io.confluent.ksql.rest.server.execution.ListVariablesExecutor; import io.confluent.ksql.rest.server.execution.PropertyExecutor; import io.confluent.ksql.rest.server.execution.PullQueryExecutor; import io.confluent.ksql.rest.server.execution.VariableExecutor; @@ -77,6 +79,7 @@ public enum CustomValidators { LIST_TYPES(ListTypes.class, StatementValidator.NO_VALIDATION), CREATE_CONNECTOR(CreateConnector.class, StatementValidator.NO_VALIDATION), DROP_CONNECTOR(DropConnector.class, StatementValidator.NO_VALIDATION), + LIST_VARIABLES(ListVariables.class, ListVariablesExecutor::execute), INSERT_VALUES(InsertValues.class, new InsertValuesExecutor()::execute), SHOW_COLUMNS(ShowColumns.class, ListSourceExecutor::columns), diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListVariablesExecutorTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListVariablesExecutorTest.java new file mode 100644 index 000000000000..9f117f0363bd --- /dev/null +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListVariablesExecutorTest.java @@ -0,0 +1,85 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server.execution; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.mock; + +import io.confluent.ksql.rest.SessionProperties; +import io.confluent.ksql.rest.entity.KsqlEntity; +import io.confluent.ksql.rest.entity.VariablesList; +import io.confluent.ksql.rest.server.TemporaryEngine; +import io.confluent.ksql.util.KsqlHostInfo; +import java.net.URL; +import java.util.Collections; +import java.util.HashMap; +import java.util.Optional; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class ListVariablesExecutorTest { + @Rule + public final TemporaryEngine engine = new TemporaryEngine(); + + private SessionProperties sessionProperties; + + @Before + public void setup() { + sessionProperties = new SessionProperties( + new HashMap<>(), mock(KsqlHostInfo.class), mock(URL.class), false); + } + + private Optional executeListVariables(final String sql) { + return CustomExecutors.LIST_VARIABLES.execute( + engine.configure(sql), + sessionProperties, + engine.getEngine(), + engine.getServiceContext() + ); + } + + @Test + public void shouldListEmptyVariables() { + // When: + final KsqlEntity response = executeListVariables("LIST VARIABLES;").get(); + + // Then: + assertThat(((VariablesList)response).getVariables(), is(Collections.emptyList())); + } + + @Test + public void shouldListVariables() { + // Given: + sessionProperties.setVariable("var1", "1"); + sessionProperties.setVariable("var2", "2"); + + // When: + final KsqlEntity response = executeListVariables("LIST VARIABLES;").get(); + + // Then: + final VariablesList variablesList = (VariablesList) response; + assertThat(variablesList.getStatementText(), is("LIST VARIABLES;")); + assertThat(variablesList.getVariables().get(0), + is(new VariablesList.Variable("var1", "1"))); + assertThat(variablesList.getVariables().get(1), + is(new VariablesList.Variable("var2", "2"))); + } +} diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/VariableExecutorTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/VariableExecutorTest.java index 656db245a203..a3bf8c5128cd 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/VariableExecutorTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/VariableExecutorTest.java @@ -1,3 +1,18 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + package io.confluent.ksql.rest.server.execution; import static org.hamcrest.MatcherAssert.assertThat; diff --git a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/VariablesList.java b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/VariablesList.java new file mode 100644 index 000000000000..020cfabdfe25 --- /dev/null +++ b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/VariablesList.java @@ -0,0 +1,103 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.entity; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class VariablesList extends KsqlEntity { + @JsonIgnoreProperties(ignoreUnknown = true) + public static class Variable { + private final String name; + private final String value; + + @JsonCreator + public Variable( + @JsonProperty("name") final String name, + @JsonProperty("value") final String value + ) { + this.name = name; + this.value = value; + } + + public String getName() { + return name; + } + + public String getValue() { + return value; + } + + @Override + public boolean equals(final Object object) { + if (this == object) { + return true; + } + if (object == null || getClass() != object.getClass()) { + return false; + } + final Variable that = (Variable) object; + return Objects.equals(name, that.name) + && Objects.equals(value, that.value); + } + + @Override + public int hashCode() { + return Objects.hash(name, value); + } + + @Override + public String toString() { + return "Variable{" + + "name='" + name + '\'' + + ", value='" + value + '\'' + + '}'; + } + } + + private final List variables; + + @JsonCreator + public VariablesList( + @JsonProperty("statementText") final String statementText, + @JsonProperty("properties") final List variables + ) { + super(statementText); + this.variables = variables == null ? Collections.emptyList() : ImmutableList.copyOf(variables); + } + + public List getVariables() { + return variables; + } + + @Override + public boolean equals(final Object o) { + return o instanceof VariablesList + && Objects.equals(variables, ((VariablesList)o).variables); + } + + @Override + public int hashCode() { + return Objects.hash(variables); + } +} From a9a9ba8f618e004c58e3512a6a0f06d2865bc7aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Pe=C3=B1a?= Date: Wed, 21 Oct 2020 21:08:10 -0500 Subject: [PATCH 3/3] fix: address Almog's feedback --- .../execution/ListVariablesExecutorTest.java | 16 ++++++++-------- .../server/execution/VariableExecutorTest.java | 14 ++++++++++++++ 2 files changed, 22 insertions(+), 8 deletions(-) diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListVariablesExecutorTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListVariablesExecutorTest.java index 9f117f0363bd..07040c2de163 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListVariablesExecutorTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListVariablesExecutorTest.java @@ -18,27 +18,24 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import io.confluent.ksql.rest.SessionProperties; import io.confluent.ksql.rest.entity.KsqlEntity; import io.confluent.ksql.rest.entity.VariablesList; -import io.confluent.ksql.rest.server.TemporaryEngine; +import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KsqlHostInfo; import java.net.URL; import java.util.Collections; import java.util.HashMap; import java.util.Optional; import org.junit.Before; -import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.junit.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) public class ListVariablesExecutorTest { - @Rule - public final TemporaryEngine engine = new TemporaryEngine(); - private SessionProperties sessionProperties; @Before @@ -48,11 +45,14 @@ public void setup() { } private Optional executeListVariables(final String sql) { + final ConfiguredStatement configuredStatement = mock(ConfiguredStatement.class); + when(configuredStatement.getStatementText()).thenReturn(sql); + return CustomExecutors.LIST_VARIABLES.execute( - engine.configure(sql), + configuredStatement, sessionProperties, - engine.getEngine(), - engine.getServiceContext() + null, + null ); } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/VariableExecutorTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/VariableExecutorTest.java index a3bf8c5128cd..f473d8cb8eea 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/VariableExecutorTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/VariableExecutorTest.java @@ -134,6 +134,20 @@ public void shouldUnsetCaseInsensitiveVariables() { assertThat(variablesMap.size(), is(0)); } + @Test + public void shouldOverrideCaseInsensitiveVariables() { + // When: + executeDefineVariable("DEFINE var1 = '1';"); + executeDefineVariable("DEFINE VAR1 = '2';"); + executeDefineVariable("DEFINE vAr1 = '3';"); // latest update + + // Then: + final Map variablesMap = sessionProperties.getSessionVariables(); + assertThat(variablesMap.size(), is(1)); + assertThat(variablesMap.containsKey("var1"), is(true)); + assertThat(variablesMap.get("var1"), is("3")); + } + @Test public void shouldReturnWarningWhenUndefineAnUnknownVariable() { // When: