Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

int96 support in parquet #1138

Closed
thesquelched opened this issue Jun 25, 2020 · 23 comments · Fixed by #1184 or #1323
Closed

int96 support in parquet #1138

thesquelched opened this issue Jun 25, 2020 · 23 comments · Fixed by #1184 or #1323

Comments

@thesquelched
Copy link

When writing parquet, Spark wants to write timestamps as int96 for compatibility with hive/presto/impala/etc. Iceberg writes timestamps as the parquet timestamp logical type over int64. While this is less than ideal, the real problem is that int96 data is not supported at all, making it impossible to use iceberg with existing parquet data files without first rewriting the data.

Ideally, iceberg would add support for int96-as-timestamp, similar to how spark handles them. However, at minimum, iceberg should be able to understand the int96 type.

@gustavoatt
Copy link
Contributor

It seems like the current spec does require timestamps to be stored as epoch microseconds in an int64.

However I do think that supporting int96 timestamps on the read path would be really handy when migrating tables to iceberg to avoid having to rewrite data.

@rdblue
Copy link
Contributor

rdblue commented Jun 27, 2020

To be clear, when you use Iceberg with Spark, Iceberg's timestamp type works just fine. So this is only necessary when working with other systems. And I believe Spark can already write the INT64 timestamps that Iceberg uses.

Iceberg is strict about the files that it writes and uses the timestamps specified in Parquet. The INT96 timestamps have a ton of problems, so it's really unlikely that it would ever make sense for Iceberg to write them.

Iceberg could read data files with those timestamps, but they would have to be existing data files imported by converting a Parquet/Hive table over to Iceberg. I guess if someone needed to implement that, we'd review it.

@gustavoatt
Copy link
Contributor

Thanks @rdblue! We would most likely need this to migrate some of our biggest tables to Iceberg since we were writing timestamps using int96.

I can take this on as only read-support, do you have any pointers where the change would need to be made? I'm looking at /parquet/ColumnIterator and /parquet/PageIterator since that is where we have parquet read support.

@aokolnychyi
Copy link
Contributor

aokolnychyi commented Jun 29, 2020

Is the INT96 timestamp representation in query engines like Hive, Impala, Spark identical? In other words, will we have to understand which system wrote that INT96 timestamp in order to read it in Iceberg? I remember different query engines did different workarounds.

@rdblue
Copy link
Contributor

rdblue commented Jun 29, 2020

Off the top of my head, we'd need to support:

  1. A value reader for INT96 that converts values to micros
  2. Type conversion from Parquet schemas to Iceberg (timestamp or timestamptz?)

We would not need to modify any of the Parquet filter classes because INT96 timestamps have broken min/max stats. We would probably want to ignore them entirely in the Parquet metrics code as well.

@rdblue
Copy link
Contributor

rdblue commented Jun 30, 2020

Also, we should find out more about differences between engines so we can document how our implementation works. I think there were some bugs there. Thanks for pointing it out, @aokolnychyi.

@gustavoatt
Copy link
Contributor

Here is the spark code that writes int96 timestamps.

There is some discussion on SPARK-12297 about having Spark int96 timestamps to work correctly with Hive & Impala.

@gustavoatt
Copy link
Contributor

Sent a PR to add int96 timestamp support in #1184. I was able to verify the behavior for spark.

I think Hive writes them in the exact same way, would need to verify that to be sure.

@thesquelched
Copy link
Author

I think there's a little bit more work to be done. I get the following when using org.apache.iceberg.parquet.ParquetUtil.fileMetrics:

java.lang.UnsupportedOperationException: Cannot convert unknown primitive type: optional int96 created_at
  at org.apache.iceberg.parquet.MessageTypeToType.primitive(MessageTypeToType.java:163)
  at org.apache.iceberg.parquet.MessageTypeToType.primitive(MessageTypeToType.java:41)
  at org.apache.iceberg.parquet.ParquetTypeVisitor.visit(ParquetTypeVisitor.java:41)
  at org.apache.iceberg.parquet.ParquetTypeVisitor.visitFields(ParquetTypeVisitor.java:168)
  at org.apache.iceberg.parquet.ParquetTypeVisitor.visit(ParquetTypeVisitor.java:38)
  at org.apache.iceberg.parquet.ParquetSchemaUtil.convert(ParquetSchemaUtil.java:47)
  at org.apache.iceberg.parquet.ParquetUtil.footerMetrics(ParquetUtil.java:90)
  at org.apache.iceberg.parquet.ParquetUtil.fileMetrics(ParquetUtil.java:74)
  at getMetrics(<console>:43)
  ... 53 elided

@aokolnychyi
Copy link
Contributor

I think we will need a test when we create a Parquet table with int96 timestamps, migrate it to Iceberg, and read back to make sure this works end-to-end.

@gustavoatt
Copy link
Contributor

Agreed! I'll add a test for that and make sure that end to end icerberg-parquet can read tables with int96 timestamps.

Current tests only verifies that we can read the parquet file but not whether it can be read once it is migrated to Iceberg.

@aokolnychyi
Copy link
Contributor

We will have to be careful with stats as well. I think Parquet does not keep stats for int96, though.

@rdblue
Copy link
Contributor

rdblue commented Aug 7, 2020

Yeah, looks like a problem with import. @aokolnychyi is right that stats for int96 columns are unreliable. That makes the fix easy: we just ignore int96 columns in the import path.

@aokolnychyi
Copy link
Contributor

