From 0c13bb0fb4b8baa72510e1f737f3d5eb8aaba8a1 Mon Sep 17 00:00:00 2001 From: Tim Fox Date: Tue, 17 Mar 2020 16:25:08 +0000 Subject: [PATCH] feat: Implement latest_by_offset() UDAF (#4782) --- .../ksqldb-reference/aggregate-functions.md | 11 + .../confluent/ksql/function/UdafLoader.java | 2 +- .../function/udaf/latest/LatestByOffset.java | 161 +++++++++++++++ .../udaf/latest/LatestByOffsetUdafTest.java | 145 +++++++++++++ .../6.0.0_1584458743188/plan.json | 190 ++++++++++++++++++ .../6.0.0_1584458743188/spec.json | 100 +++++++++ .../6.0.0_1584458743188/topology | 25 +++ .../6.0.0_1584458743310/plan.json | 190 ++++++++++++++++++ .../6.0.0_1584458743310/spec.json | 40 ++++ .../6.0.0_1584458743310/topology | 25 +++ .../latest-offset-udaf.json | 41 ++++ 11 files changed, 929 insertions(+), 1 deletion(-) create mode 100644 ksqldb-engine/src/main/java/io/confluent/ksql/function/udaf/latest/LatestByOffset.java create mode 100644 ksqldb-engine/src/test/java/io/confluent/ksql/function/udaf/latest/LatestByOffsetUdafTest.java create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/latest-offset-udaf_-_latest_by_offset/6.0.0_1584458743188/plan.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/latest-offset-udaf_-_latest_by_offset/6.0.0_1584458743188/spec.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/latest-offset-udaf_-_latest_by_offset/6.0.0_1584458743188/topology create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/latest-offset-udaf_-_latest_by_offset_with_nulls/6.0.0_1584458743310/plan.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/latest-offset-udaf_-_latest_by_offset_with_nulls/6.0.0_1584458743310/spec.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/latest-offset-udaf_-_latest_by_offset_with_nulls/6.0.0_1584458743310/topology create mode 100644 ksqldb-functional-tests/src/test/resources/query-validation-tests/latest-offset-udaf.json diff --git a/docs-md/developer-guide/ksqldb-reference/aggregate-functions.md b/docs-md/developer-guide/ksqldb-reference/aggregate-functions.md index 0ff72fceb9b3..2c297b8ebc38 100644 --- a/docs-md/developer-guide/ksqldb-reference/aggregate-functions.md +++ b/docs-md/developer-guide/ksqldb-reference/aggregate-functions.md @@ -120,6 +120,17 @@ first considering all the records from the first window, then the late-arriving record, then the records from the second window in the order they were originally processed. +LATEST_BY_OFFSET +---------------- + +`LATEST_BY_OFFSET(col1)` + +Stream + +Return the latest value for a given column. Latest here is defined as the value in the partition +with the greatest offset. +Note: rows where `col1` is null will be ignored. + MAX --- diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdafLoader.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdafLoader.java index e72819125906..adc9a6d28e13 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdafLoader.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdafLoader.java @@ -52,7 +52,7 @@ class UdafLoader { void loadUdafFromClass(final Class theClass, final String path) { final UdafDescription udafAnnotation = theClass.getAnnotation(UdafDescription.class); - + final List invokers = new ArrayList<>(); for (final Method method : theClass.getMethods()) { if (method.getAnnotation(UdafFactory.class) != null) { diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udaf/latest/LatestByOffset.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udaf/latest/LatestByOffset.java new file mode 100644 index 000000000000..5a8628cb4d06 --- /dev/null +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udaf/latest/LatestByOffset.java @@ -0,0 +1,161 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function.udaf.latest; + +import io.confluent.ksql.function.udaf.Udaf; +import io.confluent.ksql.function.udaf.UdafDescription; +import io.confluent.ksql.function.udaf.UdafFactory; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; + +@UdafDescription( + name = "LATEST_BY_OFFSET", + description = LatestByOffset.DESCRIPTION +) +public final class LatestByOffset { + + static final String DESCRIPTION = + "This function returns the most recent value for the column, computed by offset."; + + private LatestByOffset() { + } + + static final String SEQ_FIELD = "SEQ"; + static final String VAL_FIELD = "VAL"; + + public static final Schema STRUCT_INTEGER = SchemaBuilder.struct().optional() + .field(SEQ_FIELD, Schema.OPTIONAL_INT64_SCHEMA) + .field(VAL_FIELD, Schema.OPTIONAL_INT32_SCHEMA) + .build(); + + public static final Schema STRUCT_LONG = SchemaBuilder.struct().optional() + .field(SEQ_FIELD, Schema.OPTIONAL_INT64_SCHEMA) + .field(VAL_FIELD, Schema.OPTIONAL_INT64_SCHEMA) + .build(); + + public static final Schema STRUCT_DOUBLE = SchemaBuilder.struct().optional() + .field(SEQ_FIELD, Schema.OPTIONAL_INT64_SCHEMA) + .field(VAL_FIELD, Schema.OPTIONAL_FLOAT64_SCHEMA) + .build(); + + public static final Schema STRUCT_BOOLEAN = SchemaBuilder.struct().optional() + .field(SEQ_FIELD, Schema.OPTIONAL_INT64_SCHEMA) + .field(VAL_FIELD, Schema.OPTIONAL_BOOLEAN_SCHEMA) + .build(); + + public static final Schema STRUCT_STRING = SchemaBuilder.struct().optional() + .field(SEQ_FIELD, Schema.OPTIONAL_INT64_SCHEMA) + .field(VAL_FIELD, Schema.OPTIONAL_STRING_SCHEMA) + .build(); + + static AtomicLong sequence = new AtomicLong(); + + @UdafFactory(description = "return the latest value of an integer column", + aggregateSchema = "STRUCT") + public static Udaf latestInteger() { + return latest(STRUCT_INTEGER); + } + + @UdafFactory(description = "return the latest value of an big integer column", + aggregateSchema = "STRUCT") + public static Udaf latestLong() { + return latest(STRUCT_LONG); + } + + @UdafFactory(description = "return the latest value of a double column", + aggregateSchema = "STRUCT") + public static Udaf latestDouble() { + return latest(STRUCT_DOUBLE); + } + + @UdafFactory(description = "return the latest value of a boolean column", + aggregateSchema = "STRUCT") + public static Udaf latestBoolean() { + return latest(STRUCT_BOOLEAN); + } + + @UdafFactory(description = "return the latest value of a string column", + aggregateSchema = "STRUCT") + public static Udaf latestString() { + return latest(STRUCT_STRING); + } + + static Struct createStruct(final Schema schema, final T val) { + final Struct struct = new Struct(schema); + struct.put(SEQ_FIELD, generateSequence()); + struct.put(VAL_FIELD, val); + return struct; + } + + private static long generateSequence() { + return sequence.getAndIncrement(); + } + + private static int compareStructs(final Struct struct1, final Struct struct2) { + // Deal with overflow - we assume if one is positive and the other negative then the sequence + // has overflowed - in which case the latest is the one with the smallest sequence + final long sequence1 = struct1.getInt64(SEQ_FIELD); + final long sequence2 = struct2.getInt64(SEQ_FIELD); + if (sequence1 < 0 && sequence2 >= 0) { + return 1; + } else if (sequence2 < 0 && sequence1 >= 0) { + return -1; + } else { + return Long.compare(sequence1, sequence2); + } + } + + @UdafFactory(description = "Latest by offset") + static Udaf latest(final Schema structSchema) { + return new Udaf() { + + @Override + public Struct initialize() { + return null; + } + + @Override + public Struct aggregate(final T current, final Struct aggregate) { + if (current == null) { + return aggregate; + } else { + return createStruct(structSchema, current); + } + } + + @Override + public Struct merge(final Struct aggOne, final Struct aggTwo) { + // When merging we need some way of evaluating the "latest' one. + // We do this by keeping track of the sequence of when it was originally processed + if (compareStructs(aggOne, aggTwo) >= 0) { + return aggOne; + } else { + return aggTwo; + } + } + + @Override + @SuppressWarnings("unchecked") + public T map(final Struct agg) { + return (T) agg.get(VAL_FIELD); + } + }; + } + + +} diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udaf/latest/LatestByOffsetUdafTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udaf/latest/LatestByOffsetUdafTest.java new file mode 100644 index 000000000000..49c46d6c4c05 --- /dev/null +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udaf/latest/LatestByOffsetUdafTest.java @@ -0,0 +1,145 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function.udaf.latest; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; + +import io.confluent.ksql.function.udaf.Udaf; +import org.apache.kafka.connect.data.Struct; +import org.junit.Test; + +public class LatestByOffsetUdafTest { + + @Test + public void shouldInitializeToNull() { + // Given: + final Udaf udaf = LatestByOffset + .latest(LatestByOffset.STRUCT_LONG); + + // When: + Struct init = udaf.initialize(); + + // Then: + assertThat(init, is(nullValue())); + } + + @Test + public void shouldComputeLatestInteger() { + // Given: + final Udaf udaf = LatestByOffset.latestInteger(); + + // When: + Struct res = udaf + .aggregate(123, LatestByOffset.createStruct(LatestByOffset.STRUCT_INTEGER, 321)); + + // Then: + assertThat(res.get(LatestByOffset.VAL_FIELD), is(123)); + } + + @Test + public void shouldMerge() { + // Given: + final Udaf udaf = LatestByOffset.latestInteger(); + + Struct agg1 = LatestByOffset.createStruct(LatestByOffset.STRUCT_INTEGER, 123); + Struct agg2 = LatestByOffset.createStruct(LatestByOffset.STRUCT_INTEGER, 321); + + // When: + Struct merged1 = udaf.merge(agg1, agg2); + Struct merged2 = udaf.merge(agg2, agg1); + + // Then: + assertThat(merged1, is(agg2)); + assertThat(merged2, is(agg2)); + } + + @Test + public void shouldMergeWithOverflow() { + // Given: + final Udaf udaf = LatestByOffset.latestInteger(); + + LatestByOffset.sequence.set(Long.MAX_VALUE); + + Struct agg1 = LatestByOffset.createStruct(LatestByOffset.STRUCT_INTEGER, 123); + Struct agg2 = LatestByOffset.createStruct(LatestByOffset.STRUCT_INTEGER, 321); + + // When: + Struct merged1 = udaf.merge(agg1, agg2); + Struct merged2 = udaf.merge(agg2, agg1); + + // Then: + assertThat(agg1.getInt64(LatestByOffset.SEQ_FIELD), is(Long.MAX_VALUE)); + assertThat(agg2.getInt64(LatestByOffset.SEQ_FIELD), is(Long.MIN_VALUE)); + assertThat(merged1, is(agg2)); + assertThat(merged2, is(agg2)); + } + + + @Test + public void shouldComputeLatestLong() { + // Given: + final Udaf udaf = LatestByOffset.latestLong(); + + // When: + Struct res = udaf + .aggregate(123L, LatestByOffset.createStruct(LatestByOffset.STRUCT_LONG, 321L)); + + // Then: + assertThat(res.getInt64(LatestByOffset.VAL_FIELD), is(123L)); + } + + @Test + public void shouldComputeLatestDouble() { + // Given: + final Udaf udaf = LatestByOffset.latestDouble(); + + // When: + Struct res = udaf + .aggregate(1.1d, LatestByOffset.createStruct(LatestByOffset.STRUCT_DOUBLE, 2.2d)); + + // Then: + assertThat(res.getFloat64(LatestByOffset.VAL_FIELD), is(1.1d)); + } + + @Test + public void shouldComputeLatestBoolean() { + // Given: + final Udaf udaf = LatestByOffset.latestBoolean(); + + // When: + Struct res = udaf + .aggregate(true, LatestByOffset.createStruct(LatestByOffset.STRUCT_BOOLEAN, false)); + + // Then: + assertThat(res.getBoolean(LatestByOffset.VAL_FIELD), is(true)); + } + + @Test + public void shouldComputeLatestString() { + // Given: + final Udaf udaf = LatestByOffset.latestString(); + + // When: + Struct res = udaf + .aggregate("foo", LatestByOffset.createStruct(LatestByOffset.STRUCT_STRING, "bar")); + + // Then: + assertThat(res.getString(LatestByOffset.VAL_FIELD), is("foo")); + } + +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/latest-offset-udaf_-_latest_by_offset/6.0.0_1584458743188/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/latest-offset-udaf_-_latest_by_offset/6.0.0_1584458743188/plan.json new file mode 100644 index 000000000000..c49d6b17fc80 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/latest-offset-udaf_-_latest_by_offset/6.0.0_1584458743188/plan.json @@ -0,0 +1,190 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (ROWKEY BIGINT KEY, ID BIGINT, F0 INTEGER, F1 BIGINT, F2 DOUBLE, F3 BOOLEAN, F4 STRING) WITH (KAFKA_TOPIC='test_topic', KEY='ID', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`ROWKEY` BIGINT KEY, `ID` BIGINT, `F0` INTEGER, `F1` BIGINT, `F2` DOUBLE, `F3` BOOLEAN, `F4` STRING", + "keyField" : "ID", + "timestampColumn" : null, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : null + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE TABLE OUTPUT AS SELECT\n INPUT.ID ID,\n LATEST_BY_OFFSET(INPUT.F0) L0,\n LATEST_BY_OFFSET(INPUT.F1) L1,\n LATEST_BY_OFFSET(INPUT.F2) L2,\n LATEST_BY_OFFSET(INPUT.F3) L3,\n LATEST_BY_OFFSET(INPUT.F4) L4\nFROM INPUT INPUT\nGROUP BY INPUT.ID\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createTableV1", + "sourceName" : "OUTPUT", + "schema" : "`ROWKEY` BIGINT KEY, `ID` BIGINT, `L0` INTEGER, `L1` BIGINT, `L2` DOUBLE, `L3` BOOLEAN, `L4` STRING", + "keyField" : "ID", + "timestampColumn" : null, + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "tableSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "tableSelectV1", + "properties" : { + "queryContext" : "Aggregate/Project" + }, + "source" : { + "@type" : "streamAggregateV1", + "properties" : { + "queryContext" : "Aggregate/Aggregate" + }, + "source" : { + "@type" : "streamGroupByKeyV1", + "properties" : { + "queryContext" : "Aggregate/GroupBy" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Aggregate/Prepare" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON", + "properties" : { } + }, + "options" : [ ] + }, + "timestampColumn" : null, + "sourceSchema" : "`ROWKEY` BIGINT KEY, `ID` BIGINT, `F0` INTEGER, `F1` BIGINT, `F2` DOUBLE, `F3` BOOLEAN, `F4` STRING" + }, + "selectExpressions" : [ "ID AS KSQL_INTERNAL_COL_0", "F0 AS KSQL_INTERNAL_COL_1", "F1 AS KSQL_INTERNAL_COL_2", "F2 AS KSQL_INTERNAL_COL_3", "F3 AS KSQL_INTERNAL_COL_4", "F4 AS KSQL_INTERNAL_COL_5" ] + }, + "internalFormats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON", + "properties" : { } + }, + "options" : [ ] + } + }, + "internalFormats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON", + "properties" : { } + }, + "options" : [ ] + }, + "nonAggregateColumns" : [ "KSQL_INTERNAL_COL_0", "KSQL_INTERNAL_COL_1", "KSQL_INTERNAL_COL_2", "KSQL_INTERNAL_COL_3", "KSQL_INTERNAL_COL_4", "KSQL_INTERNAL_COL_5" ], + "aggregationFunctions" : [ "LATEST_BY_OFFSET(KSQL_INTERNAL_COL_1)", "LATEST_BY_OFFSET(KSQL_INTERNAL_COL_2)", "LATEST_BY_OFFSET(KSQL_INTERNAL_COL_3)", "LATEST_BY_OFFSET(KSQL_INTERNAL_COL_4)", "LATEST_BY_OFFSET(KSQL_INTERNAL_COL_5)" ] + }, + "selectExpressions" : [ "KSQL_INTERNAL_COL_0 AS ID", "KSQL_AGG_VARIABLE_0 AS L0", "KSQL_AGG_VARIABLE_1 AS L1", "KSQL_AGG_VARIABLE_2 AS L2", "KSQL_AGG_VARIABLE_3 AS L3", "KSQL_AGG_VARIABLE_4 AS L4" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON", + "properties" : { } + }, + "options" : [ ] + }, + "topicName" : "OUTPUT", + "timestampColumn" : null + }, + "queryId" : "CTAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.streams.state.dir" : "/var/folders/lr/p50h83hj6h98w8_rj05lkt680000gn/T/confluent8901927823887024542", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.authentication.plugin.class" : null, + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.any.key.name.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/latest-offset-udaf_-_latest_by_offset/6.0.0_1584458743188/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/latest-offset-udaf_-_latest_by_offset/6.0.0_1584458743188/spec.json new file mode 100644 index 000000000000..3c1e44f571f7 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/latest-offset-udaf_-_latest_by_offset/6.0.0_1584458743188/spec.json @@ -0,0 +1,100 @@ +{ + "version" : "6.0.0", + "timestamp" : 1584458743188, + "schemas" : { + "CTAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT NOT NULL", + "CTAS_OUTPUT_0.Aggregate.GroupBy" : "STRUCT NOT NULL", + "CTAS_OUTPUT_0.Aggregate.Aggregate.Materialize" : "STRUCT, KSQL_AGG_VARIABLE_1 STRUCT, KSQL_AGG_VARIABLE_2 STRUCT, KSQL_AGG_VARIABLE_3 STRUCT, KSQL_AGG_VARIABLE_4 STRUCT> NOT NULL", + "CTAS_OUTPUT_0.OUTPUT" : "STRUCT NOT NULL" + }, + "inputs" : [ { + "topic" : "test_topic", + "key" : 0, + "value" : { + "ID" : 0, + "F0" : 12, + "F1" : 1000, + "F2" : 1.23, + "F3" : true, + "F4" : "foo" + } + }, { + "topic" : "test_topic", + "key" : 1, + "value" : { + "ID" : 1, + "F0" : 12, + "F1" : 1000, + "F2" : 1.23, + "F3" : true, + "F4" : "foo" + } + }, { + "topic" : "test_topic", + "key" : 0, + "value" : { + "ID" : 0, + "F0" : 21, + "F1" : 2000, + "F2" : 2.23, + "F3" : false, + "F4" : "bar" + } + }, { + "topic" : "test_topic", + "key" : 1, + "value" : { + "ID" : 1, + "F0" : 21, + "F1" : 2000, + "F2" : 2.23, + "F3" : false, + "F4" : "bar" + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : 0, + "value" : { + "ID" : 0, + "L0" : 12, + "L1" : 1000, + "L2" : 1.23, + "L3" : true, + "L4" : "foo" + } + }, { + "topic" : "OUTPUT", + "key" : 1, + "value" : { + "ID" : 1, + "L0" : 12, + "L1" : 1000, + "L2" : 1.23, + "L3" : true, + "L4" : "foo" + } + }, { + "topic" : "OUTPUT", + "key" : 0, + "value" : { + "ID" : 0, + "L0" : 21, + "L1" : 2000, + "L2" : 2.23, + "L3" : false, + "L4" : "bar" + } + }, { + "topic" : "OUTPUT", + "key" : 1, + "value" : { + "ID" : 1, + "L0" : 21, + "L1" : 2000, + "L2" : 2.23, + "L3" : false, + "L4" : "bar" + } + } ] +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/latest-offset-udaf_-_latest_by_offset/6.0.0_1584458743188/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/latest-offset-udaf_-_latest_by_offset/6.0.0_1584458743188/topology new file mode 100644 index 000000000000..e000b160c4db --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/latest-offset-udaf_-_latest_by_offset/6.0.0_1584458743188/topology @@ -0,0 +1,25 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Aggregate-Prepare + <-- KSTREAM-SOURCE-0000000000 + Processor: Aggregate-Prepare (stores: []) + --> KSTREAM-AGGREGATE-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-AGGREGATE-0000000003 (stores: [Aggregate-Aggregate-Materialize]) + --> Aggregate-Aggregate-ToOutputSchema + <-- Aggregate-Prepare + Processor: Aggregate-Aggregate-ToOutputSchema (stores: []) + --> Aggregate-Project + <-- KSTREAM-AGGREGATE-0000000003 + Processor: Aggregate-Project (stores: []) + --> KTABLE-TOSTREAM-0000000006 + <-- Aggregate-Aggregate-ToOutputSchema + Processor: KTABLE-TOSTREAM-0000000006 (stores: []) + --> KSTREAM-SINK-0000000007 + <-- Aggregate-Project + Sink: KSTREAM-SINK-0000000007 (topic: OUTPUT) + <-- KTABLE-TOSTREAM-0000000006 + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/latest-offset-udaf_-_latest_by_offset_with_nulls/6.0.0_1584458743310/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/latest-offset-udaf_-_latest_by_offset_with_nulls/6.0.0_1584458743310/plan.json new file mode 100644 index 000000000000..4fea499309d7 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/latest-offset-udaf_-_latest_by_offset_with_nulls/6.0.0_1584458743310/plan.json @@ -0,0 +1,190 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (ROWKEY BIGINT KEY, ID BIGINT, F0 INTEGER) WITH (KAFKA_TOPIC='test_topic', KEY='ID', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`ROWKEY` BIGINT KEY, `ID` BIGINT, `F0` INTEGER", + "keyField" : "ID", + "timestampColumn" : null, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : null + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE TABLE OUTPUT AS SELECT\n INPUT.ID ID,\n LATEST_BY_OFFSET(INPUT.F0) L0\nFROM INPUT INPUT\nGROUP BY INPUT.ID\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createTableV1", + "sourceName" : "OUTPUT", + "schema" : "`ROWKEY` BIGINT KEY, `ID` BIGINT, `L0` INTEGER", + "keyField" : "ID", + "timestampColumn" : null, + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "tableSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "tableSelectV1", + "properties" : { + "queryContext" : "Aggregate/Project" + }, + "source" : { + "@type" : "streamAggregateV1", + "properties" : { + "queryContext" : "Aggregate/Aggregate" + }, + "source" : { + "@type" : "streamGroupByKeyV1", + "properties" : { + "queryContext" : "Aggregate/GroupBy" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Aggregate/Prepare" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON", + "properties" : { } + }, + "options" : [ ] + }, + "timestampColumn" : null, + "sourceSchema" : "`ROWKEY` BIGINT KEY, `ID` BIGINT, `F0` INTEGER" + }, + "selectExpressions" : [ "ID AS KSQL_INTERNAL_COL_0", "F0 AS KSQL_INTERNAL_COL_1" ] + }, + "internalFormats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON", + "properties" : { } + }, + "options" : [ ] + } + }, + "internalFormats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON", + "properties" : { } + }, + "options" : [ ] + }, + "nonAggregateColumns" : [ "KSQL_INTERNAL_COL_0", "KSQL_INTERNAL_COL_1" ], + "aggregationFunctions" : [ "LATEST_BY_OFFSET(KSQL_INTERNAL_COL_1)" ] + }, + "selectExpressions" : [ "KSQL_INTERNAL_COL_0 AS ID", "KSQL_AGG_VARIABLE_0 AS L0" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON", + "properties" : { } + }, + "options" : [ ] + }, + "topicName" : "OUTPUT", + "timestampColumn" : null + }, + "queryId" : "CTAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.streams.state.dir" : "/var/folders/lr/p50h83hj6h98w8_rj05lkt680000gn/T/confluent8901927823887024542", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.authentication.plugin.class" : null, + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.any.key.name.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/latest-offset-udaf_-_latest_by_offset_with_nulls/6.0.0_1584458743310/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/latest-offset-udaf_-_latest_by_offset_with_nulls/6.0.0_1584458743310/spec.json new file mode 100644 index 000000000000..915527e7e740 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/latest-offset-udaf_-_latest_by_offset_with_nulls/6.0.0_1584458743310/spec.json @@ -0,0 +1,40 @@ +{ + "version" : "6.0.0", + "timestamp" : 1584458743310, + "schemas" : { + "CTAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT NOT NULL", + "CTAS_OUTPUT_0.Aggregate.GroupBy" : "STRUCT NOT NULL", + "CTAS_OUTPUT_0.Aggregate.Aggregate.Materialize" : "STRUCT> NOT NULL", + "CTAS_OUTPUT_0.OUTPUT" : "STRUCT NOT NULL" + }, + "inputs" : [ { + "topic" : "test_topic", + "key" : 0, + "value" : { + "ID" : 0, + "F0" : 12 + } + }, { + "topic" : "test_topic", + "key" : 0, + "value" : { + "ID" : 0, + "F0" : null + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : 0, + "value" : { + "ID" : 0, + "L0" : 12 + } + }, { + "topic" : "OUTPUT", + "key" : 0, + "value" : { + "ID" : 0, + "L0" : 12 + } + } ] +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/latest-offset-udaf_-_latest_by_offset_with_nulls/6.0.0_1584458743310/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/latest-offset-udaf_-_latest_by_offset_with_nulls/6.0.0_1584458743310/topology new file mode 100644 index 000000000000..e000b160c4db --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/latest-offset-udaf_-_latest_by_offset_with_nulls/6.0.0_1584458743310/topology @@ -0,0 +1,25 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Aggregate-Prepare + <-- KSTREAM-SOURCE-0000000000 + Processor: Aggregate-Prepare (stores: []) + --> KSTREAM-AGGREGATE-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-AGGREGATE-0000000003 (stores: [Aggregate-Aggregate-Materialize]) + --> Aggregate-Aggregate-ToOutputSchema + <-- Aggregate-Prepare + Processor: Aggregate-Aggregate-ToOutputSchema (stores: []) + --> Aggregate-Project + <-- KSTREAM-AGGREGATE-0000000003 + Processor: Aggregate-Project (stores: []) + --> KTABLE-TOSTREAM-0000000006 + <-- Aggregate-Aggregate-ToOutputSchema + Processor: KTABLE-TOSTREAM-0000000006 (stores: []) + --> KSTREAM-SINK-0000000007 + <-- Aggregate-Project + Sink: KSTREAM-SINK-0000000007 (topic: OUTPUT) + <-- KTABLE-TOSTREAM-0000000006 + diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/latest-offset-udaf.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/latest-offset-udaf.json new file mode 100644 index 000000000000..82d42d7b2ac6 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/latest-offset-udaf.json @@ -0,0 +1,41 @@ +{ + "comments": [ + "Tests covering the use of the LATEST_BY_OFFSET aggregate function" + ], + "tests": [ + { + "name": "latest by offset", + "statements": [ + "CREATE STREAM INPUT (ROWKEY BIGINT KEY, ID BIGINT, F0 INT, F1 BIGINT, F2 DOUBLE, F3 BOOLEAN, F4 STRING) WITH (kafka_topic='test_topic', value_format='JSON', key='ID');", + "CREATE TABLE OUTPUT AS SELECT ID, LATEST_BY_OFFSET(F0) AS L0, LATEST_BY_OFFSET(F1) AS L1, LATEST_BY_OFFSET(F2) AS L2, LATEST_BY_OFFSET(F3) AS L3, LATEST_BY_OFFSET(F4) AS L4 FROM INPUT GROUP BY ID;" + ], + "inputs": [ + {"topic": "test_topic", "key": 0, "value": {"ID": 0, "F0": 12, "F1": 1000, "F2": 1.23, "F3": true, "F4": "foo"}}, + {"topic": "test_topic", "key": 1, "value": {"ID": 1, "F0": 12, "F1": 1000, "F2": 1.23, "F3": true, "F4": "foo"}}, + {"topic": "test_topic", "key": 0, "value": {"ID": 0, "F0": 21, "F1": 2000, "F2": 2.23, "F3": false, "F4": "bar"}}, + {"topic": "test_topic", "key": 1, "value": {"ID": 1, "F0": 21, "F1": 2000, "F2": 2.23, "F3": false, "F4": "bar"}} + ], + "outputs": [ + {"topic": "OUTPUT", "key": 0, "value": {"ID": 0, "L0": 12, "L1": 1000, "L2": 1.23, "L3": true, "L4": "foo"}}, + {"topic": "OUTPUT", "key": 1, "value": {"ID": 1, "L0": 12, "L1": 1000, "L2": 1.23, "L3": true, "L4": "foo"}}, + {"topic": "OUTPUT", "key": 0, "value": {"ID": 0, "L0": 21, "L1": 2000, "L2": 2.23, "L3": false, "L4": "bar"}}, + {"topic": "OUTPUT", "key": 1, "value": {"ID": 1, "L0": 21, "L1": 2000, "L2": 2.23, "L3": false, "L4": "bar"}} + ] + }, + { + "name": "latest by offset with nulls", + "statements": [ + "CREATE STREAM INPUT (ROWKEY BIGINT KEY, ID BIGINT, F0 INT) WITH (kafka_topic='test_topic', value_format='JSON', key='ID');", + "CREATE TABLE OUTPUT AS SELECT ID, LATEST_BY_OFFSET(F0) AS L0 FROM INPUT GROUP BY ID;" + ], + "inputs": [ + {"topic": "test_topic", "key": 0, "value": {"ID": 0, "F0": 12}}, + {"topic": "test_topic", "key": 0, "value": {"ID": 0, "F0": null}} + ], + "outputs": [ + {"topic": "OUTPUT", "key": 0, "value": {"ID": 0, "L0": 12}}, + {"topic": "OUTPUT", "key": 0, "value": {"ID": 0, "L0": 12}} + ] + } + ] +}