diff --git a/ksql-common/src/main/java/io/confluent/ksql/function/AggregateFunctionFactory.java b/ksql-common/src/main/java/io/confluent/ksql/function/AggregateFunctionFactory.java index 03f05dc1978a..374fadb52ed6 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/function/AggregateFunctionFactory.java +++ b/ksql-common/src/main/java/io/confluent/ksql/function/AggregateFunctionFactory.java @@ -44,7 +44,7 @@ public AggregateFunctionFactory(final String functionName) { "", KsqlConstants.CONFLUENT_AUTHOR, "", - KsqlFunction.INTERNAL_PATH, + KsqlScalarFunction.INTERNAL_PATH, false )); } diff --git a/ksql-common/src/main/java/io/confluent/ksql/function/KsqlFunction.java b/ksql-common/src/main/java/io/confluent/ksql/function/KsqlFunction.java index 42a4ec21b632..32279da251d3 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/function/KsqlFunction.java +++ b/ksql-common/src/main/java/io/confluent/ksql/function/KsqlFunction.java @@ -1,5 +1,5 @@ /* - * Copyright 2018 Confluent Inc. + * Copyright 2019 Confluent Inc. * * Licensed under the Confluent Community License (the "License"); you may not use * this file except in compliance with the License. You may obtain a copy of the @@ -15,105 +15,149 @@ package io.confluent.ksql.function; -import io.confluent.ksql.function.udf.Kudf; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; import io.confluent.ksql.name.FunctionName; -import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.schema.ksql.FormatOptions; +import io.confluent.ksql.schema.ksql.SchemaConverters; import io.confluent.ksql.util.KsqlException; +import io.confluent.ksql.util.SchemaUtil; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.function.Function; +import java.util.stream.Collectors; import javax.annotation.concurrent.Immutable; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Schema.Type; +import org.apache.kafka.connect.data.SchemaBuilder; @Immutable -public final class KsqlFunction extends KsqlFunctionBase { +public class KsqlFunction implements FunctionSignature { - static final String INTERNAL_PATH = "internal"; + private final Function, Schema> returnSchemaProvider; + private final Schema javaReturnType; + private final List parameters; + private final FunctionName functionName; + private final String description; + private final String pathLoadedFrom; + private final boolean isVariadic; - private final Class kudfClass; - private final Function udfFactory; - - private KsqlFunction( - final Function,Schema> returnSchemaProvider, + KsqlFunction( + final Function, Schema> returnSchemaProvider, final Schema javaReturnType, final List arguments, final FunctionName functionName, - final Class kudfClass, - final Function udfFactory, final String description, final String pathLoadedFrom, - final boolean isVariadic) { - super(returnSchemaProvider, javaReturnType, arguments, functionName, description, - pathLoadedFrom, isVariadic - ); - this.kudfClass = Objects.requireNonNull(kudfClass, "kudfClass"); - this.udfFactory = Objects.requireNonNull(udfFactory, "udfFactory"); - } - - /** - * Create built in / legacy function. - */ - @SuppressWarnings("deprecation") // Solution only available in Java9. - public static KsqlFunction createLegacyBuiltIn( - final Schema returnType, - final List arguments, - final FunctionName functionName, - final Class kudfClass + final boolean isVariadic ) { - final Function udfFactory = ksqlConfig -> { - try { - return kudfClass.newInstance(); - } catch (final Exception e) { - throw new KsqlException("Failed to create instance of kudfClass " - + kudfClass + " for function " + functionName, e); + + this.returnSchemaProvider = Objects.requireNonNull(returnSchemaProvider, "schemaProvider"); + this.javaReturnType = Objects.requireNonNull(javaReturnType, "javaReturnType"); + this.parameters = ImmutableList.copyOf(Objects.requireNonNull(arguments, "arguments")); + this.functionName = Objects.requireNonNull(functionName, "functionName"); + this.description = Objects.requireNonNull(description, "description"); + this.pathLoadedFrom = Objects.requireNonNull(pathLoadedFrom, "pathLoadedFrom"); + this.isVariadic = isVariadic; + + if (arguments.stream().anyMatch(Objects::isNull)) { + throw new IllegalArgumentException("KSQL Function can't have null argument types"); + } + if (isVariadic) { + if (arguments.isEmpty()) { + throw new IllegalArgumentException( + "KSQL variadic functions must have at least one parameter"); + } + if (!Iterables.getLast(arguments).type().equals(Type.ARRAY)) { + throw new IllegalArgumentException( + "KSQL variadic functions must have ARRAY type as their last parameter"); } - }; + } + } + + public Schema getReturnType(final List arguments) { + + final Schema returnType = returnSchemaProvider.apply(arguments); + + if (returnType == null) { + throw new KsqlException(String.format("Return type of UDF %s cannot be null.", functionName)); + } + + if (!returnType.isOptional()) { + throw new IllegalArgumentException("KSQL only supports optional field types"); + } + + if (!GenericsUtil.hasGenerics(returnType)) { + checkMatchingReturnTypes(returnType, javaReturnType); + return returnType; + } + + final Map genericMapping = new HashMap<>(); + for (int i = 0; i < Math.min(parameters.size(), arguments.size()); i++) { + final Schema schema = parameters.get(i); + + // we resolve any variadic as if it were an array so that the type + // structure matches the input type + final Schema instance = isVariadic && i == parameters.size() - 1 + ? SchemaBuilder.array(arguments.get(i)).build() + : arguments.get(i); + + genericMapping.putAll(GenericsUtil.resolveGenerics(schema, instance)); + } - return create( - ignored -> returnType, returnType, arguments, functionName, kudfClass, udfFactory, "", - INTERNAL_PATH, false); + final Schema genericSchema = GenericsUtil.applyResolved(returnType, genericMapping); + final Schema genericJavaSchema = GenericsUtil.applyResolved(javaReturnType, genericMapping); + checkMatchingReturnTypes(genericSchema, genericJavaSchema); + + return genericSchema; } - public Class getKudfClass() { - return kudfClass; + private void checkMatchingReturnTypes(final Schema s1, final Schema s2) { + if (!SchemaUtil.areCompatible(s1, s2)) { + throw new KsqlException(String.format( + "Return type %s of UDF %s does not match the declared " + + "return type %s.", + SchemaConverters.connectToSqlConverter().toSqlType( + s1).toString(), + functionName.toString(FormatOptions.noEscape()), + SchemaConverters.connectToSqlConverter().toSqlType( + s2).toString() + )); + } } - /** - * Create udf. - * - *

