Skip to content

Commit ec9e904

Browse files
Add support for timestamps with timezone in iceberg type converter
1 parent dd1893c commit ec9e904

19 files changed

+291
-48
lines changed

presto-docs/src/main/sphinx/connector/iceberg.rst

+4
Original file line numberDiff line numberDiff line change
@@ -1775,6 +1775,10 @@ Map of Iceberg types to the relevant PrestoDB types:
17751775
- ``TIME``
17761776
* - ``TIMESTAMP``
17771777
- ``TIMESTAMP``
1778+
* - ``TIMESTAMP``
1779+
- ``TIMESTAMP_WITH_TIMEZONE``
1780+
* - ``STRING``
1781+
- ``VARCHAR``
17781782
* - ``UUID``
17791783
- ``UUID``
17801784
* - ``LIST``

presto-hive/src/test/java/com/facebook/presto/hive/parquet/AbstractTestParquetReader.java

+8-3
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,10 @@
4343
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
4444
import org.apache.parquet.format.Statistics;
4545
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
46+
import org.apache.parquet.schema.LogicalTypeAnnotation;
4647
import org.apache.parquet.schema.MessageType;
47-
import org.apache.parquet.schema.MessageTypeParser;
4848
import org.apache.parquet.schema.PrimitiveType;
49+
import org.apache.parquet.schema.Types;
4950
import org.joda.time.DateTimeZone;
5051
import org.testng.annotations.BeforeClass;
5152
import org.testng.annotations.Test;
@@ -999,8 +1000,12 @@ public void testDecimalBackedByINT32()
9991000
public void testTimestampMicrosBackedByINT64()
10001001
throws Exception
10011002
{
1002-
org.apache.parquet.schema.MessageType parquetSchema =
1003-
MessageTypeParser.parseMessageType("message ts_micros { optional INT64 test (TIMESTAMP_MICROS); }");
1003+
LogicalTypeAnnotation annotation = LogicalTypeAnnotation.timestampType(false, LogicalTypeAnnotation.TimeUnit.MICROS);
1004+
MessageType parquetSchema = Types.buildMessage()
1005+
.primitive(PrimitiveType.PrimitiveTypeName.INT64, OPTIONAL)
1006+
.as(annotation)
1007+
.named("test")
1008+
.named("ts_micros");
10041009
ContiguousSet<Long> longValues = longsBetween(1_000_000, 1_001_000);
10051010
ImmutableList.Builder<SqlTimestamp> expectedValues = new ImmutableList.Builder<>();
10061011
for (Long value : longValues) {

presto-iceberg/src/main/java/com/facebook/presto/iceberg/ExpressionConverter.java

+6
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.facebook.presto.common.type.RowType;
3131
import com.facebook.presto.common.type.TimeType;
3232
import com.facebook.presto.common.type.TimestampType;
33+
import com.facebook.presto.common.type.TimestampWithTimeZoneType;
3334
import com.facebook.presto.common.type.Type;
3435
import com.facebook.presto.common.type.UuidType;
3536
import com.facebook.presto.common.type.VarbinaryType;
@@ -47,6 +48,7 @@
4748
import static com.facebook.presto.common.predicate.Marker.Bound.ABOVE;
4849
import static com.facebook.presto.common.predicate.Marker.Bound.BELOW;
4950
import static com.facebook.presto.common.predicate.Marker.Bound.EXACTLY;
51+
import static com.facebook.presto.common.type.DateTimeEncoding.unpackMillisUtc;
5052
import static com.facebook.presto.iceberg.IcebergColumnHandle.getPushedDownSubfield;
5153
import static com.facebook.presto.iceberg.IcebergColumnHandle.isPushedDownSubfield;
5254
import static com.facebook.presto.parquet.ParquetTypeUtils.columnPathFromSubfield;
@@ -203,6 +205,10 @@ private static Object getIcebergLiteralValue(Type type, Marker marker)
203205
return MILLISECONDS.toMicros((Long) marker.getValue());
204206
}
205207

208+
if (type instanceof TimestampWithTimeZoneType) {
209+
return MILLISECONDS.toMicros(unpackMillisUtc((Long) marker.getValue()));
210+
}
211+
206212
if (type instanceof VarcharType) {
207213
return ((Slice) marker.getValue()).toStringUtf8();
208214
}

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java

-7
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,6 @@
154154
import static com.facebook.presto.iceberg.IcebergUtil.tryGetProperties;
155155
import static com.facebook.presto.iceberg.IcebergUtil.tryGetSchema;
156156
import static com.facebook.presto.iceberg.IcebergUtil.validateTableMode;
157-
import static com.facebook.presto.iceberg.IcebergUtil.verifyTypeSupported;
158157
import static com.facebook.presto.iceberg.PartitionFields.getPartitionColumnName;
159158
import static com.facebook.presto.iceberg.PartitionFields.getTransformTerm;
160159
import static com.facebook.presto.iceberg.PartitionFields.toPartitionFields;
@@ -693,10 +692,6 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle
693692

694693
Type columnType = toIcebergType(column.getType());
695694

696-
if (columnType.equals(Types.TimestampType.withZone())) {
697-
throw new PrestoException(NOT_SUPPORTED, format("Iceberg column type %s is not supported", columnType));
698-
}
699-
700695
IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
701696
verify(handle.getIcebergTableName().getTableType() == DATA, "only the data table can have columns added");
702697
Table icebergTable = getIcebergTable(session, handle.getSchemaTableName());
@@ -754,8 +749,6 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto
754749
Table icebergTable = getIcebergTable(session, table.getSchemaTableName());
755750
validateTableMode(session, icebergTable);
756751

757-
verifyTypeSupported(icebergTable.schema());
758-
759752
return beginIcebergTableInsert(session, table, icebergTable);
760753
}
761754

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java

