-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-12297][SQL] Hive compatibility for Parquet Timestamps #16781
Changes from 21 commits
53d0744
69a3c8c
51e24f2
7e61841
9fbde13
bac9eb0
1b05978
b622d27
0604403
f45516d
d4511a6
223ce2c
5b49ae0
9ef60a4
0b6883c
69b8142
7ca2c86
6f982d3
1ad2f83
2c8a228
f0b89fd
db0216f
46fab8d
c242fb8
c87a573
db7e514
f4dca27
2891582
d951443
38e19cd
1e3b768
39f506c
f33bc91
17565e8
a96806f
7582b2c
5817064
773704a
be134be
d15b660
283b1c7
6ccaa92
71c7e60
75e8579
e4e88a5
44a8bbb
e31657a
d4ff9fd
acc72ea
b9c03e9
fc17a2e
2537437
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,7 +18,9 @@ | |
package org.apache.spark.sql.execution.datasources.parquet; | ||
|
||
import java.io.IOException; | ||
import java.util.TimeZone; | ||
|
||
import org.apache.hadoop.conf.Configuration; | ||
import org.apache.parquet.bytes.BytesUtils; | ||
import org.apache.parquet.column.ColumnDescriptor; | ||
import org.apache.parquet.column.Dictionary; | ||
|
@@ -28,6 +30,7 @@ | |
import org.apache.parquet.io.api.Binary; | ||
import org.apache.parquet.schema.PrimitiveType; | ||
|
||
import org.apache.spark.sql.catalyst.util.DateTimeUtils; | ||
import org.apache.spark.sql.execution.vectorized.ColumnVector; | ||
import org.apache.spark.sql.types.DataTypes; | ||
import org.apache.spark.sql.types.DecimalType; | ||
|
@@ -89,11 +92,23 @@ public class VectorizedColumnReader { | |
|
||
private final PageReader pageReader; | ||
private final ColumnDescriptor descriptor; | ||
private final TimeZone storageTz; | ||
private final TimeZone localTz = TimeZone.getDefault(); | ||
|
||
public VectorizedColumnReader(ColumnDescriptor descriptor, PageReader pageReader) | ||
public VectorizedColumnReader(ColumnDescriptor descriptor, PageReader pageReader, | ||
Configuration conf) | ||
throws IOException { | ||
this.descriptor = descriptor; | ||
this.pageReader = pageReader; | ||
// If the table has a timezone property, apply the correct conversions. See SPARK-12297. | ||
// The conf is sometimes null in tests. | ||
String tzString = | ||
conf == null ? null : conf.get(ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY()); | ||
if (tzString == null || tzString == "") { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is java code, not scala, you probably meant tzString.equals("") instead of tzString == "" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Or even better, isEmpty(). |
||
storageTz = localTz; | ||
} else { | ||
storageTz = TimeZone.getTimeZone(tzString); | ||
} | ||
this.maxDefLevel = descriptor.getMaxDefinitionLevel(); | ||
|
||
DictionaryPage dictionaryPage = pageReader.readDictionaryPage(); | ||
|
@@ -276,7 +291,7 @@ private void decodeDictionaryIds(int rowId, int num, ColumnVector column, | |
// TODO: Convert dictionary of Binaries to dictionary of Longs | ||
if (!column.isNullAt(i)) { | ||
Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); | ||
column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v)); | ||
column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v, storageTz, localTz)); | ||
} | ||
} | ||
} else { | ||
|
@@ -401,7 +416,7 @@ private void readBinaryBatch(int rowId, int num, ColumnVector column) throws IOE | |
if (defColumn.readInteger() == maxDefLevel) { | ||
column.putLong(rowId + i, | ||
// Read 12 bytes for INT96 | ||
ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12))); | ||
ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12), storageTz, localTz)); | ||
} else { | ||
column.putNull(rowId + i); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,10 +19,12 @@ package org.apache.spark.sql.execution.datasources.parquet | |
|
||
import java.math.{BigDecimal, BigInteger} | ||
import java.nio.ByteOrder | ||
import java.util.TimeZone | ||
|
||
import scala.collection.JavaConverters._ | ||
import scala.collection.mutable.ArrayBuffer | ||
|
||
import org.apache.hadoop.conf.Configuration | ||
import org.apache.parquet.column.Dictionary | ||
import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter} | ||
import org.apache.parquet.schema.{GroupType, MessageType, Type} | ||
|
@@ -117,12 +119,14 @@ private[parquet] class ParquetPrimitiveConverter(val updater: ParentContainerUpd | |
* @param parquetType Parquet schema of Parquet records | ||
* @param catalystType Spark SQL schema that corresponds to the Parquet record type. User-defined | ||
* types should have been expanded. | ||
* @param hadoopConf a hadoop Configuration for passing any extra parameters for parquet conversion | ||
* @param updater An updater which propagates converted field values to the parent container | ||
*/ | ||
private[parquet] class ParquetRowConverter( | ||
schemaConverter: ParquetSchemaConverter, | ||
parquetType: GroupType, | ||
catalystType: StructType, | ||
hadoopConf: Configuration, | ||
updater: ParentContainerUpdater) | ||
extends ParquetGroupConverter(updater) with Logging { | ||
|
||
|
@@ -254,18 +258,20 @@ private[parquet] class ParquetRowConverter( | |
|
||
case TimestampType => | ||
// TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that. | ||
// If the table has a timezone property, apply the correct conversions. See SPARK-12297. | ||
val tzString = hadoopConf.get(ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY) | ||
val localTz = TimeZone.getDefault() | ||
val storageTz = if (tzString == null) { | ||
localTz | ||
} else { | ||
TimeZone.getTimeZone(tzString) | ||
} | ||
new ParquetPrimitiveConverter(updater) { | ||
// Converts nanosecond timestamps stored as INT96 | ||
override def addBinary(value: Binary): Unit = { | ||
assert( | ||
value.length() == 12, | ||
"Timestamps (with nanoseconds) are expected to be stored in 12-byte long binaries, " + | ||
s"but got a ${value.length()}-byte binary.") | ||
|
||
val buf = value.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN) | ||
val timeOfDayNanos = buf.getLong | ||
val julianDay = buf.getInt | ||
updater.setLong(DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos)) | ||
val timestamp = ParquetRowConverter.binaryToSQLTimestamp(value, storageTz = storageTz, | ||
localTz = localTz) | ||
updater.setLong(timestamp) | ||
} | ||
} | ||
|
||
|
@@ -295,7 +301,7 @@ private[parquet] class ParquetRowConverter( | |
|
||
case t: StructType => | ||
new ParquetRowConverter( | ||
schemaConverter, parquetType.asGroupType(), t, new ParentContainerUpdater { | ||
schemaConverter, parquetType.asGroupType(), t, hadoopConf, new ParentContainerUpdater { | ||
override def set(value: Any): Unit = updater.set(value.asInstanceOf[InternalRow].copy()) | ||
}) | ||
|
||
|
@@ -644,6 +650,7 @@ private[parquet] class ParquetRowConverter( | |
} | ||
|
||
private[parquet] object ParquetRowConverter { | ||
|
||
def binaryToUnscaledLong(binary: Binary): Long = { | ||
// The underlying `ByteBuffer` implementation is guaranteed to be `HeapByteBuffer`, so here | ||
// we are using `Binary.toByteBuffer.array()` to steal the underlying byte array without | ||
|
@@ -666,12 +673,26 @@ private[parquet] object ParquetRowConverter { | |
unscaled | ||
} | ||
|
||
def binaryToSQLTimestamp(binary: Binary): SQLTimestamp = { | ||
/** | ||
* Converts an int96 to a SQLTimestamp, given both the storage timezone and the local timezone. | ||
* The timestamp is really meant to be interpreted as a "floating time", but since we | ||
* actually store it as micros since epoch, why we have to apply a conversion when timezones | ||
* change. | ||
* @param binary | ||
* @return | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you also add descriptions for |
||
*/ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add |
||
def binaryToSQLTimestamp(binary: Binary, storageTz: TimeZone, localTz: TimeZone): SQLTimestamp = { | ||
assert(binary.length() == 12, s"Timestamps (with nanoseconds) are expected to be stored in" + | ||
s" 12-byte long binaries. Found a ${binary.length()}-byte binary instead.") | ||
val buffer = binary.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN) | ||
val timeOfDayNanos = buffer.getLong | ||
val julianDay = buffer.getInt | ||
DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos) | ||
val utcEpochMicros = DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos) | ||
// avoid expensive time logic if possible. | ||
if (storageTz.getID() != localTz.getID()) { | ||
DateTimeUtils.convertTz(utcEpochMicros, storageTz, localTz) | ||
} else { | ||
utcEpochMicros | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -674,6 +674,12 @@ object SQLConf { | |
.stringConf | ||
.createWithDefault(TimeZone.getDefault().getID()) | ||
|
||
val PARQUET_TABLE_INCLUDE_TIMEZONE = | ||
buildConf("spark.sql.session.parquet.timeZone") | ||
.doc("""Enables inclusion of parquet timezone property in newly created parquet tables""") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There should be a config option for writing "UTC" to the table property when creating tables, not for writing the local timezone. |
||
.booleanConf | ||
.createWithDefault(false) | ||
|
||
object Deprecated { | ||
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be case insensitive?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Java does a case-sensitive check, which means Hive does too. I don't think we want to write out a timezone w/ the wrong capitalization, and then have another tool throw an error.
https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java#L167
We could try to auto-convert the user's timezone to the correct capitilazation, but do you think that is worth it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see. Let's keep current work.