@gustavoatt @rdblue quick question on int96 representation in different query engines.

Here is what I see in Spark:

  val PARQUET_INT96_AS_TIMESTAMP = buildConf("spark.sql.parquet.int96AsTimestamp")
    .doc("Some Parquet-producing systems, in particular Impala, store Timestamp into INT96. " +
      "Spark would also store Timestamp as INT96 because we need to avoid precision lost of the " +
      "nanoseconds field. This flag tells Spark SQL to interpret INT96 data as a timestamp to " +
      "provide compatibility with these systems.")
    .booleanConf
    .createWithDefault(true)

  val PARQUET_INT96_TIMESTAMP_CONVERSION = buildConf("spark.sql.parquet.int96TimestampConversion")
    .doc("This controls whether timestamp adjustments should be applied to INT96 data when " +
      "converting to timestamps, for data written by Impala.  This is necessary because Impala " +
      "stores INT96 data with a different timezone offset than Hive & Spark.")
    .booleanConf
    .createWithDefault(false)

I remember there were changes/plans on Impala side. Does anybody know the current state?

@rdblue
Copy link
Contributor

rdblue commented Aug 17, 2020

I think that the first option, int96 as timestamp, isn't very useful. I'm not aware of any system that writes int96 data that is not a timestamp. If such a system exists, we can figure out how to support it later.

For the second option, it appears that there are some adjustments made to for tables written by other systems. I think that we can do better than a global runtime flag by adding a table property that supports an adjustment for int96 data. New timestamp data will necessarily be written using int64/microseconds and will not have the problem. But we could support an adjustment to add to all int96 values.

@aokolnychyi
Copy link
Contributor

I think we already cover points 1 & 2 except potentially some systems like Impala. My question is whether the current fallback scenario we have for INT96 will lead to incorrect results while reading Impala data? I tested Spark/Hive only and that works.

@rdblue
Copy link
Contributor

rdblue commented Aug 17, 2020

Currently, there is no adjustment to int96 timestamp values. And I believe that any adjustment Spark makes is based on the current session time zone. If there needs to be an adjustment to imported int96 timestamps for Impala or any other writer that was incorrect, then I think it makes sense to add a static offset for all int96 values, assuming that all of them were written the same way.

I'm happy to not add this if no one needs it, but I think it is the remaining piece to solve any problems that might come up.

@gustavoatt
Copy link
Contributor

Currently, there is no adjustment to int96 timestamp values. And I believe that any adjustment Spark makes is based on the current session time zone. If there needs to be an adjustment to imported int96 timestamps for Impala or any other writer that was incorrect, then I think it makes sense to add a static offset for all int96 values, assuming that all of them were written the same way.

I'm happy to not add this if no one needs it, but I think it is the remaining piece to solve any problems that might come up.

Yes, Spark makes the adjustment based on the current session timezone:

// PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps'
// *only* if the file was created by something other than "parquet-mr", so check the actual
// writer here for this file.  We have to do this per-file, as each file in the table may
// have different writers.
// Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads.
def isCreatedByParquetMr: Boolean =
  footerFileMetaData.getCreatedBy().startsWith("parquet-mr")

val convertTz =
  if (timestampConversion && !isCreatedByParquetMr) {
    Some(DateTimeUtils.getTimeZone(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
  } else {
    None
  }

On our end we don't really need to add this offset since we only have to deal with int96 written by either Spark or Hive

@Ambarish-29
Copy link

Ambarish-29 commented Mar 12, 2025

@rdblue Can we write in int96 instead of int64 for timestamp fields from iceberg. Is there an option available like that. Because when we write as int64 from iceberg, hive displays the time in UTC and from impala i can't be able to insert data.

But before in my hive table, the timestamps are stored as int96, and every engine interprets the timestamp correctly and displays the local time zone. In my case when I use timestamp with timezone in iceberg, spark and impala showing correctly as local time, while hive showing in UTC with Z at suffix. and also, I can't be able to insert data from impala.

If I go with timestamp without timezone, I can be able to write from all engines, but in impala and hive the time is displayed in UTC

@Fokko
Copy link
Contributor

Fokko commented Mar 12, 2025

The physical types in Parquet are defined by the spec, and I would oppose writing something that's not in line with the specification. INT96 has been marked as deprecated a while ago, and also engines like Spark are moving away from INT96.

I'm surprised that Impala isn't able to insert any data, cc @gaborkaszab

@Ambarish-29
Copy link

@Fokko @gaborkaszab

I am getting the error when trying to insert data from impala for iceberg table which has timestampZ (timestamp with timezone type).

The error is AnalysisException: The Iceberg table has a TimestampZ column that impala cannot write.

@Ambarish-29
Copy link

There is a Cloudera documentation regarding this impala issue.

Doc: https://docs.cloudera.com/runtime/7.3.1/iceberg-how-to/topics/iceberg-data-types.html#:~:text=Impala%20is%20unable%20to%20write%20to%20Iceberg%20tables,all%20the%20engines%20must%20be%20running%20in%20UTC.

Cloudera mentioned that TimestampZ is a read only type and we cannot write it from Impala

@gaborkaszab
Copy link
Collaborator

I am getting the error when trying to insert data from impala for iceberg table which has timestampZ (timestamp with timezone type).

Impala doesn't support timestamp with timezone type currently, unfortunately. There are discussions to add support, but I'm not aware of much progress on this.

Weird to see Cloudera docs on an Iceberg github conversation :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
7 participants