Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KSQL should not filter out stream rows with null key #4747

Closed
big-andy-coates opened this issue Mar 10, 2020 · 4 comments · Fixed by #4823
Closed

KSQL should not filter out stream rows with null key #4747

big-andy-coates opened this issue Mar 10, 2020 · 4 comments · Fixed by #4823
Assignees

Comments

@big-andy-coates
Copy link
Contributor

It makes sense that tables ignore any records in their changelog logs (or sources) would null keys. A table requires a primary key, and hence a null is invalid.

However, streams have no such restriction. Yet, ksql will filter out rows where the key is null, e.g. the following fails:

{
      "name": "partition by with null value",
      "statements": [
        "CREATE STREAM TEST (ROWKEY BIGINT KEY, ID bigint) with (kafka_topic='test_topic', value_format = 'json');",
        "CREATE STREAM REPARTITIONED AS select * from TEST partition by ID;"
      ],
      "inputs": [
        {"topic": "test_topic", "key": 0, "value": null},
        {"topic": "test_topic", "key": 0, "value": {}},
        {"topic": "test_topic", "key": 0, "value": {"ID": 10}}
      ],

      "outputs": [
        {"topic": "REPARTITIONED", "key":  null, "value": {}},
        {"topic": "REPARTITIONED", "key":  null, "value": {}}
        {"topic": "REPARTITIONED", "key": 10, "value": {"ID": 10}}
      ]
    },
    

To get it to pass one must remove the output rows with null keys:

{
      "name": "partition by with null value",
      "statements": [
        "CREATE STREAM TEST (ROWKEY BIGINT KEY, ID bigint) with (kafka_topic='test_topic', value_format = 'json');",
        "CREATE STREAM REPARTITIONED AS select * from TEST partition by ID;"
      ],
      "inputs": [
        {"topic": "test_topic", "key": 0, "value": null},
        {"topic": "test_topic", "key": 0, "value": {}},
        {"topic": "test_topic", "key": 0, "value": {"ID": 10}}
      ],

      "outputs": [
        {"topic": "REPARTITIONED", "key": 10, "value": {"ID": 10}}
      ]
    },
    
@blueedgenick
Copy link
Contributor

blueedgenick commented Mar 10, 2020 via email

@mjsax
Copy link
Member

mjsax commented Mar 11, 2020

That is an interesting case. Note that a query like this would be "non-deterministic" with regard the output topic partitioning, because for the null-key record a random partition would be selected.

Hence, I am wondering if this might be an issue or not?

@big-andy-coates
Copy link
Contributor Author

Yikes! This seems like a pretty serious bug - can you tell if this had always been the case or it was a regression somewhere along the line?

I think this has always been the case.

@big-andy-coates
Copy link
Contributor Author

That is an interesting case. Note that a query like this would be "non-deterministic" with regard the output topic partitioning, because for the null-key record a random partition would be selected.

Hence, I am wondering if this might be an issue or not?

The non-determinism may well be an issue. If it is, then I'm guessing we'll need to introduce a SQL NULL object of some kind, which will serialize to something other than null bytes. This would ensure consistent partitioning.

However, I feel the introduction of a NULL object is potentially best left to a different ticket. Initially, we can just ensure we're not filtering out these rows.

big-andy-coates added a commit to big-andy-coates/ksql that referenced this issue Mar 19, 2020
Fixes: confluentinc#4747

This commit removes the filter that was excluding any rows where the `PARTITION BY` clause resolved to a `null` value, i.e. either because the result was `null` or because an error occurred evaluating the expression.

This change will only affect new queries started. Pre-existing queries will continue to run as before.
big-andy-coates added a commit that referenced this issue Mar 23, 2020
* fix: fix repartition semantics

Fixes: #4749

##### Background

This change fixes an issue with our repartition semantics.

Old style query semantics for partition by are broken:

S1: ROWKEY => B, C  (Meaning S1 has a schema with ROWKEY as the key column, and B and C as value columns - types aren't important).

```sql
CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY B;
```

S2: ROWKEY => B, C

As you can see the schema of S2 is still the same.  However, the old data in the key has been lost as its been overridden with the data from B, and the key now duplicates the data stored in B.

This loss of data on a `SELECT * .. PARTITION BY` needed fixing.

Secondly, with new primitive key [work to remove the restriction on key column naming](#3536), the same query semantics do not work. e.g.

S1: A => B, C

```sql
CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY B;
```

S2: B => B, C

This fails as the `B` value column clashes with the `B` key column!

##### The fix

This commit fixes the PARTITION BY semantics so that any PARTITION BY on a specific column sees the old key column being moved to the value and the new key column being moved from the value to the key. For example,

S1: A => B, C

```sql
CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY B;
```

Results in the schema: S2: B => C, A.

If a PARTITION BY is an expression other than a column reference, then ksql will synthesis a new column name. For example,

S1: A => B, C

```sql
CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY CAST(B AS INT);
```

Results in the schema: S2: KSQL_COL_0 => B, C, A.

[This github issue](#4813) will add the ability to use aliases in PARTITION BY expressions, allowing a custom name to be assigned.

* fix: do not filter out rows where PARTITION BY resolves to null

Fixes: #4747

This commit removes the filter that was excluding any rows where the `PARTITION BY` clause resolved to a `null` value, i.e. either because the result was `null` or because an error occurred evaluating the expression.

This change will only affect new queries started. Pre-existing queries will continue to run as before.

* docs: call out limitation of partiiton by NULL
@big-andy-coates big-andy-coates self-assigned this Mar 30, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants