diff --git a/CHANGES.md b/CHANGES.md index 40c0e1771e9..9318e85d477 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -65,6 +65,7 @@ * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). * TextIO now supports skipping multiple header lines (Java) ([#17990](https://github.com/apache/beam/issues/17990)). * Python GCSIO is now implemented with GCP GCS Client instead of apitools ([#25676](https://github.com/apache/beam/issues/25676)) +* Adding support for LowCardinality DataType in ClickHouse (Java) ([#29533](https://github.com/apache/beam/pull/29533)). ## New Features / Improvements diff --git a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java index daa8b5e1b92..52d520a0a19 100644 --- a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java +++ b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java @@ -111,7 +111,8 @@ * {@link TableSchema.TypeName#BOOL} {@link Schema.TypeName#BOOLEAN} * * - * Nullable row columns are supported through Nullable type in ClickHouse. + * Nullable row columns are supported through Nullable type in ClickHouse. Low cardinality hint is + * supported through LowCardinality DataType in ClickHouse. * *

Nested rows should be unnested using {@link Select#flattenedSchema()}. Type casting should be * done using {@link org.apache.beam.sdk.schemas.transforms.Cast} before {@link ClickHouseIO}. diff --git a/sdks/java/io/clickhouse/src/main/javacc/ColumnTypeParser.jj b/sdks/java/io/clickhouse/src/main/javacc/ColumnTypeParser.jj index 830499d3207..abe29aff3f8 100644 --- a/sdks/java/io/clickhouse/src/main/javacc/ColumnTypeParser.jj +++ b/sdks/java/io/clickhouse/src/main/javacc/ColumnTypeParser.jj @@ -73,31 +73,32 @@ TOKEN : TOKEN : { - < ARRAY : "ARRAY" > - | < DATE : "DATE" > - | < DATETIME : "DATETIME" > - | < ENUM8 : "ENUM8" > - | < ENUM16 : "ENUM16" > - | < FIXEDSTRING : "FIXEDSTRING" > - | < FLOAT32 : "FLOAT32" > - | < FLOAT64 : "FLOAT64" > - | < STRING : "STRING" > - | < INT8 : "INT8" > - | < INT16 : "INT16" > - | < INT32 : "INT32" > - | < INT64 : "INT64" > - | < UINT8 : "UINT8" > - | < UINT16 : "UINT16" > - | < UINT32 : "UINT32" > - | < UINT64 : "UINT64" > - | < NULLABLE : "NULLABLE" > - | < LPAREN : "(" > - | < RPAREN : ")" > - | < CAST : "CAST" > - | < AS : "AS" > - | < COMMA : "," > - | < EQ : "=" > - | < BOOL : "BOOL" > + < ARRAY : "ARRAY" > + | < DATE : "DATE" > + | < DATETIME : "DATETIME" > + | < ENUM8 : "ENUM8" > + | < ENUM16 : "ENUM16" > + | < FIXEDSTRING : "FIXEDSTRING" > + | < FLOAT32 : "FLOAT32" > + | < FLOAT64 : "FLOAT64" > + | < STRING : "STRING" > + | < INT8 : "INT8" > + | < INT16 : "INT16" > + | < INT32 : "INT32" > + | < INT64 : "INT64" > + | < UINT8 : "UINT8" > + | < UINT16 : "UINT16" > + | < UINT32 : "UINT32" > + | < UINT64 : "UINT64" > + | < NULLABLE : "NULLABLE" > + | < LPAREN : "(" > + | < RPAREN : ")" > + | < CAST : "CAST" > + | < AS : "AS" > + | < COMMA : "," > + | < EQ : "=" > + | < BOOL : "BOOL" > + | < LOWCARDINALITY : "LOWCARDINALITY" > } public ColumnType columnType() : @@ -111,6 +112,7 @@ public ColumnType columnType() : | ct = enum_() | ct = array() | ct = nullable() + | ct = lowcardenality() ) { return ct; @@ -278,3 +280,13 @@ private ColumnType enum_() : } ) } + +private ColumnType lowcardenality() : +{ + ColumnType ct; +} +{ + ( + ( (ct = primitive()) ) { return ct; } + ) +} \ No newline at end of file diff --git a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java index 8d4f9ab041c..33fe9467d45 100644 --- a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java +++ b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java @@ -160,7 +160,8 @@ public void testPrimitiveTypes() throws Exception { Schema.Field.of("f15", FieldType.STRING), Schema.Field.of("f16", FieldType.BYTES), Schema.Field.of("f17", FieldType.logicalType(FixedBytes.of(3))), - Schema.Field.of("f18", FieldType.BOOLEAN)); + Schema.Field.of("f18", FieldType.BOOLEAN), + Schema.Field.of("f19", FieldType.STRING)); Row row1 = Row.withSchema(schema) .addValue(new DateTime(2030, 10, 1, 0, 0, 0, DateTimeZone.UTC)) @@ -182,6 +183,7 @@ public void testPrimitiveTypes() throws Exception { .addValue(new byte[] {'a', 's', 'd'}) .addValue(new byte[] {'z', 'x', 'c'}) .addValue(true) + .addValue("lowcardenality") .build(); executeSql( @@ -204,7 +206,8 @@ public void testPrimitiveTypes() throws Exception { + "f15 FixedString(3)," + "f16 FixedString(3)," + "f17 FixedString(3)," - + "f18 Bool" + + "f18 Bool," + + "f19 LowCardinality(String)" + ") ENGINE=Log"); pipeline.apply(Create.of(row1).withRowSchema(schema)).apply(write("test_primitive_types")); @@ -233,6 +236,7 @@ public void testPrimitiveTypes() throws Exception { assertArrayEquals(new byte[] {'a', 's', 'd'}, rs.getBytes("f16")); assertArrayEquals(new byte[] {'z', 'x', 'c'}, rs.getBytes("f17")); assertEquals("true", rs.getString("f18")); + assertEquals("lowcardenality", rs.getString("f19")); } }