Can be either built-in UDF or true user-supplied. - */ - static KsqlFunction create( - final Function,Schema> schemaProvider, - final Schema javaReturnType, - final List arguments, - final FunctionName functionName, - final Class kudfClass, - final Function udfFactory, - final String description, - final String pathLoadedFrom, - final boolean isVariadic - ) { - return new KsqlFunction( - schemaProvider, - javaReturnType, - arguments, - functionName, - kudfClass, - udfFactory, - description, - pathLoadedFrom, - isVariadic); + public List getArguments() { + return parameters; + } + + public FunctionName getFunctionName() { + return functionName; + } + + public String getDescription() { + return description; + } + + public String getPathLoadedFrom() { + return pathLoadedFrom; + } + + public boolean isVariadic() { + return isVariadic; } @Override public String toString() { return "KsqlFunction{" - + ", kudfClass=" + kudfClass + + "returnType=" + javaReturnType + + ", arguments=" + parameters.stream().map(Schema::type).collect(Collectors.toList()) + + ", functionName='" + functionName + '\'' + + ", description='" + description + "'" + + ", pathLoadedFrom='" + pathLoadedFrom + "'" + + ", isVariadic=" + isVariadic + '}'; } - public Kudf newInstance(final KsqlConfig ksqlConfig) { - return udfFactory.apply(ksqlConfig); - } } \ No newline at end of file diff --git a/ksql-common/src/main/java/io/confluent/ksql/function/KsqlFunctionBase.java b/ksql-common/src/main/java/io/confluent/ksql/function/KsqlFunctionBase.java deleted file mode 100644 index 842f29d60c9b..000000000000 --- a/ksql-common/src/main/java/io/confluent/ksql/function/KsqlFunctionBase.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Copyright 2019 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.function; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import io.confluent.ksql.name.FunctionName; -import io.confluent.ksql.schema.ksql.FormatOptions; -import io.confluent.ksql.schema.ksql.SchemaConverters; -import io.confluent.ksql.util.KsqlException; -import io.confluent.ksql.util.SchemaUtil; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.function.Function; -import java.util.stream.Collectors; -import javax.annotation.concurrent.Immutable; -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.Schema.Type; -import org.apache.kafka.connect.data.SchemaBuilder; - -@Immutable -public class KsqlFunctionBase implements FunctionSignature { - - private final Function, Schema> returnSchemaProvider; - private final Schema javaReturnType; - private final List parameters; - private final FunctionName functionName; - private final String description; - private final String pathLoadedFrom; - private final boolean isVariadic; - - KsqlFunctionBase( - final Function, Schema> returnSchemaProvider, - final Schema javaReturnType, - final List arguments, - final FunctionName functionName, - final String description, - final String pathLoadedFrom, - final boolean isVariadic - ) { - - this.returnSchemaProvider = Objects.requireNonNull(returnSchemaProvider, "schemaProvider"); - this.javaReturnType = Objects.requireNonNull(javaReturnType, "javaReturnType"); - this.parameters = ImmutableList.copyOf(Objects.requireNonNull(arguments, "arguments")); - this.functionName = Objects.requireNonNull(functionName, "functionName"); - this.description = Objects.requireNonNull(description, "description"); - this.pathLoadedFrom = Objects.requireNonNull(pathLoadedFrom, "pathLoadedFrom"); - this.isVariadic = isVariadic; - - if (arguments.stream().anyMatch(Objects::isNull)) { - throw new IllegalArgumentException("KSQL Function can't have null argument types"); - } - if (isVariadic) { - if (arguments.isEmpty()) { - throw new IllegalArgumentException( - "KSQL variadic functions must have at least one parameter"); - } - if (!Iterables.getLast(arguments).type().equals(Type.ARRAY)) { - throw new IllegalArgumentException( - "KSQL variadic functions must have ARRAY type as their last parameter"); - } - } - } - - public Schema getReturnType(final List arguments) { - - final Schema returnType = returnSchemaProvider.apply(arguments); - - if (returnType == null) { - throw new KsqlException(String.format("Return type of UDF %s cannot be null.", functionName)); - } - - if (!returnType.isOptional()) { - throw new IllegalArgumentException("KSQL only supports optional field types"); - } - - if (!GenericsUtil.hasGenerics(returnType)) { - checkMatchingReturnTypes(returnType, javaReturnType); - return returnType; - } - - final Map genericMapping = new HashMap<>(); - for (int i = 0; i < Math.min(parameters.size(), arguments.size()); i++) { - final Schema schema = parameters.get(i); - - // we resolve any variadic as if it were an array so that the type - // structure matches the input type - final Schema instance = isVariadic && i == parameters.size() - 1 - ? SchemaBuilder.array(arguments.get(i)).build() - : arguments.get(i); - - genericMapping.putAll(GenericsUtil.resolveGenerics(schema, instance)); - } - - final Schema genericSchema = GenericsUtil.applyResolved(returnType, genericMapping); - final Schema genericJavaSchema = GenericsUtil.applyResolved(javaReturnType, genericMapping); - checkMatchingReturnTypes(genericSchema, genericJavaSchema); - - return genericSchema; - } - - private void checkMatchingReturnTypes(final Schema s1, final Schema s2) { - if (!SchemaUtil.areCompatible(s1, s2)) { - throw new KsqlException(String.format( - "Return type %s of UDF %s does not match the declared " - + "return type %s.", - SchemaConverters.connectToSqlConverter().toSqlType( - s1).toString(), - functionName.toString(FormatOptions.noEscape()), - SchemaConverters.connectToSqlConverter().toSqlType( - s2).toString() - )); - } - } - - public List getArguments() { - return parameters; - } - - public FunctionName getFunctionName() { - return functionName; - } - - public String getDescription() { - return description; - } - - public String getPathLoadedFrom() { - return pathLoadedFrom; - } - - public boolean isVariadic() { - return isVariadic; - } - - @Override - public String toString() { - return "KsqlFunction{" - + "returnType=" + javaReturnType - + ", arguments=" + parameters.stream().map(Schema::type).collect(Collectors.toList()) - + ", functionName='" + functionName + '\'' - + ", description='" + description + "'" - + ", pathLoadedFrom='" + pathLoadedFrom + "'" - + ", isVariadic=" + isVariadic - + '}'; - } - -} \ No newline at end of file diff --git a/ksql-common/src/main/java/io/confluent/ksql/function/KsqlScalarFunction.java b/ksql-common/src/main/java/io/confluent/ksql/function/KsqlScalarFunction.java new file mode 100644 index 000000000000..712eaf76b6f5 --- /dev/null +++ b/ksql-common/src/main/java/io/confluent/ksql/function/KsqlScalarFunction.java @@ -0,0 +1,122 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function; + +import io.confluent.ksql.function.udf.Kudf; +import io.confluent.ksql.name.FunctionName; +import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.KsqlException; +import java.util.List; +import java.util.Objects; +import java.util.function.Function; +import javax.annotation.concurrent.Immutable; +import org.apache.kafka.connect.data.Schema; + +@Immutable +public final class KsqlScalarFunction extends KsqlFunction { + + static final String INTERNAL_PATH = "internal"; + + private final Class kudfClass; + private final Function udfFactory; + + private KsqlScalarFunction( + final Function, Schema> returnSchemaProvider, + final Schema javaReturnType, + final List arguments, + final FunctionName functionName, + final Class kudfClass, + final Function udfFactory, + final String description, + final String pathLoadedFrom, + final boolean isVariadic + ) { + super(returnSchemaProvider, javaReturnType, arguments, functionName, description, + pathLoadedFrom, isVariadic + ); + this.kudfClass = Objects.requireNonNull(kudfClass, "kudfClass"); + this.udfFactory = Objects.requireNonNull(udfFactory, "udfFactory"); + } + + /** + * Create built in / legacy function. + */ + @SuppressWarnings("deprecation") // Solution only available in Java9. + public static KsqlScalarFunction createLegacyBuiltIn( + final Schema returnType, + final List arguments, + final FunctionName functionName, + final Class kudfClass + ) { + final Function udfFactory = ksqlConfig -> { + try { + return kudfClass.newInstance(); + } catch (final Exception e) { + throw new KsqlException("Failed to create instance of kudfClass " + + kudfClass + " for function " + functionName, e); + } + }; + + return create( + ignored -> returnType, returnType, arguments, functionName, kudfClass, udfFactory, "", + INTERNAL_PATH, false + ); + } + + public Class getKudfClass() { + return kudfClass; + } + + /** + * Create udf. + * + *

Can be either built-in UDF or true user-supplied. + */ + static KsqlScalarFunction create( + final Function, Schema> schemaProvider, + final Schema javaReturnType, + final List arguments, + final FunctionName functionName, + final Class kudfClass, + final Function udfFactory, + final String description, + final String pathLoadedFrom, + final boolean isVariadic + ) { + return new KsqlScalarFunction( + schemaProvider, + javaReturnType, + arguments, + functionName, + kudfClass, + udfFactory, + description, + pathLoadedFrom, + isVariadic + ); + } + + @Override + public String toString() { + return "KsqlFunction{" + + ", kudfClass=" + kudfClass + + '}'; + } + + public Kudf newInstance(final KsqlConfig ksqlConfig) { + return udfFactory.apply(ksqlConfig); + } +} \ No newline at end of file diff --git a/ksql-common/src/main/java/io/confluent/ksql/function/KsqlTableFunction.java b/ksql-common/src/main/java/io/confluent/ksql/function/KsqlTableFunction.java index 143c7dae4494..e3fa76584388 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/function/KsqlTableFunction.java +++ b/ksql-common/src/main/java/io/confluent/ksql/function/KsqlTableFunction.java @@ -16,6 +16,7 @@ package io.confluent.ksql.function; import com.google.errorprone.annotations.Immutable; +import io.confluent.ksql.function.udf.Kudf; import io.confluent.ksql.name.FunctionName; import java.util.List; import java.util.Objects; @@ -27,10 +28,9 @@ * description, and allows the function to be invoked. */ @Immutable -public class KsqlTableFunction extends KsqlFunctionBase { +public class KsqlTableFunction extends KsqlFunction { - private final FunctionInvoker invoker; - private final Object udtf; + private final Kudf udtf; public KsqlTableFunction( final Function, Schema> returnSchemaProvider, @@ -38,17 +38,15 @@ public KsqlTableFunction( final Schema outputType, final List arguments, final String description, - final FunctionInvoker functionInvoker, - final Object udtf + final Kudf udtf ) { super(returnSchemaProvider, outputType, arguments, functionName, description, "", false ); - this.invoker = Objects.requireNonNull(functionInvoker, "functionInvoker"); this.udtf = Objects.requireNonNull(udtf, "udtf"); } public List apply(final Object... args) { - return (List) invoker.eval(udtf, args); + return (List) udtf.evaluate(args); } } diff --git a/ksql-common/src/main/java/io/confluent/ksql/function/MutableFunctionRegistry.java b/ksql-common/src/main/java/io/confluent/ksql/function/MutableFunctionRegistry.java index 440b208f6d55..4c4b795a7dbd 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/function/MutableFunctionRegistry.java +++ b/ksql-common/src/main/java/io/confluent/ksql/function/MutableFunctionRegistry.java @@ -42,7 +42,7 @@ public interface MutableFunctionRegistry extends FunctionRegistry { * @param ksqlFunction the function to register. * @throws KsqlException if a function, (of any type), with the same name exists. */ - void addFunction(KsqlFunction ksqlFunction); + void addFunction(KsqlScalarFunction ksqlFunction); /** * Register an aggregate function factory. diff --git a/ksql-common/src/main/java/io/confluent/ksql/function/UdfFactory.java b/ksql-common/src/main/java/io/confluent/ksql/function/UdfFactory.java index 76fd3c408edb..f212840c893d 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/function/UdfFactory.java +++ b/ksql-common/src/main/java/io/confluent/ksql/function/UdfFactory.java @@ -27,7 +27,7 @@ public class UdfFactory { private final UdfMetadata metadata; private final Class udfClass; - private final UdfIndex udfIndex; + private final UdfIndex udfIndex; UdfFactory(final Class udfClass, final UdfMetadata metadata) { @@ -36,12 +36,12 @@ public class UdfFactory { this.udfIndex = new UdfIndex<>(metadata.getName()); } - synchronized void addFunction(final KsqlFunction ksqlFunction) { + synchronized void addFunction(final KsqlScalarFunction ksqlFunction) { checkCompatible(ksqlFunction); udfIndex.addFunction(ksqlFunction); } - private void checkCompatible(final KsqlFunction ksqlFunction) { + private void checkCompatible(final KsqlScalarFunction ksqlFunction) { if (udfClass != ksqlFunction.getKudfClass()) { throw new KsqlException("Can't add function " + ksqlFunction + " as a function with the same name exists in a different " + udfClass); @@ -61,7 +61,7 @@ public String getName() { return metadata.getName(); } - public synchronized void eachFunction(final Consumer consumer) { + public synchronized void eachFunction(final Consumer consumer) { udfIndex.values().forEach(consumer); } @@ -79,7 +79,7 @@ public String toString() { + '}'; } - public synchronized KsqlFunction getFunction(final List paramTypes) { + public synchronized KsqlScalarFunction getFunction(final List paramTypes) { return udfIndex.getFunction(paramTypes); } } diff --git a/ksql-common/src/test/java/io/confluent/ksql/function/KsqlFunctionTest.java b/ksql-common/src/test/java/io/confluent/ksql/function/KsqlFunctionTest.java index 13ad2632054c..5909bd54b83f 100644 --- a/ksql-common/src/test/java/io/confluent/ksql/function/KsqlFunctionTest.java +++ b/ksql-common/src/test/java/io/confluent/ksql/function/KsqlFunctionTest.java @@ -46,7 +46,7 @@ public class KsqlFunctionTest { @Test public void shouldResolveGenericReturnType() { // Given: - final KsqlFunction function = createFunction( + final KsqlScalarFunction function = createFunction( GenericsUtil.generic("T").build(), ImmutableList.of(GenericsUtil.generic("T").build()) ); @@ -61,7 +61,7 @@ public void shouldResolveGenericReturnType() { @Test public void shouldResolveGenericReturnTypeFromArray() { // Given: - final KsqlFunction function = createFunction( + final KsqlScalarFunction function = createFunction( GenericsUtil.generic("T").build(), ImmutableList.of(GenericsUtil.array("T").build()) ); @@ -77,7 +77,7 @@ public void shouldResolveGenericReturnTypeFromArray() { @Test public void shouldResolveGenericReturnTypeFromSecondArgument() { // Given: - final KsqlFunction function = createFunction( + final KsqlScalarFunction function = createFunction( GenericsUtil.generic("T").build(), ImmutableList.of( GenericsUtil.generic("S").build(), @@ -98,7 +98,7 @@ public void shouldResolveGenericReturnTypeFromSecondArgument() { @Test public void shouldResolveGenericArrayReturnType() { // Given: - final KsqlFunction function = createFunction( + final KsqlScalarFunction function = createFunction( GenericsUtil.array("T").build(), ImmutableList.of(GenericsUtil.generic("T").build()) ); @@ -113,7 +113,7 @@ public void shouldResolveGenericArrayReturnType() { @Test public void shouldResolveGenericFromVariadicArgument() { // Given: - final KsqlFunction function = createFunction( + final KsqlScalarFunction function = createFunction( GenericsUtil.generic("T").build(), ImmutableList.of(GenericsUtil.array("T").build()), true @@ -133,7 +133,7 @@ public void shouldThrowOnNonOptionalReturnType() { expectedException.expectMessage("KSQL only supports optional field types"); // When: - final KsqlFunction function = createFunction(Schema.INT32_SCHEMA, ImmutableList.of()); + final KsqlScalarFunction function = createFunction(Schema.INT32_SCHEMA, ImmutableList.of()); function.getReturnType(ImmutableList.of()); } @@ -146,7 +146,7 @@ public void shouldResolveSchemaProvider() { return decimalSchema; }; - final KsqlFunction udf = KsqlFunction.create( + final KsqlScalarFunction udf = KsqlScalarFunction.create( schemaProviderFunction, decimalSchema, ImmutableList.of(Schema.INT32_SCHEMA), @@ -164,16 +164,16 @@ public void shouldResolveSchemaProvider() { assertThat(returnType, is(decimalSchema)); } - private KsqlFunction createFunction(final Schema returnSchema, final List args) { + private KsqlScalarFunction createFunction(final Schema returnSchema, final List args) { return createFunction(returnSchema, args, false); } - private KsqlFunction createFunction( + private KsqlScalarFunction createFunction( final Schema returnSchema, final List args, final boolean isVariadic ) { - return KsqlFunction.create( + return KsqlScalarFunction.create( ignored -> returnSchema, returnSchema, args, diff --git a/ksql-common/src/test/java/io/confluent/ksql/function/UdfFactoryTest.java b/ksql-common/src/test/java/io/confluent/ksql/function/UdfFactoryTest.java index 5e0f66a8ed90..0a22703f77bb 100644 --- a/ksql-common/src/test/java/io/confluent/ksql/function/UdfFactoryTest.java +++ b/ksql-common/src/test/java/io/confluent/ksql/function/UdfFactoryTest.java @@ -47,7 +47,7 @@ public void shouldThrowIfNoVariantFoundThatAcceptsSuppliedParamTypes() { public void shouldThrowExceptionIfAddingFunctionWithDifferentPath() { expectedException.expect(KafkaException.class); expectedException.expectMessage("as a function with the same name has been loaded from a different jar"); - factory.addFunction(KsqlFunction.create( + factory.addFunction(KsqlScalarFunction.create( ignored -> Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA, Collections.emptyList(), diff --git a/ksql-common/src/test/java/io/confluent/ksql/function/UdfIndexTest.java b/ksql-common/src/test/java/io/confluent/ksql/function/UdfIndexTest.java index ce12afb3282c..34d475e2503d 100644 --- a/ksql-common/src/test/java/io/confluent/ksql/function/UdfIndexTest.java +++ b/ksql-common/src/test/java/io/confluent/ksql/function/UdfIndexTest.java @@ -1,6 +1,6 @@ package io.confluent.ksql.function; -import static io.confluent.ksql.function.KsqlFunction.INTERNAL_PATH; +import static io.confluent.ksql.function.KsqlScalarFunction.INTERNAL_PATH; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -42,11 +42,11 @@ public class UdfIndexTest { private static final FunctionName EXPECTED = FunctionName.of("expected"); private static final FunctionName OTHER = FunctionName.of("other"); - private UdfIndex udfIndex; + private UdfIndex udfIndex; @Before public void setUp() { - udfIndex = new UdfIndex("name"); + udfIndex = new UdfIndex("name"); } @Rule @@ -55,11 +55,11 @@ public void setUp() { @Test public void shouldFindNoArgs() { // Given: - final KsqlFunction[] functions = new KsqlFunction[]{function(EXPECTED, false)}; + final KsqlScalarFunction[] functions = new KsqlScalarFunction[]{function(EXPECTED, false)}; Arrays.stream(functions).forEach(udfIndex::addFunction); // When: - final KsqlFunction fun = udfIndex.getFunction(ImmutableList.of()); + final KsqlScalarFunction fun = udfIndex.getFunction(ImmutableList.of()); // Then: assertThat(fun.getFunctionName(), equalTo(EXPECTED)); @@ -68,11 +68,12 @@ public void shouldFindNoArgs() { @Test public void shouldFindOneArg() { // Given: - final KsqlFunction[] functions = new KsqlFunction[]{function(EXPECTED, false, STRING)}; + final KsqlScalarFunction[] functions = new KsqlScalarFunction[]{ + function(EXPECTED, false, STRING)}; Arrays.stream(functions).forEach(udfIndex::addFunction); // When: - final KsqlFunction fun = udfIndex.getFunction(ImmutableList.of(STRING)); + final KsqlScalarFunction fun = udfIndex.getFunction(ImmutableList.of(STRING)); // Then: assertThat(fun.getFunctionName(), equalTo(EXPECTED)); @@ -81,11 +82,12 @@ public void shouldFindOneArg() { @Test public void shouldFindTwoDifferentArgs() { // Given: - final KsqlFunction[] functions = new KsqlFunction[]{function(EXPECTED, false, STRING, INT)}; + final KsqlScalarFunction[] functions = new KsqlScalarFunction[]{ + function(EXPECTED, false, STRING, INT)}; Arrays.stream(functions).forEach(udfIndex::addFunction); // When: - final KsqlFunction fun = udfIndex.getFunction(ImmutableList.of(STRING, INT)); + final KsqlScalarFunction fun = udfIndex.getFunction(ImmutableList.of(STRING, INT)); // Then: assertThat(fun.getFunctionName(), equalTo(EXPECTED)); @@ -94,11 +96,12 @@ public void shouldFindTwoDifferentArgs() { @Test public void shouldFindTwoSameArgs() { // Given: - final KsqlFunction[] functions = new KsqlFunction[]{function(EXPECTED, false, STRING, STRING)}; + final KsqlScalarFunction[] functions = new KsqlScalarFunction[]{ + function(EXPECTED, false, STRING, STRING)}; Arrays.stream(functions).forEach(udfIndex::addFunction); // When: - final KsqlFunction fun = udfIndex.getFunction(ImmutableList.of(STRING, STRING)); + final KsqlScalarFunction fun = udfIndex.getFunction(ImmutableList.of(STRING, STRING)); // Then: assertThat(fun.getFunctionName(), equalTo(EXPECTED)); @@ -107,12 +110,13 @@ public void shouldFindTwoSameArgs() { @Test public void shouldFindOneArgConflict() { // Given: - final KsqlFunction[] functions = new KsqlFunction[]{function(EXPECTED, false, STRING), + final KsqlScalarFunction[] functions = new KsqlScalarFunction[]{ + function(EXPECTED, false, STRING), function(OTHER, false, INT)}; Arrays.stream(functions).forEach(udfIndex::addFunction); // When: - final KsqlFunction fun = udfIndex.getFunction(ImmutableList.of(STRING)); + final KsqlScalarFunction fun = udfIndex.getFunction(ImmutableList.of(STRING)); // Then: assertThat(fun.getFunctionName(), equalTo(EXPECTED)); @@ -121,12 +125,13 @@ public void shouldFindOneArgConflict() { @Test public void shouldFindTwoArgSameFirstConflict() { // Given: - final KsqlFunction[] functions = new KsqlFunction[]{function(EXPECTED, false, STRING, STRING), + final KsqlScalarFunction[] functions = new KsqlScalarFunction[]{ + function(EXPECTED, false, STRING, STRING), function(OTHER, false, STRING, INT)}; Arrays.stream(functions).forEach(udfIndex::addFunction); // When: - final KsqlFunction fun = udfIndex.getFunction(ImmutableList.of(STRING, STRING)); + final KsqlScalarFunction fun = udfIndex.getFunction(ImmutableList.of(STRING, STRING)); // Then: assertThat(fun.getFunctionName(), equalTo(EXPECTED)); @@ -135,12 +140,12 @@ public void shouldFindTwoArgSameFirstConflict() { @Test public void shouldChooseCorrectStruct() { // Given: - final KsqlFunction[] functions = new KsqlFunction[]{function(OTHER, false, STRUCT2), + final KsqlScalarFunction[] functions = new KsqlScalarFunction[]{function(OTHER, false, STRUCT2), function(EXPECTED, false, STRUCT1)}; Arrays.stream(functions).forEach(udfIndex::addFunction); // When: - final KsqlFunction fun = udfIndex.getFunction(ImmutableList.of(STRUCT1)); + final KsqlScalarFunction fun = udfIndex.getFunction(ImmutableList.of(STRUCT1)); // Then: assertThat(fun.getFunctionName(), equalTo(EXPECTED)); @@ -149,12 +154,12 @@ public void shouldChooseCorrectStruct() { @Test public void shouldChooseCorrectMap() { // Given: - final KsqlFunction[] functions = new KsqlFunction[]{function(OTHER, false, MAP2), + final KsqlScalarFunction[] functions = new KsqlScalarFunction[]{function(OTHER, false, MAP2), function(EXPECTED, false, MAP1)}; Arrays.stream(functions).forEach(udfIndex::addFunction); // When: - final KsqlFunction fun = udfIndex.getFunction(ImmutableList.of(MAP1)); + final KsqlScalarFunction fun = udfIndex.getFunction(ImmutableList.of(MAP1)); // Then: assertThat(fun.getFunctionName(), equalTo(EXPECTED)); @@ -163,11 +168,12 @@ public void shouldChooseCorrectMap() { @Test public void shouldChooseCorrectDecimal() { // Given: - final KsqlFunction[] functions = new KsqlFunction[]{function(EXPECTED, false, DECIMAL1)}; + final KsqlScalarFunction[] functions = new KsqlScalarFunction[]{ + function(EXPECTED, false, DECIMAL1)}; Arrays.stream(functions).forEach(udfIndex::addFunction); // When: - final KsqlFunction fun = udfIndex.getFunction(ImmutableList.of(DECIMAL1)); + final KsqlScalarFunction fun = udfIndex.getFunction(ImmutableList.of(DECIMAL1)); // Then: assertThat(fun.getFunctionName(), equalTo(EXPECTED)); @@ -176,11 +182,12 @@ public void shouldChooseCorrectDecimal() { @Test public void shouldAllowAnyDecimal() { // Given: - final KsqlFunction[] functions = new KsqlFunction[]{function(EXPECTED, false, DECIMAL1)}; + final KsqlScalarFunction[] functions = new KsqlScalarFunction[]{ + function(EXPECTED, false, DECIMAL1)}; Arrays.stream(functions).forEach(udfIndex::addFunction); // When: - final KsqlFunction fun = udfIndex.getFunction(ImmutableList.of(DECIMAL2)); + final KsqlScalarFunction fun = udfIndex.getFunction(ImmutableList.of(DECIMAL2)); // Then: assertThat(fun.getFunctionName(), equalTo(EXPECTED)); @@ -189,13 +196,13 @@ public void shouldAllowAnyDecimal() { @Test public void shouldChooseCorrectPermutedStruct() { // Given: - final KsqlFunction[] functions = new KsqlFunction[]{ + final KsqlScalarFunction[] functions = new KsqlScalarFunction[]{ function(OTHER, false, STRUCT3_PERMUTE), function(EXPECTED, false, STRUCT3)}; Arrays.stream(functions).forEach(udfIndex::addFunction); // When: - final KsqlFunction fun = udfIndex.getFunction(ImmutableList.of(STRUCT3)); + final KsqlScalarFunction fun = udfIndex.getFunction(ImmutableList.of(STRUCT3)); // Then: assertThat(fun.getFunctionName(), equalTo(EXPECTED)); @@ -204,11 +211,12 @@ public void shouldChooseCorrectPermutedStruct() { @Test public void shouldFindVarargsEmpty() { // Given: - final KsqlFunction[] functions = new KsqlFunction[]{function(EXPECTED, true, STRING_VARARGS)}; + final KsqlScalarFunction[] functions = new KsqlScalarFunction[]{ + function(EXPECTED, true, STRING_VARARGS)}; Arrays.stream(functions).forEach(udfIndex::addFunction); // When: - final KsqlFunction fun = udfIndex.getFunction(ImmutableList.of()); + final KsqlScalarFunction fun = udfIndex.getFunction(ImmutableList.of()); // Then: assertThat(fun.getFunctionName(), equalTo(EXPECTED)); @@ -217,11 +225,12 @@ public void shouldFindVarargsEmpty() { @Test public void shouldFindVarargsOne() { // Given: - final KsqlFunction[] functions = new KsqlFunction[]{function(EXPECTED, true, STRING_VARARGS)}; + final KsqlScalarFunction[] functions = new KsqlScalarFunction[]{ + function(EXPECTED, true, STRING_VARARGS)}; Arrays.stream(functions).forEach(udfIndex::addFunction); // When: - final KsqlFunction fun = udfIndex.getFunction(ImmutableList.of(STRING)); + final KsqlScalarFunction fun = udfIndex.getFunction(ImmutableList.of(STRING)); // Then: assertThat(fun.getFunctionName(), equalTo(EXPECTED)); @@ -230,11 +239,12 @@ public void shouldFindVarargsOne() { @Test public void shouldFindVarargsTwo() { // Given: - final KsqlFunction[] functions = new KsqlFunction[]{function(EXPECTED, true, STRING_VARARGS)}; + final KsqlScalarFunction[] functions = new KsqlScalarFunction[]{ + function(EXPECTED, true, STRING_VARARGS)}; Arrays.stream(functions).forEach(udfIndex::addFunction); // When: - final KsqlFunction fun = udfIndex.getFunction(ImmutableList.of(STRING, STRING)); + final KsqlScalarFunction fun = udfIndex.getFunction(ImmutableList.of(STRING, STRING)); // Then: assertThat(fun.getFunctionName(), equalTo(EXPECTED)); @@ -243,12 +253,12 @@ public void shouldFindVarargsTwo() { @Test public void shouldFindVarargWithStruct() { // Given: - final KsqlFunction[] functions = new KsqlFunction[]{ + final KsqlScalarFunction[] functions = new KsqlScalarFunction[]{ function(EXPECTED, true, SchemaBuilder.array(STRUCT1).build())}; Arrays.stream(functions).forEach(udfIndex::addFunction); // When: - final KsqlFunction fun = udfIndex.getFunction(ImmutableList.of(STRUCT1, STRUCT1)); + final KsqlScalarFunction fun = udfIndex.getFunction(ImmutableList.of(STRUCT1, STRUCT1)); // Then: assertThat(fun.getFunctionName(), equalTo(EXPECTED)); @@ -257,11 +267,12 @@ public void shouldFindVarargWithStruct() { @Test public void shouldFindVarargWithList() { // Given: - final KsqlFunction[] functions = new KsqlFunction[]{function(EXPECTED, true, STRING_VARARGS)}; + final KsqlScalarFunction[] functions = new KsqlScalarFunction[]{ + function(EXPECTED, true, STRING_VARARGS)}; Arrays.stream(functions).forEach(udfIndex::addFunction); // When: - final KsqlFunction fun = udfIndex.getFunction(ImmutableList.of(STRING_VARARGS)); + final KsqlScalarFunction fun = udfIndex.getFunction(ImmutableList.of(STRING_VARARGS)); // Then: assertThat(fun.getFunctionName(), equalTo(EXPECTED)); @@ -270,13 +281,13 @@ public void shouldFindVarargWithList() { @Test public void shouldChooseSpecificOverVarArgs() { // Given: - final KsqlFunction[] functions = new KsqlFunction[]{ + final KsqlScalarFunction[] functions = new KsqlScalarFunction[]{ function(EXPECTED, false, STRING), function(OTHER, true, STRING_VARARGS)}; Arrays.stream(functions).forEach(udfIndex::addFunction); // When: - final KsqlFunction fun = udfIndex.getFunction(ImmutableList.of(STRING)); + final KsqlScalarFunction fun = udfIndex.getFunction(ImmutableList.of(STRING)); // Then: assertThat(fun.getFunctionName(), equalTo(EXPECTED)); @@ -285,14 +296,14 @@ public void shouldChooseSpecificOverVarArgs() { @Test public void shouldChooseSpecificOverMultipleVarArgs() { // Given: - final KsqlFunction[] functions = new KsqlFunction[]{ + final KsqlScalarFunction[] functions = new KsqlScalarFunction[]{ function(EXPECTED, false, STRING), function(OTHER, true, STRING_VARARGS), function("two", true, STRING, STRING_VARARGS)}; Arrays.stream(functions).forEach(udfIndex::addFunction); // When: - final KsqlFunction fun = udfIndex.getFunction(ImmutableList.of(STRING)); + final KsqlScalarFunction fun = udfIndex.getFunction(ImmutableList.of(STRING)); // Then: assertThat(fun.getFunctionName(), equalTo(EXPECTED)); @@ -301,12 +312,12 @@ public void shouldChooseSpecificOverMultipleVarArgs() { @Test public void shouldChooseVarArgsIfSpecificDoesntMatch() { // Given: - final KsqlFunction[] functions = new KsqlFunction[]{function(OTHER, false, STRING), + final KsqlScalarFunction[] functions = new KsqlScalarFunction[]{function(OTHER, false, STRING), function(EXPECTED, true, STRING_VARARGS)}; Arrays.stream(functions).forEach(udfIndex::addFunction); // When: - final KsqlFunction fun = udfIndex.getFunction(ImmutableList.of(STRING, STRING)); + final KsqlScalarFunction fun = udfIndex.getFunction(ImmutableList.of(STRING, STRING)); // Then: assertThat(fun.getFunctionName(), equalTo(EXPECTED)); @@ -315,11 +326,12 @@ public void shouldChooseVarArgsIfSpecificDoesntMatch() { @Test public void shouldFindNonVarargWithNullValues() { // Given: - final KsqlFunction[] functions = new KsqlFunction[]{function(EXPECTED, false, STRING)}; + final KsqlScalarFunction[] functions = new KsqlScalarFunction[]{ + function(EXPECTED, false, STRING)}; Arrays.stream(functions).forEach(udfIndex::addFunction); // When: - final KsqlFunction fun = udfIndex.getFunction(Collections.singletonList(null)); + final KsqlScalarFunction fun = udfIndex.getFunction(Collections.singletonList(null)); // Then: assertThat(fun.getFunctionName(), equalTo(EXPECTED)); @@ -328,11 +340,12 @@ public void shouldFindNonVarargWithNullValues() { @Test public void shouldFindNonVarargWithPartialNullValues() { // Given: - final KsqlFunction[] functions = new KsqlFunction[]{function(EXPECTED, false, STRING, STRING)}; + final KsqlScalarFunction[] functions = new KsqlScalarFunction[]{ + function(EXPECTED, false, STRING, STRING)}; Arrays.stream(functions).forEach(udfIndex::addFunction); // When: - final KsqlFunction fun = udfIndex.getFunction(Arrays.asList(null, STRING)); + final KsqlScalarFunction fun = udfIndex.getFunction(Arrays.asList(null, STRING)); // Then: assertThat(fun.getFunctionName(), equalTo(EXPECTED)); @@ -341,12 +354,13 @@ public void shouldFindNonVarargWithPartialNullValues() { @Test public void shouldChooseFirstAddedWithNullValues() { // Given: - final KsqlFunction[] functions = new KsqlFunction[]{function(EXPECTED, false, STRING), + final KsqlScalarFunction[] functions = new KsqlScalarFunction[]{ + function(EXPECTED, false, STRING), function(OTHER, false, INT)}; Arrays.stream(functions).forEach(udfIndex::addFunction); // When: - final KsqlFunction fun = udfIndex.getFunction(Collections.singletonList(null)); + final KsqlScalarFunction fun = udfIndex.getFunction(Collections.singletonList(null)); // Then: assertThat(fun.getFunctionName(), equalTo(EXPECTED)); @@ -355,11 +369,12 @@ public void shouldChooseFirstAddedWithNullValues() { @Test public void shouldFindVarargWithNullValues() { // Given: - final KsqlFunction[] functions = new KsqlFunction[]{function(EXPECTED, true, STRING_VARARGS)}; + final KsqlScalarFunction[] functions = new KsqlScalarFunction[]{ + function(EXPECTED, true, STRING_VARARGS)}; Arrays.stream(functions).forEach(udfIndex::addFunction); // When: - final KsqlFunction fun = udfIndex.getFunction(Arrays.asList(new Schema[]{null})); + final KsqlScalarFunction fun = udfIndex.getFunction(Arrays.asList(new Schema[]{null})); // Then: assertThat(fun.getFunctionName(), equalTo(EXPECTED)); @@ -368,11 +383,12 @@ public void shouldFindVarargWithNullValues() { @Test public void shouldFindVarargWithSomeNullValues() { // Given: - final KsqlFunction[] functions = new KsqlFunction[]{function(EXPECTED, true, STRING_VARARGS)}; + final KsqlScalarFunction[] functions = new KsqlScalarFunction[]{ + function(EXPECTED, true, STRING_VARARGS)}; Arrays.stream(functions).forEach(udfIndex::addFunction); // When: - final KsqlFunction fun = udfIndex.getFunction(Arrays.asList(null, STRING, null)); + final KsqlScalarFunction fun = udfIndex.getFunction(Arrays.asList(null, STRING, null)); // Then: assertThat(fun.getFunctionName(), equalTo(EXPECTED)); @@ -381,12 +397,13 @@ public void shouldFindVarargWithSomeNullValues() { @Test public void shouldChooseNonVarargWithNullValues() { // Given: - final KsqlFunction[] functions = new KsqlFunction[]{function(EXPECTED, false, STRING), + final KsqlScalarFunction[] functions = new KsqlScalarFunction[]{ + function(EXPECTED, false, STRING), function(OTHER, true, STRING_VARARGS)}; Arrays.stream(functions).forEach(udfIndex::addFunction); // When: - final KsqlFunction fun = udfIndex.getFunction(Collections.singletonList(null)); + final KsqlScalarFunction fun = udfIndex.getFunction(Collections.singletonList(null)); // Then: assertThat(fun.getFunctionName(), equalTo(EXPECTED)); @@ -395,12 +412,13 @@ public void shouldChooseNonVarargWithNullValues() { @Test public void shouldChooseNonVarargWithNullValuesOfDifferingSchemas() { // Given: - final KsqlFunction[] functions = new KsqlFunction[]{function(EXPECTED, false, STRING, INT), + final KsqlScalarFunction[] functions = new KsqlScalarFunction[]{ + function(EXPECTED, false, STRING, INT), function(OTHER, true, STRING_VARARGS)}; Arrays.stream(functions).forEach(udfIndex::addFunction); // When: - final KsqlFunction fun = udfIndex.getFunction(Arrays.asList(new Schema[]{null, null})); + final KsqlScalarFunction fun = udfIndex.getFunction(Arrays.asList(new Schema[]{null, null})); // Then: assertThat(fun.getFunctionName(), equalTo(EXPECTED)); @@ -409,12 +427,13 @@ public void shouldChooseNonVarargWithNullValuesOfDifferingSchemas() { @Test public void shouldChooseNonVarargWithNullValuesOfSameSchemas() { // Given: - final KsqlFunction[] functions = new KsqlFunction[]{function(EXPECTED, false, STRING, STRING), + final KsqlScalarFunction[] functions = new KsqlScalarFunction[]{ + function(EXPECTED, false, STRING, STRING), function(OTHER, true, STRING_VARARGS)}; Arrays.stream(functions).forEach(udfIndex::addFunction); // When: - final KsqlFunction fun = udfIndex.getFunction(Arrays.asList(new Schema[]{null, null})); + final KsqlScalarFunction fun = udfIndex.getFunction(Arrays.asList(new Schema[]{null, null})); // Then: assertThat(fun.getFunctionName(), equalTo(EXPECTED)); @@ -423,12 +442,13 @@ public void shouldChooseNonVarargWithNullValuesOfSameSchemas() { @Test public void shouldChooseNonVarargWithNullValuesOfPartialNulls() { // Given: - final KsqlFunction[] functions = new KsqlFunction[]{function(EXPECTED, false, STRING, INT), + final KsqlScalarFunction[] functions = new KsqlScalarFunction[]{ + function(EXPECTED, false, STRING, INT), function(OTHER, true, STRING_VARARGS)}; Arrays.stream(functions).forEach(udfIndex::addFunction); // When: - final KsqlFunction fun = udfIndex.getFunction(Arrays.asList(STRING, null)); + final KsqlScalarFunction fun = udfIndex.getFunction(Arrays.asList(STRING, null)); // Then: assertThat(fun.getFunctionName(), equalTo(EXPECTED)); @@ -437,7 +457,7 @@ public void shouldChooseNonVarargWithNullValuesOfPartialNulls() { @Test public void shouldChooseCorrectlyInComplicatedTopology() { // Given: - final KsqlFunction[] functions = new KsqlFunction[]{ + final KsqlScalarFunction[] functions = new KsqlScalarFunction[]{ function(EXPECTED, false, STRING, INT, STRING, INT), function(OTHER, true, STRING_VARARGS), function("two", true, STRING, STRING_VARARGS), function("three", true, STRING, INT, STRING_VARARGS), @@ -446,7 +466,7 @@ public void shouldChooseCorrectlyInComplicatedTopology() { Arrays.stream(functions).forEach(udfIndex::addFunction); // When: - final KsqlFunction fun = udfIndex.getFunction(Arrays.asList(STRING, INT, null, INT)); + final KsqlScalarFunction fun = udfIndex.getFunction(Arrays.asList(STRING, INT, null, INT)); // Then: assertThat(fun.getFunctionName(), equalTo(EXPECTED)); @@ -455,13 +475,13 @@ public void shouldChooseCorrectlyInComplicatedTopology() { @Test public void shouldFindGenericMethodWithIntParam() { // Given: - final KsqlFunction[] functions = new KsqlFunction[]{ + final KsqlScalarFunction[] functions = new KsqlScalarFunction[]{ function(EXPECTED, false, GENERIC_LIST) }; Arrays.stream(functions).forEach(udfIndex::addFunction); // When: - final KsqlFunction fun = udfIndex.getFunction(Collections.singletonList(INT_LIST)); + final KsqlScalarFunction fun = udfIndex.getFunction(Collections.singletonList(INT_LIST)); // Then: assertThat(fun.getFunctionName(), equalTo(EXPECTED)); @@ -470,13 +490,13 @@ public void shouldFindGenericMethodWithIntParam() { @Test public void shouldFindGenericMethodWithStringParam() { // Given: - final KsqlFunction[] functions = new KsqlFunction[]{ + final KsqlScalarFunction[] functions = new KsqlScalarFunction[]{ function(EXPECTED, false, GENERIC_LIST) }; Arrays.stream(functions).forEach(udfIndex::addFunction); // When: - final KsqlFunction fun = udfIndex.getFunction(Collections.singletonList(STRING_LIST)); + final KsqlScalarFunction fun = udfIndex.getFunction(Collections.singletonList(STRING_LIST)); // Then: assertThat(fun.getFunctionName(), equalTo(EXPECTED)); @@ -486,13 +506,13 @@ public void shouldFindGenericMethodWithStringParam() { public void shouldMatchGenericMethodWithMultipleIdenticalGenerics() { // Given: final Schema generic = GenericsUtil.generic("A").build(); - final KsqlFunction[] functions = new KsqlFunction[]{ + final KsqlScalarFunction[] functions = new KsqlScalarFunction[]{ function(EXPECTED, false, generic, generic) }; Arrays.stream(functions).forEach(udfIndex::addFunction); // When: - final KsqlFunction fun = udfIndex.getFunction(ImmutableList.of(INT, INT)); + final KsqlScalarFunction fun = udfIndex.getFunction(ImmutableList.of(INT, INT)); // Then: assertThat(fun.getFunctionName(), equalTo(EXPECTED)); @@ -503,13 +523,13 @@ public void shouldMatchGenericMethodWithMultipleGenerics() { // Given: final Schema genericA = GenericsUtil.generic("A").build(); final Schema genericB = GenericsUtil.generic("B").build(); - final KsqlFunction[] functions = new KsqlFunction[]{ + final KsqlScalarFunction[] functions = new KsqlScalarFunction[]{ function(EXPECTED, false, genericA, genericB) }; Arrays.stream(functions).forEach(udfIndex::addFunction); // When: - final KsqlFunction fun = udfIndex.getFunction(ImmutableList.of(INT, STRING)); + final KsqlScalarFunction fun = udfIndex.getFunction(ImmutableList.of(INT, STRING)); // Then: assertThat(fun.getFunctionName(), equalTo(EXPECTED)); @@ -519,13 +539,13 @@ public void shouldMatchGenericMethodWithMultipleGenerics() { public void shouldMatchNestedGenericMethodWithMultipleGenerics() { // Given: final Schema generic = GenericsUtil.array("A").build(); - final KsqlFunction[] functions = new KsqlFunction[]{ + final KsqlScalarFunction[] functions = new KsqlScalarFunction[]{ function(EXPECTED, false, generic, generic) }; Arrays.stream(functions).forEach(udfIndex::addFunction); // When: - final KsqlFunction fun = udfIndex.getFunction(ImmutableList.of(INT_LIST, INT_LIST)); + final KsqlScalarFunction fun = udfIndex.getFunction(ImmutableList.of(INT_LIST, INT_LIST)); // Then: assertThat(fun.getFunctionName(), equalTo(EXPECTED)); @@ -534,7 +554,7 @@ public void shouldMatchNestedGenericMethodWithMultipleGenerics() { @Test public void shouldNotMatchIfParamLengthDiffers() { // Given: - final KsqlFunction[] functions = new KsqlFunction[]{function(OTHER, false, STRING)}; + final KsqlScalarFunction[] functions = new KsqlScalarFunction[]{function(OTHER, false, STRING)}; Arrays.stream(functions).forEach(udfIndex::addFunction); // Expect: @@ -549,7 +569,7 @@ public void shouldNotMatchIfParamLengthDiffers() { @Test public void shouldNotMatchIfNoneFound() { // Given: - final KsqlFunction[] functions = new KsqlFunction[]{function(OTHER, false, STRING)}; + final KsqlScalarFunction[] functions = new KsqlScalarFunction[]{function(OTHER, false, STRING)}; Arrays.stream(functions).forEach(udfIndex::addFunction); // Expect: @@ -565,7 +585,7 @@ public void shouldNotMatchIfNoneFound() { @Test public void shouldNotMatchIfNullAndPrimitive() { // Given: - final KsqlFunction[] functions = new KsqlFunction[]{ + final KsqlScalarFunction[] functions = new KsqlScalarFunction[]{ function(OTHER, false, Schema.INT32_SCHEMA)}; Arrays.stream(functions).forEach(udfIndex::addFunction); @@ -582,7 +602,7 @@ public void shouldNotMatchIfNullAndPrimitive() { @Test public void shouldNotMatchIfNullAndPrimitiveVararg() { // Given: - final KsqlFunction[] functions = new KsqlFunction[]{ + final KsqlScalarFunction[] functions = new KsqlScalarFunction[]{ function(OTHER, true, SchemaBuilder.array(Schema.INT32_SCHEMA))}; Arrays.stream(functions).forEach(udfIndex::addFunction); @@ -599,7 +619,8 @@ public void shouldNotMatchIfNullAndPrimitiveVararg() { @Test public void shouldNotMatchIfNoneFoundWithNull() { // Given: - final KsqlFunction[] functions = new KsqlFunction[]{function(OTHER, false, STRING, INT)}; + final KsqlScalarFunction[] functions = new KsqlScalarFunction[]{ + function(OTHER, false, STRING, INT)}; Arrays.stream(functions).forEach(udfIndex::addFunction); // Expect: @@ -615,7 +636,8 @@ public void shouldNotMatchIfNoneFoundWithNull() { @Test public void shouldNotChooseSpecificWhenTrickyVarArgLoop() { // Given: - final KsqlFunction[] functions = new KsqlFunction[]{function(OTHER, false, STRING, INT), + final KsqlScalarFunction[] functions = new KsqlScalarFunction[]{ + function(OTHER, false, STRING, INT), function("two", true, STRING_VARARGS)}; Arrays.stream(functions).forEach(udfIndex::addFunction); @@ -632,7 +654,7 @@ public void shouldNotChooseSpecificWhenTrickyVarArgLoop() { @Test public void shouldNotMatchWhenNullTypeInArgsIfParamLengthDiffers() { // Given: - final KsqlFunction[] functions = new KsqlFunction[]{function(OTHER, false, STRING)}; + final KsqlScalarFunction[] functions = new KsqlScalarFunction[]{function(OTHER, false, STRING)}; Arrays.stream(functions).forEach(udfIndex::addFunction); // Expect: @@ -648,7 +670,7 @@ public void shouldNotMatchWhenNullTypeInArgsIfParamLengthDiffers() { @Test public void shouldNotMatchVarargDifferentStructs() { // Given: - final KsqlFunction[] functions = new KsqlFunction[]{ + final KsqlScalarFunction[] functions = new KsqlScalarFunction[]{ function(OTHER, true, SchemaBuilder.array(STRUCT1).build())}; Arrays.stream(functions).forEach(udfIndex::addFunction); @@ -664,7 +686,7 @@ public void shouldNotMatchVarargDifferentStructs() { @Test public void shouldNotMatchPermutedStructs() { // Given: - final KsqlFunction[] functions = new KsqlFunction[]{ + final KsqlScalarFunction[] functions = new KsqlScalarFunction[]{ function(OTHER, false, STRUCT3)}; Arrays.stream(functions).forEach(udfIndex::addFunction); @@ -681,7 +703,7 @@ public void shouldNotMatchPermutedStructs() { public void shouldNotMatchGenericMethodWithAlreadyReservedTypes() { // Given: final Schema generic = GenericsUtil.generic("A").build(); - final KsqlFunction[] functions = new KsqlFunction[]{ + final KsqlScalarFunction[] functions = new KsqlScalarFunction[]{ function(EXPECTED, false, generic, generic) }; Arrays.stream(functions).forEach(udfIndex::addFunction); @@ -699,7 +721,7 @@ public void shouldNotMatchGenericMethodWithAlreadyReservedTypes() { public void shouldNotMatchNestedGenericMethodWithAlreadyReservedTypes() { // Given: final Schema generic = GenericsUtil.array("A").build(); - final KsqlFunction[] functions = new KsqlFunction[]{ + final KsqlScalarFunction[] functions = new KsqlScalarFunction[]{ function(EXPECTED, false, generic, generic) }; Arrays.stream(functions).forEach(udfIndex::addFunction); @@ -716,7 +738,8 @@ public void shouldNotMatchNestedGenericMethodWithAlreadyReservedTypes() { @Test public void shouldNotFindArbitraryBytesTypes() { // Given: - final KsqlFunction[] functions = new KsqlFunction[]{function(EXPECTED, false, DECIMAL1)}; + final KsqlScalarFunction[] functions = new KsqlScalarFunction[]{ + function(EXPECTED, false, DECIMAL1)}; Arrays.stream(functions).forEach(udfIndex::addFunction); // Expect: @@ -728,7 +751,7 @@ public void shouldNotFindArbitraryBytesTypes() { udfIndex.getFunction(ImmutableList.of(SchemaBuilder.bytes().build())); } - private static KsqlFunction function( + private static KsqlScalarFunction function( final String name, final boolean isVarArgs, final Schema... args @@ -736,7 +759,7 @@ private static KsqlFunction function( return function(FunctionName.of(name), isVarArgs, args); } - private static KsqlFunction function( + private static KsqlScalarFunction function( final FunctionName name, final boolean isVarArgs, final Schema... args @@ -750,7 +773,7 @@ private static KsqlFunction function( } }; - return KsqlFunction.create( + return KsqlScalarFunction.create( ignored -> Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA, Arrays.asList(args), diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/InternalFunctionRegistry.java b/ksql-engine/src/main/java/io/confluent/ksql/function/InternalFunctionRegistry.java index ebf97d9a6be1..95fb12c237f3 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/function/InternalFunctionRegistry.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/function/InternalFunctionRegistry.java @@ -68,7 +68,7 @@ public synchronized UdfFactory getUdfFactory(final String functionName) { } @Override - public synchronized void addFunction(final KsqlFunction ksqlFunction) { + public synchronized void addFunction(final KsqlScalarFunction ksqlFunction) { final UdfFactory udfFactory = udfs.get(ksqlFunction.getFunctionName().name().toUpperCase()); if (udfFactory == null) { throw new KsqlException("Unknown function factory: " + ksqlFunction.getFunctionName()); @@ -238,7 +238,7 @@ private BuiltInInitializer( } private static UdfFactory builtInUdfFactory( - final KsqlFunction ksqlFunction, + final KsqlScalarFunction ksqlFunction, final boolean internal ) { final UdfMetadata metadata = new UdfMetadata( @@ -246,7 +246,7 @@ private static UdfFactory builtInUdfFactory( ksqlFunction.getDescription(), KsqlConstants.CONFLUENT_AUTHOR, "", - KsqlFunction.INTERNAL_PATH, + KsqlScalarFunction.INTERNAL_PATH, internal ); @@ -263,31 +263,31 @@ private void init() { private void addStringFunctions() { - addBuiltInFunction(KsqlFunction.createLegacyBuiltIn( + addBuiltInFunction(KsqlScalarFunction.createLegacyBuiltIn( Schema.OPTIONAL_STRING_SCHEMA, Collections.singletonList(Schema.OPTIONAL_STRING_SCHEMA), FunctionName.of("LCASE"), LCaseKudf.class )); - addBuiltInFunction(KsqlFunction.createLegacyBuiltIn( + addBuiltInFunction(KsqlScalarFunction.createLegacyBuiltIn( Schema.OPTIONAL_STRING_SCHEMA, Collections.singletonList(Schema.OPTIONAL_STRING_SCHEMA), FunctionName.of("UCASE"), UCaseKudf.class )); - addBuiltInFunction(KsqlFunction.createLegacyBuiltIn( + addBuiltInFunction(KsqlScalarFunction.createLegacyBuiltIn( Schema.OPTIONAL_STRING_SCHEMA, ImmutableList.of(Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA), FunctionName.of(ConcatKudf.NAME), ConcatKudf.class )); - addBuiltInFunction(KsqlFunction.createLegacyBuiltIn( + addBuiltInFunction(KsqlScalarFunction.createLegacyBuiltIn( Schema.OPTIONAL_STRING_SCHEMA, Collections.singletonList(Schema.OPTIONAL_STRING_SCHEMA), FunctionName.of("TRIM"), TrimKudf.class )); - addBuiltInFunction(KsqlFunction.createLegacyBuiltIn( + addBuiltInFunction(KsqlScalarFunction.createLegacyBuiltIn( Schema.OPTIONAL_STRING_SCHEMA, ImmutableList.of( Schema.OPTIONAL_STRING_SCHEMA, @@ -296,7 +296,7 @@ private void addStringFunctions() { FunctionName.of("IFNULL"), IfNullKudf.class )); - addBuiltInFunction(KsqlFunction.createLegacyBuiltIn( + addBuiltInFunction(KsqlScalarFunction.createLegacyBuiltIn( Schema.OPTIONAL_INT32_SCHEMA, Collections.singletonList(Schema.OPTIONAL_STRING_SCHEMA), FunctionName.of("LEN"), @@ -306,14 +306,14 @@ private void addStringFunctions() { private void addMathFunctions() { - addBuiltInFunction(KsqlFunction.createLegacyBuiltIn( + addBuiltInFunction(KsqlScalarFunction.createLegacyBuiltIn( Schema.OPTIONAL_FLOAT64_SCHEMA, Collections.singletonList(Schema.OPTIONAL_FLOAT64_SCHEMA), FunctionName.of("CEIL"), CeilKudf.class )); - addBuiltInFunction(KsqlFunction.createLegacyBuiltIn( + addBuiltInFunction(KsqlScalarFunction.createLegacyBuiltIn( Schema.OPTIONAL_FLOAT64_SCHEMA, Collections.emptyList(), FunctionName.of("RANDOM"), @@ -323,21 +323,21 @@ private void addMathFunctions() { private void addJsonFunctions() { - addBuiltInFunction(KsqlFunction.createLegacyBuiltIn( + addBuiltInFunction(KsqlScalarFunction.createLegacyBuiltIn( Schema.OPTIONAL_STRING_SCHEMA, ImmutableList.of(Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA), JsonExtractStringKudf.FUNCTION_NAME, JsonExtractStringKudf.class )); - addBuiltInFunction(KsqlFunction.createLegacyBuiltIn( + addBuiltInFunction(KsqlScalarFunction.createLegacyBuiltIn( Schema.OPTIONAL_BOOLEAN_SCHEMA, ImmutableList.of(Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA), FunctionName.of("ARRAYCONTAINS"), ArrayContainsKudf.class )); - addBuiltInFunction(KsqlFunction.createLegacyBuiltIn( + addBuiltInFunction(KsqlScalarFunction.createLegacyBuiltIn( Schema.OPTIONAL_BOOLEAN_SCHEMA, ImmutableList.of( SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build(), @@ -347,7 +347,7 @@ private void addJsonFunctions() { ArrayContainsKudf.class )); - addBuiltInFunction(KsqlFunction.createLegacyBuiltIn( + addBuiltInFunction(KsqlScalarFunction.createLegacyBuiltIn( Schema.OPTIONAL_BOOLEAN_SCHEMA, ImmutableList.of( SchemaBuilder.array(Schema.OPTIONAL_INT32_SCHEMA).optional().build(), @@ -357,7 +357,7 @@ private void addJsonFunctions() { ArrayContainsKudf.class )); - addBuiltInFunction(KsqlFunction.createLegacyBuiltIn( + addBuiltInFunction(KsqlScalarFunction.createLegacyBuiltIn( Schema.OPTIONAL_BOOLEAN_SCHEMA, ImmutableList.of( SchemaBuilder.array(Schema.OPTIONAL_INT64_SCHEMA).optional().build(), @@ -367,7 +367,7 @@ private void addJsonFunctions() { ArrayContainsKudf.class )); - addBuiltInFunction(KsqlFunction.createLegacyBuiltIn( + addBuiltInFunction(KsqlScalarFunction.createLegacyBuiltIn( Schema.OPTIONAL_BOOLEAN_SCHEMA, ImmutableList.of( SchemaBuilder.array(Schema.OPTIONAL_FLOAT64_SCHEMA).optional().build(), @@ -381,7 +381,7 @@ private void addJsonFunctions() { private void addStructFieldFetcher() { addBuiltInFunction( - KsqlFunction.createLegacyBuiltIn( + KsqlScalarFunction.createLegacyBuiltIn( SchemaBuilder.struct().optional().build(), ImmutableList.of( SchemaBuilder.struct().optional().build(), @@ -406,11 +406,11 @@ private void addUdafFunctions() { functionRegistry.addAggregateFunctionFactory(new TopkDistinctAggFunctionFactory()); } - private void addBuiltInFunction(final KsqlFunction ksqlFunction) { + private void addBuiltInFunction(final KsqlScalarFunction ksqlFunction) { addBuiltInFunction(ksqlFunction, false); } - private void addBuiltInFunction(final KsqlFunction ksqlFunction, final boolean internal) { + private void addBuiltInFunction(final KsqlScalarFunction ksqlFunction, final boolean internal) { functionRegistry .ensureFunctionFactory(builtInUdfFactory(ksqlFunction, internal)) .addFunction(ksqlFunction); diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/UdfLoader.java b/ksql-engine/src/main/java/io/confluent/ksql/function/UdfLoader.java index 362ff9d4b4c7..fda2124698a4 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/function/UdfLoader.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/function/UdfLoader.java @@ -65,7 +65,7 @@ public class UdfLoader { void loadUdfFromClass(final Class... udfClasses) { for (final Class theClass : udfClasses) { loadUdfFromClass( - theClass, KsqlFunction.INTERNAL_PATH); + theClass, KsqlScalarFunction.INTERNAL_PATH); } } @@ -102,7 +102,7 @@ void loadUdfFromClass( for (Method method : theClass.getMethods()) { final Udf udfAnnotation = method.getAnnotation(Udf.class); if (udfAnnotation != null) { - final KsqlFunction function; + final KsqlScalarFunction function; try { function = createFunction(theClass, udfDescriptionAnnotation, udfAnnotation, method, path, sensorName, udfClass @@ -125,7 +125,7 @@ void loadUdfFromClass( } } - private KsqlFunction createFunction( + private KsqlScalarFunction createFunction( final Class theClass, final UdfDescription udfDescriptionAnnotation, final Udf udfAnnotation, @@ -156,7 +156,7 @@ private KsqlFunction createFunction( udfDescriptionAnnotation.name() ); - return KsqlFunction.create( + return KsqlScalarFunction.create( schemaProviderFunction, javaReturnSchema, parameters, diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/UdfLoaderUtil.java b/ksql-engine/src/main/java/io/confluent/ksql/function/UdfLoaderUtil.java index 35f5fbc5fca8..9b80ff960f11 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/function/UdfLoaderUtil.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/function/UdfLoaderUtil.java @@ -34,13 +34,13 @@ public static FunctionRegistry load(final MutableFunctionRegistry functionRegist return functionRegistry; } - public static UdfFactory createTestUdfFactory(final KsqlFunction udf) { + public static UdfFactory createTestUdfFactory(final KsqlScalarFunction udf) { final UdfMetadata metadata = new UdfMetadata( udf.getFunctionName().name(), udf.getDescription(), "Test Author", "", - KsqlFunction.INTERNAL_PATH, + KsqlScalarFunction.INTERNAL_PATH, false); return new UdfFactory(udf.getKudfClass(), metadata); diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/UdtfLoader.java b/ksql-engine/src/main/java/io/confluent/ksql/function/UdtfLoader.java index fe3f399fd566..919f01f6dffb 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/function/UdtfLoader.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/function/UdtfLoader.java @@ -15,6 +15,7 @@ package io.confluent.ksql.function; +import io.confluent.ksql.function.udf.PluggableUdf; import io.confluent.ksql.function.udf.UdfMetadata; import io.confluent.ksql.function.udtf.Udtf; import io.confluent.ksql.function.udtf.UdtfDescription; @@ -148,7 +149,8 @@ private KsqlTableFunction createTableFunction( ); return new KsqlTableFunction( schemaProviderFunction, - functionName, outputType, arguments, description, invoker, instance + functionName, outputType, arguments, description, + new PluggableUdf(invoker, instance) ); } } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/UserFunctionLoader.java b/ksql-engine/src/main/java/io/confluent/ksql/function/UserFunctionLoader.java index 735b0b447fc2..5a7f3c17da87 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/function/UserFunctionLoader.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/function/UserFunctionLoader.java @@ -104,7 +104,7 @@ public void load() { @SuppressWarnings("OptionalUsedAsFieldOrParameterType") private void loadFunctions(final ClassLoader loader, final Optional path) { final String pathLoadedFrom - = path.map(Path::toString).orElse(KsqlFunction.INTERNAL_PATH); + = path.map(Path::toString).orElse(KsqlScalarFunction.INTERNAL_PATH); final FastClasspathScanner fastClasspathScanner = new FastClasspathScanner(); if (loader != parentClassLoader) { fastClasspathScanner.overrideClassLoaders(loader); diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/udf/PluggableUdf.java b/ksql-engine/src/main/java/io/confluent/ksql/function/udf/PluggableUdf.java index 7596348e8014..b3ab36ac140f 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/function/udf/PluggableUdf.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/function/udf/PluggableUdf.java @@ -48,5 +48,4 @@ public Object evaluate(final Object... args) { } } - } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/codegen/CodeGenRunnerTest.java b/ksql-engine/src/test/java/io/confluent/ksql/codegen/CodeGenRunnerTest.java index ec0df9992bae..933899b2cccf 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/codegen/CodeGenRunnerTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/codegen/CodeGenRunnerTest.java @@ -38,7 +38,7 @@ import io.confluent.ksql.execution.ddl.commands.KsqlTopic; import io.confluent.ksql.execution.expression.tree.Expression; import io.confluent.ksql.function.InternalFunctionRegistry; -import io.confluent.ksql.function.KsqlFunction; +import io.confluent.ksql.function.KsqlScalarFunction; import io.confluent.ksql.function.MutableFunctionRegistry; import io.confluent.ksql.function.UdfLoaderUtil; import io.confluent.ksql.function.udf.Kudf; @@ -149,13 +149,13 @@ public class CodeGenRunnerTest { @Before public void init() { - final KsqlFunction whenCondition = KsqlFunction.createLegacyBuiltIn( + final KsqlScalarFunction whenCondition = KsqlScalarFunction.createLegacyBuiltIn( Schema.OPTIONAL_BOOLEAN_SCHEMA, ImmutableList.of(Schema.OPTIONAL_BOOLEAN_SCHEMA, Schema.OPTIONAL_BOOLEAN_SCHEMA), FunctionName.of("WHENCONDITION"), WhenCondition.class ); - final KsqlFunction whenResult = KsqlFunction.createLegacyBuiltIn( + final KsqlScalarFunction whenResult = KsqlScalarFunction.createLegacyBuiltIn( Schema.OPTIONAL_INT32_SCHEMA, ImmutableList.of(Schema.OPTIONAL_INT32_SCHEMA, Schema.OPTIONAL_BOOLEAN_SCHEMA), FunctionName.of("WHENRESULT"), diff --git a/ksql-engine/src/test/java/io/confluent/ksql/function/InternalFunctionRegistryTest.java b/ksql-engine/src/test/java/io/confluent/ksql/function/InternalFunctionRegistryTest.java index 196a6a6e0e67..fbcf8d555bfc 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/function/InternalFunctionRegistryTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/function/InternalFunctionRegistryTest.java @@ -73,7 +73,8 @@ public Object evaluate(final Object... args) { } private final InternalFunctionRegistry functionRegistry = new InternalFunctionRegistry(); - private final KsqlFunction func = KsqlFunction.createLegacyBuiltIn(Schema.OPTIONAL_STRING_SCHEMA, + private final KsqlScalarFunction func = KsqlScalarFunction.createLegacyBuiltIn( + Schema.OPTIONAL_STRING_SCHEMA, Collections.emptyList(), FunctionName.of("func"), Func1.class); @@ -120,7 +121,8 @@ public void shouldNotAddFunctionWithSameNameAsExistingFunctionAndOnDifferentClas // Given: givenUdfFactoryRegistered(); - final KsqlFunction func2 = KsqlFunction.createLegacyBuiltIn(Schema.OPTIONAL_STRING_SCHEMA, + final KsqlScalarFunction func2 = KsqlScalarFunction.createLegacyBuiltIn( + Schema.OPTIONAL_STRING_SCHEMA, Collections.emptyList(), func.getFunctionName(), Func2.class); @@ -256,7 +258,8 @@ public void shouldAddFunctionWithSameNameButDifferentReturnTypes() { // Given: givenUdfFactoryRegistered(); functionRegistry.addFunction(func); - final KsqlFunction func2 = KsqlFunction.createLegacyBuiltIn(Schema.OPTIONAL_INT64_SCHEMA, + final KsqlScalarFunction func2 = KsqlScalarFunction.createLegacyBuiltIn( + Schema.OPTIONAL_INT64_SCHEMA, Collections.singletonList(Schema.OPTIONAL_INT64_SCHEMA), FunctionName.of("func"), Func1.class); // When: @@ -270,7 +273,7 @@ public void shouldAddFunctionWithSameNameClassButDifferentArguments() { // Given: givenUdfFactoryRegistered(); - final KsqlFunction func2 = KsqlFunction.createLegacyBuiltIn( + final KsqlScalarFunction func2 = KsqlScalarFunction.createLegacyBuiltIn( Schema.OPTIONAL_STRING_SCHEMA, Collections.singletonList(Schema.OPTIONAL_INT64_SCHEMA), func.getFunctionName(), diff --git a/ksql-engine/src/test/java/io/confluent/ksql/function/UdfLoaderTest.java b/ksql-engine/src/test/java/io/confluent/ksql/function/UdfLoaderTest.java index 4c1890a38b26..35e5c05e16cf 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/function/UdfLoaderTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/function/UdfLoaderTest.java @@ -164,7 +164,7 @@ public void shouldLoadDecimalUdfs() { final Schema schema = DecimalUtil.builder(2, 1).optional().build(); // When: - final KsqlFunction fun = FUNC_REG.getUdfFactory("floor") + final KsqlScalarFunction fun = FUNC_REG.getUdfFactory("floor") .getFunction(ImmutableList.of(schema)); // Then: @@ -186,7 +186,7 @@ public void shouldLoadFunctionWithListReturnType() { // When: final List args = Collections.singletonList(Schema.OPTIONAL_STRING_SCHEMA); - final KsqlFunction function + final KsqlScalarFunction function = toList.getFunction(args); assertThat(function.getReturnType(args), @@ -204,7 +204,7 @@ public void shouldLoadFunctionWithMapReturnType() { // When: final List args = Collections.singletonList(Schema.OPTIONAL_STRING_SCHEMA); - final KsqlFunction function + final KsqlScalarFunction function = toMap.getFunction(args); // Then: @@ -225,7 +225,7 @@ public void shouldLoadFunctionWithStructReturnType() { // When: final List args = Collections.singletonList(Schema.OPTIONAL_STRING_SCHEMA); - final KsqlFunction function + final KsqlScalarFunction function = toStruct.getFunction(args); // Then: @@ -244,7 +244,7 @@ public void shouldLoadFunctionWithSchemaProvider() { // When: final Schema decimal = DecimalUtil.builder(2, 1).build(); final List args = Collections.singletonList(decimal); - final KsqlFunction function = returnDecimal.getFunction(args); + final KsqlScalarFunction function = returnDecimal.getFunction(args); // Then: assertThat(function.getReturnType(args), equalTo(decimal)); @@ -256,7 +256,7 @@ public void shouldThrowOnReturnTypeMismatch() { final UdfFactory returnIncompatible = FUNC_REG.getUdfFactory("returnincompatible"); final Schema decimal = DecimalUtil.builder(2, 1).build(); final List args = Collections.singletonList(decimal); - final KsqlFunction function = returnIncompatible.getFunction(args); + final KsqlScalarFunction function = returnIncompatible.getFunction(args); // Expect: expectedException.expect(KsqlException.class); @@ -406,13 +406,13 @@ public void shouldCreateUdfFactoryWithJarPathWhenExternal() { @Test public void shouldCreateUdfFactoryWithInternalPathWhenInternal() { final UdfFactory substring = FUNC_REG.getUdfFactory("substring"); - assertThat(substring.getMetadata().getPath(), equalTo(KsqlFunction.INTERNAL_PATH)); + assertThat(substring.getMetadata().getPath(), equalTo(KsqlScalarFunction.INTERNAL_PATH)); } @Test public void shouldSupportUdfParameterAnnotation() { final UdfFactory substring = FUNC_REG.getUdfFactory("somefunction"); - final KsqlFunction function = substring.getFunction( + final KsqlScalarFunction function = substring.getFunction( ImmutableList.of( Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA, @@ -497,7 +497,7 @@ public void shouldLoadSomeFunction() { // Then: assertThat(udfFactory, not(nullValue())); - final KsqlFunction function = udfFactory.getFunction(args); + final KsqlScalarFunction function = udfFactory.getFunction(args); assertThat(function.getFunctionName().name(), equalToIgnoringCase("somefunction")); } @@ -506,7 +506,7 @@ public void shouldLoadSomeFunction() { public void shouldCollectMetricsWhenMetricCollectionEnabled() { // Given: final UdfFactory substring = FUNC_REG_WITH_METRICS.getUdfFactory("substring"); - final KsqlFunction function = substring + final KsqlScalarFunction function = substring .getFunction(Arrays.asList(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA)); // When: @@ -561,7 +561,7 @@ public void shouldConfigureConfigurableUdfsOnInstantiation() { KSQL_FUNCTIONS_PROPERTY_PREFIX + "_global_.expected-param", "expected-value" )); - final KsqlFunction udf = FUNC_REG.getUdfFactory("ConfigurableUdf") + final KsqlScalarFunction udf = FUNC_REG.getUdfFactory("ConfigurableUdf") .getFunction(ImmutableList.of(Schema.INT32_SCHEMA)); // When: @@ -590,7 +590,7 @@ public void shouldEnsureFunctionReturnTypeIsOptional() throws Exception { Schema.STRING_SCHEMA); // Then: - final KsqlFunction someFunction = FUNC_REG + final KsqlScalarFunction someFunction = FUNC_REG .getUdfFactory("SomeFunction") .getFunction(args); @@ -600,7 +600,7 @@ public void shouldEnsureFunctionReturnTypeIsOptional() throws Exception { @Test public void shouldEnsureFunctionReturnTypeIsDeepOptional() { final List args = Collections.singletonList(Schema.OPTIONAL_STRING_SCHEMA); - final KsqlFunction complexFunction = FUNC_REG + final KsqlScalarFunction complexFunction = FUNC_REG .getUdfFactory("ComplexFunction") .getFunction(args); diff --git a/ksql-engine/src/test/java/io/confluent/ksql/function/UdtfLoaderTest.java b/ksql-engine/src/test/java/io/confluent/ksql/function/UdtfLoaderTest.java index 3306558c39ec..86d655fe1aea 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/function/UdtfLoaderTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/function/UdtfLoaderTest.java @@ -235,7 +235,7 @@ public void shouldNotLoadUdtfWithWrongReturnValue() { is("UDTF functions must return a List. Class io.confluent.ksql.function.UdtfLoaderTest$UdtfBadReturnValue Method badReturn")); // When: - udtfLoader.loadUdtfFromClass(UdtfBadReturnValue.class, KsqlFunction.INTERNAL_PATH); + udtfLoader.loadUdtfFromClass(UdtfBadReturnValue.class, KsqlScalarFunction.INTERNAL_PATH); } @Test @@ -254,7 +254,7 @@ public void shouldNotLoadUdtfWithRawListReturn() { is("UDTF functions must return a parameterized List. Class io.confluent.ksql.function.UdtfLoaderTest$RawListReturn Method badReturn")); // When: - udtfLoader.loadUdtfFromClass(RawListReturn.class, KsqlFunction.INTERNAL_PATH); + udtfLoader.loadUdtfFromClass(RawListReturn.class, KsqlScalarFunction.INTERNAL_PATH); } @Test @@ -273,7 +273,8 @@ public void shouldNotLoadUdtfWithBigDecimalReturnAndNoSchemaProvider() { is("Cannot load UDF bigDecimalNoSchemaProvider. BigDecimal return type is not supported without a schema provider method.")); // When: - udtfLoader.loadUdtfFromClass(BigDecimalNoSchemaProvider.class, KsqlFunction.INTERNAL_PATH); + udtfLoader + .loadUdtfFromClass(BigDecimalNoSchemaProvider.class, KsqlScalarFunction.INTERNAL_PATH); } @UdtfDescription(name = "badReturnUdtf", description = "whatever") diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/codegen/CodeGenRunner.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/codegen/CodeGenRunner.java index 4f56cd4e2622..e73e1bb18543 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/codegen/CodeGenRunner.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/codegen/CodeGenRunner.java @@ -27,7 +27,7 @@ import io.confluent.ksql.execution.util.ExpressionTypeManager; import io.confluent.ksql.execution.util.GenericRowValueTypeEnforcer; import io.confluent.ksql.function.FunctionRegistry; -import io.confluent.ksql.function.KsqlFunction; +import io.confluent.ksql.function.KsqlScalarFunction; import io.confluent.ksql.function.UdfFactory; import io.confluent.ksql.name.FunctionName; import io.confluent.ksql.schema.ksql.Column; @@ -177,7 +177,7 @@ public Void visitFunctionCall(final FunctionCall node, final Void context) { } final UdfFactory holder = functionRegistry.getUdfFactory(functionName.name()); - final KsqlFunction function = holder.getFunction(argumentTypes); + final KsqlScalarFunction function = holder.getFunction(argumentTypes); spec.addFunction( function.getFunctionName(), function.newInstance(ksqlConfig) @@ -220,7 +220,7 @@ public Void visitDereferenceExpression(final DereferenceExpression node, final V Schema.OPTIONAL_STRING_SCHEMA ); - final KsqlFunction function = functionRegistry + final KsqlScalarFunction function = functionRegistry .getUdfFactory(FetchFieldFromStruct.FUNCTION_NAME.name()) .getFunction(argumentTypes); diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/codegen/FunctionSpec.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/codegen/FunctionSpec.java deleted file mode 100644 index e4abb1616837..000000000000 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/codegen/FunctionSpec.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright 2019 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"; you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.execution.codegen; - -import com.google.errorprone.annotations.Immutable; -import io.confluent.ksql.function.KsqlFunction; -import io.confluent.ksql.function.udf.Kudf; -import io.confluent.ksql.name.FunctionName; -import io.confluent.ksql.util.KsqlConfig; -import java.util.Objects; - -@Immutable -public final class FunctionSpec { - private final Class type; - private final KsqlFunction function; - private final String codegenName; - private final KsqlConfig ksqlConfig; - - FunctionSpec( - final KsqlFunction function, - final String codegenName, - final KsqlConfig ksqlConfig - ) { - this.type = Objects.requireNonNull(function.getKudfClass(), "kudfClass"); - this.function = Objects.requireNonNull(function, "function"); - this.codegenName = Objects.requireNonNull(codegenName, "codegenName"); - this.ksqlConfig = Objects.requireNonNull(ksqlConfig, "ksqlConfig"); - } - - public Class type() { - return type; - } - - public String codeGenName() { - return codegenName; - } - - public FunctionName functionName() { - return function.getFunctionName(); - } - - public Kudf newInstance() { - return function.newInstance(ksqlConfig); - } -} diff --git a/ksql-execution/src/test/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitorTest.java b/ksql-execution/src/test/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitorTest.java index 8ed04a6d0dbe..34d5f2ed620f 100644 --- a/ksql-execution/src/test/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitorTest.java +++ b/ksql-execution/src/test/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitorTest.java @@ -54,7 +54,7 @@ import io.confluent.ksql.execution.expression.tree.Type; import io.confluent.ksql.execution.expression.tree.WhenClause; import io.confluent.ksql.function.FunctionRegistry; -import io.confluent.ksql.function.KsqlFunction; +import io.confluent.ksql.function.KsqlScalarFunction; import io.confluent.ksql.function.UdfFactory; import io.confluent.ksql.function.udf.UdfMetadata; import io.confluent.ksql.name.ColumnName; @@ -187,9 +187,9 @@ public void shouldCreateCorrectCastJavaExpression() { public void shouldPostfixFunctionInstancesWithUniqueId() { // Given: final UdfFactory ssFactory = mock(UdfFactory.class); - final KsqlFunction ssFunction = mock(KsqlFunction.class); + final KsqlScalarFunction ssFunction = mock(KsqlScalarFunction.class); final UdfFactory catFactory = mock(UdfFactory.class); - final KsqlFunction catFunction = mock(KsqlFunction.class); + final KsqlScalarFunction catFunction = mock(KsqlScalarFunction.class); givenUdf("SUBSTRING", Schema.OPTIONAL_STRING_SCHEMA, ssFactory, ssFunction); givenUdf("CONCAT", Schema.OPTIONAL_STRING_SCHEMA, catFactory, catFunction); final FunctionName ssName = FunctionName.of("SUBSTRING"); @@ -770,7 +770,8 @@ private void givenUdf( final String name, final Schema returnType, final UdfFactory factory, - final KsqlFunction function) { + final KsqlScalarFunction function + ) { when(functionRegistry.isAggregate(name)).thenReturn(false); when(functionRegistry.getUdfFactory(name)).thenReturn(factory); when(factory.getFunction(anyList())).thenReturn(function); diff --git a/ksql-execution/src/test/java/io/confluent/ksql/execution/sqlpredicate/SqlPredicateTest.java b/ksql-execution/src/test/java/io/confluent/ksql/execution/sqlpredicate/SqlPredicateTest.java index ec3d24583a85..51c648850e7c 100644 --- a/ksql-execution/src/test/java/io/confluent/ksql/execution/sqlpredicate/SqlPredicateTest.java +++ b/ksql-execution/src/test/java/io/confluent/ksql/execution/sqlpredicate/SqlPredicateTest.java @@ -36,7 +36,7 @@ import io.confluent.ksql.schema.ksql.ColumnRef; import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp; import io.confluent.ksql.function.FunctionRegistry; -import io.confluent.ksql.function.KsqlFunction; +import io.confluent.ksql.function.KsqlScalarFunction; import io.confluent.ksql.function.UdfFactory; import io.confluent.ksql.function.udf.Kudf; import io.confluent.ksql.logging.processing.ProcessingLogConfig; @@ -80,7 +80,7 @@ public class SqlPredicateTest { private static final ColumnReferenceExp COL2 = new ColumnReferenceExp(ColumnRef.of(TEST1, ColumnName.of("COL2"))); - private static final KsqlFunction LEN_FUNCTION = KsqlFunction.createLegacyBuiltIn( + private static final KsqlScalarFunction LEN_FUNCTION = KsqlScalarFunction.createLegacyBuiltIn( Schema.OPTIONAL_INT32_SCHEMA, ImmutableList.of(Schema.OPTIONAL_STRING_SCHEMA), FunctionName.of("LEN"), diff --git a/ksql-execution/src/test/java/io/confluent/ksql/execution/util/ExpressionTypeManagerTest.java b/ksql-execution/src/test/java/io/confluent/ksql/execution/util/ExpressionTypeManagerTest.java index a05490f96a33..f718a25b467d 100644 --- a/ksql-execution/src/test/java/io/confluent/ksql/execution/util/ExpressionTypeManagerTest.java +++ b/ksql-execution/src/test/java/io/confluent/ksql/execution/util/ExpressionTypeManagerTest.java @@ -56,7 +56,7 @@ import io.confluent.ksql.execution.function.udf.structfieldextractor.FetchFieldFromStruct; import io.confluent.ksql.execution.testutil.TestExpressions; import io.confluent.ksql.function.FunctionRegistry; -import io.confluent.ksql.function.KsqlFunction; +import io.confluent.ksql.function.KsqlScalarFunction; import io.confluent.ksql.function.UdfFactory; import io.confluent.ksql.function.udf.UdfMetadata; import io.confluent.ksql.name.ColumnName; @@ -90,7 +90,7 @@ public class ExpressionTypeManagerTest { @Mock private UdfFactory udfFactory; @Mock - private KsqlFunction function; + private KsqlScalarFunction function; private ExpressionTypeManager expressionTypeManager; @@ -268,7 +268,7 @@ public void shouldHandleNestedUdfs() { // Given: givenUdfWithNameAndReturnType("EXTRACTJSONFIELD", Schema.OPTIONAL_STRING_SCHEMA); final UdfFactory outerFactory = mock(UdfFactory.class); - final KsqlFunction function = mock(KsqlFunction.class); + final KsqlScalarFunction function = mock(KsqlScalarFunction.class); givenUdfWithNameAndReturnType("LCASE", Schema.OPTIONAL_STRING_SCHEMA, outerFactory, function); final Expression inner = new FunctionCall( FunctionName.of("EXTRACTJSONFIELD"), @@ -558,7 +558,7 @@ private void givenUdfWithNameAndReturnType( final String name, final Schema returnType, final UdfFactory factory, - final KsqlFunction function) { + final KsqlScalarFunction function) { when(functionRegistry.isAggregate(name)).thenReturn(false); when(functionRegistry.getUdfFactory(name)).thenReturn(factory); when(factory.getFunction(anyList())).thenReturn(function);