Skip to content

Commit

Permalink
feat: add LEAST and GREATEST UDFs (#7683)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sullivan-Patrick authored Jul 1, 2021
1 parent 3c68710 commit 0d84733
Show file tree
Hide file tree
Showing 49 changed files with 4,528 additions and 0 deletions.
24 changes: 24 additions & 0 deletions docs/developer-guide/ksqldb-reference/scalar-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,30 @@ The great-circle distance between two lat-long points, both specified
in decimal degrees. An optional final parameter specifies `KM`
(the default) or `miles`.

### `GREATEST`

Since: 0.20.0

```sql
GREATEST(col1, col2...)
```

The highest non-null value among a variable number of comparable columns.
If comparing columns of different numerical types, use [CAST](#cast) to first
cast them to be of the same type.

### `LEAST`

Since: 0.20.0

```sql
LEAST(col1, col2...)
```

The highest non-null value among a variable number of comparable columns.
If comparing columns of different numerical types, use [CAST](#cast) to first
cast them to be of the same type.

### `LN`

Since: 0.6.0
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.math;

import com.google.common.collect.Streams;
import io.confluent.ksql.function.FunctionCategory;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.function.udf.UdfSchemaProvider;
import io.confluent.ksql.schema.ksql.SqlArgument;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.util.DecimalUtil;
import io.confluent.ksql.util.KsqlConstants;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.stream.Stream;

@UdfDescription(
name = "greatest",
category = FunctionCategory.MATHEMATICAL,
description = "Returns the highest non-null value among a"
+ " variable number of comparable columns.",
author = KsqlConstants.CONFLUENT_AUTHOR
)
public class Greatest {

@Udf
public Integer greatest(@UdfParameter final Integer val, @UdfParameter final Integer... vals) {

return Stream.concat(Stream.of(val), Arrays.stream(vals))
.filter(Objects::nonNull)
.max(Integer::compareTo)
.orElse(null);
}

@Udf
public Long greatest(@UdfParameter final Long val, @UdfParameter final Long... vals) {

return Stream.concat(Stream.of(val), Arrays.stream(vals))
.filter(Objects::nonNull)
.max(Long::compareTo)
.orElse(null);
}

@Udf
public Double greatest(@UdfParameter final Double val, @UdfParameter final Double... vals) {

return Streams.concat(Stream.of(val), Arrays.stream(vals))
.filter(Objects::nonNull)
.max(Double::compareTo)
.orElse(null);
}

@Udf
public String greatest(@UdfParameter final String val, @UdfParameter final String... vals) {

return Streams.concat(Stream.of(val), Arrays.stream(vals))
.filter(Objects::nonNull)
.max(String::compareTo)
.orElse(null);
}

@Udf(schemaProvider = "greatestDecimalProvider")
public BigDecimal greatest(@UdfParameter final BigDecimal val,
@UdfParameter final BigDecimal... vals) {

return Streams.concat(Stream.of(val), Arrays.stream(vals))
.filter(Objects::nonNull)
.max(Comparator.naturalOrder())
.orElse(null);
}

@UdfSchemaProvider
public SqlType greatestDecimalProvider(final List<SqlArgument> params) {

return params.stream()
.filter(s -> s.getSqlType().isPresent())
.map(SqlArgument::getSqlTypeOrThrow)
.reduce(DecimalUtil::widen)
.orElse(null);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.math;

import com.google.common.collect.Streams;
import io.confluent.ksql.function.FunctionCategory;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.function.udf.UdfSchemaProvider;
import io.confluent.ksql.schema.ksql.SqlArgument;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.util.DecimalUtil;
import io.confluent.ksql.util.KsqlConstants;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.stream.Stream;

@UdfDescription(
name = "least",
category = FunctionCategory.MATHEMATICAL,
description = "Returns the lowest non-null value among a variable number of comparable columns",
author = KsqlConstants.CONFLUENT_AUTHOR
)
public class Least {

@Udf
public Integer least(@UdfParameter final Integer val, @UdfParameter final Integer... vals) {

return Stream.concat(Stream.of(val), Arrays.stream(vals))
.filter(Objects::nonNull)
.min(Integer::compareTo)
.orElse(null);
}

@Udf
public Long least(@UdfParameter final Long val, @UdfParameter final Long... vals) {

return Stream.concat(Stream.of(val), Arrays.stream(vals))
.filter(Objects::nonNull)
.min(Long::compareTo)
.orElse(null);
}

@Udf
public Double least(@UdfParameter final Double val, @UdfParameter final Double... vals) {

return Streams.concat(Stream.of(val), Arrays.stream(vals))
.filter(Objects::nonNull)
.min(Double::compareTo)
.orElse(null);
}

@Udf
public String least(@UdfParameter final String val, @UdfParameter final String... vals) {

return Streams.concat(Stream.of(val), Arrays.stream(vals))
.filter(Objects::nonNull)
.min(String::compareTo)
.orElse(null);
}

@Udf(schemaProvider = "leastDecimalProvider")
public BigDecimal least(@UdfParameter final BigDecimal val,
@UdfParameter final BigDecimal... vals) {

return Streams.concat(Stream.of(val), Arrays.stream(vals))
.filter(Objects::nonNull)
.min(Comparator.naturalOrder())
.orElse(null);
}

@UdfSchemaProvider
public SqlType leastDecimalProvider(final List<SqlArgument> params) {

return params.stream()
.filter(s -> s.getSqlType().isPresent())
.map(SqlArgument::getSqlTypeOrThrow)
.reduce(DecimalUtil::widen)
.orElse(null);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.confluent.ksql.function.udf.math;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;

import java.math.BigDecimal;
import org.junit.Before;
import org.junit.Test;

public class GreatestTest {

private Greatest udf;

@Before
public void setUp() {
udf = new Greatest();
}

@Test
public void shouldWorkWithoutImplicitCasting(){
assertThat(udf.greatest(0, 1, -1, 2, -2), is(2));
assertThat(udf.greatest(0L, 1L, -1L, 2L, -2L), is(2L));
assertThat(udf.greatest(0D, .1, -.1, .2, -.2), is(.2));
assertThat(udf.greatest(new BigDecimal("0"), new BigDecimal(".1"), new BigDecimal("-.1"), new BigDecimal(".2"), new BigDecimal("-.2")), is(new BigDecimal(".2")));
assertThat(udf.greatest("apple", "banana", "bzzz"), is("bzzz"));
}

@Test
public void shouldHandleAllNullColumns(){
assertThat(udf.greatest((Integer) null, null, null), is(nullValue()));
assertThat(udf.greatest((Double) null, null, null), is(nullValue()));
assertThat(udf.greatest((Long) null, null, null), is(nullValue()));
assertThat(udf.greatest((BigDecimal) null, null, null), is(nullValue()));
assertThat(udf.greatest((String) null, null, null), is(nullValue()));
}

@Test
public void shouldHandleSomeNullColumns(){
assertThat(udf.greatest(null, 27, null, 39, -49, -11, 68, 32, null, 101), is(101));
assertThat(udf.greatest(null, null, 39D, -49.01, -11.98, 68.1, .32, null, 101D), is(101D));
assertThat(udf.greatest(null, 272038202439L, null, 39L, -4923740932490L, -11L, 68L, 32L, null, 101L), is(272038202439L));
assertThat(udf.greatest(null, new BigDecimal("27"), null, new BigDecimal("-49")), is(new BigDecimal("27")));
assertThat(udf.greatest(null, "apple", null, "banana", "kumquat", "aardvark", null), is("kumquat"));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.confluent.ksql.function.udf.math;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;

import java.math.BigDecimal;
import org.junit.Before;
import org.junit.Test;

public class LeastTest {

private Least udf;

@Before
public void setUp() {
udf = new Least();
}

@Test
public void shouldWorkWithoutImplicitCasting() {
assertThat(udf.least(0, 1, -1, 2, -2), is(-2));
assertThat(udf.least(0L, 1L, -1L, 2L, -2L), is(-2L));
assertThat(udf.least(0D, .1, -.1, .2, -.2), is(-.2));
assertThat(udf.least(new BigDecimal("0"), new BigDecimal(".1"), new BigDecimal("-.1"), new BigDecimal(".2"), new BigDecimal("-.2")), is(new BigDecimal("-.2")));
assertThat(udf.least("apple", "banana", "aardvark"), is("aardvark"));
}

@Test
public void shouldHandleAllNullColumns() {
assertThat(udf.least((Integer) null, null, null), is(nullValue()));
assertThat(udf.least((Double) null, null, null), is(nullValue()));
assertThat(udf.least((Long) null, null, null), is(nullValue()));
assertThat(udf.least((BigDecimal) null, null, null), is(nullValue()));
assertThat(udf.least((String) null, null, null), is(nullValue()));
}

@Test
public void shouldHandleSomeNullColumns() {
assertThat(udf.least(null, 27, null, 39, -49, -11, 68, 32, null, 101), is(-49));
assertThat(udf.least(null, null, 39d, -49.01, -11.98, 68.1, .32, null, 101d), is(-49.01));
assertThat(udf.least(null, 272038202439L, null, 39L, -4923740932490L, -11L, 68L, 32L, null, 101L), is(-4923740932490L));
assertThat(udf.least(null, new BigDecimal("27"), null, new BigDecimal("-49")), is(new BigDecimal("-49")));
assertThat(udf.least(null, "apple", null, "banana", "kumquat", "aardvark", null), is("aardvark"));
}

}
Loading

0 comments on commit 0d84733

Please sign in to comment.