Skip to content

Commit

Permalink
fix: update Concat UDF to new framework and make variadic (#5513)
Browse files Browse the repository at this point in the history
* update Concat UDF to new framework and make variadic

* missed file

* Almog's requested change

* fix typo in test case

* historical plan

* trivial doc change to trigger build
  • Loading branch information
blueedgenick authored Jun 2, 2020
1 parent 30309bf commit cab6a86
Show file tree
Hide file tree
Showing 12 changed files with 254 additions and 45 deletions.
4 changes: 2 additions & 2 deletions docs/developer-guide/ksqldb-reference/scalar-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -274,10 +274,10 @@ include both endpoints.
### `CONCAT`

```sql
CONCAT(col1, '_hello')
CONCAT(col1, col2, 'hello', ..., col-n)
```

Concatenate two or more strings.
Concatenate two or more string expressions. Any input strings which evaluate to NULL are replaced with empty string in the output.

### `EXTRACTJSONFIELD`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import io.confluent.ksql.function.udf.UdfMetadata;
import io.confluent.ksql.function.udf.json.JsonExtractStringKudf;
import io.confluent.ksql.function.udf.math.RandomKudf;
import io.confluent.ksql.function.udf.string.ConcatKudf;
import io.confluent.ksql.function.udf.string.LCaseKudf;
import io.confluent.ksql.function.udf.string.LenKudf;
import io.confluent.ksql.function.udf.string.TrimKudf;
Expand Down Expand Up @@ -274,12 +273,6 @@ private void addStringFunctions() {
FunctionName.of("UCASE"), UCaseKudf.class
));

addBuiltInFunction(KsqlScalarFunction.createLegacyBuiltIn(
SqlTypes.STRING,
ImmutableList.of(ParamTypes.STRING, ParamTypes.STRING),
FunctionName.of(ConcatKudf.NAME), ConcatKudf.class
));

