Skip to content

Commit

Permalink
feat: cli to show tombstones in transient query output (#6462)
Browse files Browse the repository at this point in the history
fixes: #3616

The output from transient queries that return tables will now include tombstones when run in the CLI or using the REST API directly.

Tombstones indicate an existing row has been deleted.

CLI output now looks like:

```
ksql> SELECT * FROM SOME_TABLE EMIT CHANGES;

|  ID   |   COL0      |   COL1      |
+-------+-------------+-------------+
| 10    | A           | B           |
| 20    | E           | F           |
| 10    | <TOMBSTONE> | <TOMBSTONE> | <-- previous row '10' has been deleted.
| 11    | X           | Y           |
```

To facilitate this, the response from the rest api needed to evolve.  The query response includes a single `header`, followed by `row`'s of data. Previously, this looked like:

```
{"header":{"queryId":"X","schema":"`ID` BIGINT, `COL0` STRING, `COL1` STRING"}},
{"row":{"columns":[10,A,B]}},
{"row":{"columns":[20,E,F]}},
{"row":{"columns":[11,X,Y]}},
```

For queries returning table rows, the result now looks like:

```
{"header":{"queryId":"X","schema":"`ID` BIGINT, `COL0` STRING, `COL1` STRING"}},
{"row":{"columns":[10,A,B]}},
{"row":{"columns":[20,E,F]}},
{"row":{"columns":[10,null,null],"tombstone":true}},
{"row":{"columns":[11,X,Y]}}
```

Note how a row can now be flagged as a `tombstone`.  Such rows will have all non-key columns set to `null`, (which may be all columns if no key columns are in the projection).

**NOTE**: the old websocket query api is yet to be updated to support tombstones: #6439, and the same goes for the new client API: #6438

Co-authored-by: Andy Coates <big-andy-coates@users.noreply.github.com>
  • Loading branch information
big-andy-coates and big-andy-coates authored Nov 3, 2020
1 parent 8f84e41 commit ef3039a
Show file tree
Hide file tree
Showing 92 changed files with 2,176 additions and 529 deletions.
4 changes: 3 additions & 1 deletion docs/developer-guide/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ Accept: application/vnd.ksql.v1+json

The less specific `application/json` content type is also permitted.
However, this is only for compatibility and ease of use, and you should
use the versioned value if possible.
use the versioned value where possible. `application/json` maps to the latest
versioned content type, meaning the response may change after upgrading the server to
a later version.

The server also supports content negotiation, so you may include
multiple, weighted preferences:
Expand Down
59 changes: 53 additions & 6 deletions docs/developer-guide/ksqldb-rest-api/query-endpoint.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,32 @@ Each response chunk is a JSON object with the following format:

Response JSON Object:

- **header** (object): Information about the result.
- **header.queryId**: (string): the unique id of the query. This can be useful when debugging.
For example, when looking in the logs or processing log for errors or issues.
- **header.schema**: (string): the list of columns being returned. This defines the schema for
the data returned later in **row.columns**.
- **row** (object): A single row being returned. This will be null if an error is being returned.
- **row.columns** (array): The values contained in the row.
- **row.columns[i]** (?): The value contained in a single column for the row. The value type depends on the type of the column.
- **finalMessage** (string): If this field is non-null, it contains a final message from the server. No additional rows will be returned and the server will end the response.
- **errorMessage** (string): If this field is non-null, an error has been encountered while running the statement. No additional rows are returned and the server will end the response.
- **row.columns** (array): The values of the columns requested. The schema of the columns was
already supplied in **header.schema**.
- **row.tombstone** (boolean): Whether the row is a deletion of a previous row.
It is recommended that you include all columns within the primary key in the projection
so that you can determine _which_ previous row was deleted.
- **finalMessage** (string): If this field is non-null, it contains a final message from the server.
This signifies successful completion of the query.
No additional rows will be returned and the server will end the response.
- **errorMessage** (string): If this field is non-null, an error has been encountered while running
the statement.
This signifies unsuccessful completion of the query.
No additional rows are returned and the server will end the response.

## Examples

### Example curl command

```bash
curl -X "POST" "http://<ksqldb-host-name>:8088/query" \
-H "Accept: application/vnd.ksql.v1+json" \
-d $'{
"ksql": "SELECT * FROM USERS;",
"streamsProperties": {}
Expand All @@ -62,15 +76,48 @@ Content-Type: application/vnd.ksql.v1+json
}
```

### Example response
### Example stream response

If the query result is a stream, the response will not include the **row.tombstone** field, as
streams don't have primary keys or the concept of a deletion.


```http
HTTP/1.1 200 OK
Content-Type: application/vnd.ksql.v1+json
Transfer-Encoding: chunked
...
{"row":{"columns":[1524760769983,"1",1524760769747,"alice","home"]},"errorMessage":null}
{"header":{"queryId":"_confluent_id_19",schema":"`ROWTIME` BIGINT, `NAME` STRING, `AGE` INT"}}
{"row":{"columns":[1524760769983,"1",1524760769747,"alice","home"]}}
...
```

### Example table response

If the query result is a table, the response may include deleted rows, as identified by the
**row.tombstone** field.

Rows within the table are identified by their primary key. Rows may be inserted, updated or deleted.
Rows without the **tombstone** field set indicate an upserts, (inserts or updates).
Rows with the **tombstone** field set indicate a delete.

While it is not a requirement to include the primary key columns within the query's projection, any
use-case that is attempting to materialize the table, or has a requirement to be able to
correlate later updates and deletes to previous rows, will generally need all primary key columns
in the projection.

In the example response below, the `ID` column is the primary key of the table. The second row in
the response deletes the first.

```http
HTTP/1.1 200 OK
Content-Type: application/vnd.ksql.v1+json
Transfer-Encoding: chunked
...
{"header":{"queryId":"_confluent_id_34",schema":"`ROWTIME` BIGINT, `NAME` STRING, `ID` INT"}}
{"row":{"columns":[1524760769983,"alice",10]}},
{"row":{"columns":[null,null,10],"tombstone":true}}
...
```
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.confluent.ksql.api.server.PushQueryHandle;
import io.confluent.ksql.query.BlockingRowQueue;
import io.confluent.ksql.query.TransientQueryQueue;
import io.confluent.ksql.util.KeyValue;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.core.WorkerExecutor;
Expand All @@ -30,7 +31,7 @@
import org.reactivestreams.tck.PublisherVerification;
import org.reactivestreams.tck.TestEnvironment;

public class BlockingQueryPublisherVerificationTest extends PublisherVerification<GenericRow> {
public class BlockingQueryPublisherVerificationTest extends PublisherVerification<KeyValue<List<?>, GenericRow>> {

private final Vertx vertx;
private final WorkerExecutor workerExecutor;
Expand All @@ -44,21 +45,21 @@ public BlockingQueryPublisherVerificationTest() {
}

@Override
public Publisher<GenericRow> createPublisher(long elements) {
public Publisher<KeyValue<List<?>, GenericRow>> createPublisher(long elements) {
final Context context = vertx.getOrCreateContext();
BlockingQueryPublisher publisher = new BlockingQueryPublisher(context, workerExecutor);
final TestQueryHandle queryHandle = new TestQueryHandle(elements);
publisher.setQueryHandle(queryHandle);
if (elements < Integer.MAX_VALUE) {
for (long l = 0; l < elements; l++) {
queryHandle.queue.acceptRow(generateRow(l));
queryHandle.queue.acceptRow(null, generateRow(l));
}
}
return publisher;
}

@Override
public Publisher<GenericRow> createFailedPublisher() {
public Publisher<KeyValue<List<?>, GenericRow>> createFailedPublisher() {
return null;
}

Expand Down
1 change: 0 additions & 1 deletion ksqldb-cli/src/main/java/io/confluent/ksql/cli/Cli.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@
import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.jline.reader.EndOfFileException;
import org.jline.reader.UserInterruptException;
import org.jline.terminal.Terminal;
Expand Down
39 changes: 21 additions & 18 deletions ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/Console.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.cli.console.CliConfig.OnOff;
import io.confluent.ksql.cli.console.KsqlTerminal.HistoryEntry;
import io.confluent.ksql.cli.console.KsqlTerminal.StatusClosable;
Expand Down Expand Up @@ -64,7 +63,6 @@
import io.confluent.ksql.rest.entity.KafkaTopicsList;
import io.confluent.ksql.rest.entity.KafkaTopicsListExtended;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
import io.confluent.ksql.rest.entity.KsqlStatementErrorMessage;
import io.confluent.ksql.rest.entity.KsqlWarning;
Expand All @@ -80,13 +78,14 @@
import io.confluent.ksql.rest.entity.SourceDescriptionEntity;
import io.confluent.ksql.rest.entity.SourceDescriptionList;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.rest.entity.StreamedRow.DataRow;
import io.confluent.ksql.rest.entity.StreamedRow.Header;
import io.confluent.ksql.rest.entity.StreamsList;
import io.confluent.ksql.rest.entity.TablesList;
import io.confluent.ksql.rest.entity.TopicDescription;
import io.confluent.ksql.rest.entity.TypeList;
import io.confluent.ksql.rest.entity.VariablesList;
import io.confluent.ksql.rest.entity.WarningEntity;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.util.CmdLineUtil;
import io.confluent.ksql.util.HandlerMaps;
import io.confluent.ksql.util.HandlerMaps.ClassHandlerMap1;
Expand Down Expand Up @@ -209,7 +208,7 @@ private static <T extends KsqlEntity> Handler1<KsqlEntity, Console> tablePrinter

public interface RowCaptor {

void addRow(GenericRow row);
void addRow(DataRow row);

void addRows(List<List<String>> fields);
}
Expand Down Expand Up @@ -342,12 +341,12 @@ public void printStreamedRow(final StreamedRow row) {

row.getFinalMessage().ifPresent(finalMsg -> writer().println(finalMsg));

row.getHeader().ifPresent(header -> printRowHeader(header.getSchema()));
row.getHeader().ifPresent(this::printRowHeader);

if (row.getRow().isPresent()) {
switch (outputFormat) {
case JSON:
printAsJson(row.getRow().get().values());
printAsJson(row.getRow().get());
break;
case TABULAR:
printAsTable(row.getRow().get());
Expand Down Expand Up @@ -384,15 +383,16 @@ public void printKsqlEntityList(final List<KsqlEntity> entityList) {
}
}

private void printRowHeader(final LogicalSchema schema) {
private void printRowHeader(final Header header) {
switch (outputFormat) {
case JSON:
printAsJson(header);
break;
case TABULAR:
writer().println(
TabularRow.createHeader(
getWidth(),
schema.columns(),
header.getSchema().columns(),
config.getString(CliConfig.WRAP_CONFIG).equalsIgnoreCase(OnOff.ON.toString()),
config.getInt(CliConfig.COLUMN_WIDTH_CONFIG)
)
Expand Down Expand Up @@ -441,14 +441,24 @@ private Optional<CliCmdExecutor> getCliCommand(final String line) {
.findFirst();
}

private void printAsTable(final GenericRow row) {
private void printAsTable(final DataRow row) {
rowCaptor.addRow(row);

final boolean tombstone = row.getTombstone().orElse(false);

final List<?> columns = tombstone
? row.getColumns().stream()
.map(val -> val == null ? "<TOMBSTONE>" : val)
.collect(Collectors.toList())
: row.getColumns();

writer().println(TabularRow.createRow(
getWidth(),
row,
columns,
config.getString(CliConfig.WRAP_CONFIG).equalsIgnoreCase(OnOff.ON.toString()),
config.getInt(CliConfig.COLUMN_WIDTH_CONFIG))
);

flush();
}

Expand Down Expand Up @@ -882,13 +892,6 @@ private static String splitLongLine(final String input, final int maxLineLength)
}

private void printAsJson(final Object o) {
if (!((o instanceof PropertiesList || (o instanceof KsqlEntityList)))) {
log.warn(
"Unexpected result class: '{}' found in printAsJson",
o.getClass().getCanonicalName()
);
}

try {
OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValue(writer(), o);
writer().println();
Expand All @@ -901,7 +904,7 @@ private void printAsJson(final Object o) {
static class NoOpRowCaptor implements RowCaptor {

@Override
public void addRow(final GenericRow row) {
public void addRow(final DataRow row) {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.google.common.collect.ImmutableList;
import io.confluent.ksql.cli.console.table.Table;
import io.confluent.ksql.rest.entity.VariablesList;

import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
Expand Down
5 changes: 3 additions & 2 deletions ksqldb-cli/src/test/java/io/confluent/ksql/cli/CliTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
import io.confluent.ksql.rest.entity.ServerInfo;
import io.confluent.ksql.rest.entity.StreamedRow.DataRow;
import io.confluent.ksql.rest.server.TestKsqlRestApp;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
Expand Down Expand Up @@ -1361,8 +1362,8 @@ public void addRows(final List<List<String>> rows) {
}

@Override
public void addRow(final GenericRow row) {
addRow(row.values());
public void addRow(final DataRow row) {
addRow(row.getColumns());
}

private void addRow(final List<?> row) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public class ConsoleTest {
private static final LogicalSchema SCHEMA = LogicalSchema.builder()
.keyColumn(ColumnName.of("foo"), SqlTypes.INTEGER)
.valueColumn(ColumnName.of("bar"), SqlTypes.STRING)
.build();
.build().withPseudoAndKeyColsInValue(false);

private final TestTerminal terminal;
private final Console console;
Expand Down Expand Up @@ -187,17 +187,35 @@ public void after() {
}

@Test
public void testPrintGenericStreamedRow() {
public void testPrintDataRow() {
// Given:
final StreamedRow row = StreamedRow.row(genericRow("col_1", "col_2"));
final StreamedRow row = StreamedRow.pushRow(genericRow("col_1", "col_2"));

// When:
console.printStreamedRow(row);

// Then:
assertThat(terminal.getOutputString(), containsString("col_1"));
assertThat(terminal.getOutputString(), containsString("col_2"));
}

@Test
public void testPrintTableTombstone() {
// Given:
console.printStreamedRow(StreamedRow.header(new QueryId("id"), SCHEMA));

final StreamedRow row = StreamedRow.tombstone(genericRow(null, "v_0", null));

// When:
console.printStreamedRow(row);

// Then:
assertThat(terminal.getOutputString(), containsString("v_0"));

if (console.getOutputFormat() == OutputFormat.TABULAR) {
assertThat(terminal.getOutputString(), containsString("col_1"));
assertThat(terminal.getOutputString(), containsString("col_2"));
assertThat(terminal.getOutputString(), containsString("<TOMBSTONE>"));
} else {
assertThat(terminal.getOutputString(), containsString("\"tombstone\" : true"));
}
}

Expand Down
Loading

0 comments on commit ef3039a

Please sign in to comment.