diff --git a/docs/developer-guide/syntax-reference.rst b/docs/developer-guide/syntax-reference.rst index 2d2923d42d17..03b1afcd89c8 100644 --- a/docs/developer-guide/syntax-reference.rst +++ b/docs/developer-guide/syntax-reference.rst @@ -1279,6 +1279,12 @@ Aggregate functions +------------------------+---------------------------+------------+---------------------------------------------------------------------+ | TOPKDISTINCT | ``TOPKDISTINCT(col1, k)`` | Stream | Return the distinct Top *K* values for the given column and window | +------------------------+---------------------------+------------+---------------------------------------------------------------------+ +| WindowStart | ``WindowStart()`` | Stream | Extract the start time of the current window, in milliseconds. | +| | | Table | If the query is not windowed the function will return null. | ++------------------------+---------------------------+------------+---------------------------------------------------------------------+ +| WindowEnd | ``WindowEnd()`` | Stream | Extract the end time of the current window, in milliseconds. | +| | | Table | If the query is not windowed the function will return null. | ++------------------------+---------------------------+------------+---------------------------------------------------------------------+ .. _ksql_key_requirements: diff --git a/docs/tutorials/examples.rst b/docs/tutorials/examples.rst index efcfefc15c0c..b3fa38fe6ee8 100644 --- a/docs/tutorials/examples.rst +++ b/docs/tutorials/examples.rst @@ -272,6 +272,21 @@ counting/aggregation step per region. WINDOW SESSION (60 SECONDS) \ GROUP BY regionid; +Sometimes you may want to include the bounds of the current window in the result so that it is +more easily accessible to consumers of the data. The statement below extracts the start and +end time of the current session window into fields within output rows. + +.. code:: sql + + CREATE TABLE pageviews_per_region_per_session AS + SELECT regionid, + windowStart(), + windowEnd(), + count(*) + FROM pageviews_enriched + WINDOW SESSION (60 SECONDS) + GROUP BY regionid; + Working with arrays and maps ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/AggregateAnalyzer.java b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/AggregateAnalyzer.java index aeaf11b050fc..93ef84906f05 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/AggregateAnalyzer.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/AggregateAnalyzer.java @@ -28,9 +28,9 @@ public class AggregateAnalyzer extends DefaultTraversalVisitor { - private AggregateAnalysis aggregateAnalysis; - private Analysis analysis; - private FunctionRegistry functionRegistry; + private final AggregateAnalysis aggregateAnalysis; + private final Analysis analysis; + private final FunctionRegistry functionRegistry; private boolean hasAggregateFunction = false; diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/udaf/placeholder/PlaceholderTableUdaf.java b/ksql-engine/src/main/java/io/confluent/ksql/function/udaf/placeholder/PlaceholderTableUdaf.java new file mode 100644 index 000000000000..e8d35637ab17 --- /dev/null +++ b/ksql-engine/src/main/java/io/confluent/ksql/function/udaf/placeholder/PlaceholderTableUdaf.java @@ -0,0 +1,50 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.ksql.function.udaf.placeholder; + +import io.confluent.ksql.function.udaf.TableUdaf; + +/** + * A no-op {@link TableUdaf} that is used as a placeholder for some later hardcoded computation. + */ +public final class PlaceholderTableUdaf implements TableUdaf { + + public static final PlaceholderTableUdaf INSTANCE = new PlaceholderTableUdaf(); + + private PlaceholderTableUdaf(){ + } + + @Override + public Long undo(final Long valueToUndo, final Long aggregateValue) { + return null; + } + + @Override + public Long initialize() { + return null; + } + + @Override + public Long aggregate(final Long value, final Long aggregate) { + return null; + } + + @Override + public Long merge(final Long aggOne, final Long aggTwo) { + return null; + } +} diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/udaf/window/WindowEndKudaf.java b/ksql-engine/src/main/java/io/confluent/ksql/function/udaf/window/WindowEndKudaf.java new file mode 100644 index 000000000000..749fbd73e42a --- /dev/null +++ b/ksql-engine/src/main/java/io/confluent/ksql/function/udaf/window/WindowEndKudaf.java @@ -0,0 +1,48 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.ksql.function.udaf.window; + +import io.confluent.ksql.function.udaf.TableUdaf; +import io.confluent.ksql.function.udaf.UdafDescription; +import io.confluent.ksql.function.udaf.UdafFactory; +import io.confluent.ksql.function.udaf.placeholder.PlaceholderTableUdaf; + +/** + * A placeholder KUDAF for extracting window end times. + * + *

The KUDAF itself does nothing. It's just a placeholder. + * + * @see WindowSelectMapper + */ +@SuppressWarnings("WeakerAccess") // Invoked via reflection. +@UdafDescription(name = "WindowEnd", author = "Confluent", + description = "Returns the window end time, in milliseconds, for the given record. " + + "If the given record is not part of a window the function will return NULL.") +public final class WindowEndKudaf { + + private WindowEndKudaf() { + } + + static String getFunctionName() { + return "WindowEnd"; + } + + @UdafFactory(description = "Extracts the window end time") + public static TableUdaf createWindowEnd() { + return PlaceholderTableUdaf.INSTANCE; + } +} diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/udaf/window/WindowSelectMapper.java b/ksql-engine/src/main/java/io/confluent/ksql/function/udaf/window/WindowSelectMapper.java new file mode 100644 index 000000000000..eada151d3eb1 --- /dev/null +++ b/ksql-engine/src/main/java/io/confluent/ksql/function/udaf/window/WindowSelectMapper.java @@ -0,0 +1,75 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.ksql.function.udaf.window; + +import com.google.common.collect.ImmutableMap; +import io.confluent.ksql.GenericRow; +import io.confluent.ksql.function.KsqlAggregateFunction; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.kafka.streams.kstream.ValueMapperWithKey; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; + +/** + * Used to handle the special cased {@link WindowStartKudaf} and {@link WindowEndKudaf}. + */ +public final class WindowSelectMapper + implements ValueMapperWithKey, GenericRow, GenericRow> { + + private static final Map WINDOW_FUNCTION_NAMES = ImmutableMap.of( + WindowStartKudaf.getFunctionName().toUpperCase(), Type.StartTime, + WindowEndKudaf.getFunctionName().toUpperCase(), Type.EndTime + ); + + private final Map windowSelects; + + public WindowSelectMapper( + final Map aggFunctionsByIndex) { + this.windowSelects = aggFunctionsByIndex.entrySet().stream() + .filter(e -> + WINDOW_FUNCTION_NAMES.containsKey(e.getValue().getFunctionName().toUpperCase())) + .collect(Collectors.toMap( + Map.Entry::getKey, + e -> WINDOW_FUNCTION_NAMES.get(e.getValue().getFunctionName().toUpperCase()))); + } + + public boolean hasSelects() { + return !windowSelects.isEmpty(); + } + + @Override + public GenericRow apply(final Windowed readOnlyKey, final GenericRow row) { + final Window window = readOnlyKey.window(); + + windowSelects.forEach((index, type) -> + row.getColumns().set(index, type.mapper.apply(window))); + + return row; + } + + private enum Type { + StartTime(Window::start), EndTime(Window::end); + + private final Function mapper; + + Type(final Function mapper) { + this.mapper = mapper; + } + } +} diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/udaf/window/WindowStartKudaf.java b/ksql-engine/src/main/java/io/confluent/ksql/function/udaf/window/WindowStartKudaf.java new file mode 100644 index 000000000000..29bc37a08cdd --- /dev/null +++ b/ksql-engine/src/main/java/io/confluent/ksql/function/udaf/window/WindowStartKudaf.java @@ -0,0 +1,48 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.ksql.function.udaf.window; + +import io.confluent.ksql.function.udaf.TableUdaf; +import io.confluent.ksql.function.udaf.UdafDescription; +import io.confluent.ksql.function.udaf.UdafFactory; +import io.confluent.ksql.function.udaf.placeholder.PlaceholderTableUdaf; + +/** + * A placeholder KUDAF for extracting window start times. + * + *

The KUDAF itself does nothing. It's just a placeholder. + * + * @see WindowSelectMapper + */ +@SuppressWarnings("WeakerAccess") // Invoked via reflection. +@UdafDescription(name = "WindowStart", author = "Confluent", + description = "Returns the window start time, in milliseconds, for the given record. " + + "If the given record is not part of a window the function will return NULL.") +public final class WindowStartKudaf { + + private WindowStartKudaf() { + } + + static String getFunctionName() { + return "WindowStart"; + } + + @UdafFactory(description = "Extracts the window start time") + public static TableUdaf createWindowStart() { + return PlaceholderTableUdaf.INSTANCE; + } +} diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/AggregateNode.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/AggregateNode.java index 76f0f375745f..8ea7ffebef52 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/AggregateNode.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/AggregateNode.java @@ -41,8 +41,8 @@ import io.confluent.ksql.util.KafkaTopicClient; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; -import io.confluent.ksql.util.Pair; import io.confluent.ksql.util.SchemaUtil; +import io.confluent.ksql.util.SelectExpression; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -67,12 +67,9 @@ public class AggregateNode extends PlanNode { private final List groupByExpressions; private final WindowExpression windowExpression; private final List aggregateFunctionArguments; - private final List functionList; private final List requiredColumnList; - private final List finalSelectExpressions; - private final Expression havingExpressions; @JsonCreator @@ -140,8 +137,8 @@ public List getRequiredColumnList() { return requiredColumnList; } - private List> getFinalSelectExpressions() { - final List> finalSelectExpressionList = new ArrayList<>(); + private List getFinalSelectExpressions() { + final List finalSelectExpressionList = new ArrayList<>(); if (finalSelectExpressions.size() != schema.fields().size()) { throw new RuntimeException( "Incompatible aggregate schema, field count must match, " @@ -151,7 +148,7 @@ private List> getFinalSelectExpressions() { + schema.fields().size()); } for (int i = 0; i < finalSelectExpressions.size(); i++) { - finalSelectExpressionList.add(new Pair<>( + finalSelectExpressionList.add(SelectExpression.of( schema.fields().get(i).name(), finalSelectExpressions.get(i) )); @@ -233,17 +230,13 @@ public SchemaKStream buildStream( ); final KudafInitializer initializer = new KudafInitializer(aggValToValColumnMap.size()); + + final Map aggValToFunctionMap = createAggValToFunctionMap( + aggregateArgExpanded, aggregateSchema, initializer, aggValToValColumnMap.size(), + functionRegistry, internalSchema); + final SchemaKTable schemaKTable = schemaKGroupedStream.aggregate( - initializer, - createAggValToFunctionMap( - aggregateArgExpanded, - aggregateSchema, - initializer, - aggValToValColumnMap.size(), - functionRegistry, - internalSchema), - aggValToValColumnMap, - getWindowExpression(), + initializer, aggValToFunctionMap, aggValToValColumnMap, getWindowExpression(), aggValueGenericRowSerde); SchemaKTable result = new SchemaKTable( @@ -369,7 +362,7 @@ private Schema buildAggregateSchema( } private static class InternalSchema { - private final List> aggArgExpansionList = new ArrayList<>(); + private final List aggArgExpansionList = new ArrayList<>(); private final Map internalNameToIndexMap = new HashMap<>(); private final Map expressionToInternalColumnNameMap = new HashMap<>(); @@ -390,15 +383,14 @@ private void collectAggregateArgExpressions( final String internalColumnName = INTERNAL_COLUMN_NAME_PREFIX + aggArgExpansionList.size(); internalNameToIndexMap.put(internalColumnName, aggArgExpansionList.size()); - aggArgExpansionList.add(new Pair<>(internalColumnName, expression)); - if (!expressionToInternalColumnNameMap.containsKey(expression.toString())) { - expressionToInternalColumnNameMap.put(expression.toString(), internalColumnName); - } + aggArgExpansionList.add(SelectExpression.of(internalColumnName, expression)); + expressionToInternalColumnNameMap + .putIfAbsent(expression.toString(), internalColumnName); }); } - List getInternalExpressionList(final List argExpressionList) { - return argExpressionList.stream() + List getInternalExpressionList(final List expressionList) { + return expressionList.stream() .map(argExpression -> new QualifiedNameReference( QualifiedName.of(getExpressionToInternalColumnNameMap() .get(argExpression.toString())))) @@ -435,19 +427,23 @@ List getInternalArgsExpressionList(final List argExpress } - List> updateFinalSelectExpressions( - final List> finalSelectExpressions) { + List updateFinalSelectExpressions( + final List finalSelectExpressions + ) { return finalSelectExpressions.stream() - .map(finalSelectExpression -> - expressionToInternalColumnNameMap - .containsKey(finalSelectExpression.getRight().toString()) - ? new Pair<>(finalSelectExpression.getLeft(), - (Expression) - new QualifiedNameReference( - QualifiedName.of( - expressionToInternalColumnNameMap - .get(finalSelectExpression.getRight().toString())))) - : new Pair<>(finalSelectExpression.getLeft(), finalSelectExpression.getRight())) + .map(finalSelectExpression -> { + final String internal = expressionToInternalColumnNameMap + .get(finalSelectExpression.getExpression().toString()); + + if (internal == null) { + return finalSelectExpression; + } + + return SelectExpression.of( + finalSelectExpression.getName(), + new QualifiedNameReference(QualifiedName.of(internal))); + + }) .collect(Collectors.toList()); } @@ -455,7 +451,7 @@ String getInternalColumnForExpression(final Expression expression) { return expressionToInternalColumnNameMap.get(expression.toString()); } - List> getAggArgExpansionList() { + List getAggArgExpansionList() { return aggArgExpansionList; } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/ProjectNode.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/ProjectNode.java index c25a2de33caa..a9ab4de5894b 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/ProjectNode.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/ProjectNode.java @@ -28,7 +28,7 @@ import io.confluent.ksql.util.KafkaTopicClient; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; -import io.confluent.ksql.util.Pair; +import io.confluent.ksql.util.SelectExpression; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -93,16 +93,17 @@ public Field getKeyField() { return keyField; } - public List> getProjectNameExpressionPairList() { + public List getProjectSelectExpressions() { if (schema.fields().size() != projectExpressions.size()) { throw new KsqlException("Error in projection. Schema fields and expression list are not " + "compatible."); } - final List> expressionPairs = new ArrayList<>(); + + final List selects = new ArrayList<>(); for (int i = 0; i < projectExpressions.size(); i++) { - expressionPairs.add(new Pair<>(schema.fields().get(i).name(), projectExpressions.get(i))); + selects.add(SelectExpression.of(schema.fields().get(i).name(), projectExpressions.get(i))); } - return expressionPairs; + return selects; } @Override @@ -120,6 +121,6 @@ public SchemaKStream buildStream( final Supplier schemaRegistryClientFactory) { return getSource().buildStream(builder, ksqlConfig, kafkaTopicClient, functionRegistry, props, schemaRegistryClientFactory) - .select(getProjectNameExpressionPairList()); + .select(getProjectSelectExpressions()); } } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/structured/QueuedSchemaKStream.java b/ksql-engine/src/main/java/io/confluent/ksql/structured/QueuedSchemaKStream.java index 1ffec4579ab7..066063c17033 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/structured/QueuedSchemaKStream.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/structured/QueuedSchemaKStream.java @@ -20,7 +20,7 @@ import io.confluent.ksql.parser.tree.Expression; import io.confluent.ksql.planner.plan.OutputNode; import io.confluent.ksql.util.KsqlException; -import io.confluent.ksql.util.Pair; +import io.confluent.ksql.util.SelectExpression; import java.util.List; import java.util.Objects; import java.util.Set; @@ -76,7 +76,7 @@ public SchemaKStream filter(final Expression filterExpression) { } @Override - public SchemaKStream select(final List> expressions) { + public SchemaKStream select(final List expressions) { throw new UnsupportedOperationException(); } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKGroupedStream.java b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKGroupedStream.java index 88b3e63d790d..b832342c1b08 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKGroupedStream.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKGroupedStream.java @@ -1,4 +1,4 @@ -/** +/* * Copyright 2017 Confluent Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -22,6 +22,7 @@ import io.confluent.ksql.function.KsqlAggregateFunction; import io.confluent.ksql.function.UdafAggregator; import io.confluent.ksql.function.udaf.KudafAggregator; +import io.confluent.ksql.function.udaf.window.WindowSelectMapper; import io.confluent.ksql.parser.tree.KsqlWindowExpression; import io.confluent.ksql.parser.tree.WindowExpression; import io.confluent.ksql.util.KsqlConfig; @@ -30,14 +31,13 @@ import java.util.Objects; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.KGroupedStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; -import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.kstream.Windowed; public class SchemaKGroupedStream { @@ -78,40 +78,59 @@ public SchemaKTable aggregate( final Map aggValToValColumnMap, final WindowExpression windowExpression, final Serde topicValueSerDe) { - final KTable aggKtable; - final UdafAggregator aggregator = new KudafAggregator( - aggValToFunctionMap, aggValToValColumnMap); + + final KTable table; if (windowExpression != null) { - final Materialized materialized - = Materialized.>with( - Serdes.String(), topicValueSerDe); - - final KsqlWindowExpression ksqlWindowExpression = windowExpression.getKsqlWindowExpression(); - aggKtable = ksqlWindowExpression.applyAggregate( - kgroupedStream, - initializer, - aggregator, - materialized - ); + table = aggregateWindowed( + initializer, aggValToFunctionMap, aggValToValColumnMap, windowExpression, + topicValueSerDe); } else { - aggKtable = kgroupedStream.aggregate( - initializer, - aggregator, - Materialized.with(Serdes.String(), topicValueSerDe) - ); + table = aggregateNonWindowed( + initializer, aggValToFunctionMap, aggValToValColumnMap, topicValueSerDe); } + return new SchemaKTable( - schema, - aggKtable, - keyField, - sourceSchemaKStreams, - windowExpression != null, - SchemaKStream.Type.AGGREGATE, - ksqlConfig, - functionRegistry, - schemaRegistryClient - ); + schema, table, keyField, sourceSchemaKStreams, windowExpression != null, + SchemaKStream.Type.AGGREGATE, ksqlConfig, functionRegistry, schemaRegistryClient); + } + @SuppressWarnings("unchecked") + private KTable aggregateNonWindowed( + final Initializer initializer, + final Map indexToFunctionMap, + final Map indexToValueMap, + final Serde topicValueSerDe) { + + final UdafAggregator aggregator = new KudafAggregator( + indexToFunctionMap, indexToValueMap); + + return kgroupedStream.aggregate( + initializer, aggregator, Materialized.with(Serdes.String(), topicValueSerDe)); } + @SuppressWarnings("unchecked") + private KTable aggregateWindowed( + final Initializer initializer, + final Map indexToFunctionMap, + final Map indexToValueMap, + final WindowExpression windowExpression, + final Serde topicValueSerDe) { + + final UdafAggregator aggregator = new KudafAggregator( + indexToFunctionMap, indexToValueMap); + + final KsqlWindowExpression ksqlWindowExpression = windowExpression.getKsqlWindowExpression(); + + final KTable aggKtable = ksqlWindowExpression.applyAggregate( + kgroupedStream, initializer, aggregator, + Materialized.with(Serdes.String(), topicValueSerDe)); + + final WindowSelectMapper windowSelectMapper = new WindowSelectMapper(indexToFunctionMap); + if (!windowSelectMapper.hasSelects()) { + return aggKtable; + } + + return aggKtable.mapValues((readOnlyKey, value) -> + windowSelectMapper.apply((Windowed) readOnlyKey, (GenericRow) value)); + } } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java index ee36d42877f8..0bfff67d9748 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java @@ -29,8 +29,8 @@ import io.confluent.ksql.util.GenericRowValueTypeEnforcer; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; -import io.confluent.ksql.util.Pair; import io.confluent.ksql.util.SchemaUtil; +import io.confluent.ksql.util.SelectExpression; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -133,8 +133,8 @@ public SchemaKStream filter(final Expression filterExpression) { ); } - public SchemaKStream select(final List> expressionPairList) { - final Selection selection = new Selection(expressionPairList, functionRegistry, this); + public SchemaKStream select(final List selectExpressions) { + final Selection selection = new Selection(selectExpressions, functionRegistry, this); return new SchemaKStream( selection.getSchema(), kstream.mapValues(selection.getSelectValueMapper()), @@ -148,25 +148,29 @@ public SchemaKStream select(final List> expressionPairL } static class Selection { - private Schema schema; - private Field key; - private SelectValueMapper selectValueMapper; + private final Schema schema; + private final Field key; + private final SelectValueMapper selectValueMapper; Selection( - final List> expressionPairList, + final List selectExpressions, final FunctionRegistry functionRegistry, final SchemaKStream fromStream) { - key = findKeyField(expressionPairList, fromStream); + key = findKeyField(selectExpressions, fromStream); final List expressionEvaluators = buildExpressions( - expressionPairList, functionRegistry, fromStream); - schema = buildSchema(expressionPairList, expressionEvaluators); + selectExpressions, functionRegistry, fromStream); + schema = buildSchema(selectExpressions, expressionEvaluators); + final GenericRowValueTypeEnforcer typeEnforcer = + new GenericRowValueTypeEnforcer(fromStream.getSchema()); + final List selectFieldNames = selectExpressions.stream() + .map(SelectExpression::getName) + .collect(Collectors.toList()); selectValueMapper = new SelectValueMapper( - new GenericRowValueTypeEnforcer( - fromStream.getSchema()), expressionPairList, expressionEvaluators); + typeEnforcer, selectFieldNames, expressionEvaluators); } private Field findKeyField( - final List> expressionPairList, final SchemaKStream fromStream) { + final List selectExpressions, final SchemaKStream fromStream) { if (fromStream.getKeyField() == null) { return null; } @@ -174,9 +178,9 @@ private Field findKeyField( // The key "field" isn't an actual field in the schema return fromStream.getKeyField(); } - for (int i = 0; i < expressionPairList.size(); i++) { - final String toName = expressionPairList.get(i).left; - final Expression toExpression = expressionPairList.get(i).right; + for (int i = 0; i < selectExpressions.size(); i++) { + final String toName = selectExpressions.get(i).getName(); + final Expression toExpression = selectExpressions.get(i).getExpression(); /* * Sometimes a column reference is a DereferenceExpression, and sometimes its @@ -206,12 +210,12 @@ private Field findKeyField( } private Schema buildSchema( - final List> expressionPairList, + final List selectExpressions, final List expressionEvaluators) { final SchemaBuilder schemaBuilder = SchemaBuilder.struct(); - IntStream.range(0, expressionPairList.size()).forEach( + IntStream.range(0, selectExpressions.size()).forEach( i -> schemaBuilder.field( - expressionPairList.get(i).getLeft(), + selectExpressions.get(i).getName(), expressionEvaluators.get(i).getExpressionType())); return schemaBuilder.build(); } @@ -229,13 +233,13 @@ private ExpressionMetadata buildExpression( } private List buildExpressions( - final List> expressionPairList, + final List selectExpressions, final FunctionRegistry functionRegistry, final SchemaKStream fromStream) { final CodeGenRunner codeGenRunner = new CodeGenRunner( fromStream.getSchema(), fromStream.ksqlConfig, functionRegistry); - return expressionPairList.stream() - .map(Pair::getRight) + return selectExpressions.stream() + .map(SelectExpression::getExpression) .map(e -> buildExpression(codeGenRunner, e)) .collect(Collectors.toList()); } @@ -248,7 +252,7 @@ public Field getKey() { return key; } - public SelectValueMapper getSelectValueMapper() { + SelectValueMapper getSelectValueMapper() { return selectValueMapper; } } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKTable.java b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKTable.java index 38930ba10d69..a7d89218fd05 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKTable.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKTable.java @@ -22,7 +22,7 @@ import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.parser.tree.Expression; import io.confluent.ksql.util.KsqlConfig; -import io.confluent.ksql.util.Pair; +import io.confluent.ksql.util.SelectExpression; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -146,8 +146,8 @@ public SchemaKTable filter(final Expression filterExpression) { @SuppressWarnings("unchecked") @Override - public SchemaKTable select(final List> expressionPairList) { - final Selection selection = new Selection(expressionPairList, functionRegistry, this); + public SchemaKTable select(final List selectExpressions) { + final Selection selection = new Selection(selectExpressions, functionRegistry, this); return new SchemaKTable( selection.getSchema(), ktable.mapValues(selection.getSelectValueMapper()), diff --git a/ksql-engine/src/main/java/io/confluent/ksql/structured/SelectValueMapper.java b/ksql-engine/src/main/java/io/confluent/ksql/structured/SelectValueMapper.java index a920827ab55a..73a3cae64318 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/structured/SelectValueMapper.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/structured/SelectValueMapper.java @@ -1,4 +1,4 @@ -/** +/* * Copyright 2017 Confluent Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -18,10 +18,8 @@ import io.confluent.ksql.GenericRow; import io.confluent.ksql.function.udf.Kudf; -import io.confluent.ksql.parser.tree.Expression; import io.confluent.ksql.util.ExpressionMetadata; import io.confluent.ksql.util.GenericRowValueTypeEnforcer; -import io.confluent.ksql.util.Pair; import java.util.ArrayList; import java.util.List; import org.apache.kafka.streams.kstream.ValueMapper; @@ -29,56 +27,63 @@ import org.slf4j.LoggerFactory; class SelectValueMapper implements ValueMapper { - private static Logger log = LoggerFactory.getLogger(SelectValueMapper.class); + + private static final Logger LOG = LoggerFactory.getLogger(SelectValueMapper.class); private final GenericRowValueTypeEnforcer typeEnforcer; - private final List> expressionPairList; + private final List selectFieldNames; private final List expressionEvaluators; SelectValueMapper( final GenericRowValueTypeEnforcer typeEnforcer, - final List> expressionPairList, + final List selectFieldNames, final List expressionEvaluators ) { this.typeEnforcer = typeEnforcer; - this.expressionPairList = expressionPairList; + this.selectFieldNames = selectFieldNames; this.expressionEvaluators = expressionEvaluators; + + if (selectFieldNames.size() != expressionEvaluators.size()) { + throw new IllegalArgumentException("must have field names for all expressions"); + } } @Override public GenericRow apply(final GenericRow row) { if (row == null) { - return row; + return null; } + final List newColumns = new ArrayList<>(); - for (int i = 0; i < expressionPairList.size(); i++) { - try { - final int[] parameterIndexes = expressionEvaluators.get(i).getIndexes(); - final Kudf[] kudfs = expressionEvaluators.get(i).getUdfs(); - final Object[] parameterObjects = new Object[parameterIndexes.length]; - for (int j = 0; j < parameterIndexes.length; j++) { - if (parameterIndexes[j] < 0) { - parameterObjects[j] = kudfs[j]; - } else { - parameterObjects[j] = - typeEnforcer.enforceFieldType(parameterIndexes[j], - row.getColumns() - .get(parameterIndexes[j])); - } + for (int i = 0; i < selectFieldNames.size(); i++) { + newColumns.add(processColumn(i, row)); + } + return new GenericRow(newColumns); + } + + private Object processColumn(final int column, final GenericRow row) { + try { + final ExpressionMetadata expression = expressionEvaluators.get(column); + + final int[] parameterIndexes = expressionEvaluators.get(column).getIndexes(); + final Kudf[] kudfs = expressionEvaluators.get(column).getUdfs(); + final Object[] parameterObjects = new Object[parameterIndexes.length]; + for (int j = 0; j < parameterIndexes.length; j++) { + final Integer paramIndex = parameterIndexes[j]; + if (paramIndex < 0) { + parameterObjects[j] = kudfs[j]; + } else { + parameterObjects[j] = typeEnforcer + .enforceFieldType(paramIndex, row.getColumns().get(paramIndex)); } - newColumns.add( - expressionEvaluators.get(i).getExpressionEvaluator().evaluate(parameterObjects) - ); - } catch (final Exception e) { - log.error( - "Error calculating column with index {} : {}", - i, - expressionPairList.get(i).getLeft(), - e - ); - newColumns.add(null); } + + return expression.evaluate(parameterObjects); + + } catch (final Exception e) { + LOG.error(String.format("Error calculating column with index %d : %s", + column, selectFieldNames.get(column)), e); + return null; } - return new GenericRow(newColumns); } } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/util/ExpressionMetadata.java b/ksql-engine/src/main/java/io/confluent/ksql/util/ExpressionMetadata.java index ff39f1007677..09db62717e0f 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/util/ExpressionMetadata.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/util/ExpressionMetadata.java @@ -1,4 +1,4 @@ -/** +/* * Copyright 2017 Confluent Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -17,6 +17,7 @@ package io.confluent.ksql.util; import io.confluent.ksql.function.udf.Kudf; +import java.lang.reflect.InvocationTargetException; import org.apache.kafka.connect.data.Schema; import org.codehaus.commons.compiler.IExpressionEvaluator; @@ -38,10 +39,6 @@ public ExpressionMetadata( this.expressionType = expressionType; } - public IExpressionEvaluator getExpressionEvaluator() { - return expressionEvaluator; - } - public int[] getIndexes() { final int [] result = new int[indexes.length]; System.arraycopy(indexes, 0, result, 0, indexes.length); @@ -57,4 +54,12 @@ public Kudf[] getUdfs() { public Schema getExpressionType() { return expressionType; } + + public Object evaluate(final Object[] parameterObjects) { + try { + return expressionEvaluator.evaluate(parameterObjects); + } catch (final InvocationTargetException e) { + throw new KsqlException(e.getMessage(), e); + } + } } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/util/SelectExpression.java b/ksql-engine/src/main/java/io/confluent/ksql/util/SelectExpression.java new file mode 100644 index 000000000000..149d06866dfd --- /dev/null +++ b/ksql-engine/src/main/java/io/confluent/ksql/util/SelectExpression.java @@ -0,0 +1,74 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.ksql.util; + +import io.confluent.ksql.parser.tree.Expression; +import java.util.Objects; +import javax.annotation.concurrent.Immutable; + +/** + * Pojo holding field name and expression of a select item. + */ +@Immutable +public final class SelectExpression { + + private final String name; + private final Expression expression; + + private SelectExpression(final String name, final Expression expression) { + this.name = Objects.requireNonNull(name, "name"); + this.expression = Objects.requireNonNull(expression, "expression"); + } + + public static SelectExpression of(final String name, final Expression expression) { + return new SelectExpression(name, expression); + } + + public String getName() { + return name; + } + + public Expression getExpression() { + return expression; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final SelectExpression that = (SelectExpression) o; + return Objects.equals(name, that.name) + && Objects.equals(expression, that.expression); + } + + @Override + public int hashCode() { + return Objects.hash(name, expression); + } + + @Override + public String toString() { + return "SelectExpression{" + + "name='" + name + '\'' + + ", expression=" + expression + + '}'; + } +} diff --git a/ksql-engine/src/test/java/io/confluent/ksql/codegen/CodeGenRunnerTest.java b/ksql-engine/src/test/java/io/confluent/ksql/codegen/CodeGenRunnerTest.java index bc25ea64b11d..ad6ae1c1dae7 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/codegen/CodeGenRunnerTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/codegen/CodeGenRunnerTest.java @@ -43,7 +43,6 @@ import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.MetaStoreFixture; -import java.lang.reflect.InvocationTargetException; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -168,11 +167,11 @@ public void testIsNull() throws Exception { assertThat(idx0, equalTo(0)); assertThat(expressionEvaluatorMetadata0.getUdfs().length, equalTo(1)); - Object result0 = expressionEvaluatorMetadata0.getExpressionEvaluator().evaluate(new Object[]{null}); + Object result0 = expressionEvaluatorMetadata0.evaluate(new Object[]{null}); assertThat(result0, instanceOf(Boolean.class)); assertThat(result0, is(true)); - result0 = expressionEvaluatorMetadata0.getExpressionEvaluator().evaluate(new Object[]{12345L}); + result0 = expressionEvaluatorMetadata0.evaluate(new Object[]{12345L}); assertThat(result0, instanceOf(Boolean.class)); assertThat(result0, is(false)); } @@ -186,7 +185,7 @@ public void shouldHandleMultiDimensionalArray() throws Exception { final List innerArray1 = Arrays.asList("item_11", "item_12"); final List innerArray2 = Arrays.asList("item_21", "item_22"); final Object[] args = new Object[]{Arrays.asList(innerArray1, innerArray2)}; - final Object result = expressionEvaluatorMetadata.getExpressionEvaluator().evaluate(args); + final Object result = expressionEvaluatorMetadata.evaluate(args); assertThat(result, instanceOf(String.class)); assertThat(result, equalTo("item_11")); } @@ -203,11 +202,11 @@ public void testIsNotNull() throws Exception { assertThat(idx0, equalTo(0)); assertThat(expressionEvaluatorMetadata0.getUdfs().length, equalTo(1)); - Object result0 = expressionEvaluatorMetadata0.getExpressionEvaluator().evaluate(new Object[]{null}); + Object result0 = expressionEvaluatorMetadata0.evaluate(new Object[]{null}); assertThat(result0, instanceOf(Boolean.class)); assertThat(result0, is(false)); - result0 = expressionEvaluatorMetadata0.getExpressionEvaluator().evaluate(new Object[]{12345L}); + result0 = expressionEvaluatorMetadata0.evaluate(new Object[]{12345L}); assertThat(result0, instanceOf(Boolean.class)); assertThat(result0, is(true)); } @@ -438,9 +437,7 @@ public void shouldHandleNestedUdfs() { final Map inputValues = ImmutableMap.of(1, "{\"name\":\"fred\",\"value\":1}"); // When: - final List columns = executeExpression(query, inputValues); - - // Then: + executeExpression(query, inputValues); } @Test @@ -455,7 +452,7 @@ public void shouldHandleMaps() throws Exception { final ExpressionMetadata expressionMetadata = codeGenRunner.buildCodeGenFromParseTree(analysis.getSelectExpressions().get(0)); - assertThat(expressionMetadata.getExpressionEvaluator().evaluate(new Object[]{inputs}), + assertThat(expressionMetadata.evaluate(new Object[]{inputs}), equalTo("{\"city\":\"adelaide\",\"country\":\"oz\"}")); } @@ -479,9 +476,7 @@ public void shouldHandleUdfsExtractingFromMaps() throws Exception { params[i] = inputs; } } - assertThat(metadata.getExpressionEvaluator() - .evaluate(params), - equalTo("adelaide")); + assertThat(metadata.evaluate(params), equalTo("adelaide")); } @Test @@ -521,7 +516,7 @@ private List executeExpression(final String query, return analysis.getSelectExpressions().stream() .map(buildCodeGenFromParseTree) - .map(md -> evaluate(md, inputValues)) + .map(md -> md.evaluate(buildParams(md, inputValues))) .collect(Collectors.toList()); } @@ -573,20 +568,11 @@ private boolean evalBooleanExpr( values[1] = tmp; } assertThat(expressionEvaluatorMetadata0.getUdfs().length, equalTo(2)); - final Object result0 = expressionEvaluatorMetadata0.getExpressionEvaluator().evaluate(values); + final Object result0 = expressionEvaluatorMetadata0.evaluate(values); assertThat(result0, instanceOf(Boolean.class)); return (Boolean)result0; } - private Object evaluate(final ExpressionMetadata md, - final Map inputValues) { - try { - return md.getExpressionEvaluator().evaluate(buildParams(md, inputValues)); - } catch (final InvocationTargetException e) { - throw new RuntimeException(e); - } - } - private Object[] buildParams(final ExpressionMetadata metadata, final Map inputValues) { final Kudf[] udfs = metadata.getUdfs(); diff --git a/ksql-engine/src/test/java/io/confluent/ksql/function/UdfLoaderUtil.java b/ksql-engine/src/test/java/io/confluent/ksql/function/UdfLoaderUtil.java index bf7333bc8162..6e385f549d54 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/function/UdfLoaderUtil.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/function/UdfLoaderUtil.java @@ -23,11 +23,13 @@ public final class UdfLoaderUtil { private UdfLoaderUtil() {} - public static void load(final MetaStore metaStore) { + public static MetaStore load(final MetaStore metaStore) { new UdfLoader(metaStore, TestUtils.tempDirectory(), UdfLoaderUtil.class.getClassLoader(), value -> false, new UdfCompiler(Optional.empty()), Optional.empty(), true) .load(); + + return metaStore; } } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/function/udaf/placeholder/PlaceholderTableUdafTest.java b/ksql-engine/src/test/java/io/confluent/ksql/function/udaf/placeholder/PlaceholderTableUdafTest.java new file mode 100644 index 000000000000..ce3c6ed72a4d --- /dev/null +++ b/ksql-engine/src/test/java/io/confluent/ksql/function/udaf/placeholder/PlaceholderTableUdafTest.java @@ -0,0 +1,49 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.ksql.function.udaf.placeholder; + +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertThat; + +import io.confluent.ksql.function.udaf.TableUdaf; +import org.junit.Test; + +public class PlaceholderTableUdafTest { + + private final TableUdaf udaf = PlaceholderTableUdaf.INSTANCE; + + @Test + public void shouldInitializeAsNull() { + assertThat(udaf.initialize(), is(nullValue())); + } + + @Test + public void shouldAggregateToNull() { + assertThat(udaf.aggregate(1L, 2L), is(nullValue())); + } + + @Test + public void shouldUndoToNull() { + assertThat(udaf.undo(1L, 2L), is(nullValue())); + } + + @Test + public void shouldMergeToNull() { + assertThat(udaf.merge(1L, 2L), is(nullValue())); + } +} \ No newline at end of file diff --git a/ksql-engine/src/test/java/io/confluent/ksql/function/udaf/window/WindowEndKudafTest.java b/ksql-engine/src/test/java/io/confluent/ksql/function/udaf/window/WindowEndKudafTest.java new file mode 100644 index 000000000000..5241b67936e1 --- /dev/null +++ b/ksql-engine/src/test/java/io/confluent/ksql/function/udaf/window/WindowEndKudafTest.java @@ -0,0 +1,39 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.ksql.function.udaf.window; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; + +import io.confluent.ksql.function.udaf.UdafDescription; +import io.confluent.ksql.function.udaf.placeholder.PlaceholderTableUdaf; +import org.junit.Test; + +public class WindowEndKudafTest { + + @Test + public void shouldReturnCorrectFunctionName() { + assertThat(WindowEndKudaf.getFunctionName(), + is(WindowEndKudaf.class.getAnnotation(UdafDescription.class).name())); + } + + @Test + public void shouldCreatePlaceholderTableUdaf() { + assertThat(WindowEndKudaf.createWindowEnd(), is(instanceOf(PlaceholderTableUdaf.class))); + } +} \ No newline at end of file diff --git a/ksql-engine/src/test/java/io/confluent/ksql/function/udaf/window/WindowSelectMapperTest.java b/ksql-engine/src/test/java/io/confluent/ksql/function/udaf/window/WindowSelectMapperTest.java new file mode 100644 index 000000000000..dc025164cd03 --- /dev/null +++ b/ksql-engine/src/test/java/io/confluent/ksql/function/udaf/window/WindowSelectMapperTest.java @@ -0,0 +1,105 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.ksql.function.udaf.window; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.sameInstance; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.confluent.ksql.GenericRow; +import io.confluent.ksql.function.KsqlAggregateFunction; +import java.util.ArrayList; +import java.util.Arrays; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.SessionWindow; +import org.easymock.EasyMock; +import org.easymock.EasyMockRunner; +import org.easymock.Mock; +import org.easymock.MockType; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(EasyMockRunner.class) +public class WindowSelectMapperTest { + + @Mock(MockType.NICE) + private KsqlAggregateFunction windowStartFunc; + @Mock(MockType.NICE) + private KsqlAggregateFunction windowEndFunc; + @Mock(MockType.NICE) + private KsqlAggregateFunction otherFunc; + + @Before + public void setUp() { + EasyMock.expect(windowStartFunc.getFunctionName()).andReturn("WinDowStarT").anyTimes(); + EasyMock.expect(windowEndFunc.getFunctionName()).andReturn("WinDowEnD").anyTimes(); + EasyMock.expect(otherFunc.getFunctionName()).andReturn("NotWindowStartOrWindowEnd").anyTimes(); + EasyMock.replay(windowStartFunc, windowEndFunc, otherFunc); + } + + @Test + public void shouldNotDetectNonWindowBoundsSelects() { + assertThat(new WindowSelectMapper(ImmutableMap.of(5, otherFunc)).hasSelects(), + is(false)); + } + + @Test + public void shouldDetectWindowStartSelects() { + assertThat(new WindowSelectMapper(ImmutableMap.of(5, windowStartFunc)).hasSelects(), + is(true)); + } + + @Test + public void shouldDetectWindowEndSelects() { + assertThat(new WindowSelectMapper(ImmutableMap.of(5, windowEndFunc)).hasSelects(), + is(true)); + } + + @Test + public void shouldUpdateRowWithWindowBounds() { + // Given: + final WindowSelectMapper mapper = new WindowSelectMapper(ImmutableMap.of( + 0, otherFunc, 2, windowStartFunc, 3, windowEndFunc, 4, windowStartFunc)); + + final Window window = new SessionWindow(12345L, 54321L); + final GenericRow row = new GenericRow(Arrays.asList(0, 1, 2, 3, 4, 5)); + + // When: + final GenericRow result = mapper.apply(new Windowed<>("k", window), row); + + // Then: + assertThat(result, is(sameInstance(row))); + assertThat(row.getColumns(), is(ImmutableList.of(0, 1, 12345L, 54321L, 12345L, 5))); + } + + @Test(expected = IndexOutOfBoundsException.class) + public void shouldThrowIfRowNotBigEnough() { + // Given: + final WindowSelectMapper mapper = new WindowSelectMapper(ImmutableMap.of( + 0, windowStartFunc)); + + final Window window = new SessionWindow(12345L, 54321L); + final GenericRow row = new GenericRow(new ArrayList<>()); + + // When: + mapper.apply(new Windowed<>("k", window), row); + } +} \ No newline at end of file diff --git a/ksql-engine/src/test/java/io/confluent/ksql/function/udaf/window/WindowStartKudafTest.java b/ksql-engine/src/test/java/io/confluent/ksql/function/udaf/window/WindowStartKudafTest.java new file mode 100644 index 000000000000..03793e04a63f --- /dev/null +++ b/ksql-engine/src/test/java/io/confluent/ksql/function/udaf/window/WindowStartKudafTest.java @@ -0,0 +1,39 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.ksql.function.udaf.window; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; + +import io.confluent.ksql.function.udaf.UdafDescription; +import io.confluent.ksql.function.udaf.placeholder.PlaceholderTableUdaf; +import org.junit.Test; + +public class WindowStartKudafTest { + + @Test + public void shouldReturnCorrectFunctionName() { + assertThat(WindowStartKudaf.getFunctionName(), + is(WindowStartKudaf.class.getAnnotation(UdafDescription.class).name())); + } + + @Test + public void shouldCreatePlaceholderTableUdaf() { + assertThat(WindowStartKudaf.createWindowStart(), is(instanceOf(PlaceholderTableUdaf.class))); + } +} \ No newline at end of file diff --git a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/AggregateNodeTest.java b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/AggregateNodeTest.java index 01d8df07c8b1..08bf8252a56b 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/AggregateNodeTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/AggregateNodeTest.java @@ -23,10 +23,14 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.Assert.assertTrue; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; import com.google.common.collect.ImmutableSet; import io.confluent.ksql.function.InternalFunctionRegistry; +import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.schema.registry.MockSchemaRegistryClientFactory; import io.confluent.ksql.structured.LogicalPlanBuilder; import io.confluent.ksql.structured.SchemaKStream; @@ -34,7 +38,6 @@ import io.confluent.ksql.util.KafkaTopicClient; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.MetaStoreFixture; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -53,9 +56,13 @@ public class AggregateNodeTest { private final StreamsBuilder builder = new StreamsBuilder(); @Test - public void shouldBuildSourceNode() { - build(); + // When: + buildQuery("SELECT col0, sum(col3), count(col3) FROM test1 " + + "window TUMBLING (size 2 second) " + + "WHERE col0 > 100 GROUP BY col0;"); + + // Then: final TopologyDescription.Source node = (TopologyDescription.Source) getNodeByName(builder.build(), SOURCE_NODE); final List successors = node.successors().stream().map(TopologyDescription.Node::name).collect(Collectors.toList()); assertThat(node.predecessors(), equalTo(Collections.emptySet())); @@ -65,22 +72,34 @@ public void shouldBuildSourceNode() { @Test public void shouldHaveOneSubTopologyIfGroupByKey() { - build(); - final TopologyDescription description = builder.build().describe(); - assertThat(description.subtopologies().size(), equalTo(1)); + // When: + buildQuery("SELECT col0, sum(col3), count(col3) FROM test1 " + + "window TUMBLING (size 2 second) " + + "WHERE col0 > 100 GROUP BY col0;"); + + // Then: + assertThat(builder.build().describe().subtopologies(), hasSize(1)); } @Test public void shouldHaveTwoSubTopologies() { - // We always require rekey at the moment. - buildRequireRekey(); - final TopologyDescription description = builder.build().describe(); - assertThat(description.subtopologies().size(), equalTo(2)); + // When: + buildQuery("SELECT col1, sum(col3), count(col3) FROM test1 " + + "window TUMBLING (size 2 second) " + + "GROUP BY col1;"); + + // Then: + assertThat(builder.build().describe().subtopologies(), hasSize(2)); } @Test public void shouldHaveSourceNodeForSecondSubtopolgy() { - buildRequireRekey(); + // When: + buildQuery("SELECT col1, sum(col3), count(col3) FROM test1 " + + "window TUMBLING (size 2 second) " + + "GROUP BY col1;"); + + // Then: final TopologyDescription.Source node = (TopologyDescription.Source) getNodeByName(builder.build(), "KSTREAM-SOURCE-0000000010"); final List successors = node.successors().stream().map(TopologyDescription.Node::name).collect(Collectors.toList()); assertThat(node.predecessors(), equalTo(Collections.emptySet())); @@ -91,7 +110,12 @@ public void shouldHaveSourceNodeForSecondSubtopolgy() { @Test public void shouldHaveSinkNodeWithSameTopicAsSecondSource() { - buildRequireRekey(); + // When: + buildQuery("SELECT col1, sum(col3), count(col3) FROM test1 " + + "window TUMBLING (size 2 second) " + + "GROUP BY col1;"); + + // Then: final TopologyDescription.Sink sink = (TopologyDescription.Sink) getNodeByName(builder.build(), "KSTREAM-SINK-0000000008"); final TopologyDescription.Source source = (TopologyDescription.Source) getNodeByName(builder.build(), "KSTREAM-SOURCE-0000000010"); assertThat(sink.successors(), equalTo(Collections.emptySet())); @@ -100,57 +124,54 @@ public void shouldHaveSinkNodeWithSameTopicAsSecondSource() { @Test public void shouldBuildCorrectAggregateSchema() { - final SchemaKStream stream = build(); - final List expected = Arrays.asList( + // When: + final SchemaKStream stream = buildQuery("SELECT col0, sum(col3), count(col3) FROM test1 " + + "window TUMBLING (size 2 second) " + + "WHERE col0 > 100 GROUP BY col0;"); + + // Then: + assertThat(stream.getSchema().fields(), contains( new Field("COL0", 0, Schema.OPTIONAL_INT64_SCHEMA), new Field("KSQL_COL_1", 1, Schema.OPTIONAL_FLOAT64_SCHEMA), - new Field("KSQL_COL_2", 2, Schema.OPTIONAL_INT64_SCHEMA)); - assertThat(stream.getSchema().fields(), equalTo(expected)); + new Field("KSQL_COL_2", 2, Schema.OPTIONAL_INT64_SCHEMA))); } @Test public void shouldBeSchemaKTableResult() { - final SchemaKStream stream = build(); - assertThat(stream.getClass(), equalTo(SchemaKTable.class)); - } - - @Test - public void shouldBeWindowedWhenStatementSpecifiesWindowing() { - final SchemaKStream stream = build(); - assertTrue(((SchemaKTable)stream).isWindowed()); - } - - private SchemaKStream build() { - return buildQuery("SELECT col0, sum(col3), count(col3) FROM test1 window TUMBLING ( " - + "size 2 " - + "second) " + // When: + final SchemaKStream stream = buildQuery("SELECT col0, sum(col3), count(col3) FROM test1 " + "WHERE col0 > 100 GROUP BY col0;"); + + // Then: + assertThat(stream, is(instanceOf(SchemaKTable.class))); } - @SuppressWarnings("UnusedReturnValue") - private SchemaKStream buildRequireRekey() { - return buildQuery("SELECT col1, sum(col3), count(col3) FROM test1 window TUMBLING ( " - + "size 2 " - + "second) " - + "GROUP BY col1;"); + @Test + public void shouldBeWindowedTableWhenStatementSpecifiesWindowing() { + // Given: + final SchemaKStream stream = buildQuery("SELECT col0, sum(col3), count(col3) FROM test1 " + + "window TUMBLING (size 2 second) " + + "GROUP BY col0;"); + + // Then: + assertThat(stream, is(instanceOf(SchemaKTable.class))); + assertThat(((SchemaKTable)stream).isWindowed(), is(true)); } private SchemaKStream buildQuery(final String queryString) { - final AggregateNode aggregateNode = buildAggregateNode(queryString); - return buildStream(aggregateNode); + return buildAggregateNode(queryString) + .buildStream(builder, + ksqlConfig, + topicClient, + new InternalFunctionRegistry(), + new HashMap<>(), new MockSchemaRegistryClientFactory()::get); } - private AggregateNode buildAggregateNode(final String queryString) { - final KsqlBareOutputNode planNode = (KsqlBareOutputNode) new LogicalPlanBuilder(MetaStoreFixture.getNewMetaStore(new InternalFunctionRegistry())).buildLogicalPlan(queryString); - return (AggregateNode) planNode.getSource(); - } + private static AggregateNode buildAggregateNode(final String queryString) { + final MetaStore newMetaStore = MetaStoreFixture.getNewMetaStore(new InternalFunctionRegistry()); + final KsqlBareOutputNode planNode = (KsqlBareOutputNode) new LogicalPlanBuilder(newMetaStore) + .buildLogicalPlan(queryString); - private SchemaKStream buildStream(final AggregateNode aggregateNode) { - return aggregateNode.buildStream(builder, - ksqlConfig, - topicClient, - new InternalFunctionRegistry(), - new HashMap<>(), new MockSchemaRegistryClientFactory()::get); + return (AggregateNode) planNode.getSource(); } - } \ No newline at end of file diff --git a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/ProjectNodeTest.java b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/ProjectNodeTest.java index 3dab8dd9ee39..d9ba2c66c680 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/ProjectNodeTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/ProjectNodeTest.java @@ -18,26 +18,21 @@ import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.eq; -import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; -import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.ksql.function.InternalFunctionRegistry; import io.confluent.ksql.parser.tree.BooleanLiteral; import io.confluent.ksql.serde.DataSource.DataSourceType; -import io.confluent.ksql.function.InternalFunctionRegistry; -import io.confluent.ksql.parser.tree.BooleanLiteral; import io.confluent.ksql.schema.registry.MockSchemaRegistryClientFactory; import io.confluent.ksql.structured.SchemaKStream; import io.confluent.ksql.util.FakeKafkaTopicClient; import io.confluent.ksql.util.KafkaTopicClient; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; -import io.confluent.ksql.util.Pair; +import io.confluent.ksql.util.SelectExpression; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.function.Supplier; - import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; @@ -89,8 +84,8 @@ public void shouldCreateProjectionWithFieldNameExpressionPairs() { final BooleanLiteral trueExpression = new BooleanLiteral("true"); final BooleanLiteral falseExpression = new BooleanLiteral("false"); EasyMock.expect(stream.select( - Arrays.asList(new Pair<>("field1", trueExpression), - new Pair<>("field2", falseExpression)))) + Arrays.asList(SelectExpression.of("field1", trueExpression), + SelectExpression.of("field2", falseExpression)))) .andReturn(stream); EasyMock.replay(source, stream); diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedStreamTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedStreamTest.java new file mode 100644 index 000000000000..c7f374a7faad --- /dev/null +++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedStreamTest.java @@ -0,0 +1,198 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.ksql.structured; + +import static java.util.Collections.emptyMap; +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.verify; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.sameInstance; + +import com.google.common.collect.ImmutableMap; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.ksql.GenericRow; +import io.confluent.ksql.function.FunctionRegistry; +import io.confluent.ksql.function.KsqlAggregateFunction; +import io.confluent.ksql.parser.tree.KsqlWindowExpression; +import io.confluent.ksql.parser.tree.WindowExpression; +import io.confluent.ksql.util.KsqlConfig; +import java.util.List; +import java.util.Map; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.KGroupedStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.ValueMapper; +import org.apache.kafka.streams.kstream.ValueMapperWithKey; +import org.apache.kafka.streams.kstream.Windowed; +import org.easymock.EasyMock; +import org.easymock.EasyMockRunner; +import org.easymock.Mock; +import org.easymock.MockType; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +@SuppressWarnings("unchecked") +@RunWith(EasyMockRunner.class) +public class SchemaKGroupedStreamTest { + + @Mock(MockType.NICE) + private Schema schema; + @Mock(MockType.NICE) + private KGroupedStream groupedStream; + @Mock(MockType.NICE) + private Field keyField; + @Mock(MockType.NICE) + private List sourceStreams; + @Mock(MockType.NICE) + private KsqlConfig config; + @Mock(MockType.NICE) + private FunctionRegistry funcRegistry; + @Mock(MockType.NICE) + private SchemaRegistryClient srClient; + @Mock(MockType.NICE) + private Initializer initializer; + @Mock(MockType.NICE) + private Serde topicValueSerDe; + @Mock(MockType.NICE) + private KsqlAggregateFunction windowStartFunc; + @Mock(MockType.NICE) + private KsqlAggregateFunction windowEndFunc; + @Mock(MockType.NICE) + private KsqlAggregateFunction otherFunc; + @Mock(MockType.NICE) + private KTable table; + @Mock(MockType.NICE) + private KTable table2; + @Mock(MockType.NICE) + private WindowExpression windowExp; + @Mock(MockType.NICE) + private KsqlWindowExpression ksqlWindowExp; + @Mock(MockType.NICE) + private Serde> windowedKeySerde; + private SchemaKGroupedStream schemaGroupedStream; + + @Before + public void setUp() { + schemaGroupedStream = new SchemaKGroupedStream( + schema, groupedStream, keyField, sourceStreams, config, funcRegistry, srClient); + + EasyMock.expect(windowStartFunc.getFunctionName()).andReturn("WindowStart").anyTimes(); + EasyMock.expect(windowEndFunc.getFunctionName()).andReturn("WindowEnd").anyTimes(); + EasyMock.expect(otherFunc.getFunctionName()).andReturn("NotWindowStartFunc").anyTimes(); + EasyMock.expect(windowExp.getKsqlWindowExpression()).andReturn(ksqlWindowExp).anyTimes(); + EasyMock.replay(windowStartFunc, windowEndFunc, otherFunc, windowExp); + } + + @Test + public void shouldNoUseSelectMapperForNonWindowed() { + // Given: + final Map invalidWindowFuncs = ImmutableMap.of( + 2, windowStartFunc, 4, windowEndFunc); + + // When: + assertDoesNotInstallWindowSelectMapper(null, invalidWindowFuncs); + } + + @Test + public void shouldNotUseSelectMapperForWindowedWithoutWindowSelects() { + // Given: + final Map nonWindowFuncs = ImmutableMap.of(2, otherFunc); + + // When: + assertDoesNotInstallWindowSelectMapper(windowExp, nonWindowFuncs); + } + + @Test + public void shouldUseSelectMapperForWindowedWithWindowStart() { + // Given: + Map funcMapWithWindowStart = ImmutableMap.of( + 0, otherFunc, 1, windowStartFunc); + + // Then: + assertDoesInstallWindowSelectMapper(funcMapWithWindowStart); + } + + @Test + public void shouldUseSelectMapperForWindowedWithWindowEnd() { + // Given: + Map funcMapWithWindowEnd = ImmutableMap.of( + 0, windowEndFunc, 1, otherFunc); + + // Then: + assertDoesInstallWindowSelectMapper(funcMapWithWindowEnd); + } + + private void assertDoesNotInstallWindowSelectMapper( + final WindowExpression windowExp, + final Map funcMap) { + + // Given: + if (windowExp != null) { + EasyMock + .expect(ksqlWindowExp.applyAggregate(anyObject(), anyObject(), anyObject(), anyObject())) + .andReturn(table); + } else { + EasyMock + .expect(groupedStream.aggregate(anyObject(), anyObject(), anyObject())) + .andReturn(table); + } + + EasyMock.expect(table.mapValues(anyObject(ValueMapper.class))) + .andThrow(new AssertionError("Should not be called")) + .anyTimes(); + EasyMock.expect(table.mapValues(anyObject(ValueMapperWithKey.class))) + .andThrow(new AssertionError("Should not be called")) + .anyTimes(); + + EasyMock.replay(ksqlWindowExp, groupedStream, table); + + // When: + final SchemaKTable result = schemaGroupedStream + .aggregate(initializer, funcMap, emptyMap(), windowExp, topicValueSerDe); + + // Then: + assertThat(result.getKtable(), is(sameInstance(table))); + verify(table); + } + + private void assertDoesInstallWindowSelectMapper( + final Map funcMap) { + // Given: + EasyMock + .expect(ksqlWindowExp.applyAggregate(anyObject(), anyObject(), anyObject(), anyObject())) + .andReturn(table); + + EasyMock.expect(table.mapValues(anyObject(ValueMapperWithKey.class))) + .andReturn(table2) + .once(); + + EasyMock.replay(ksqlWindowExp, table); + + // When: + final SchemaKTable result = schemaGroupedStream + .aggregate(initializer, funcMap, emptyMap(), windowExp, topicValueSerDe); + + // Then: + assertThat(result.getKtable(), is(sameInstance(table2))); + verify(table); + } +} \ No newline at end of file diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java index c365824e5f40..75348150952a 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java @@ -47,7 +47,7 @@ import io.confluent.ksql.serde.json.KsqlJsonTopicSerDe; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.MetaStoreFixture; -import io.confluent.ksql.util.Pair; +import io.confluent.ksql.util.SelectExpression; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -159,8 +159,8 @@ public void testSelectSchemaKStream() { SchemaKStream.Type.SOURCE, ksqlConfig, functionRegistry, schemaRegistryClient); - final List> projectNameExpressionPairList = projectNode.getProjectNameExpressionPairList(); - final SchemaKStream projectedSchemaKStream = initialSchemaKStream.select(projectNameExpressionPairList); + final List selectExpressions = projectNode.getProjectSelectExpressions(); + final SchemaKStream projectedSchemaKStream = initialSchemaKStream.select(selectExpressions); Assert.assertTrue(projectedSchemaKStream.getSchema().fields().size() == 3); Assert.assertTrue(projectedSchemaKStream.getSchema().field("COL0") == projectedSchemaKStream.getSchema().fields().get(0)); @@ -187,8 +187,8 @@ public void shouldUpdateKeyIfRenamed() { SchemaKStream.Type.SOURCE, ksqlConfig, functionRegistry, schemaRegistryClient); - final List> projectNameExpressionPairList = projectNode.getProjectNameExpressionPairList(); - final SchemaKStream projectedSchemaKStream = initialSchemaKStream.select(projectNameExpressionPairList); + final List selectExpressions = projectNode.getProjectSelectExpressions(); + final SchemaKStream projectedSchemaKStream = initialSchemaKStream.select(selectExpressions); assertThat( projectedSchemaKStream.getKeyField(), equalTo(new Field("NEWKEY", 0, Schema.OPTIONAL_INT64_SCHEMA))); @@ -210,8 +210,8 @@ public void shouldPreserveKeyOnSelectStar() { functionRegistry, new MockSchemaRegistryClient()); - final List> projectNameExpressionPairList = projectNode.getProjectNameExpressionPairList(); - final SchemaKStream projectedSchemaKStream = initialSchemaKStream.select(projectNameExpressionPairList); + final List selectExpressions = projectNode.getProjectSelectExpressions(); + final SchemaKStream projectedSchemaKStream = initialSchemaKStream.select(selectExpressions); assertThat( projectedSchemaKStream.getKeyField(), equalTo(initialSchemaKStream.getKeyField())); @@ -227,8 +227,8 @@ public void shouldUpdateKeyIfMovedToDifferentIndex() { SchemaKStream.Type.SOURCE, ksqlConfig, functionRegistry, schemaRegistryClient); - final List> projectNameExpressionPairList = projectNode.getProjectNameExpressionPairList(); - final SchemaKStream projectedSchemaKStream = initialSchemaKStream.select(projectNameExpressionPairList); + final List selectExpressions = projectNode.getProjectSelectExpressions(); + final SchemaKStream projectedSchemaKStream = initialSchemaKStream.select(selectExpressions); assertThat( projectedSchemaKStream.getKeyField(), equalTo(new Field("COL0", 1, Schema.OPTIONAL_INT64_SCHEMA))); @@ -244,8 +244,8 @@ public void shouldDropKeyIfNotSelected() { SchemaKStream.Type.SOURCE, ksqlConfig, functionRegistry, schemaRegistryClient); - final List> projectNameExpressionPairList = projectNode.getProjectNameExpressionPairList(); - final SchemaKStream projectedSchemaKStream = initialSchemaKStream.select(projectNameExpressionPairList); + final List selectExpressions = projectNode.getProjectSelectExpressions(); + final SchemaKStream projectedSchemaKStream = initialSchemaKStream.select(selectExpressions); assertThat(projectedSchemaKStream.getKeyField(), nullValue()); } @@ -258,7 +258,7 @@ public void testSelectWithExpression() { ksqlStream.getKeyField(), new ArrayList<>(), SchemaKStream.Type.SOURCE, ksqlConfig, functionRegistry, schemaRegistryClient); - final SchemaKStream projectedSchemaKStream = initialSchemaKStream.select(projectNode.getProjectNameExpressionPairList()); + final SchemaKStream projectedSchemaKStream = initialSchemaKStream.select(projectNode.getProjectSelectExpressions()); Assert.assertTrue(projectedSchemaKStream.getSchema().fields().size() == 3); Assert.assertTrue(projectedSchemaKStream.getSchema().field("COL0") == projectedSchemaKStream.getSchema().fields().get(0)); diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java index b2c81c4dd7d0..b3a63d96896a 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java @@ -141,7 +141,7 @@ public void testSelectSchemaKStream() throws Exception { SchemaKStream.Type.SOURCE, ksqlConfig, functionRegistry, schemaRegistryClient); final SchemaKTable projectedSchemaKStream = initialSchemaKTable - .select(projectNode.getProjectNameExpressionPairList()); + .select(projectNode.getProjectSelectExpressions()); Assert.assertTrue(projectedSchemaKStream.getSchema().fields().size() == 3); Assert.assertTrue(projectedSchemaKStream.getSchema().field("COL0") == projectedSchemaKStream.getSchema().fields().get(0)); @@ -174,7 +174,7 @@ public void testSelectWithExpression() throws Exception { SchemaKStream.Type.SOURCE, ksqlConfig, functionRegistry, schemaRegistryClient); final SchemaKTable projectedSchemaKStream = initialSchemaKTable - .select(projectNode.getProjectNameExpressionPairList()); + .select(projectNode.getProjectSelectExpressions()); Assert.assertTrue(projectedSchemaKStream.getSchema().fields().size() == 3); Assert.assertTrue(projectedSchemaKStream.getSchema().field("COL0") == projectedSchemaKStream.getSchema().fields().get(0)); diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/SelectValueMapperTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/SelectValueMapperTest.java index 78d5b5e5c981..125e5bb4d19e 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/structured/SelectValueMapperTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/SelectValueMapperTest.java @@ -16,73 +16,110 @@ package io.confluent.ksql.structured; -import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.nullValue; import static org.junit.Assert.assertThat; import io.confluent.ksql.GenericRow; import io.confluent.ksql.codegen.CodeGenRunner; import io.confluent.ksql.function.InternalFunctionRegistry; import io.confluent.ksql.metastore.MetaStore; -import io.confluent.ksql.parser.tree.Expression; import io.confluent.ksql.planner.plan.PlanNode; import io.confluent.ksql.planner.plan.ProjectNode; import io.confluent.ksql.util.ExpressionMetadata; import io.confluent.ksql.util.GenericRowValueTypeEnforcer; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.MetaStoreFixture; -import io.confluent.ksql.util.Pair; +import io.confluent.ksql.util.SelectExpression; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; import org.apache.kafka.connect.data.Schema; import org.junit.Test; public class SelectValueMapperTest { - private final MetaStore metaStore = MetaStoreFixture.getNewMetaStore(new InternalFunctionRegistry()); + private final MetaStore metaStore = + MetaStoreFixture.getNewMetaStore(new InternalFunctionRegistry()); private final LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(metaStore); private final KsqlConfig ksqlConfig = new KsqlConfig(Collections.emptyMap()); + @Test + public void shouldSelectChosenColumns() { + // Given: + final SelectValueMapper selectMapper = givenSelectMapperFor( + "SELECT col0, col2, col3 FROM test1 WHERE col0 > 100;"); + + // When: + final GenericRow transformed = selectMapper.apply( + genericRow(1521834663L, "key1", 1L, "hi", "bye", 2.0F, "blah")); + + // Then: + assertThat(transformed, is(genericRow(1L, "bye", 2.0F))); + } @Test - public void shouldSelectChosenColumns() throws Exception { - final SelectValueMapper mapper = createMapper("SELECT col0, col2, col3 FROM test1 WHERE col0 > 100;"); - final GenericRow transformed = mapper.apply(new GenericRow(Arrays.asList(1521834663L, - "key1", 1L, "hi", "bye", 2.0F, "blah"))); - assertThat(transformed, equalTo(new GenericRow(Arrays.asList(1L, "bye", 2.0F)))); + public void shouldApplyUdfsToColumns() { + // Given: + final SelectValueMapper selectMapper = givenSelectMapperFor( + "SELECT col0, col1, col2, CEIL(col3) FROM test1 WHERE col0 > 100;"); + + // When: + final GenericRow row = selectMapper.apply( + genericRow(1521834663L, "key1", 2L, "foo", "whatever", 6.9F, "boo", "hoo")); + + // Then: + assertThat(row, is(genericRow(2L, "foo", "whatever", 7.0F))); } @Test - public void shouldApplyUdfsToColumns() throws Exception { - final SelectValueMapper mapper = createMapper("SELECT col0, col1, col2, CEIL(col3) FROM test1 WHERE col0 > 100;"); - final GenericRow row = mapper.apply(new GenericRow(Arrays.asList(1521834663L, "key1", 2L, - "foo", - "whatever", 6.9F, "boo", "hoo"))); - assertThat(row, equalTo(new GenericRow(Arrays.asList(2L, "foo", "whatever", - 7.0F)))); + public void shouldHandleNullRows() { + // Given: + final SelectValueMapper selectMapper = givenSelectMapperFor( + "SELECT col0, col1, col2, CEIL(col3) FROM test1 WHERE col0 > 100;"); + + // When: + final GenericRow row = selectMapper.apply(null); + + // Then: + assertThat(row, is(nullValue())); } - private SelectValueMapper createMapper(final String query) throws Exception { + private SelectValueMapper givenSelectMapperFor(final String query) { final PlanNode planNode = planBuilder.buildLogicalPlan(query); final ProjectNode projectNode = (ProjectNode) planNode.getSources().get(0); final Schema schema = planNode.getTheSourceNode().getSchema(); - final List> expressionPairList = projectNode.getProjectNameExpressionPairList(); - final List metadata = createExpressionMetadata(expressionPairList, schema); - return new SelectValueMapper(new GenericRowValueTypeEnforcer(schema), expressionPairList, metadata); + final List selectExpressions = projectNode.getProjectSelectExpressions(); + final List metadata = createExpressionMetadata(selectExpressions, schema); + final List selectFieldNames = selectExpressions.stream() + .map(SelectExpression::getName) + .collect(Collectors.toList()); + final GenericRowValueTypeEnforcer typeEnforcer = new GenericRowValueTypeEnforcer(schema); + return new SelectValueMapper(typeEnforcer, selectFieldNames, metadata); } + private List createExpressionMetadata( + final List selectExpressions, + final Schema schema + ) { + try { + final CodeGenRunner codeGenRunner = new CodeGenRunner( + schema, ksqlConfig, new InternalFunctionRegistry()); - private List createExpressionMetadata(final List> expressionPairList, - final Schema schema) throws Exception { - final CodeGenRunner codeGenRunner = new CodeGenRunner(schema, ksqlConfig, new InternalFunctionRegistry()); - final List expressionEvaluators = new ArrayList<>(); - for (final Pair expressionPair : expressionPairList) { - final ExpressionMetadata - expressionEvaluator = - codeGenRunner.buildCodeGenFromParseTree(expressionPair.getRight()); - expressionEvaluators.add(expressionEvaluator); + final List expressionEvaluators = new ArrayList<>(); + for (final SelectExpression expressionPair : selectExpressions) { + expressionEvaluators + .add(codeGenRunner.buildCodeGenFromParseTree(expressionPair.getExpression())); + } + return expressionEvaluators; + } catch (final Exception e) { + throw new AssertionError("Invalid test", e); } - return expressionEvaluators; } -} \ No newline at end of file + + private static GenericRow genericRow(final Object... columns) { + return new GenericRow(Arrays.asList(columns)); + } +} diff --git a/ksql-engine/src/test/resources/query-validation-tests/window-bounds.json b/ksql-engine/src/test/resources/query-validation-tests/window-bounds.json new file mode 100644 index 000000000000..31847de41bed --- /dev/null +++ b/ksql-engine/src/test/resources/query-validation-tests/window-bounds.json @@ -0,0 +1,180 @@ +{ + "comments": [ + "Test cases covering WindowStart and WindowEnd UDAFs" + ], + "tests": [ + { + "name": "table session", + "comment": "This version of KSQL reports the end time of the session window incorrectly.", + "statements": [ + "CREATE STREAM TEST (ID bigint) WITH (kafka_topic='test_topic', value_format='DELIMITED', key='ID');", + "CREATE TABLE S2 as SELECT id, WindowStart(), WindowEnd() FROM test WINDOW SESSION (30 SECONDS) group by id;" + ], + "inputs": [ + {"topic": "test_topic", "key": 0, "value": "0", "timestamp": 0}, + {"topic": "test_topic", "key": 0, "value": "0", "timestamp": 10000}, + {"topic": "test_topic", "key": 1, "value": "1", "timestamp": 10000}, + {"topic": "test_topic", "key": 1, "value": "1", "timestamp": 40000} + ], + "outputs": [ + {"topic": "S2", "key": 0, "value": "0,0,0", "timestamp": 0, "window": {"start": 0, "end": 30000, "type": "session"}}, + {"topic": "S2", "key": 0, "value": null, "timestamp": 10000, "window": {"start": 0, "end": 30000, "type": "session"}}, + {"topic": "S2", "key": 0, "value": "0,0,10000", "timestamp": 10000, "window": {"start": 0, "end": 40000, "type": "session"}}, + {"topic": "S2", "key": 1, "value": "1,10000,10000", "timestamp": 10000, "window": {"start": 10000, "end": 40000, "type": "session"}}, + {"topic": "S2", "key": 1, "value": null, "timestamp": 40000, "window": {"start": 10000, "end": 40000, "type": "session"}}, + {"topic": "S2", "key": 1, "value": "1,10000,40000", "timestamp": 40000, "window": {"start": 10000, "end": 70000, "type": "session"}} + ] + }, + { + "name": "stream session", + "comment": "This version of KSQL reports the end time of the session window incorrectly", + "statements": [ + "CREATE STREAM TEST (ID bigint) WITH (kafka_topic='test_topic', value_format='DELIMITED', key='ID');", + "CREATE STREAM S2 as SELECT id, WindowStart(), WindowEnd() FROM test WINDOW SESSION (30 SECONDS) group by id;" + ], + "inputs": [ + {"topic": "test_topic", "key": 0, "value": "0", "timestamp": 0}, + {"topic": "test_topic", "key": 0, "value": "0", "timestamp": 10000}, + {"topic": "test_topic", "key": 1, "value": "1", "timestamp": 10000}, + {"topic": "test_topic", "key": 1, "value": "1", "timestamp": 40000} + ], + "outputs": [ + {"topic": "S2", "key": 0, "value": "0,0,0", "timestamp": 0, "window": {"start": 0, "end": 30000, "type": "session"}}, + {"topic": "S2", "key": 0, "value": null, "timestamp": 10000, "window": {"start": 0, "end": 30000, "type": "session"}}, + {"topic": "S2", "key": 0, "value": "0,0,10000", "timestamp": 10000, "window": {"start": 0, "end": 40000, "type": "session"}}, + {"topic": "S2", "key": 1, "value": "1,10000,10000", "timestamp": 10000, "window": {"start": 10000, "end": 40000, "type": "session"}}, + {"topic": "S2", "key": 1, "value": null, "timestamp": 40000, "window": {"start": 10000, "end": 40000, "type": "session"}}, + {"topic": "S2", "key": 1, "value": "1,10000,40000", "timestamp": 40000, "window": {"start": 10000, "end": 70000, "type": "session"}} + ] + }, + { + "name": "table tumbling", + "statements": [ + "CREATE STREAM TEST (ID bigint) WITH (kafka_topic='test_topic', value_format='DELIMITED', key='ID');", + "CREATE TABLE S2 as SELECT id, WindowStart(), WindowEnd() FROM test WINDOW TUMBLING (SIZE 30 SECONDS) group by id;" + ], + "inputs": [ + {"topic": "test_topic", "key": 0, "value": "0", "timestamp": 0}, + {"topic": "test_topic", "key": 0, "value": "0", "timestamp": 10000}, + {"topic": "test_topic", "key": 100, "value": "100", "timestamp": 30000}, + {"topic": "test_topic", "key": 100, "value": "100", "timestamp": 45000}, + {"topic": "test_topic", "key": 100, "value": "100", "timestamp": 50000}, + {"topic": "test_topic", "key": 0, "value": "0", "timestamp": 35000}, + {"topic": "test_topic", "key": 0, "value": "0", "timestamp": 70000} + ], + "outputs": [ + {"topic": "S2", "key": 0, "value": "0,0,30000", "timestamp": 0, "window": {"start": 0, "end": 30000, "type": "time"}}, + {"topic": "S2", "key": 0, "value": "0,0,30000", "timestamp": 10000, "window": {"start": 0, "end": 30000, "type": "time"}}, + {"topic": "S2", "key": 100, "value": "100,30000,60000", "timestamp": 30000, "window": {"start": 30000, "end": 60000, "type": "time"}}, + {"topic": "S2", "key": 100, "value": "100,30000,60000", "timestamp": 45000, "window": {"start": 30000, "end": 60000, "type": "time"}}, + {"topic": "S2", "key": 100, "value": "100,30000,60000", "timestamp": 50000, "window": {"start": 30000, "end": 60000, "type": "time"}}, + {"topic": "S2", "key": 0, "value": "0,30000,60000", "timestamp": 35000, "window": {"start": 30000, "end": 60000, "type": "time"}}, + {"topic": "S2", "key": 0, "value": "0,60000,90000", "timestamp": 70000, "window": {"start": 60000, "end": 90000, "type": "time"}} + ] + }, + { + "name": "stream tumbling", + "statements": [ + "CREATE STREAM TEST (ID bigint) WITH (kafka_topic='test_topic', value_format='DELIMITED', key='ID');", + "CREATE STREAM S2 as SELECT id, WindowStart(), WindowEnd() FROM test WINDOW TUMBLING (SIZE 30 SECONDS) group by id;" + ], + "inputs": [ + {"topic": "test_topic", "key": 0, "value": "0", "timestamp": 0}, + {"topic": "test_topic", "key": 0, "value": "0", "timestamp": 10000}, + {"topic": "test_topic", "key": 100, "value": "100", "timestamp": 30000}, + {"topic": "test_topic", "key": 100, "value": "100", "timestamp": 45000}, + {"topic": "test_topic", "key": 100, "value": "100", "timestamp": 50000}, + {"topic": "test_topic", "key": 0, "value": "0", "timestamp": 35000}, + {"topic": "test_topic", "key": 0, "value": "0", "timestamp": 70000} + ], + "outputs": [ + {"topic": "S2", "key": 0, "value": "0,0,30000", "timestamp": 0, "window": {"start": 0, "end": 30000, "type": "time"}}, + {"topic": "S2", "key": 0, "value": "0,0,30000", "timestamp": 10000, "window": {"start": 0, "end": 30000, "type": "time"}}, + {"topic": "S2", "key": 100, "value": "100,30000,60000", "timestamp": 30000, "window": {"start": 30000, "end": 60000, "type": "time"}}, + {"topic": "S2", "key": 100, "value": "100,30000,60000", "timestamp": 45000, "window": {"start": 30000, "end": 60000, "type": "time"}}, + {"topic": "S2", "key": 100, "value": "100,30000,60000", "timestamp": 50000, "window": {"start": 30000, "end": 60000, "type": "time"}}, + {"topic": "S2", "key": 0, "value": "0,30000,60000", "timestamp": 35000, "window": {"start": 30000, "end": 60000, "type": "time"}}, + {"topic": "S2", "key": 0, "value": "0,60000,90000", "timestamp": 70000, "window": {"start": 60000, "end": 90000, "type": "time"}} + ] + }, + { + "name": "table hopping", + "statements": [ + "CREATE STREAM TEST (ID bigint) WITH (kafka_topic='test_topic', value_format='DELIMITED', key='ID');", + "CREATE TABLE S2 as SELECT id, WindowStart(), WindowEnd() FROM test WINDOW HOPPING (SIZE 30 SECONDS, ADVANCE BY 5 SECONDS) group by id;" + ], + "inputs": [ + {"topic": "test_topic", "key": 0, "value": "0", "timestamp": 0}, + {"topic": "test_topic", "key": 100, "value": "100", "timestamp": 2000}, + {"topic": "test_topic", "key": 0, "value": "0", "timestamp": 4999}, + {"topic": "test_topic", "key": 0, "value": "0", "timestamp": 5000} + ], + "outputs": [ + {"topic": "S2", "key": 0, "value": "0,0,30000", "timestamp": 0, "window": {"start": 0, "end": 30000, "type": "time"}}, + {"topic": "S2", "key": 100, "value": "100,0,30000", "timestamp": 2000, "window": {"start": 0, "end": 30000, "type": "time"}}, + {"topic": "S2", "key": 0, "value": "0,0,30000", "timestamp": 4999, "window": {"start": 0, "end": 30000, "type": "time"}}, + {"topic": "S2", "key": 0, "value": "0,0,30000", "timestamp": 5000, "window": {"start": 0, "end": 30000, "type": "time"}}, + {"topic": "S2", "key": 0, "value": "0,5000,35000", "timestamp": 5000, "window": {"start": 5000, "end": 35000, "type": "time"}} + ] + }, + { + "name": "stream hopping", + "statements": [ + "CREATE STREAM TEST (ID bigint) WITH (kafka_topic='test_topic', value_format='DELIMITED', key='ID');", + "CREATE STREAM S2 as SELECT id, WindowStart(), WindowEnd() FROM test WINDOW HOPPING (SIZE 30 SECONDS, ADVANCE BY 5 SECONDS) group by id;" + ], + "inputs": [ + {"topic": "test_topic", "key": 0, "value": "0", "timestamp": 0}, + {"topic": "test_topic", "key": 100, "value": "100", "timestamp": 2000}, + {"topic": "test_topic", "key": 0, "value": "0", "timestamp": 4999}, + {"topic": "test_topic", "key": 0, "value": "0", "timestamp": 5000} + ], + "outputs": [ + {"topic": "S2", "key": 0, "value": "0,0,30000", "timestamp": 0, "window": {"start": 0, "end": 30000, "type": "time"}}, + {"topic": "S2", "key": 100, "value": "100,0,30000", "timestamp": 2000, "window": {"start": 0, "end": 30000, "type": "time"}}, + {"topic": "S2", "key": 0, "value": "0,0,30000", "timestamp": 4999, "window": {"start": 0, "end": 30000, "type": "time"}}, + {"topic": "S2", "key": 0, "value": "0,0,30000", "timestamp": 5000, "window": {"start": 0, "end": 30000, "type": "time"}}, + {"topic": "S2", "key": 0, "value": "0,5000,35000", "timestamp": 5000, "window": {"start": 5000, "end": 35000, "type": "time"}} + ] + }, + { + "name": "none", + "comment" : "Without a WINDOW statement the methods will return NULL", + "statements": [ + "CREATE STREAM TEST (ID bigint) WITH (kafka_topic='test_topic', value_format='DELIMITED', key='ID');", + "CREATE STREAM S2 as SELECT id, WindowStart(), WindowEnd() FROM test group by id;" + ], + "inputs": [ + {"topic": "test_topic", "key": 0, "value": "0", "timestamp": 0}, + {"topic": "test_topic", "key": 100, "value": "100", "timestamp": 2000}, + {"topic": "test_topic", "key": 0, "value": "0", "timestamp": 4999}, + {"topic": "test_topic", "key": 0, "value": "0", "timestamp": 5000} + ], + "outputs": [ + {"topic": "S2", "key": 0, "value": "0,,", "timestamp": 0}, + {"topic": "S2", "key": 100, "value": "100,,", "timestamp": 2000}, + {"topic": "S2", "key": 0, "value": "0,,", "timestamp": 4999}, + {"topic": "S2", "key": 0, "value": "0,,", "timestamp": 5000} + ] + }, + { + "name": "in expressions", + "statements": [ + "CREATE STREAM TEST (ID bigint) WITH (kafka_topic='test_topic', value_format='DELIMITED', key='ID');", + "CREATE STREAM S2 as SELECT id, WindowStart() / 2, WindowEnd() / 2 FROM test WINDOW TUMBLING (SIZE 30 SECONDS) group by id;" + ], + "inputs": [ + {"topic": "test_topic", "key": 0, "value": "0", "timestamp": 0}, + {"topic": "test_topic", "key": 100, "value": "100", "timestamp": 2000}, + {"topic": "test_topic", "key": 0, "value": "0", "timestamp": 4999}, + {"topic": "test_topic", "key": 0, "value": "0", "timestamp": 5000} + ], + "outputs": [ + {"topic": "S2", "key": 0, "value": "0,0,15000", "timestamp": 0, "window": {"start": 0, "end": 30000, "type": "time"}}, + {"topic": "S2", "key": 100, "value": "100,0,15000", "timestamp": 2000, "window": {"start": 0, "end": 30000, "type": "time"}}, + {"topic": "S2", "key": 0, "value": "0,0,15000", "timestamp": 4999, "window": {"start": 0, "end": 30000, "type": "time"}}, + {"topic": "S2", "key": 0, "value": "0,0,15000", "timestamp": 5000, "window": {"start": 0, "end": 30000, "type": "time"}} + ] + } + ] +} \ No newline at end of file