Skip to content

Commit

Permalink
create latest_by_offset() udaf
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim Fox committed Mar 16, 2020
1 parent 395626c commit c287561
Show file tree
Hide file tree
Showing 2 changed files with 191 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udaf.latest;

import io.confluent.ksql.function.udaf.Udaf;
import io.confluent.ksql.function.udaf.UdafDescription;
import io.confluent.ksql.function.udaf.UdafFactory;
import java.util.concurrent.atomic.AtomicLong;
import org.jetbrains.annotations.NotNull;

@UdafDescription(
name = "LATEST_BY_OFFSET",
description = LatestOffset.DESCRIPTION
)
public final class LatestOffset {

static final String DESCRIPTION =
"This function returns the latest value for that key, based on offset";

private LatestOffset() {
}

@UdafFactory(description = "Latest by offset")
public static <T> Udaf<T, ValueWithSequence<T>, T> latest() {
return new Udaf<T, ValueWithSequence<T>, T>() {

@Override
public ValueWithSequence<T> initialize() {
return null;
}

@Override
public ValueWithSequence<T> aggregate(final T current,
final ValueWithSequence<T> aggregate) {
if (current == null) {
return aggregate;
} else {
return new ValueWithSequence<>(current);
}
}

@Override
public ValueWithSequence<T> merge(final ValueWithSequence<T> aggOne,
final ValueWithSequence<T> aggTwo) {
// When merging we need some way of evaluating the "latest' one.
// We do this by keeping track of the sequence when it was originally processed
if (aggOne.compareTo(aggTwo) >= 0) {
return aggOne;
} else {
return aggTwo;
}
}

@Override
public T map(final ValueWithSequence<T> agg) {
return agg.value;
}
};
}

static final class ValueWithSequence<T> implements Comparable<ValueWithSequence<T>> {

static final AtomicLong seq = new AtomicLong();

final long sequence;
final T value;

ValueWithSequence(final T value) {
this.sequence = seq.getAndIncrement();
this.value = value;
}

@Override
public int compareTo(@NotNull final ValueWithSequence<T> other) {
// Deal with overflow - we assume if one is positive and the other negative then the sequence
// has overflowed - in which case the latest is the one with the smallest sequence
if (sequence < 0 && other.sequence >= 0) {
return 1;
} else if (other.sequence < 0 && sequence >= 0) {
return -1;
} else {
return Long.compare(sequence, other.sequence);
}
}
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udaf.latest;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;

import io.confluent.ksql.function.udaf.Udaf;
import io.confluent.ksql.function.udaf.latest.LatestOffset.ValueWithSequence;
import org.junit.Test;

public class LatestOffsetUdafTest {

@Test
public void shouldInitializeToNull() {
// Given:
final Udaf<Integer, ValueWithSequence<Integer>, Integer> udaf = LatestOffset.latest();

// When:
ValueWithSequence<Integer> init = udaf.initialize();

// Then:
assertThat(init, is(nullValue()));
}

@Test
public void shouldComputeLatest() {
// Given:
final Udaf<Integer, ValueWithSequence<Integer>, Integer> udaf = LatestOffset.latest();

// When:
ValueWithSequence<Integer> res = udaf.aggregate(123, new ValueWithSequence<>(321));

// Then:
assertThat(res.value, is(123));
}

@Test
public void shouldMerge() {
// Given:
final Udaf<Integer, ValueWithSequence<Integer>, Integer> udaf = LatestOffset.latest();

ValueWithSequence<Integer> agg1 = new ValueWithSequence<>(123);
ValueWithSequence<Integer> agg2 = new ValueWithSequence<>(321);

// When:
ValueWithSequence<Integer> merged1 = udaf.merge(agg1, agg2);
ValueWithSequence<Integer> merged2 = udaf.merge(agg2, agg1);

// Then:
assertThat(merged1, is(agg2));
assertThat(merged2, is(agg2));
}

@Test
public void shouldMergeWithOverflow() {
// Given:
final Udaf<Integer, ValueWithSequence<Integer>, Integer> udaf = LatestOffset.latest();

ValueWithSequence.seq.set(Long.MAX_VALUE);

ValueWithSequence<Integer> agg1 = new ValueWithSequence<>(123);
ValueWithSequence<Integer> agg2 = new ValueWithSequence<>(321);

// When:
ValueWithSequence<Integer> merged1 = udaf.merge(agg1, agg2);
ValueWithSequence<Integer> merged2 = udaf.merge(agg2, agg1);

// Then:
assertThat(agg1.sequence, is(Long.MAX_VALUE));
assertThat(agg2.sequence, is(Long.MIN_VALUE));
assertThat(merged1, is(agg2));
assertThat(merged2, is(agg2));
}

}

0 comments on commit c287561

Please sign in to comment.