Skip to content

Commit

Permalink
feat: remove restriction that key columns must be called ROWKEY
Browse files Browse the repository at this point in the history
  • Loading branch information
big-andy-coates committed Mar 2, 2020
1 parent ce74cf8 commit 5e9cb56
Show file tree
Hide file tree
Showing 82 changed files with 1,170 additions and 567 deletions.
2 changes: 1 addition & 1 deletion docs-md/concepts/collections/inserting-events.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ INSERT INTO all_publications (author, title) VALUES ('C.S. Lewis', 'The Silver C

Any column that doesn't get a value explicitly is set to `null`. If no columns
are specified, a value for every column is expected in the same order as the
schema, with `ROWKEY` as the first column. If columns are specified, the order
schema, with the key columns first. If columns are specified, the order
doesn’t matter. You can specify `ROWTIME` as an explicit column, but it’s not
required when you omit the column specifications.

Expand Down
12 changes: 7 additions & 5 deletions docs-md/concepts/events.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,12 @@ to {{ site.ak }}, they also describe the time at which the event was true.

ksqlDB aims to raise the abstraction from working with a lower-level stream
processor. Usually, an event is called a "row", as if it were a row in a
relational database. Each row is composed of a series of columns. Most columns
represent fields in the value of an event, but there are a few extra columns.
In particular, there are the `ROWKEY` and `ROWTIME` columns that represent the
key and time of the event. These system columns are present on every row.
In addition, windowed sources have `WINDOWSTART` and `WINDOWEND` columns.
relational database. Each row is composed of a series of columns.
ksqlDB currently supports only a single key column and multiple value columns.
These columns represent fields in the key and value of an event, respectively.
There are also a few extra system columns. The `ROWTIME` column that represent the
time of the event, in milliseconds. This system column is present on every row.
In addition, windowed sources have `WINDOWSTART` and `WINDOWEND` columns, which
represent the bounds on the window, in milliseconds.

Page last revised on: {{ git_revision_date }}
11 changes: 5 additions & 6 deletions docs-md/concepts/queries/pull.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,9 @@ Pull query features and limitations
- Pull queries are currently available only against materialized aggregate
tables, which means tables that are created by using a persistent query
with a GROUP BY clause.
- For non-windowed aggregations, pull queries only support looking up events
by key.
- WHERE clauses can only have `ROWKEY=x`-style bounds for non-windowed tables.
- Windowed tables support bounds on WINDOWSTART using operators
- For non-windowed tables, pull queries only support looking up events
by key, i.e. the WHERE clause can only have `<key-column>=x`-style filters.
- Windowed tables support additional bounds on WINDOWSTART using operators
`<=`, `<`, `=`, `>`, `>=`.
- JOIN, PARTITION BY, GROUP BY and WINDOW clauses aren't supported.
- SELECT statements can contain column arithmetic and function calls.
Expand All @@ -48,12 +47,12 @@ Example pull query

The following pull query gets all events for the specified user that have a
timestamp within the specified time window. The WHERE clause must contain a
single value of `ROWKEY` to retrieve and may optionally include bounds on
single value of the key, `userId` in this case, to retrieve and may optionally include bounds on
WINDOWSTART if the materialized table is windowed.

```sql
SELECT * FROM user_location
WHERE ROWKEY = 'user19r7t33'
WHERE userId = 'user19r7t33'
AND '2019-10-02T21:31:16' <= WINDOWSTART AND WINDOWSTART <= '2019-10-03T21:31:16';
```

Expand Down
15 changes: 7 additions & 8 deletions docs-md/concepts/schemas.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,17 @@ underlying {{ site.ak }} topic.
A column is defined by a combination of its [name](#valid-identifiers), its [SQL data type](#sql-data-types),
and possibly a namespace.

Key columns have a `KEY` namespace suffix. Key columns have the following restrictions:
* The can only be a single key column, currently.
* The key column must be named `ROWKEY` in the KSQL schema.
Key columns have a `KEY` namespace suffix. The can only be a single key column, currently, and the
key column can have any [valid name](#valid-identifiers).

Value columns have no namespace suffix. There can be one or more value columns amd the value columns
can have any name.
Value columns have no namespace suffix. There can be one or more value columns and the value columns
can have any [valid name](#valid-identifiers).

For example, the following declares a schema with a single `INT` key column and several value
columns:
For example, the following declares a schema with a single `INT` key column, containing some Id, and
several value columns:

```sql
ROWKEY INT KEY, ID BIGINT, STRING NAME, ADDRESS ADDRESS_TYPE
ID INT KEY, STRING NAME, ADDRESS ADDRESS_TYPE
```

## Valid Identifiers
Expand Down
5 changes: 3 additions & 2 deletions docs-md/concepts/stream-processing.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ collection by using the `SELECT` statement on an existing collection. The
result of the inner `SELECT` feeds into the outer declared collection. You
don't need to declare a schema when deriving a new collection, because ksqlDB
infers the column names and types from the inner `SELECT` statement. The
`ROWKEY` of the row remains the same, unless the query includes either a
key columns of the row remains the same, unless the query includes either a
`PARTITION BY` or `GROUP BY` clause. The value of the `ROWTIME` column
sets the timestamp of the record written to {{ site.ak }}. The value of system columns
sets the timestamp of the record written to {{ site.ak }}, unless overridden by
specifying a different column in the WITH clause. The value of system columns
can not be set in the `SELECT`.

Here are a few examples of deriving between the different collection types.
Expand Down
109 changes: 59 additions & 50 deletions docs-md/developer-guide/joins/partition-data.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,31 +32,46 @@ ksqlDB requires keys to use the `KAFKA` format. For more information, see
repartitioning, ksqlDB uses the correct format.

Because you can only use the primary key of a table as a joining column, it's
important to understand how keys are defined. For both streams and tables, the
column that represents the key has the name `ROWKEY`.

When you create a table by using a CREATE TABLE statement, the key of the
table is the same as that of the records in the underlying Kafka topic.
You must set the type of the `ROWKEY` column in the
CREATE TABLE statement to match the key data in the underlying {{ site.ak }} topic.

When you create a table by using a CREATE TABLE AS SELECT statement, the key of
the resulting table is determined as follows:

- If the FROM clause contains a stream, the statement must have a GROUP BY clause,
and the grouping columns determine the key of the resulting table.
- When grouping by a single column or expression, the type of `ROWKEY` in the
resulting stream matches the type of the column or expression.
- When grouping by multiple columns or expressions, the type of `ROWKEY` in the
resulting stream is an [SQL `STRING`](../../concepts/schemas).
- If the FROM clause contains only tables and no GROUP BY clause, the key is
copied over from the key of the table(s) in the FROM clause.
- If the FROM clause contains only tables and has a GROUP BY clause, the
grouping columns determine the key of the resulting table.
- When grouping by a single column or expression, the type of `ROWKEY` in the
resulting stream matches the type of the column or expression.
- When grouping by multiple columns or expressions, the type of `ROWKEY` in the
resulting stream is an [SQL `STRING`](../../concepts/schemas).
important to understand how keys are defined.

When you create a table or stream by using a CREATE TABLE or CREATE STREAM statement, the key of the
table or stream should be the same as that of the records in the underlying {{ site.ak }} topic.
You can set the type of the key column in the statement. Key columns are identified by the `KEY`
namespace. For example, the following statement creates a stream with a key column named `ID` of
type `BIGINT`:


```sql
CREATE STREAM USERS (ID BIGINT KEY, NAME STRING) WITH (kafka_topic='users', value_format='json');
```

When you create a stream by using a CREATE STREAM AS SELECT statement, the key of the resulting
stream is determined as follows:

- If the statement includes a PARTITION BY clause the, the key type is set to the type of the
PARTITION BY clause. The key column will have the default name of `ROWKEY`.
- If the statement does not include a PARTITION BY clause, the key is copied over from the key of
the first stream in the FROM clause.

When you create a table by using a CREATE TABLE AS SELECT statement, the key of the resulting table
is determined as follows:

- If the FROM clause contains a stream, the statement must have a GROUP BY clause. The key column
will have the default name of `ROWKEY` and the grouping column(s) determine the type of the key
of the resulting table.
- When grouping by a single column or expression, the type of the key column in the
resulting table matches the type of this column or expression.
- When grouping by multiple columns or expressions, the type of the key column in the
resulting table is an [SQL `STRING`](../../concepts/schemas).
- If the FROM clause contains only tables and no GROUP BY clause, the key is copied over from the
key of the first table in the FROM clause.
- If the FROM clause contains only tables and has a GROUP BY clause, The key column will have the
default name of `ROWKEY` and the grouping column(s) determine the type of the key of the resulting
table.
- When grouping by a single column or expression, the type of the key column in the
resulting table matches the type of this column or expression.
- When grouping by multiple columns or expressions, the type of the key column in the
resulting table is an [SQL `STRING`](../../concepts/schemas).

The following example shows a `users` table joined with a `clicks` stream
on the `userId` column. The `users` table has the correct primary key
Expand All @@ -65,21 +80,17 @@ doesn't have a defined key, and ksqlDB must repartition it on the joining
column (`userId`) and assign the key before performing the join.

```sql
-- clicks stream, with an unknown key.
-- the schema of stream clicks is: ROWTIME BIGINT | ROWKEY STRING | USERID BIGINT | URL STRING
-- clicks stream, with an unknown key:
-- the schema of stream clicks is: USERID BIGINT | URL STRING
CREATE STREAM clicks (userId BIGINT, url STRING) WITH(kafka_topic='clickstream', value_format='json');

-- the primary key of table users is a BIGINT.
-- The userId column in the value matches the key, so can be used as an alias for ROWKEY in queries to make them more readable.
-- the schema of table users is: ROWTIME BIGINT | ROWKEY BIGINT | USERID BIGINT | FULLNAME STRING
CREATE TABLE users (ROWKEY BIGINT KEY, userId BIGINT, fullName STRING) WITH(kafka_topic='users', value_format='json', key='userId');
-- the schema of table users is: USERID BIGINT KEY | FULLNAME STRING
CREATE TABLE users (userId BIGINT KEY, fullName STRING) WITH(kafka_topic='users', value_format='json');

-- join of users table with clicks stream, joining on the table's primary key alias and the stream's userId column:
-- join of users table with clicks stream, joining on the table's primary key and the stream's userId column:
-- join will automatically repartition clicks stream:
-- the schema of the result is: USERID BIGINT KEY | CLICKS_URL STRING | USERS_FULLNAME STRING
SELECT clicks.url, users.fullName FROM clicks JOIN users ON clicks.userId = users.userId;

-- The following is equivalent and does not rely on their being a copy of the tables key within the value schema:
SELECT clicks.url, users.fullName FROM clicks JOIN users ON clicks.userId = users.ROWKEY;
```

Co-partitioning Requirements
Expand Down Expand Up @@ -112,29 +123,27 @@ Records with the exact same user id on both sides will be joined.
If the schema of the columns you wish to join on don't match, it may be possible
to `CAST` one side to match the other. For example, if one side of the join
had a `INT` userId column, and the other a `LONG`, then you may choose to cast
the `INT` side to a `LONG`:
the `INT` side to a `LONG`. Note, this will incur the cost of an internal repartition of the data:

```sql
-- stream with INT userId
CREATE STREAM clicks (userId INT, url STRING) WITH(kafka_topic='clickstream', value_format='json');

-- table with BIGINT userId stored in they key:
CREATE TABLE users (ROWKEY BIGINT KEY, fullName STRING) WITH(kafka_topic='users', value_format='json');
CREATE TABLE users (userId BIGINT KEY, fullName STRING) WITH(kafka_topic='users', value_format='json');

-- Join utilising a CAST to convert the left sides join column to match the rights type.
SELECT clicks.url, users.fullName FROM clicks JOIN users ON CAST(clicks.userId AS BIGINT) = users.ROWKEY;
SELECT clicks.url, users.fullName FROM clicks JOIN users ON CAST(clicks.userId AS BIGINT) = users.userId;
```

Tables or streams created on top of existing Kafka topics, for example those created with
a `CREATE TABLE` or `CREATE STREAM` statements, are keyed on the data held in the key of the records
in the Kafka topic. ksqlDB expects the data to be in the `KAFKA` format.

Tables created on top of existing Kafka topics, for example those created with
a `CREATE TABLE` statement, are keyed on the data held in the key of the records
in the Kafka topic. ksqlDB presents this data in the `ROWKEY` column and expects
the data to be in the `KAFKA` format.

Tables created inside ksqlDB from other sources, for example those created with
a `CREATE TABLE AS SELECT` statement, will copy the key from their source(s)
unless there is an explicit `GROUP BY` clause, which can change what the table
is keyed on.
Tables or streams created inside ksqlDB from other sources, for example those created with
a `CREATE TABLE AS SELECT` or `CREATE STREAM AS SELECT` statements, will copy the key from their
source(s) unless there is an explicit `GROUP BY` or `PARTITION BY` clauses, which can change what
they are keyed on. See the [Keys](#keys) section above for more information.

!!! note
ksqlDB automatically repartitions a stream if a join requires it, but ksqlDB
Expand All @@ -145,7 +154,7 @@ is keyed on.

If you are using the same sources in more than one join that requires the data
to be repartitioned, you may prefer to repartition manually to avoid ksqlDB
repartitioning multiple times.
repartitioning the same data multiple times.

To repartition a stream, use the PARTITION BY clause. Be aware that Kafka
guarantees the relative order of any two messages from one source partition
Expand Down Expand Up @@ -182,10 +191,10 @@ the partition counts of the source topics, or repartition one side to match the
partition count of the other.

The following example creates a repartitioned stream, maintaining the existing
key, with the specified number of partitions.
key, `productId` in this case, with the specified number of partitions.

```sql
CREATE STREAM products_rekeyed WITH (PARTITIONS=6) AS SELECT * FROM products PARTITION BY ROWKEY;
CREATE STREAM products_rekeyed WITH (PARTITIONS=6) AS SELECT * FROM products PARTITION BY productId;
```

### Records Have the Same Partitioning Strategy
Expand Down
11 changes: 6 additions & 5 deletions docs-md/developer-guide/ksqldb-reference/create-stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ Create a new stream with the specified columns and properties. Columns
can be any of the [data types](../syntax-reference.md#ksqldb-data-types) supported by
ksqlDB.

ksqlDB adds the implicit columns `ROWTIME` and `ROWKEY` to every stream
and table, which represent the corresponding Kafka message timestamp and
message key, respectively. The timestamp has milliseconds accuracy.
To maintain backwards compatibility with older versions, ksqlDB adds an implicit `ROWKEY STRING KEY`
column if no explicit key column is provided.

ksqlDB adds the implicit column `ROWTIME` to every stream, which represent the corresponding Kafka
message timestamp. The timestamp has milliseconds accuracy.

The WITH clause supports the following properties:

Expand All @@ -37,7 +39,6 @@ The WITH clause supports the following properties:
| PARTITIONS | The number of partitions in the backing topic. This property must be set if creating a STREAM without an existing topic (the command will fail if the topic does not exist). |
| REPLICAS | The number of replicas in the backing topic. If this property is not set but PARTITIONS is set, then the default Kafka cluster configuration for replicas will be used for creating a new topic. |
| VALUE_DELIMITER | Used when VALUE_FORMAT='DELIMITED'. Supports single character to be a delimiter, defaults to ','. For space and tab delimited values you must use the special values 'SPACE' or 'TAB', not an actual space or tab character. |
| KEY | Optimization hint: If the Kafka message key is also present as a field/column in the Kafka message value, you may set this property to associate the corresponding field/column with the implicit `ROWKEY` column (message key). If set, ksqlDB uses it as an optimization hint to determine if repartitioning can be avoided when performing aggregations and joins. You can only use this if the key format in Kafka is `VARCHAR` or `STRING`. Do not use this hint if the message key format in Kafka is `AVRO` or `JSON`. See [Key Requirements](../syntax-reference.md#key-requirements) for more information. |
| TIMESTAMP | By default, the implicit `ROWTIME` column is the timestamp of the message in the Kafka topic. The TIMESTAMP property can be used to override `ROWTIME` with the contents of the specified field/column within the Kafka message value (similar to timestamp extractors in Kafka's Streams API). Timestamps have a millisecond accuracy. Time-based operations, such as windowing, will process a record according to the timestamp in `ROWTIME`. |
| TIMESTAMP_FORMAT | Used in conjunction with TIMESTAMP. If not set will assume that the timestamp field is a `bigint`. If it is set, then the TIMESTAMP field must be of type `varchar` and have a format that can be parsed with the java `DateTimeFormatter`. If your timestamp format has characters requiring single quotes, you can escape them with successive single quotes, `''`, for example: `'yyyy-MM-dd''T''HH:mm:ssX'`. For more information on timestamp formats, see [DateTimeFormatter](https://cnfl.io/java-dtf). |
| WRAP_SINGLE_VALUE | Controls how values are deserialized where the value schema contains only a single field. The setting controls how ksqlDB will deserialize the value of the records in the supplied `KAFKA_TOPIC` that contain only a single field.<br>If set to `true`, ksqlDB expects the field to have been serialized as a named field within a record.<br>If set to `false`, ksqlDB expects the field to have been serialized as an anonymous value.<br>If not supplied, the system default, defined by [ksql.persistence.wrap.single.values](../../operate-and-deploy/installation/server-config/config-reference.md#ksqlpersistencewrapsinglevalues) and defaulting to `true`, is used.<br>**Note:** `null` values have special meaning in ksqlDB. Care should be taken when dealing with single-field schemas where the value can be `null`. For more information, see [Single field (un)wrapping](../serialization.md#single-field-unwrapping).<br>**Note:** Supplying this property for formats that do not support wrapping, for example `DELIMITED`, or when the value schema has multiple fields, will result in an error. |
Expand All @@ -58,7 +59,7 @@ Example
-------

```sql
CREATE STREAM pageviews (viewtime BIGINT, user_id VARCHAR, page_id VARCHAR)
CREATE STREAM pageviews (userId BIGINT KEY, viewtime BIGINT, page_id VARCHAR)
WITH (VALUE_FORMAT = 'JSON',
KAFKA_TOPIC = 'my-pageviews-topic');
```
Expand Down
Loading

0 comments on commit 5e9cb56

Please sign in to comment.