Skip to content

Commit

Permalink
fix: do not filter out rows where PARTITION BY resolves to null
Browse files Browse the repository at this point in the history
Fixes: confluentinc#4747

This commit removes the filter that was excluding any rows where the `PARTITION BY` clause resolved to a `null` value, i.e. either because the result was `null` or because an error occurred evaluating the expression.

This change will only affect new queries started. Pre-existing queries will continue to run as before.
  • Loading branch information
big-andy-coates committed Mar 19, 2020
1 parent ef63feb commit ad27020
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -257,14 +257,14 @@
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-Join-right-repartition", "key": 100, "value": {"TT_ROWTIME": 16000, "TT_K": "", "TT_ID": 100, "TT_F1": "newblah", "TT_F2": 150}, "timestamp": 16000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-Join-left-repartition", "key": 90, "value": {"T_ROWTIME": 17000, "T_K": "", "T_ID": 90, "T_NAME": "ninety", "T_VALUE": 90}, "timestamp": 17000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-Join-left-repartition", "key": 0, "value": {"T_ROWTIME": 30000, "T_K": "", "T_ID": 0, "T_NAME": "bar", "T_VALUE": 99}, "timestamp": 30000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000018-store-changelog", "window": {"start": 0, "end": 11000, "type": "time"}, "key": 0, "value": {"T_ROWTIME": 0, "T_K": "", "T_ID": 0, "T_NAME": "zero", "T_VALUE": 0}, "timestamp": 0},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-OUTEROTHER-0000000019-store-changelog", "window": {"start": 10000, "end": 21000, "type": "time"}, "key": 0, "value": {"TT_ROWTIME": 10000, "TT_K": "", "TT_ID": 0, "TT_F1": "blah", "TT_F2": 50}, "timestamp": 10000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000018-store-changelog", "window": {"start": 11000, "end": 22000, "type": "time"}, "key": 10, "value": {"T_ROWTIME": 11000, "T_K": "", "T_ID": 10, "T_NAME": "100", "T_VALUE": 5}, "timestamp": 11000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000018-store-changelog", "window": {"start": 13000, "end": 24000, "type": "time"}, "key": 0, "value": {"T_ROWTIME": 13000, "T_K": "", "T_ID": 0, "T_NAME": "foo", "T_VALUE": 100}, "timestamp": 13000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-OUTEROTHER-0000000019-store-changelog", "window": {"start": 15000, "end": 26000, "type": "time"}, "key": 0, "value": {"TT_ROWTIME": 15000, "TT_K": "", "TT_ID": 0, "TT_F1": "a", "TT_F2": 10}, "timestamp": 15000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-OUTEROTHER-0000000019-store-changelog", "window": {"start": 16000, "end": 27000, "type": "time"}, "key": 100, "value": {"TT_ROWTIME": 16000, "TT_K": "", "TT_ID": 100, "TT_F1": "newblah", "TT_F2": 150}, "timestamp": 16000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000018-store-changelog", "window": {"start": 17000, "end": 28000, "type": "time"}, "key": 90, "value": {"T_ROWTIME": 17000, "T_K": "", "T_ID": 90, "T_NAME": "ninety", "T_VALUE": 90}, "timestamp": 17000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000018-store-changelog", "window": {"start": 30000, "end": 41000, "type": "time"}, "key": 0, "value": {"T_ROWTIME": 30000, "T_K": "", "T_ID": 0, "T_NAME": "bar", "T_VALUE": 99}, "timestamp": 30000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000016-store-changelog", "window": {"start": 0, "end": 11000, "type": "time"}, "key": 0, "value": {"T_ROWTIME": 0, "T_K": "", "T_ID": 0, "T_NAME": "zero", "T_VALUE": 0}, "timestamp": 0},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-OUTEROTHER-0000000017-store-changelog", "window": {"start": 10000, "end": 21000, "type": "time"}, "key": 0, "value": {"TT_ROWTIME": 10000, "TT_K": "", "TT_ID": 0, "TT_F1": "blah", "TT_F2": 50}, "timestamp": 10000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000016-store-changelog", "window": {"start": 11000, "end": 22000, "type": "time"}, "key": 10, "value": {"T_ROWTIME": 11000, "T_K": "", "T_ID": 10, "T_NAME": "100", "T_VALUE": 5}, "timestamp": 11000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000016-store-changelog", "window": {"start": 13000, "end": 24000, "type": "time"}, "key": 0, "value": {"T_ROWTIME": 13000, "T_K": "", "T_ID": 0, "T_NAME": "foo", "T_VALUE": 100}, "timestamp": 13000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-OUTEROTHER-0000000017-store-changelog", "window": {"start": 15000, "end": 26000, "type": "time"}, "key": 0, "value": {"TT_ROWTIME": 15000, "TT_K": "", "TT_ID": 0, "TT_F1": "a", "TT_F2": 10}, "timestamp": 15000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-OUTEROTHER-0000000017-store-changelog", "window": {"start": 16000, "end": 27000, "type": "time"}, "key": 100, "value": {"TT_ROWTIME": 16000, "TT_K": "", "TT_ID": 100, "TT_F1": "newblah", "TT_F2": 150}, "timestamp": 16000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000016-store-changelog", "window": {"start": 17000, "end": 28000, "type": "time"}, "key": 90, "value": {"T_ROWTIME": 17000, "T_K": "", "T_ID": 90, "T_NAME": "ninety", "T_VALUE": 90}, "timestamp": 17000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000016-store-changelog", "window": {"start": 30000, "end": 41000, "type": "time"}, "key": 0, "value": {"T_ROWTIME": 30000, "T_K": "", "T_ID": 0, "T_NAME": "bar", "T_VALUE": 99}, "timestamp": 30000},
{"topic": "LEFT_OUTER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "zero", "VALUE": 0, "F1": null, "F2": null}, "timestamp": 0},
{"topic": "LEFT_OUTER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "zero", "VALUE": 0, "F1": "blah", "F2": 50}, "timestamp": 10000},
{"topic": "LEFT_OUTER_JOIN", "key": 10, "value": {"T_ID": 10, "NAME": "100", "VALUE": 5, "F1": null, "F2": null}, "timestamp": 11000},
Expand Down Expand Up @@ -309,14 +309,14 @@
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-Join-right-repartition", "key": 100, "value": {"TT_ROWTIME": 16000, "TT_K": "", "TT_ID": 100, "TT_F1": "newblah", "TT_F2": 150}, "timestamp": 16000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-Join-left-repartition", "key": 90, "value": {"T_ROWTIME": 17000, "T_K": "", "T_ID": 90, "T_NAME": "ninety", "T_VALUE": 90}, "timestamp": 17000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-Join-left-repartition", "key": 0, "value": {"T_ROWTIME": 30000, "T_K": "", "T_ID": 0, "T_NAME": "bar", "T_VALUE": 99}, "timestamp": 30000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000018-store-changelog", "window": {"start": 0, "end": 11000, "type": "time"}, "key": 0, "value": {"T_ROWTIME": 0, "T_K": "", "T_ID": 0, "T_NAME": "zero", "T_VALUE": 0}, "timestamp": 0},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-OUTEROTHER-0000000019-store-changelog", "window": {"start": 10000, "end": 21000, "type": "time"}, "key": 0, "value": {"TT_ROWTIME": 10000, "TT_K": "", "TT_ID": 0, "TT_F1": "blah", "TT_F2": 50}, "timestamp": 10000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000018-store-changelog", "window": {"start": 11000, "end": 22000, "type": "time"}, "key": 10, "value": {"T_ROWTIME": 11000, "T_K": "", "T_ID": 10, "T_NAME": "100", "T_VALUE": 5}, "timestamp": 11000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000018-store-changelog", "window": {"start": 13000, "end": 24000, "type": "time"}, "key": 0, "value": {"T_ROWTIME": 13000, "T_K": "", "T_ID": 0, "T_NAME": "foo", "T_VALUE": 100}, "timestamp": 13000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-OUTEROTHER-0000000019-store-changelog", "window": {"start": 15000, "end": 26000, "type": "time"}, "key": 0, "value": {"TT_ROWTIME": 15000, "TT_K": "", "TT_ID": 0, "TT_F1": "a", "TT_F2": 10}, "timestamp": 15000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-OUTEROTHER-0000000019-store-changelog", "window": {"start": 16000, "end": 27000, "type": "time"}, "key": 100, "value": {"TT_ROWTIME": 16000, "TT_K": "", "TT_ID": 100, "TT_F1": "newblah", "TT_F2": 150}, "timestamp": 16000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000018-store-changelog", "window": {"start": 17000, "end": 28000, "type": "time"}, "key": 90, "value": {"T_ROWTIME": 17000, "T_K": "", "T_ID": 90, "T_NAME": "ninety", "T_VALUE": 90}, "timestamp": 17000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000018-store-changelog", "window": {"start": 30000, "end": 41000, "type": "time"}, "key": 0, "value": {"T_ROWTIME": 30000, "T_K": "", "T_ID": 0, "T_NAME": "bar", "T_VALUE": 99}, "timestamp": 30000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000016-store-changelog", "window": {"start": 0, "end": 11000, "type": "time"}, "key": 0, "value": {"T_ROWTIME": 0, "T_K": "", "T_ID": 0, "T_NAME": "zero", "T_VALUE": 0}, "timestamp": 0},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-OUTEROTHER-0000000017-store-changelog", "window": {"start": 10000, "end": 21000, "type": "time"}, "key": 0, "value": {"TT_ROWTIME": 10000, "TT_K": "", "TT_ID": 0, "TT_F1": "blah", "TT_F2": 50}, "timestamp": 10000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000016-store-changelog", "window": {"start": 11000, "end": 22000, "type": "time"}, "key": 10, "value": {"T_ROWTIME": 11000, "T_K": "", "T_ID": 10, "T_NAME": "100", "T_VALUE": 5}, "timestamp": 11000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000016-store-changelog", "window": {"start": 13000, "end": 24000, "type": "time"}, "key": 0, "value": {"T_ROWTIME": 13000, "T_K": "", "T_ID": 0, "T_NAME": "foo", "T_VALUE": 100}, "timestamp": 13000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-OUTEROTHER-0000000017-store-changelog", "window": {"start": 15000, "end": 26000, "type": "time"}, "key": 0, "value": {"TT_ROWTIME": 15000, "TT_K": "", "TT_ID": 0, "TT_F1": "a", "TT_F2": 10}, "timestamp": 15000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-OUTEROTHER-0000000017-store-changelog", "window": {"start": 16000, "end": 27000, "type": "time"}, "key": 100, "value": {"TT_ROWTIME": 16000, "TT_K": "", "TT_ID": 100, "TT_F1": "newblah", "TT_F2": 150}, "timestamp": 16000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000016-store-changelog", "window": {"start": 17000, "end": 28000, "type": "time"}, "key": 90, "value": {"T_ROWTIME": 17000, "T_K": "", "T_ID": 90, "T_NAME": "ninety", "T_VALUE": 90}, "timestamp": 17000},
{"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000016-store-changelog", "window": {"start": 30000, "end": 41000, "type": "time"}, "key": 0, "value": {"T_ROWTIME": 30000, "T_K": "", "T_ID": 0, "T_NAME": "bar", "T_VALUE": 99}, "timestamp": 30000},
{"topic": "LEFT_OUTER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "zero", "VALUE": 0, "F1": "", "F2": 0}, "timestamp": 0},
{"topic": "LEFT_OUTER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "zero", "VALUE": 0, "F1": "blah", "F2": 50}, "timestamp": 10000},
{"topic": "LEFT_OUTER_JOIN", "key": 10, "value": {"T_ID": 10, "NAME": "100", "VALUE": 5, "F1": "", "F2": 0}, "timestamp": 11000},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,26 @@
]
}
},
{
"name": "nulls",
"statements": [
"CREATE STREAM TEST (ROWKEY BIGINT KEY, ID bigint, NAME varchar) with (kafka_topic='test_topic', value_format = 'delimited');",
"CREATE STREAM REPARTITIONED AS select id from TEST partition by name;"
],
"properties": {
"ksql.any.key.name.enabled": true
},
"inputs": [
{"topic": "test_topic", "key": 1, "value": null},
{"topic": "test_topic", "key": 2, "value": "4,"},
{"topic": "test_topic", "key": 3, "value": "5,zero"}
],
"outputs": [
{"topic": "REPARTITIONED", "key": null, "value": null},
{"topic": "REPARTITIONED", "key": null, "value": "4"},
{"topic": "REPARTITIONED", "key": "zero", "value": "5"}
]
},
{
"name": "partition by with projection select some",
"statements": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,51 @@
{"topic": "OUTPUT", "key": 0.11, "value": "3"},
{"topic": "OUTPUT", "key": 1.1, "value": "4"}
]
},
{
"name": "project nulls",
"statements": [
"CREATE STREAM INPUT (ID INT KEY, NAME STRING) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT as SELECT name FROM INPUT;"
],
"properties": {
"ksql.any.key.name.enabled": true
},
"inputs": [
{"topic": "test_topic", "key": 1, "value": {"NAME": "Nick"}},
{"topic": "test_topic", "key": null, "value": {"NAME": "null key"}},
{"topic": "test_topic", "key": 2, "value": {}},
{"topic": "test_topic", "key": 3, "value": null},
{"topic": "test_topic", "key": 4, "value": {"NAME": "Fred"}}
],
"outputs": [
{"topic": "OUTPUT", "key": 1, "value": {"NAME": "Nick"}},
{"topic": "OUTPUT", "key": null, "value": {"NAME": "null key"}},
{"topic": "OUTPUT", "key": 2, "value": {"NAME": null}},
{"topic": "OUTPUT", "key": 3, "value": null},
{"topic": "OUTPUT", "key": 4, "value": {"NAME": "Fred"}}
]
},
{
"name": "filter nulls",
"statements": [
"CREATE STREAM INPUT (ID INT KEY, NAME STRING) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT as SELECT * FROM INPUT WHERE ID IS NOT NULL AND NAME IS NOT NULL;"
],
"properties": {
"ksql.any.key.name.enabled": true
},
"inputs": [
{"topic": "test_topic", "key": 1, "value": {"NAME": "Nick"}},
{"topic": "test_topic", "key": null, "value": {"NAME": "null key"}},
{"topic": "test_topic", "key": 2, "value": {}},
{"topic": "test_topic", "key": 3, "value": null},
{"topic": "test_topic", "key": 4, "value": {"NAME": "Fred"}}
],
"outputs": [
{"topic": "OUTPUT", "key": 1, "value": {"NAME": "Nick"}},
{"topic": "OUTPUT", "key": 4, "value": {"NAME": "Fred"}}
]
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,34 +20,26 @@
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.KeyValue;

public final class PartitionByParams {

private final LogicalSchema schema;
private BiPredicate<Struct, GenericRow> predicate;
private BiFunction<Struct, GenericRow, KeyValue<Struct, GenericRow>> mapper;
private final BiFunction<Struct, GenericRow, KeyValue<Struct, GenericRow>> mapper;

public PartitionByParams(
final LogicalSchema schema,
final BiPredicate<Struct, GenericRow> predicate,
final BiFunction<Struct, GenericRow, KeyValue<Struct, GenericRow>> mapper
) {
this.schema = requireNonNull(schema, "schema");
this.predicate = requireNonNull(predicate, "predicate");
this.mapper = requireNonNull(mapper, "mapper");
}

public LogicalSchema getSchema() {
return schema;
}

public BiPredicate<Struct, GenericRow> getPredicate() {
return predicate;
}

public BiFunction<Struct, GenericRow, KeyValue<Struct, GenericRow>> getMapper() {
return mapper;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import io.confluent.ksql.util.KsqlConfig;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;
import java.util.function.Function;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.KeyValue;
Expand Down Expand Up @@ -65,7 +64,10 @@ public static PartitionByParams build(
final LogicalSchema resultSchema =
buildSchema(sourceSchema, partitionBy, functionRegistry, partitionByCol);

return buildMapper(resultSchema, partitionByCol, evaluator);
final BiFunction<Struct, GenericRow, KeyValue<Struct, GenericRow>> mapper =
buildMapper(resultSchema, partitionByCol, evaluator);

return new PartitionByParams(resultSchema, mapper);
}

public static LogicalSchema buildSchema(
Expand Down Expand Up @@ -123,7 +125,7 @@ private static Optional<Column> getPartitionByCol(
return Optional.of(column);
}

private static PartitionByParams buildMapper(
private static BiFunction<Struct, GenericRow, KeyValue<Struct, GenericRow>> buildMapper(
final LogicalSchema resultSchema,
final Optional<Column> partitionByCol,
final Function<GenericRow, Object> evaluator
Expand All @@ -135,15 +137,7 @@ private static PartitionByParams buildMapper(

final KeyBuilder keyBuilder = StructKeyUtil.keyBuilder(resultSchema);

final BiPredicate<Struct, GenericRow> predicate = (k, v) -> {
if (v == null) {
return false;
}

return evaluator.apply(v) != null;
};

final BiFunction<Struct, GenericRow, KeyValue<Struct, GenericRow>> mapper = (k, v) -> {
return (k, v) -> {
final Object newKey = evaluator.apply(v);
final Struct structKey = keyBuilder.build(newKey);

Expand All @@ -153,8 +147,6 @@ private static PartitionByParams buildMapper(

return new KeyValue<>(structKey, v);
};

return new PartitionByParams(resultSchema, predicate, mapper);
}

private static Function<GenericRow, Object> buildExpressionEvaluator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
Expand Down Expand Up @@ -54,15 +53,12 @@ public static KStreamHolder<Struct> build(
logger
);

final BiPredicate<Struct, GenericRow> predicate = params.getPredicate();
final BiFunction<Struct, GenericRow, KeyValue<Struct, GenericRow>> mapper = params.getMapper();

// This cast is safe because selectKey is not allowed on windowed streams:
final KStream<Struct, GenericRow> kStream = (KStream<Struct, GenericRow>) stream.getStream();

final KStream<Struct, GenericRow> reKeyed = kStream
.filter(predicate::test, Named
.as(queryContext.formatContext() + "-FilterNulls"))
.map(mapper::apply, Named.as(queryContext.formatContext() + "-SelectKey"));

return new KStreamHolder<>(
Expand Down
Loading

0 comments on commit ad27020

Please sign in to comment.