addBuiltInFunction(KsqlScalarFunction.createLegacyBuiltIn(
SqlTypes.STRING,
Collections.singletonList(ParamTypes.STRING),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,28 @@

package io.confluent.ksql.function.udf.string;

import io.confluent.ksql.function.KsqlFunctionException;
import io.confluent.ksql.function.udf.Kudf;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import java.util.Arrays;
import java.util.Objects;
import java.util.stream.Collectors;

public class ConcatKudf implements Kudf {
public static final String NAME = "CONCAT";
@UdfDescription(
name = "concat",
description = "Concatenate an arbitrary number of string fields together")
public class Concat {

@Override
public String evaluate(final Object... args) {
if (args.length < 2) {
throw new KsqlFunctionException(NAME + " should have at least two input argument.");
@Udf
public String concat(@UdfParameter(
description = "The varchar fields to concatenate") final String... inputs) {
if (inputs == null) {
return null;
}

return Arrays.stream(args)
return Arrays.stream(inputs)
.filter(Objects::nonNull)
.map(Object::toString)
.collect(Collectors.joining());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,8 @@ public void shouldHaveBuiltInUDFRegistered() {
// Verify that all built-in UDF are correctly registered in the InternalFunctionRegistry
final List<String> buildtInUDF = Arrays.asList(
// String UDF
"LCASE", "UCASE", "CONCAT", "TRIM", "LEN",
"LCASE", "UCASE", "TRIM",
"LEN",
// Math UDF
"RANDOM",
// JSON UDF
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,46 +18,42 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

import io.confluent.ksql.function.udf.KudfTester;
import org.junit.Before;
import org.junit.Test;

public class ConcatKudfTest {
public class ConcatTest {

private ConcatKudf udf;
private Concat udf;

@Before
public void setUp() {
udf = new ConcatKudf();
udf = new Concat();
}

@Test
public void shouldBeWellBehavedUdf() {
new KudfTester(ConcatKudf::new)
.withArgumentTypes(Object.class, Object.class)
.withUnboundedMaxArgCount()
.test();
public void shouldConcatStrings() {
assertThat(udf.concat("The", "Quick", "Brown", "Fox"), is("TheQuickBrownFox"));
}

@Test
public void shouldConcatStrings() {
assertThat(udf.evaluate("Hello", " Mum"), is("Hello Mum"));
public void shouldIgnoreNullInputs() {
assertThat(udf.concat(null, "this ", null, "should ", null, "work!", null),
is("this should work!"));
}

@Test
public void shouldConcatNonStrings() {
assertThat(udf.evaluate(1.345, 34), is("1.34534"));
public void shouldReturnEmptyStringIfAllInputsNull() {
assertThat(udf.concat(null, null), is(""));
}

@Test
public void shouldConcatIgnoringNulls() {
assertThat(
udf.evaluate(null, "this ", null, "should ", null, "work!", null),
is("this should work!"));
public void shouldReturnSingleInput() {
assertThat(udf.concat("singular"), is("singular"));
}

@Test
public void shouldReturnEmptyStringIfAllArgsNull() {
assertThat(udf.evaluate(null, null), is(""));
public void shouldReturnEmptyStringForSingleNullInput() {
assertThat(udf.concat((String) null), is(""));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
{
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM TEST (ID STRING KEY, SOURCE STRING) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "TEST",
"schema" : "`ID` STRING KEY, `SOURCE` STRING",
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
}
}
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM OUTPUT AS SELECT\n TEST.ID ID,\n CONCAT('prefix-', TEST.SOURCE, '-postfix') THING\nFROM TEST TEST\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "OUTPUT",
"schema" : "`ID` STRING KEY, `THING` STRING",
"topicName" : "OUTPUT",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
}
},
"queryPlan" : {
"sources" : [ "TEST" ],
"sink" : "OUTPUT",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "OUTPUT"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"sourceSchema" : "`ID` STRING KEY, `SOURCE` STRING"
},
"keyColumnNames" : [ "ID" ],
"selectExpressions" : [ "CONCAT('prefix-', SOURCE, '-postfix') AS THING" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"topicName" : "OUTPUT"
},
"queryId" : "CSAS_OUTPUT_0"
}
} ],
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"ksql.transient.prefix" : "transient_",
"ksql.persistence.wrap.single.values" : "true",
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.schema.registry.url" : "",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.query.pull.max.qps" : "2147483647",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.metric.reporters" : "",
"ksql.query.pull.metrics.enabled" : "false",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.metrics.extension" : null,
"ksql.streams.topology.optimization" : "all",
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.streams.num.stream.threads" : "4",
"ksql.timestamp.throw.on.invalid" : "false",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.metrics.tags.custom" : "",
"ksql.pull.queries.enable" : "true",
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.udf.collect.metrics" : "false",
"ksql.persistent.prefix" : "query_",
"ksql.query.persistent.active.limit" : "2147483647",
"ksql.error.classifier.regex" : ""
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
{
"version" : "6.0.0",
"timestamp" : 1591052563853,
"path" : "query-validation-tests\\concat.json",
"schemas" : {
"CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT<SOURCE VARCHAR> NOT NULL",
"CSAS_OUTPUT_0.OUTPUT" : "STRUCT<THING VARCHAR> NOT NULL"
},
"testCase" : {
"name" : "concat fields using CONCAT",
"inputs" : [ {
"topic" : "test_topic",
"key" : "",
"value" : {
"source" : "s1"
}
}, {
"topic" : "test_topic",
"key" : "",
"value" : {
"source" : "s2"
}
} ],
"outputs" : [ {
"topic" : "OUTPUT",
"key" : "",
"value" : {
"THING" : "prefix-s1-postfix"
}
}, {
"topic" : "OUTPUT",
"key" : "",
"value" : {
"THING" : "prefix-s2-postfix"
}
} ],
"topics" : [ {
"name" : "OUTPUT",
"replicas" : 1,
"numPartitions" : 4
}, {
"name" : "test_topic",
"replicas" : 1,
"numPartitions" : 4
} ],
"statements" : [ "CREATE STREAM TEST (ID STRING KEY, source VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT ID, CONCAT('prefix-', source, '-postfix') AS THING FROM TEST;" ],
"post" : {
"topics" : {
"topics" : [ {
"name" : "OUTPUT",
"keyFormat" : {
"formatInfo" : {
"format" : "KAFKA"
}
},
"valueFormat" : {
"format" : "JSON"
},
"partitions" : 4
}, {
"name" : "test_topic",
"keyFormat" : {
"formatInfo" : {
"format" : "KAFKA"
}
},
"valueFormat" : {
"format" : "JSON"
},
"partitions" : 4
} ]
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic])
--> KSTREAM-TRANSFORMVALUES-0000000001
Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: [])
--> Project
<-- KSTREAM-SOURCE-0000000000
Processor: Project (stores: [])
--> KSTREAM-SINK-0000000003
<-- KSTREAM-TRANSFORMVALUES-0000000001
Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT)
<-- Project

Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"name": "concat fields using CONCAT",
"statements": [
"CREATE STREAM TEST (ID STRING KEY, source VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT ID, CONCAT('prefix-', CONCAT(source, '-postfix')) AS THING FROM TEST;"
"CREATE STREAM OUTPUT AS SELECT ID, CONCAT('prefix-', source, '-postfix') AS THING FROM TEST;"
],
"inputs": [
{"topic": "test_topic", "value": {"source": "s1"}},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ public void shouldDescribeUDF() {
// When:
final FunctionDescriptionList functionList = (FunctionDescriptionList)
CustomExecutors.DESCRIBE_FUNCTION.execute(
engine.configure("DESCRIBE FUNCTION CONCAT;"),
engine.configure(
"DESCRIBE FUNCTION TRIM;"),
mock(SessionProperties.class),
engine.getEngine(),
engine.getServiceContext()
Expand All @@ -49,7 +50,7 @@ public void shouldDescribeUDF() {
assertThat(functionList, new TypeSafeMatcher<FunctionDescriptionList>() {
@Override
protected boolean matchesSafely(final FunctionDescriptionList item) {
return functionList.getName().equals("CONCAT")
return functionList.getName().equals("TRIM")
&& functionList.getType().equals(FunctionType.SCALAR);
}

Expand Down
Loading

0 comments on commit cab6a86

Please sign in to comment.