Skip to content

Commit

Permalink
fix: support GROUP BY with no source columns used (#5644)
Browse files Browse the repository at this point in the history
fixes:

Adds support for a `GROUP BY` query where there are no source columns used in the query, e.g.

```sql
CREATE TABLE OUTPUT as SELECT 1 as k, count(1) AS ID FROM INPUT group by 1;
```

This was previously failing as it ran into an issue deserializing the query plan due to `nonAggColumns` not being present in the plan.

Co-authored-by: Andy Coates <big-andy-coates@users.noreply.github.com>
  • Loading branch information
2 people authored and agavra committed Jun 19, 2020
1 parent 11bee27 commit a8e6630
Show file tree
Hide file tree
Showing 26 changed files with 1,625 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import static java.util.Objects.requireNonNull;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.errorprone.annotations.Immutable;
Expand Down Expand Up @@ -52,6 +54,10 @@ public StreamAggregate(
ImmutableList.copyOf(requireNonNull(nonAggregateColumns, "nonAggregateColumns"));
this.aggregationFunctions = ImmutableList.copyOf(
requireNonNull(aggregationFunctions, "aggregationFunctions"));

if (aggregationFunctions.isEmpty()) {
throw new IllegalArgumentException("Need at least one aggregate function");
}
}

@Override
Expand All @@ -73,6 +79,7 @@ public Formats getInternalFormats() {
return internalFormats;
}

@JsonInclude(Include.NON_NULL)
public List<ColumnName> getNonAggregateColumns() {
return nonAggregateColumns;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ public StreamFlatMap(
this.properties = Objects.requireNonNull(props, "props");
this.source = Objects.requireNonNull(source, "source");
this.tableFunctions = ImmutableList.copyOf(Objects.requireNonNull(tableFunctions));

if (tableFunctions.isEmpty()) {
throw new IllegalArgumentException("Need at latest one table function");
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ public StreamGroupBy(
this.internalFormats = requireNonNull(internalFormats, "internalFormats");
this.source = requireNonNull(source, "source");
this.groupByExpressions = ImmutableList.copyOf(requireNonNull(groupBys, "groupBys"));

if (groupByExpressions.isEmpty()) {
throw new IllegalArgumentException("Need at least one grouping expression");
}
}

public List<Expression> getGroupByExpressions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ public StreamSelect(
this.source = requireNonNull(source, "source");
this.keyColumnNames = ImmutableList.copyOf(keyColumnNames);
this.selectExpressions = ImmutableList.copyOf(selectExpressions);

if (selectExpressions.isEmpty()) {
throw new IllegalArgumentException("Need at least one select expression");
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import static java.util.Objects.requireNonNull;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.errorprone.annotations.Immutable;
Expand Down Expand Up @@ -59,6 +61,10 @@ public StreamWindowedAggregate(
this.aggregationFunctions = ImmutableList.copyOf(
requireNonNull(aggregationFunctions, "aggregationFunctions"));
this.windowExpression = requireNonNull(windowExpression, "windowExpression");

if (aggregationFunctions.isEmpty()) {
throw new IllegalArgumentException("Need at least one aggregate function");
}
}

@Override
Expand All @@ -80,6 +86,7 @@ public Formats getInternalFormats() {
return internalFormats;
}

@JsonInclude(Include.NON_NULL)
public List<ColumnName> getNonAggregateColumns() {
return nonAggregateColumns;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import static java.util.Objects.requireNonNull;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.errorprone.annotations.Immutable;
Expand Down Expand Up @@ -51,6 +53,10 @@ public TableAggregate(
= ImmutableList.copyOf(requireNonNull(nonAggregateColumns, "nonAggregatecolumns"));
this.aggregationFunctions = ImmutableList
.copyOf(requireNonNull(aggregationFunctions, "aggValToFunctionMap"));

if (aggregationFunctions.isEmpty()) {
throw new IllegalArgumentException("Need at least one aggregate function");
}
}

@Override
Expand All @@ -72,6 +78,7 @@ public List<FunctionCall> getAggregationFunctions() {
return aggregationFunctions;
}

@JsonInclude(Include.NON_NULL)
public List<ColumnName> getNonAggregateColumns() {
return nonAggregateColumns;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ public TableGroupBy(
this.source = requireNonNull(source, "source");
this.internalFormats = requireNonNull(internalFormats, "internalFormats");
this.groupByExpressions = ImmutableList.copyOf(requireNonNull(groupBys, "groupBys"));

if (groupByExpressions.isEmpty()) {
throw new IllegalArgumentException("Need at least one grouping expression");
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ public TableSelect(
this.source = requireNonNull(source, "source");
this.keyColumnNames = ImmutableList.copyOf(keyColumnNames);
this.selectExpressions = ImmutableList.copyOf(selectExpressions);

if (selectExpressions.isEmpty()) {
throw new IllegalArgumentException("Need at least one select expression");
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
{
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM INPUT (VALUE INTEGER) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "INPUT",
"schema" : "`VALUE` INTEGER",
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
}
}
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE TABLE OUTPUT AS SELECT\n 1 K,\n LATEST_BY_OFFSET(INPUT.VALUE) VALUE,\n COUNT(1) ID\nFROM INPUT INPUT\nGROUP BY 1\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createTableV1",
"sourceName" : "OUTPUT",
"schema" : "`K` INTEGER KEY, `VALUE` INTEGER, `ID` BIGINT",
"topicName" : "OUTPUT",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
}
},
"queryPlan" : {
"sources" : [ "INPUT" ],
"sink" : "OUTPUT",
"physicalPlan" : {
"@type" : "tableSinkV1",
"properties" : {
"queryContext" : "OUTPUT"
},
"source" : {
"@type" : "tableSelectV1",
"properties" : {
"queryContext" : "Aggregate/Project"
},
"source" : {
"@type" : "streamAggregateV1",
"properties" : {
"queryContext" : "Aggregate/Aggregate"
},
"source" : {
"@type" : "streamGroupByV1",
"properties" : {
"queryContext" : "Aggregate/GroupBy"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Aggregate/Prepare"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"sourceSchema" : "`VALUE` INTEGER"
},
"selectExpressions" : [ "VALUE AS VALUE", "1 AS KSQL_INTERNAL_COL_1" ]
},
"internalFormats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"groupByExpressions" : [ "1" ]
},
"internalFormats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"nonAggregateColumns" : [ "VALUE" ],
"aggregationFunctions" : [ "LATEST_BY_OFFSET(VALUE)", "COUNT(KSQL_INTERNAL_COL_1)" ]
},
"keyColumnNames" : [ "K" ],
"selectExpressions" : [ "KSQL_AGG_VARIABLE_0 AS VALUE", "KSQL_AGG_VARIABLE_1 AS ID" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"topicName" : "OUTPUT"
},
"queryId" : "CTAS_OUTPUT_0"
}
} ],
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"ksql.transient.prefix" : "transient_",
"ksql.persistence.wrap.single.values" : "true",
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.schema.registry.url" : "",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.query.pull.max.qps" : "2147483647",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.metric.reporters" : "",
"ksql.query.pull.metrics.enabled" : "false",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.metrics.extension" : null,
"ksql.streams.topology.optimization" : "all",
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.streams.num.stream.threads" : "4",
"ksql.timestamp.throw.on.invalid" : "false",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.metrics.tags.custom" : "",
"ksql.pull.queries.enable" : "true",
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.udf.collect.metrics" : "false",
"ksql.persistent.prefix" : "query_",
"ksql.query.persistent.active.limit" : "2147483647",
"ksql.error.classifier.regex" : ""
}
}
Loading

0 comments on commit a8e6630

Please sign in to comment.