diff --git a/docs/developer-guide/syntax-reference.rst b/docs/developer-guide/syntax-reference.rst index c62751307bb7..e5435faab568 100644 --- a/docs/developer-guide/syntax-reference.rst +++ b/docs/developer-guide/syntax-reference.rst @@ -2093,70 +2093,74 @@ convention is followed. Aggregate functions =================== -+------------------------+---------------------------+------------+---------------------------------------------------------------------+ -| Function | Example | Input Type | Description | -+========================+===========================+============+=====================================================================+ -| COLLECT_LIST | ``COLLECT_LIST(col1)`` | Stream, | Return an array containing all the values of ``col1`` from each | -| | | Table | input row (for the specified grouping and time window, if any). | -| | | | Currently only works for simple types (not Map, Array, or Struct). | -| | | | This version limits the size of the result Array to a maximum of | -| | | | 1000 entries and any values beyond this limit are silently ignored. | -| | | | When using with a window type of ``session``, it can sometimes | -| | | | happen that two session windows get merged together into one when a | -| | | | late-arriving record with a timestamp between the two windows is | -| | | | processed. In this case the 1000 record limit is calculated by | -| | | | first considering all the records from the first window, then the | -| | | | late-arriving record, then the records from the second window in | -| | | | the order they were originally processed. | -+------------------------+---------------------------+------------+---------------------------------------------------------------------+ -| COLLECT_SET | ``COLLECT_SET(col1)`` | Stream | Return an array containing the distinct values of ``col1`` from | -| | | | each input row (for the specified grouping and time window, if any).| -| | | | Currently only works for simple types (not Map, Array, or Struct). | -| | | | This version limits the size of the result Array to a maximum of | -| | | | 1000 entries and any values beyond this limit are silently ignored. | -| | | | When using with a window type of ``session``, it can sometimes | -| | | | happen that two session windows get merged together into one when a | -| | | | late-arriving record with a timestamp between the two windows is | -| | | | processed. In this case the 1000 record limit is calculated by | -| | | | first considering all the records from the first window, then the | -| | | | late-arriving record, then the records from the second window in | -| | | | the order they were originally processed. | -+------------------------+---------------------------+------------+---------------------------------------------------------------------+ -| COUNT | ``COUNT(col1)``, | Stream, | Count the number of rows. When ``col1`` is specified, the count | -| | ``COUNT(*)`` | Table | returned will be the number of rows where ``col1`` is non-null. | -| | | | When ``*`` is specified, the count returned will be the total | -| | | | number of rows. | -+------------------------+---------------------------+------------+---------------------------------------------------------------------+ -| HISTOGRAM | ``HISTOGRAM(col1)`` | Stream, | Return a map containing the distinct String values of ``col1`` | -| | | Table | mapped to the number of times each one occurs for the given window. | -| | | | This version limits the number of distinct values which can be | -| | | | counted to 1000, beyond which any additional entries are ignored. | -| | | | When using with a window type of ``session``, it can sometimes | -| | | | happen that two session windows get merged together into one when a | -| | | | late-arriving record with a timestamp between the two windows is | -| | | | processed. In this case the 1000 record limit is calculated by | -| | | | first considering all the records from the first window, then the | -| | | | late-arriving record, then the records from the second window in | -| | | | the order they were originally processed. | -+------------------------+---------------------------+------------+---------------------------------------------------------------------+ -| AVERAGE | ``AVG(col1)`` | Stream, | Return the average value for a given column. | -| | | Table | Note: rows where ``col1`` is null are ignored. | -+------------------------+---------------------------+------------+---------------------------------------------------------------------+ -| MAX | ``MAX(col1)`` | Stream | Return the maximum value for a given column and window. | -| | | | Note: rows where ``col1`` is null will be ignored. | -+------------------------+---------------------------+------------+---------------------------------------------------------------------+ -| MIN | ``MIN(col1)`` | Stream | Return the minimum value for a given column and window. | -| | | | Note: rows where ``col1`` is null will be ignored. | -+------------------------+---------------------------+------------+---------------------------------------------------------------------+ -| SUM | ``SUM(col1)`` | Stream, | Sums the column values | -| | | Table | Note: rows where ``col1`` is null will be ignored. | -+------------------------+---------------------------+------------+---------------------------------------------------------------------+ -| TOPK | ``TOPK(col1, k)`` | Stream | Return the Top *K* values for the given column and window | -| | | | Note: rows where ``col1`` is null will be ignored. | -+------------------------+---------------------------+------------+---------------------------------------------------------------------+ -| TOPKDISTINCT | ``TOPKDISTINCT(col1, k)`` | Stream | Return the distinct Top *K* values for the given column and window | -| | | | Note: rows where ``col1`` is null will be ignored. | -+------------------------+---------------------------+------------+---------------------------------------------------------------------+ ++------------------------+----------------------------+------------+---------------------------------------------------------------------+ +| Function | Example | Input Type | Description | ++========================+============================+============+=====================================================================+ +| COLLECT_LIST | ``COLLECT_LIST(col1)`` | Stream, | Return an array containing all the values of ``col1`` from each | +| | | Table | input row (for the specified grouping and time window, if any). | +| | | | Currently only works for simple types (not Map, Array, or Struct). | +| | | | This version limits the size of the result Array to a maximum of | +| | | | 1000 entries and any values beyond this limit are silently ignored. | +| | | | When using with a window type of ``session``, it can sometimes | +| | | | happen that two session windows get merged together into one when a | +| | | | late-arriving record with a timestamp between the two windows is | +| | | | processed. In this case the 1000 record limit is calculated by | +| | | | first considering all the records from the first window, then the | +| | | | late-arriving record, then the records from the second window in | +| | | | the order they were originally processed. | ++------------------------+----------------------------+------------+---------------------------------------------------------------------+ +| COLLECT_SET | ``COLLECT_SET(col1)`` | Stream | Return an array containing the distinct values of ``col1`` from | +| | | | each input row (for the specified grouping and time window, if any).| +| | | | Currently only works for simple types (not Map, Array, or Struct). | +| | | | This version limits the size of the result Array to a maximum of | +| | | | 1000 entries and any values beyond this limit are silently ignored. | +| | | | When using with a window type of ``session``, it can sometimes | +| | | | happen that two session windows get merged together into one when a | +| | | | late-arriving record with a timestamp between the two windows is | +| | | | processed. In this case the 1000 record limit is calculated by | +| | | | first considering all the records from the first window, then the | +| | | | late-arriving record, then the records from the second window in | +| | | | the order they were originally processed. | ++------------------------+----------------------------+------------+---------------------------------------------------------------------+ +| COUNT | ``COUNT(col1)``, | Stream, | Count the number of rows. When ``col1`` is specified, the count | +| | ``COUNT(*)`` | Table | returned will be the number of rows where ``col1`` is non-null. | +| | | | When ``*`` is specified, the count returned will be the total | +| | | | number of rows. | ++------------------------+----------------------------+------------+---------------------------------------------------------------------+ +| HISTOGRAM | ``HISTOGRAM(col1)`` | Stream, | Return a map containing the distinct String values of ``col1`` | +| | | Table | mapped to the number of times each one occurs for the given window. | +| | | | This version limits the number of distinct values which can be | +| | | | counted to 1000, beyond which any additional entries are ignored. | +| | | | When using with a window type of ``session``, it can sometimes | +| | | | happen that two session windows get merged together into one when a | +| | | | late-arriving record with a timestamp between the two windows is | +| | | | processed. In this case the 1000 record limit is calculated by | +| | | | first considering all the records from the first window, then the | +| | | | late-arriving record, then the records from the second window in | +| | | | the order they were originally processed. | ++------------------------+----------------------------+------------+---------------------------------------------------------------------+ +| AVERAGE | ``AVG(col1)`` | Stream, | Return the average value for a given column. | +| | | Table | Note: rows where ``col1`` is null are ignored. | ++------------------------+----------------------------+------------+---------------------------------------------------------------------+ +| MAX | ``MAX(col1)`` | Stream | Return the maximum value for a given column and window. | +| | | | Note: rows where ``col1`` is null will be ignored. | ++------------------------+----------------------------+------------+---------------------------------------------------------------------+ +| MIN | ``MIN(col1)`` | Stream | Return the minimum value for a given column and window. | +| | | | Note: rows where ``col1`` is null will be ignored. | ++------------------------+----------------------------+------------+---------------------------------------------------------------------+ +| SUM | ``SUM(col1)`` | Stream, | Sums the column values | +| | | Table | Note: rows where ``col1`` is null will be ignored. | ++------------------------+----------------------------+------------+---------------------------------------------------------------------+ +| TOPK | ``TOPK(col1, k)`` | Stream | Return the Top *K* values for the given column and window | +| | | | Note: rows where ``col1`` is null will be ignored. | ++------------------------+----------------------------+------------+---------------------------------------------------------------------+ +| TOPKDISTINCT | ``TOPKDISTINCT(col1, k)`` | Stream | Return the distinct Top *K* values for the given column and window | +| | | | Note: rows where ``col1`` is null will be ignored. | ++------------------------+----------------------------+------------+---------------------------------------------------------------------+ +| LATEST_BY_OFFSET | ``LATEST_BY_OFFSET(col1)`` | Stream | Returns the latest value for a given column as computed by offset | +| | | | i.e. a row with a greater offset is considered later. | +| | | | Note: rows where ``col1`` is null will be ignored. | ++------------------------+----------------------------+------------+---------------------------------------------------------------------+ For more information, see :ref:`aggregate-streaming-data-with-ksql`. diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdafLoader.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdafLoader.java index e72819125906..adc9a6d28e13 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdafLoader.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdafLoader.java @@ -52,7 +52,7 @@ class UdafLoader { void loadUdafFromClass(final Class theClass, final String path) { final UdafDescription udafAnnotation = theClass.getAnnotation(UdafDescription.class); - + final List invokers = new ArrayList<>(); for (final Method method : theClass.getMethods()) { if (method.getAnnotation(UdafFactory.class) != null) { diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udaf/latest/LatestByOffset.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udaf/latest/LatestByOffset.java new file mode 100644 index 000000000000..5a8628cb4d06 --- /dev/null +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udaf/latest/LatestByOffset.java @@ -0,0 +1,161 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function.udaf.latest; + +import io.confluent.ksql.function.udaf.Udaf; +import io.confluent.ksql.function.udaf.UdafDescription; +import io.confluent.ksql.function.udaf.UdafFactory; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; + +@UdafDescription( + name = "LATEST_BY_OFFSET", + description = LatestByOffset.DESCRIPTION +) +public final class LatestByOffset { + + static final String DESCRIPTION = + "This function returns the most recent value for the column, computed by offset."; + + private LatestByOffset() { + } + + static final String SEQ_FIELD = "SEQ"; + static final String VAL_FIELD = "VAL"; + + public static final Schema STRUCT_INTEGER = SchemaBuilder.struct().optional() + .field(SEQ_FIELD, Schema.OPTIONAL_INT64_SCHEMA) + .field(VAL_FIELD, Schema.OPTIONAL_INT32_SCHEMA) + .build(); + + public static final Schema STRUCT_LONG = SchemaBuilder.struct().optional() + .field(SEQ_FIELD, Schema.OPTIONAL_INT64_SCHEMA) + .field(VAL_FIELD, Schema.OPTIONAL_INT64_SCHEMA) + .build(); + + public static final Schema STRUCT_DOUBLE = SchemaBuilder.struct().optional() + .field(SEQ_FIELD, Schema.OPTIONAL_INT64_SCHEMA) + .field(VAL_FIELD, Schema.OPTIONAL_FLOAT64_SCHEMA) + .build(); + + public static final Schema STRUCT_BOOLEAN = SchemaBuilder.struct().optional() + .field(SEQ_FIELD, Schema.OPTIONAL_INT64_SCHEMA) + .field(VAL_FIELD, Schema.OPTIONAL_BOOLEAN_SCHEMA) + .build(); + + public static final Schema STRUCT_STRING = SchemaBuilder.struct().optional() + .field(SEQ_FIELD, Schema.OPTIONAL_INT64_SCHEMA) + .field(VAL_FIELD, Schema.OPTIONAL_STRING_SCHEMA) + .build(); + + static AtomicLong sequence = new AtomicLong(); + + @UdafFactory(description = "return the latest value of an integer column", + aggregateSchema = "STRUCT") + public static Udaf latestInteger() { + return latest(STRUCT_INTEGER); + } + + @UdafFactory(description = "return the latest value of an big integer column", + aggregateSchema = "STRUCT") + public static Udaf latestLong() { + return latest(STRUCT_LONG); + } + + @UdafFactory(description = "return the latest value of a double column", + aggregateSchema = "STRUCT") + public static Udaf latestDouble() { + return latest(STRUCT_DOUBLE); + } + + @UdafFactory(description = "return the latest value of a boolean column", + aggregateSchema = "STRUCT") + public static Udaf latestBoolean() { + return latest(STRUCT_BOOLEAN); + } + + @UdafFactory(description = "return the latest value of a string column", + aggregateSchema = "STRUCT") + public static Udaf latestString() { + return latest(STRUCT_STRING); + } + + static Struct createStruct(final Schema schema, final T val) { + final Struct struct = new Struct(schema); + struct.put(SEQ_FIELD, generateSequence()); + struct.put(VAL_FIELD, val); + return struct; + } + + private static long generateSequence() { + return sequence.getAndIncrement(); + } + + private static int compareStructs(final Struct struct1, final Struct struct2) { + // Deal with overflow - we assume if one is positive and the other negative then the sequence + // has overflowed - in which case the latest is the one with the smallest sequence + final long sequence1 = struct1.getInt64(SEQ_FIELD); + final long sequence2 = struct2.getInt64(SEQ_FIELD); + if (sequence1 < 0 && sequence2 >= 0) { + return 1; + } else if (sequence2 < 0 && sequence1 >= 0) { + return -1; + } else { + return Long.compare(sequence1, sequence2); + } + } + + @UdafFactory(description = "Latest by offset") + static Udaf latest(final Schema structSchema) { + return new Udaf() { + + @Override + public Struct initialize() { + return null; + } + + @Override + public Struct aggregate(final T current, final Struct aggregate) { + if (current == null) { + return aggregate; + } else { + return createStruct(structSchema, current); + } + } + + @Override + public Struct merge(final Struct aggOne, final Struct aggTwo) { + // When merging we need some way of evaluating the "latest' one. + // We do this by keeping track of the sequence of when it was originally processed + if (compareStructs(aggOne, aggTwo) >= 0) { + return aggOne; + } else { + return aggTwo; + } + } + + @Override + @SuppressWarnings("unchecked") + public T map(final Struct agg) { + return (T) agg.get(VAL_FIELD); + } + }; + } + + +} diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udaf/latest/LatestByOffsetUdafTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udaf/latest/LatestByOffsetUdafTest.java new file mode 100644 index 000000000000..49c46d6c4c05 --- /dev/null +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udaf/latest/LatestByOffsetUdafTest.java @@ -0,0 +1,145 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function.udaf.latest; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; + +import io.confluent.ksql.function.udaf.Udaf; +import org.apache.kafka.connect.data.Struct; +import org.junit.Test; + +public class LatestByOffsetUdafTest { + + @Test + public void shouldInitializeToNull() { + // Given: + final Udaf udaf = LatestByOffset + .latest(LatestByOffset.STRUCT_LONG); + + // When: + Struct init = udaf.initialize(); + + // Then: + assertThat(init, is(nullValue())); + } + + @Test + public void shouldComputeLatestInteger() { + // Given: + final Udaf udaf = LatestByOffset.latestInteger(); + + // When: + Struct res = udaf + .aggregate(123, LatestByOffset.createStruct(LatestByOffset.STRUCT_INTEGER, 321)); + + // Then: + assertThat(res.get(LatestByOffset.VAL_FIELD), is(123)); + } + + @Test + public void shouldMerge() { + // Given: + final Udaf udaf = LatestByOffset.latestInteger(); + + Struct agg1 = LatestByOffset.createStruct(LatestByOffset.STRUCT_INTEGER, 123); + Struct agg2 = LatestByOffset.createStruct(LatestByOffset.STRUCT_INTEGER, 321); + + // When: + Struct merged1 = udaf.merge(agg1, agg2); + Struct merged2 = udaf.merge(agg2, agg1); + + // Then: + assertThat(merged1, is(agg2)); + assertThat(merged2, is(agg2)); + } + + @Test + public void shouldMergeWithOverflow() { + // Given: + final Udaf udaf = LatestByOffset.latestInteger(); + + LatestByOffset.sequence.set(Long.MAX_VALUE); + + Struct agg1 = LatestByOffset.createStruct(LatestByOffset.STRUCT_INTEGER, 123); + Struct agg2 = LatestByOffset.createStruct(LatestByOffset.STRUCT_INTEGER, 321); + + // When: + Struct merged1 = udaf.merge(agg1, agg2); + Struct merged2 = udaf.merge(agg2, agg1); + + // Then: + assertThat(agg1.getInt64(LatestByOffset.SEQ_FIELD), is(Long.MAX_VALUE)); + assertThat(agg2.getInt64(LatestByOffset.SEQ_FIELD), is(Long.MIN_VALUE)); + assertThat(merged1, is(agg2)); + assertThat(merged2, is(agg2)); + } + + + @Test + public void shouldComputeLatestLong() { + // Given: + final Udaf udaf = LatestByOffset.latestLong(); + + // When: + Struct res = udaf + .aggregate(123L, LatestByOffset.createStruct(LatestByOffset.STRUCT_LONG, 321L)); + + // Then: + assertThat(res.getInt64(LatestByOffset.VAL_FIELD), is(123L)); + } + + @Test + public void shouldComputeLatestDouble() { + // Given: + final Udaf udaf = LatestByOffset.latestDouble(); + + // When: + Struct res = udaf + .aggregate(1.1d, LatestByOffset.createStruct(LatestByOffset.STRUCT_DOUBLE, 2.2d)); + + // Then: + assertThat(res.getFloat64(LatestByOffset.VAL_FIELD), is(1.1d)); + } + + @Test + public void shouldComputeLatestBoolean() { + // Given: + final Udaf udaf = LatestByOffset.latestBoolean(); + + // When: + Struct res = udaf + .aggregate(true, LatestByOffset.createStruct(LatestByOffset.STRUCT_BOOLEAN, false)); + + // Then: + assertThat(res.getBoolean(LatestByOffset.VAL_FIELD), is(true)); + } + + @Test + public void shouldComputeLatestString() { + // Given: + final Udaf udaf = LatestByOffset.latestString(); + + // When: + Struct res = udaf + .aggregate("foo", LatestByOffset.createStruct(LatestByOffset.STRUCT_STRING, "bar")); + + // Then: + assertThat(res.getString(LatestByOffset.VAL_FIELD), is("foo")); + } + +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/latest-offset-udaf.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/latest-offset-udaf.json new file mode 100644 index 000000000000..82d42d7b2ac6 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/latest-offset-udaf.json @@ -0,0 +1,41 @@ +{ + "comments": [ + "Tests covering the use of the LATEST_BY_OFFSET aggregate function" + ], + "tests": [ + { + "name": "latest by offset", + "statements": [ + "CREATE STREAM INPUT (ROWKEY BIGINT KEY, ID BIGINT, F0 INT, F1 BIGINT, F2 DOUBLE, F3 BOOLEAN, F4 STRING) WITH (kafka_topic='test_topic', value_format='JSON', key='ID');", + "CREATE TABLE OUTPUT AS SELECT ID, LATEST_BY_OFFSET(F0) AS L0, LATEST_BY_OFFSET(F1) AS L1, LATEST_BY_OFFSET(F2) AS L2, LATEST_BY_OFFSET(F3) AS L3, LATEST_BY_OFFSET(F4) AS L4 FROM INPUT GROUP BY ID;" + ], + "inputs": [ + {"topic": "test_topic", "key": 0, "value": {"ID": 0, "F0": 12, "F1": 1000, "F2": 1.23, "F3": true, "F4": "foo"}}, + {"topic": "test_topic", "key": 1, "value": {"ID": 1, "F0": 12, "F1": 1000, "F2": 1.23, "F3": true, "F4": "foo"}}, + {"topic": "test_topic", "key": 0, "value": {"ID": 0, "F0": 21, "F1": 2000, "F2": 2.23, "F3": false, "F4": "bar"}}, + {"topic": "test_topic", "key": 1, "value": {"ID": 1, "F0": 21, "F1": 2000, "F2": 2.23, "F3": false, "F4": "bar"}} + ], + "outputs": [ + {"topic": "OUTPUT", "key": 0, "value": {"ID": 0, "L0": 12, "L1": 1000, "L2": 1.23, "L3": true, "L4": "foo"}}, + {"topic": "OUTPUT", "key": 1, "value": {"ID": 1, "L0": 12, "L1": 1000, "L2": 1.23, "L3": true, "L4": "foo"}}, + {"topic": "OUTPUT", "key": 0, "value": {"ID": 0, "L0": 21, "L1": 2000, "L2": 2.23, "L3": false, "L4": "bar"}}, + {"topic": "OUTPUT", "key": 1, "value": {"ID": 1, "L0": 21, "L1": 2000, "L2": 2.23, "L3": false, "L4": "bar"}} + ] + }, + { + "name": "latest by offset with nulls", + "statements": [ + "CREATE STREAM INPUT (ROWKEY BIGINT KEY, ID BIGINT, F0 INT) WITH (kafka_topic='test_topic', value_format='JSON', key='ID');", + "CREATE TABLE OUTPUT AS SELECT ID, LATEST_BY_OFFSET(F0) AS L0 FROM INPUT GROUP BY ID;" + ], + "inputs": [ + {"topic": "test_topic", "key": 0, "value": {"ID": 0, "F0": 12}}, + {"topic": "test_topic", "key": 0, "value": {"ID": 0, "F0": null}} + ], + "outputs": [ + {"topic": "OUTPUT", "key": 0, "value": {"ID": 0, "L0": 12}}, + {"topic": "OUTPUT", "key": 0, "value": {"ID": 0, "L0": 12}} + ] + } + ] +}