Skip to content

Commit

Permalink
feat: implement earliest_by_offset() UDAF
Browse files Browse the repository at this point in the history
  • Loading branch information
spena committed May 11, 2020
1 parent 1f0ca3e commit 8799c43
Show file tree
Hide file tree
Showing 14 changed files with 1,268 additions and 1 deletion.
10 changes: 10 additions & 0 deletions docs/developer-guide/ksqldb-reference/aggregate-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,16 @@ Returns the _approximate_ number of unique values of `col1` in a group.
The function implementation uses [HyperLogLog](https://en.wikipedia.org/wiki/HyperLogLog)
to estimate cardinalities of 10^9 with a typical standard error of 2%.

EARLIEST_BY_OFFSET
------------------

`EARLIEST_BY_OFFSET(col1)`

Stream

Return the earliest value for a given column. Earliest here is defined as the value in the partition
with the lowest offset. Rows that have `col1` set to null are ignored.


HISTOGRAM
---------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udaf.earliest;

import io.confluent.ksql.function.udaf.Udaf;
import io.confluent.ksql.function.udaf.UdafDescription;
import io.confluent.ksql.function.udaf.UdafFactory;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;

@UdafDescription(
name = "EARLIEST_BY_OFFSET",
description = EarliestByOffset.DESCRIPTION
)
public final class EarliestByOffset {
static final String DESCRIPTION =
"This function returns the oldest value for the column, computed by offset.";

private EarliestByOffset() {
}

static final String SEQ_FIELD = "SEQ";
static final String VAL_FIELD = "VAL";

public static final Schema STRUCT_INTEGER = SchemaBuilder.struct().optional()
.field(SEQ_FIELD, Schema.OPTIONAL_INT64_SCHEMA)
.field(VAL_FIELD, Schema.OPTIONAL_INT32_SCHEMA)
.build();

public static final Schema STRUCT_LONG = SchemaBuilder.struct().optional()
.field(SEQ_FIELD, Schema.OPTIONAL_INT64_SCHEMA)
.field(VAL_FIELD, Schema.OPTIONAL_INT64_SCHEMA)
.build();

public static final Schema STRUCT_DOUBLE = SchemaBuilder.struct().optional()
.field(SEQ_FIELD, Schema.OPTIONAL_INT64_SCHEMA)
.field(VAL_FIELD, Schema.OPTIONAL_FLOAT64_SCHEMA)
.build();

public static final Schema STRUCT_BOOLEAN = SchemaBuilder.struct().optional()
.field(SEQ_FIELD, Schema.OPTIONAL_INT64_SCHEMA)
.field(VAL_FIELD, Schema.OPTIONAL_BOOLEAN_SCHEMA)
.build();

public static final Schema STRUCT_STRING = SchemaBuilder.struct().optional()
.field(SEQ_FIELD, Schema.OPTIONAL_INT64_SCHEMA)
.field(VAL_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.build();

static AtomicLong sequence = new AtomicLong();

@UdafFactory(description = "return the earliest value of an integer column",
aggregateSchema = "STRUCT<SEQ BIGINT, VAL INT>")
public static Udaf<Integer, Struct, Integer> earliestInteger() {
return earliest(STRUCT_INTEGER);
}

@UdafFactory(description = "return the earliest value of an big integer column",
aggregateSchema = "STRUCT<SEQ BIGINT, VAL BIGINT>")
public static Udaf<Long, Struct, Long> earliestLong() {
return earliest(STRUCT_LONG);
}

@UdafFactory(description = "return the earliest value of a double column",
aggregateSchema = "STRUCT<SEQ BIGINT, VAL DOUBLE>")
public static Udaf<Double, Struct, Double> earliestDouble() {
return earliest(STRUCT_DOUBLE);
}

@UdafFactory(description = "return the earliest value of a boolean column",
aggregateSchema = "STRUCT<SEQ BIGINT, VAL BOOLEAN>")
public static Udaf<Boolean, Struct, Boolean> earliestBoolean() {
return earliest(STRUCT_BOOLEAN);
}

@UdafFactory(description = "return the earliest value of a string column",
aggregateSchema = "STRUCT<SEQ BIGINT, VAL STRING>")
public static Udaf<String, Struct, String> earliestString() {
return earliest(STRUCT_STRING);
}

static <T> Struct createStruct(final Schema schema, final T val) {
final Struct struct = new Struct(schema);
struct.put(SEQ_FIELD, generateSequence());
struct.put(VAL_FIELD, val);
return struct;
}

private static long generateSequence() {
return sequence.getAndIncrement();
}

private static int compareStructs(final Struct struct1, final Struct struct2) {
// Deal with overflow - we assume if one is positive and the other negative then the sequence
// has overflowed - in which case the latest is the one with the smallest sequence
final long sequence1 = struct1.getInt64(SEQ_FIELD);
final long sequence2 = struct2.getInt64(SEQ_FIELD);
if (sequence1 < 0 && sequence2 >= 0) {
return 1;
} else if (sequence2 < 0 && sequence1 >= 0) {
return -1;
} else {
return Long.compare(sequence1, sequence2);
}
}

@UdafFactory(description = "Earliest by offset")
static <T> Udaf<T, Struct, T> earliest(final Schema structSchema) {
return new Udaf<T, Struct, T>() {

@Override
public Struct initialize() {
return createStruct(structSchema, null);
}

@Override
public Struct aggregate(final T current, final Struct aggregate) {
if (current == null || aggregate.get(VAL_FIELD) != null) {
return aggregate;
} else {
return createStruct(structSchema, current);
}
}

@Override
public Struct merge(final Struct aggOne, final Struct aggTwo) {
// When merging we need some way of evaluating the "earliest' one.
// We do this by keeping track of the sequence of when it was originally processed
if (compareStructs(aggOne, aggTwo) < 0) {
return aggOne;
} else {
return aggTwo;
}
}

@Override
@SuppressWarnings("unchecked")
public T map(final Struct agg) {
return (T) agg.get(VAL_FIELD);
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udaf.earliest;

import io.confluent.ksql.function.udaf.Udaf;
import org.apache.kafka.connect.data.Struct;
import org.junit.Test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;

public class EarliestByOffsetUdafTest {
@Test
public void shouldInitialize() {
// Given:
final Udaf<Integer, Struct, Integer> udaf = EarliestByOffset
.earliest(EarliestByOffset.STRUCT_LONG);

// When:
Struct init = udaf.initialize();

// Then:
assertThat(init, is(notNullValue()));
}

@Test
public void shouldComputeEarliestInteger() {
// Given:
final Udaf<Integer, Struct, Integer> udaf = EarliestByOffset.earliestInteger();

// When:
Struct res = udaf
.aggregate(123, EarliestByOffset.createStruct(EarliestByOffset.STRUCT_INTEGER, 321));

// Then:
assertThat(res.get(EarliestByOffset.VAL_FIELD), is(321));
}

@Test
public void shouldMerge() {
// Given:
final Udaf<Integer, Struct, Integer> udaf = EarliestByOffset.earliestInteger();

Struct agg1 = EarliestByOffset.createStruct(EarliestByOffset.STRUCT_INTEGER, 123);
Struct agg2 = EarliestByOffset.createStruct(EarliestByOffset.STRUCT_INTEGER, 321);

// When:
Struct merged1 = udaf.merge(agg1, agg2);
Struct merged2 = udaf.merge(agg2, agg1);

// Then:
assertThat(merged1, is(agg1));
assertThat(merged2, is(agg1));
}

@Test
public void shouldMergeWithOverflow() {
// Given:
final Udaf<Integer, Struct, Integer> udaf = EarliestByOffset.earliestInteger();

EarliestByOffset.sequence.set(Long.MAX_VALUE);

Struct agg1 = EarliestByOffset.createStruct(EarliestByOffset.STRUCT_INTEGER, 123);
Struct agg2 = EarliestByOffset.createStruct(EarliestByOffset.STRUCT_INTEGER, 321);

// When:
Struct merged1 = udaf.merge(agg1, agg2);
Struct merged2 = udaf.merge(agg2, agg1);

// Then:
assertThat(agg1.getInt64(EarliestByOffset.SEQ_FIELD), is(Long.MAX_VALUE));
assertThat(agg2.getInt64(EarliestByOffset.SEQ_FIELD), is(Long.MIN_VALUE));
assertThat(merged1, is(agg1));
assertThat(merged2, is(agg1));
}


@Test
public void shouldComputeEarliestLong() {
// Given:
final Udaf<Long, Struct, Long> udaf = EarliestByOffset.earliestLong();

// When:
Struct res = udaf
.aggregate(123L, EarliestByOffset.createStruct(EarliestByOffset.STRUCT_LONG, 321L));

// Then:
assertThat(res.getInt64(EarliestByOffset.VAL_FIELD), is(321L));
}

@Test
public void shouldComputeEarliestDouble() {
// Given:
final Udaf<Double, Struct, Double> udaf = EarliestByOffset.earliestDouble();

// When:
Struct res = udaf
.aggregate(1.1d, EarliestByOffset.createStruct(EarliestByOffset.STRUCT_DOUBLE, 2.2d));

// Then:
assertThat(res.getFloat64(EarliestByOffset.VAL_FIELD), is(2.2d));
}

@Test
public void shouldComputeEarliestBoolean() {
// Given:
final Udaf<Boolean, Struct, Boolean> udaf = EarliestByOffset.earliestBoolean();

// When:
Struct res = udaf
.aggregate(true, EarliestByOffset.createStruct(EarliestByOffset.STRUCT_BOOLEAN, false));

// Then:
assertThat(res.getBoolean(EarliestByOffset.VAL_FIELD), is(false));
}

@Test
public void shouldComputeEarliestString() {
// Given:
final Udaf<String, Struct, String> udaf = EarliestByOffset.earliestString();

// When:
Struct res = udaf
.aggregate("foo", EarliestByOffset.createStruct(EarliestByOffset.STRUCT_STRING, "bar"));

// Then:
assertThat(res.getString(EarliestByOffset.VAL_FIELD), is("bar"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ public class PlannedTestGeneratorTest {
* or changed query plans.
*/
@Test
@Ignore
public void manuallyGeneratePlans() {
PlannedTestGenerator.generatePlans(QueryTranslationTest.findTestCases()
.filter(PlannedTestUtils::isNotExcluded));
Expand Down
Loading

0 comments on commit 8799c43

Please sign in to comment.