-3
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,6 @@
124124
import static com.facebook.presto.iceberg.IcebergUtil.populateTableProperties;
125125
import static com.facebook.presto.iceberg.IcebergUtil.toHiveColumns;
126126
import static com.facebook.presto.iceberg.IcebergUtil.tryGetProperties;
127-
import static com.facebook.presto.iceberg.IcebergUtil.verifyTypeSupported;
128127
import static com.facebook.presto.iceberg.PartitionFields.parsePartitionFields;
129128
import static com.facebook.presto.iceberg.PartitionSpecConverter.toPrestoPartitionSpec;
130129
import static com.facebook.presto.iceberg.SchemaConverter.toPrestoSchema;
@@ -307,8 +306,6 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
307306

308307
Schema schema = toIcebergSchema(tableMetadata.getColumns());
309308

310-
verifyTypeSupported(schema);
311-
312309
PartitionSpec partitionSpec = parsePartitionFields(schema, getPartitioning(tableMetadata.getProperties()));
313310

314311
MetastoreContext metastoreContext = getMetastoreContext(session);

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java

-3
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@
6767
import static com.facebook.presto.iceberg.IcebergUtil.getNativeIcebergTable;
6868
import static com.facebook.presto.iceberg.IcebergUtil.getNativeIcebergView;
6969
import static com.facebook.presto.iceberg.IcebergUtil.populateTableProperties;
70-
import static com.facebook.presto.iceberg.IcebergUtil.verifyTypeSupported;
7170
import static com.facebook.presto.iceberg.PartitionFields.parsePartitionFields;
7271
import static com.facebook.presto.iceberg.PartitionSpecConverter.toPrestoPartitionSpec;
7372
import static com.facebook.presto.iceberg.SchemaConverter.toPrestoSchema;
@@ -308,8 +307,6 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
308307

309308
Schema schema = toIcebergSchema(tableMetadata.getColumns());
310309

311-
verifyTypeSupported(schema);
312-
313310
PartitionSpec partitionSpec = parsePartitionFields(schema, getPartitioning(tableMetadata.getProperties()));
314311
FileFormat fileFormat = getFileFormat(tableMetadata.getProperties());
315312

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java

-7
Original file line numberDiff line numberDiff line change
@@ -457,13 +457,6 @@ public static void validateTableMode(ConnectorSession session, org.apache.iceber
457457
}
458458
}
459459

