Skip to content

Commit

Permalink
fix: schema column order returned by websocket pull query (#4012)
Browse files Browse the repository at this point in the history
The order of columns was unintentionally getting changed so that key columns can before value columns. This is incorrect.  A user can choose which order the columns are returned:

```sql
SELECT COUNT, ROWKEY from Foo...
```
  • Loading branch information
big-andy-coates authored Dec 2, 2019
1 parent 5ec3bb0 commit 85fef09
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import io.confluent.ksql.schema.ksql.types.SqlMap;
import io.confluent.ksql.schema.ksql.types.SqlStruct;
import io.confluent.ksql.schema.ksql.types.SqlType;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

Expand All @@ -36,10 +35,9 @@ private EntityUtil() {
}

public static List<FieldInfo> buildSourceSchemaEntity(final LogicalSchema schema) {
final List<FieldInfo> allFields = new ArrayList<>();
allFields.addAll(getFields(schema.metadata(), "meta"));
allFields.addAll(getFields(schema.key(), "key"));
allFields.addAll(getFields(schema.value(), "value"));
final List<FieldInfo> allFields = schema.columns().stream()
.map(EntityUtil::toFieldInfo)
.collect(Collectors.toList());

if (allFields.isEmpty()) {
throw new IllegalArgumentException("Root schema should contain columns: " + schema);
Expand All @@ -48,17 +46,15 @@ public static List<FieldInfo> buildSourceSchemaEntity(final LogicalSchema schema
return allFields;
}

private static List<FieldInfo> getFields(final List<Column> columns, final String type) {
return columns.stream()
.map(col -> SqlTypeWalker.visit(
Field.of(col.ref().aliasedFieldName(), col.type()), new Converter()))
.collect(Collectors.toList());
}

public static SchemaInfo schemaInfo(final SqlType type) {
return SqlTypeWalker.visit(type, new Converter());
}

private static FieldInfo toFieldInfo(final Column column) {
return SqlTypeWalker.visit(
Field.of(column.ref().aliasedFieldName(), column.type()), new Converter());
}

private static final class Converter implements SqlTypeWalker.Visitor<SchemaInfo, FieldInfo> {

public SchemaInfo visitType(final SqlType schema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,8 @@ public void shouldExecutePullQueryOverWebSocketWithJsonContentType() {
assertValidJsonMessages(messages);
assertThat(messages.get(0),
is("["
+ "{\"name\":\"ROWKEY\",\"schema\":{\"type\":\"STRING\",\"fields\":null,\"memberSchema\":null}},"
+ "{\"name\":\"COUNT\",\"schema\":{\"type\":\"BIGINT\",\"fields\":null,\"memberSchema\":null}}"
+ "{\"name\":\"COUNT\",\"schema\":{\"type\":\"BIGINT\",\"fields\":null,\"memberSchema\":null}},"
+ "{\"name\":\"ROWKEY\",\"schema\":{\"type\":\"STRING\",\"fields\":null,\"memberSchema\":null}}"
+ "]"));
assertThat(messages.get(1),
is("{\"row\":{\"columns\":[1,\"USER_1\"]}}"));
Expand All @@ -276,7 +276,7 @@ public void shouldExecutePullQueryOverWebSocketWithJsonContentType() {
public void shouldReturnCorrectSchemaForPullQueryWithOnlyKeyInSelect() {
// When:
final Supplier<List<String>> call = () -> makeWebSocketRequest(
"SELECT * from " + AGG_TABLE + " WHERE ROWKEY='" + AN_AGG_KEY + "';",
"SELECT ROWKEY from " + AGG_TABLE + " WHERE ROWKEY='" + AN_AGG_KEY + "';",
MediaType.APPLICATION_JSON_TYPE,
MediaType.APPLICATION_JSON_TYPE
);
Expand All @@ -286,11 +286,10 @@ public void shouldReturnCorrectSchemaForPullQueryWithOnlyKeyInSelect() {
assertValidJsonMessages(messages);
assertThat(messages.get(0),
is("["
+ "{\"name\":\"ROWKEY\",\"schema\":{\"type\":\"STRING\",\"fields\":null,\"memberSchema\":null}},"
+ "{\"name\":\"COUNT\",\"schema\":{\"type\":\"BIGINT\",\"fields\":null,\"memberSchema\":null}}"
+ "{\"name\":\"ROWKEY\",\"schema\":{\"type\":\"STRING\",\"fields\":null,\"memberSchema\":null}}"
+ "]"));
assertThat(messages.get(1),
is("{\"row\":{\"columns\":[\"USER_1\",1]}}"));
is("{\"row\":{\"columns\":[\"USER_1\"]}}"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,26 @@ public void shouldSupportSchemasWithKeyColumns() {
assertThat(fields.get(0).getSchema().getTypeName(), equalTo("INTEGER"));
}

@Test
public void shouldMaintainColumnOrder() {
// Given:
final LogicalSchema schema = LogicalSchema.builder()
.noImplicitColumns()
.valueColumn(ColumnName.of("field0"), SqlTypes.DOUBLE)
.keyColumn(ColumnName.of("field1"), SqlTypes.INTEGER)
.build();

// When:
final List<FieldInfo> fields = EntityUtil.buildSourceSchemaEntity(schema);

// Then:
assertThat(fields, hasSize(2));
assertThat(fields.get(0).getName(), equalTo("field0"));
assertThat(fields.get(0).getSchema().getTypeName(), equalTo("DOUBLE"));
assertThat(fields.get(1).getName(), equalTo("field1"));
assertThat(fields.get(1).getSchema().getTypeName(), equalTo("INTEGER"));
}

private static void shouldBuildCorrectPrimitiveField(
final SqlType primitiveSchema,
final String schemaName
Expand Down

0 comments on commit 85fef09

Please sign in to comment.