Skip to content

Commit

Permalink
feat: introduce transform and reduce invocation function for lambdas (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenpyzhang authored Feb 24, 2021
1 parent 96bbd70 commit 563ff9b
Show file tree
Hide file tree
Showing 52 changed files with 4,531 additions and 375 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.lambda;

import io.confluent.ksql.execution.codegen.helpers.TriFunction;
import io.confluent.ksql.function.FunctionCategory;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.util.KsqlConstants;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.function.BiFunction;

/**
* Reduce a collection using an initial state and function
*/
@UdfDescription(
name = "reduce",
category = FunctionCategory.LAMBDA,
description = "Reduce the input collection down to a single value "
+ "using an initial state and a function. "
+ "The initial state (s) is passed into the scope of the function. "
+ "Each invocation returns a new value for s, "
+ "which the next invocation will receive. "
+ "The final value for s is returned.",
author = KsqlConstants.CONFLUENT_AUTHOR
)
public class Reduce {

@Udf(description = "When reducing an array, "
+ "the reduce function must have two arguments. "
+ "The two arguments for the reduce function are in order: "
+ "the state and the array item. "
+ "The final state is returned."
)
public <T,S> S reduceArray(
@UdfParameter(description = "The initial state.") final S initialState,
@UdfParameter(description = "The array.") final List<T> list,
@UdfParameter(description = "The reduce function.") final BiFunction<S, T, S> biFunction
) {
if (initialState == null) {
return null;
}

if (list == null) {
return initialState;
}

S state = initialState;
for (T listItem: list) {
state = biFunction.apply(state, listItem);
}
return state;
}

@Udf(description = "When reducing a map, "
+ "the reduce function must have three arguments. "
+ "The three arguments for the reduce function are in order: "
+ "the state, the key, and the value. "
+ "The final state is returned."
)
public <K,V,S> S reduceMap(
@UdfParameter(description = "The initial state.") final S initialState,
@UdfParameter(description = "The map.") final Map<K, V> map,
@UdfParameter(description = "The reduce function.") final TriFunction<S, K, V, S> triFunction
) {
if (initialState == null) {
return null;
}

if (map == null) {
return initialState;
}

S state = initialState;
for (Entry<K, V> entry : map.entrySet()) {
state = triFunction.apply(state, entry.getKey(), entry.getValue());
}
return state;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,35 +13,56 @@
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.map;
package io.confluent.ksql.function.udf.lambda;

import io.confluent.ksql.function.FunctionCategory;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.util.KsqlConstants;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* Transform a map's key and values using two lambda functions
* Transform a collection with a function
*/
@UdfDescription(
name = "map_transform",
category = FunctionCategory.MAP,
description = "Apply one function to each key and "
+ "one function to each value of a map. "
+ "The two arguments for each function are in order: key, value. "
+ "The first function provided will be applied to each key and the "
+ "second one applied to each value. "
+ "The transformed map is returned.",
name = "transform",
category = FunctionCategory.LAMBDA,
description = "Apply a function to each element in a collection. "
+ "The transformed collection is returned.",
author = KsqlConstants.CONFLUENT_AUTHOR
)
public class MapTransform {
public class Transform {

@Udf
public <K,V,R,T> Map<R,T> mapTransform(
@Udf(description = "When transforming an array, "
+ "the function provided must have two arguments. "
+ "The two arguments for each function are in order: "
+ "the key and then the value. "
+ "The transformed array is returned."
)
public <T, R> List<R> transformArray(
@UdfParameter(description = "The array") final List<T> array,
@UdfParameter(description = "The lambda function") final Function<T, R> function
) {
if (array == null) {
return null;
}
return array.stream().map(function::apply).collect(Collectors.toList());
}

@Udf(description = "When transforming a map, "
+ "two functions must be provided. "
+ "For each map entry, the first function provided will "
+ "be applied to the key and the second one applied to the value. "
+ "Each function must have two arguments. "
+ "The two arguments for each function are in order: the key and then the value. "
+ "The transformed map is returned."
)
public <K,V,R,T> Map<R,T> transformMap(
@UdfParameter(description = "The map") final Map<K, V> map,
@UdfParameter(description = "The key lambda function") final BiFunction<K, V, R> biFunction1,
@UdfParameter(description = "The value lambda function") final BiFunction<K, V, T> biFunction2
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -203,15 +203,15 @@ public void shouldLoadLambdaReduceUdfs() {
SqlTypes.INTEGER);

// When:
final KsqlScalarFunction fun = FUNC_REG.getUdfFactory(FunctionName.of("reduce_map"))
final KsqlScalarFunction fun = FUNC_REG.getUdfFactory(FunctionName.of("reduce"))
.getFunction(
ImmutableList.of(
SqlArgument.of(SqlMap.of(SqlTypes.INTEGER, SqlTypes.INTEGER)),
SqlArgument.of(SqlTypes.INTEGER),
SqlArgument.of(SqlMap.of(SqlTypes.INTEGER, SqlTypes.INTEGER)),
SqlArgument.of(lambda)));

// Then:
assertThat(fun.name().text(), equalToIgnoringCase("reduce_map"));
assertThat(fun.name().text(), equalToIgnoringCase("reduce"));
}

@Test
Expand All @@ -223,14 +223,14 @@ public void shouldLoadLambdaTransformUdfs() {
SqlTypes.INTEGER);

// When:
final KsqlScalarFunction fun = FUNC_REG.getUdfFactory(FunctionName.of("array_transform"))
final KsqlScalarFunction fun = FUNC_REG.getUdfFactory(FunctionName.of("transform"))
.getFunction(
ImmutableList.of(
SqlArgument.of(SqlArray.of(SqlTypes.INTEGER)),
SqlArgument.of(lambda)));

// Then:
assertThat(fun.name().text(), equalToIgnoringCase("array_transform"));
assertThat(fun.name().text(), equalToIgnoringCase("transform"));
}

@Test
Expand Down
Loading

0 comments on commit 563ff9b

Please sign in to comment.