460-
public static void verifyTypeSupported(Schema schema)
461-
{
462-
if (schema.columns().stream().anyMatch(column -> Types.TimestampType.withZone().equals(column.type()))) {
463-
throw new PrestoException(NOT_SUPPORTED, format("Iceberg column type %s is not supported", Types.TimestampType.withZone()));
464-
}
465-
}
466-
467460
public static Map<String, String> createIcebergViewProperties(ConnectorSession session, String prestoVersion)
468461
{
469462
return ImmutableMap.<String, String>builder()

presto-iceberg/src/main/java/com/facebook/presto/iceberg/TypeConverter.java

+5
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import static com.facebook.presto.common.type.RealType.REAL;
6262
import static com.facebook.presto.common.type.SmallintType.SMALLINT;
6363
import static com.facebook.presto.common.type.TimestampType.TIMESTAMP;
64+
import static com.facebook.presto.common.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
6465
import static com.facebook.presto.common.type.TinyintType.TINYINT;
6566
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
6667
import static com.facebook.presto.hive.HiveType.HIVE_BINARY;
@@ -118,6 +119,10 @@ public static Type toPrestoType(org.apache.iceberg.types.Type type, TypeManager
118119
case TIME:
119120
return TimeType.TIME;
120121
case TIMESTAMP:
122+
Types.TimestampType timestampType = (Types.TimestampType) type.asPrimitiveType();
123+
if (timestampType.shouldAdjustToUTC()) {
124+
return TIMESTAMP_WITH_TIME_ZONE;
125+
}
121126
return TimestampType.TIMESTAMP;
122127
case STRING:
123128
return VarcharType.createUnboundedVarcharType();

presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -89,11 +89,11 @@ public void testTimestamp()
8989
@Test
9090
public void testTimestampWithTimeZone()
9191
{
92-
assertQueryFails("CREATE TABLE test_timestamp_with_timezone (x timestamp with time zone)", "Iceberg column type timestamptz is not supported");
93-
assertQueryFails("CREATE TABLE test_timestamp_with_timezone (x) AS SELECT TIMESTAMP '1969-12-01 00:00:00.000000 UTC'", "Iceberg column type timestamptz is not supported");
94-
assertUpdate("CREATE TABLE test_timestamp_with_timezone (x timestamp)");
95-
assertQueryFails("ALTER TABLE test_timestamp_with_timezone ADD COLUMN y timestamp with time zone", "Iceberg column type timestamptz is not supported");
92+
assertQuerySucceeds("CREATE TABLE test_timestamp_with_timezone (x) AS SELECT TIMESTAMP '1969-12-01 00:00:00.000000 UTC'");
93+
assertQuerySucceeds("ALTER TABLE test_timestamp_with_timezone ADD COLUMN y timestamp with time zone");
9694
dropTable(getSession(), "test_timestamp_with_timezone");
95+
96+
assertQueryFails("CREATE TABLE test_timestamp_with_timezone (x) WITH ( format = 'ORC') AS SELECT TIMESTAMP '1969-12-01 00:00:00.000000 UTC'", "Unsupported Type: timestamp with time zone");
9797
}
9898

9999
@Test
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.iceberg;
15+
16+
import com.facebook.presto.Session;
17+
import com.facebook.presto.common.type.TimestampType;
18+
import com.facebook.presto.common.type.TimestampWithTimeZoneType;
19+
import com.facebook.presto.common.type.Type;
20+
import com.facebook.presto.testing.MaterializedResult;
21+
import com.facebook.presto.testing.MaterializedRow;
22+
import com.facebook.presto.testing.QueryRunner;
23+
import com.facebook.presto.tests.AbstractTestQueryFramework;
24+
import com.google.common.collect.ImmutableMap;
25+
import org.testng.annotations.DataProvider;
26+
import org.testng.annotations.Test;
27+
28+
import java.util.List;
29+
30+
import static com.facebook.presto.hive.HiveCommonSessionProperties.PARQUET_BATCH_READ_OPTIMIZATION_ENABLED;
31+
import static com.facebook.presto.iceberg.IcebergQueryRunner.createIcebergQueryRunner;
32+
import static org.testng.Assert.assertEquals;
33+
import static org.testng.Assert.assertTrue;
34+
35+
public class TestIcebergTypes
36+
extends AbstractTestQueryFramework
37+
{
38+
protected QueryRunner createQueryRunner() throws Exception
39+
{
40+
return createIcebergQueryRunner(ImmutableMap.of(), ImmutableMap.of());
41+
}
42+
43+
@DataProvider(name = "testTimestampWithTimezone")
44+
public Object[][] createTestTimestampWithTimezoneData()
45+
{
46+
return new Object[][] {
47+
{Session.builder(getSession())
48+
.setCatalogSessionProperty("iceberg", PARQUET_BATCH_READ_OPTIMIZATION_ENABLED, "true")
49+
.build()},
50+
{Session.builder(getSession())
51+
.setCatalogSessionProperty("iceberg", PARQUET_BATCH_READ_OPTIMIZATION_ENABLED, "false")
52+
.build()}
53+
};
54+
}
55+
56+
@Test(dataProvider = "testTimestampWithTimezone")
57+
public void testTimestampWithTimezone(Session session)
58+
{
59+
QueryRunner runner = getQueryRunner();
60+
String timestamptz = "TIMESTAMP '1984-12-08 00:10:00 America/Los_Angeles'";
61+
String timestamp = "TIMESTAMP '1984-12-08 00:10:00'";
62+
63+
dropTableIfExists(runner, session.getCatalog().get(), session.getSchema().get(), "test_timestamptz");
64+
assertQuerySucceeds(session, "CREATE TABLE test_timestamptz(a TIMESTAMP WITH TIME ZONE, b TIMESTAMP, c TIMESTAMP WITH TIME ZONE)");
65+
66+
String row = "(" + timestamptz + ", " + timestamp + ", " + timestamptz + ")";
67+
for (int i = 0; i < 10; i++) {
68+
assertUpdate(session, "INSERT INTO test_timestamptz values " + row, 1);
69+
}
70+
71+
MaterializedResult initialRows = runner.execute(session, "SELECT * FROM test_timestamptz");
72+
73+
List<Type> types = initialRows.getTypes();
74+
assertTrue(types.get(0) instanceof TimestampWithTimeZoneType);
75+
assertTrue(types.get(1) instanceof TimestampType);
76+
77+
List<MaterializedRow> rows = initialRows.getMaterializedRows();
78+
for (int i = 0; i < 10; i++) {
79+
assertEquals("[1984-12-08T08:10Z[UTC], 1984-12-08T00:10, 1984-12-08T08:10Z[UTC]]", rows.get(i).toString());
80+
}
81+
82+
dropTableIfExists(runner, session.getCatalog().get(), session.getSchema().get(), "test_timestamptz_partition");
83+
assertQuerySucceeds(session, "CREATE TABLE test_timestamptz_partition(a TIMESTAMP WITH TIME ZONE, b TIMESTAMP, c TIMESTAMP WITH TIME ZONE) " +
84+
"WITH (PARTITIONING = ARRAY['b'])");
85+
assertUpdate(session, "INSERT INTO test_timestamptz_partition (a, b, c) SELECT a, b, c FROM test_timestamptz", 10);
86+
87+
MaterializedResult partitionRows = runner.execute(session, "SELECT * FROM test_timestamptz");
88+
89+
List<Type> partitionTypes = partitionRows.getTypes();
90+
assertTrue(partitionTypes.get(0) instanceof TimestampWithTimeZoneType);
91+
assertTrue(partitionTypes.get(1) instanceof TimestampType);
92+
93+
rows = partitionRows.getMaterializedRows();
94+
for (int i = 0; i < 10; i++) {
95+
assertEquals("[1984-12-08T08:10Z[UTC], 1984-12-08T00:10, 1984-12-08T08:10Z[UTC]]", rows.get(i).toString());
96+
}
97+
98+
String earlyTimestamptz = "TIMESTAMP '1980-12-08 00:10:00 America/Los_Angeles'";
99+
dropTableIfExists(runner, session.getCatalog().get(), session.getSchema().get(), "test_timestamptz_filter");
100+
assertQuerySucceeds(session, "CREATE TABLE test_timestamptz_filter(a TIMESTAMP WITH TIME ZONE)");
101+
102+
for (int i = 0; i < 5; i++) {
103+
assertUpdate(session, "INSERT INTO test_timestamptz_filter VALUES (" + earlyTimestamptz + ")", 1);
104+
}
105+
for (int i = 0; i < 5; i++) {
106+
assertUpdate(session, "INSERT INTO test_timestamptz_filter VALUES (" + timestamptz + ")", 1);
107+
}
108+
109+
MaterializedResult lateRows = runner.execute(session, "SELECT a FROM test_timestamptz_filter WHERE a > " + earlyTimestamptz);
110+
assertEquals(lateRows.getMaterializedRows().size(), 5);
111+
112+
MaterializedResult lateRowsFromEquals = runner.execute(session, "SELECT a FROM test_timestamptz_filter WHERE a = " + timestamptz);
113+
com.facebook.presto.testing.assertions.Assert.assertEquals(lateRows, lateRowsFromEquals);
114+
115+
MaterializedResult earlyRows = runner.execute(session, "SELECT a FROM test_timestamptz_filter WHERE a < " + timestamptz);
116+
assertEquals(earlyRows.getMaterializedRows().size(), 5);
117+
118+
MaterializedResult earlyRowsFromEquals = runner.execute(session, "SELECT a FROM test_timestamptz_filter WHERE a = " + earlyTimestamptz);
119+
com.facebook.presto.testing.assertions.Assert.assertEquals(earlyRows, earlyRowsFromEquals);
120+
}
121+
}

presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestNestedFieldConverter.java

+14
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
import static com.facebook.presto.common.type.DoubleType.DOUBLE;
3939
import static com.facebook.presto.common.type.IntegerType.INTEGER;
4040
import static com.facebook.presto.common.type.RealType.REAL;
41+
import static com.facebook.presto.common.type.TimestampType.TIMESTAMP;
42+
import static com.facebook.presto.common.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
4143
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
4244
import static com.facebook.presto.common.type.VarcharType.createUnboundedVarcharType;
4345
import static com.facebook.presto.iceberg.NestedFieldConverter.toIcebergNestedField;
@@ -176,6 +178,12 @@ protected static PrestoIcebergNestedField prestoIcebergNestedField(
176178
case "date":
177179
prestoType = DATE;
178180
break;
181+
case "timestamp":
182+
prestoType = TIMESTAMP;
183+
break;
184+
case "timestamptz":
185+
prestoType = TIMESTAMP_WITH_TIME_ZONE;
186+
break;
179187
case "nested":
180188
prestoType = RowType.from(ImmutableList.of(
181189
RowType.field("int", INTEGER),
@@ -239,6 +247,12 @@ protected static Types.NestedField nestedField(int id, String name)
239247
case "date":
240248
icebergType = Types.DateType.get();
241249
break;
250+
case "timestamp":
251+
icebergType = Types.TimestampType.withoutZone();
252+
break;
253+
case "timestamptz":
254+
icebergType = Types.TimestampType.withZone();
255+
break;
242256
case "nested":
243257
icebergType = nested();
244258
break;

presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestSchemaConverter.java

+13-7
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,9 @@ protected static PrestoIcebergSchema prestoIcebergSchema(TypeManager typeManager
9292
prestoIcebergNestedField(9, "varchar", typeManager),
9393
prestoIcebergNestedField(10, "varbinary", typeManager),
9494
prestoIcebergNestedField(11, "row", typeManager),
95-
prestoIcebergNestedField(12, "date", typeManager)));
95+
prestoIcebergNestedField(12, "date", typeManager),
96+
prestoIcebergNestedField(13, "timestamp", typeManager),
97+
prestoIcebergNestedField(14, "timestamptz", typeManager)));
9698

9799
Map<String, Integer> columnNameToIdMapping = getColumnNameToIdMapping();
98100

@@ -114,11 +116,13 @@ private static Map<String, Integer> getColumnNameToIdMapping()
114116
columnNameToIdMapping.put("varbinary", 10);
115117
columnNameToIdMapping.put("row", 11);
116118
columnNameToIdMapping.put("date", 12);
117-
columnNameToIdMapping.put("array.element", 13);
118-
columnNameToIdMapping.put("map.key", 14);
119-
columnNameToIdMapping.put("map.value", 15);
120-
columnNameToIdMapping.put("row.int", 16);
121-
columnNameToIdMapping.put("row.varchar", 17);
119+
columnNameToIdMapping.put("timestamp", 13);
120+
columnNameToIdMapping.put("timestamptz", 14);
121+
columnNameToIdMapping.put("array.element", 15);
122+
columnNameToIdMapping.put("map.key", 16);
123+
columnNameToIdMapping.put("map.value", 17);
124+
columnNameToIdMapping.put("row.int", 18);
125+
columnNameToIdMapping.put("row.varchar", 19);
122126

123127
return columnNameToIdMapping;
124128
}
@@ -137,7 +141,9 @@ protected static Schema schema()
137141
nestedField(9, "varchar"),
138142
nestedField(10, "varbinary"),
139143
nestedField(11, "row"),
140-
nestedField(12, "date")));
144+
nestedField(12, "date"),
145+
nestedField(13, "timestamp"),
146+
nestedField(14, "timestamptz")));
141147

142148
Type schemaAsStruct = Types.StructType.of(fields);
143149
AtomicInteger nextFieldId = new AtomicInteger(1);

0 commit comments

Comments
 (0)