-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-20682][SPARK-15474][SPARK-21791] Add new ORCFileFormat based on ORC 1.4.1 #19651
Conversation
Test build #83382 has finished for PR 19651 at commit
|
retest this please |
Test build #83407 has started for PR 19651 at commit |
Thank you, @HyukjinKwon ! |
Test build #83431 has finished for PR 19651 at commit
|
Retest this please |
Test build #83433 has finished for PR 19651 at commit
|
Hi, @cloud-fan and @gatorsmile . |
|
||
object OrcUtils extends Logging { | ||
|
||
def listOrcFiles(pathStr: String, conf: Configuration): Seq[Path] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is moved from OrcFileOperator in sql/hive
.
@@ -67,4 +67,11 @@ object OrcOptions { | |||
"snappy" -> "SNAPPY", | |||
"zlib" -> "ZLIB", | |||
"lzo" -> "LZO") | |||
|
|||
// The extensions for ORC compression codecs | |||
val extensionsForCompressionCodecNames = Map( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is moved from object ORCFileFormat in sql/hive
.
.filterNot(_.getName.startsWith("_")) | ||
.filterNot(_.getName.startsWith(".")) | ||
paths | ||
def setRequiredColumns( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is moved from object ORCFileFormat inside sql/hive
.
|
||
private[orc] def readSchema(sparkSession: SparkSession, files: Seq[FileStatus]) | ||
: Option[StructType] = { | ||
val conf = sparkSession.sparkContext.hadoopConfiguration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sparkSession.sessionState.newHadoopConf
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure!
private[orc] def readSchema(sparkSession: SparkSession, files: Seq[FileStatus]) | ||
: Option[StructType] = { | ||
val conf = sparkSession.sparkContext.hadoopConfiguration | ||
files.map(_.getPath).flatMap(readSchema(_, conf)).headOption.map { schema => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we do schema merging?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Later, I will implement schema merging in a parallel manner like Parquet.
true | ||
} | ||
|
||
override def buildReaderWithPartitionValues( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should override buildReader
and return GenericInternalRow
here. Then the parent class will merge the partition values and output UnsafeRow
. This is what the current OrcFileFormat
does and let's keep it first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. I see. It was because I preferred to be consistent with ParquetFileFormat
here.
|
||
val convertibleFilters = for { | ||
filter <- filters | ||
_ <- buildSearchArgument(dataTypeMap, filter, SearchArgumentFactory.newBuilder()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why call this function inside a loop? Can we put it at the beginning?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a two-step approach which validates each individual filter is convertible.
I'll add the comment of SPARK-12218.
@@ -67,4 +67,11 @@ object OrcOptions { | |||
"snappy" -> "SNAPPY", | |||
"zlib" -> "ZLIB", | |||
"lzo" -> "LZO") | |||
|
|||
// The extensions for ORC compression codecs | |||
val extensionsForCompressionCodecNames = Map( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doesn't belong to OrcOptions
, maybe OrcUtils
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's moved to OrcUtils.
override def write(row: InternalRow): Unit = { | ||
recordWriter.write( | ||
NullWritable.get, | ||
OrcUtils.convertInternalRowToOrcStruct( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ideally we should make it into a function a use it in write
, like the old OrcOutputWriter
did.
import org.apache.spark.internal.Logging | ||
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser | ||
import org.apache.spark.sql.execution.datasources.orc.OrcUtils | ||
import org.apache.spark.sql.hive.HiveShim | ||
import org.apache.spark.sql.types.StructType | ||
|
||
private[hive] object OrcFileOperator extends Logging { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we merge this class to OrcUtils
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OrcFileOperator
defines functions depending on Hive. We cannot merge these functions into sql/core
.
import org.apache.hadoop.hive.ql.io.orc.{OrcFile, Reader}
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector
* Convert Apache ORC OrcStruct to Apache Spark InternalRow. | ||
* If internalRow is not None, fill into it. Otherwise, create a SpecificInternalRow and use it. | ||
*/ | ||
private[orc] def convertOrcStructToInternalRow( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
like the old orc format, can we create a OrcSerializer
to capsulate these serializing logic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. It's done.
Thank you so much for review, @cloud-fan . I'll try to update the PR tonight. |
@dongjoon-hyun, btw, if I understood correctly,
we don't necessarily remove the old (I said this because I'd like to keep the blame easy to track if possible). |
Right. @HyukjinKwon . I'll follow the final decision on this PR. |
Test build #83543 has finished for PR 19651 at commit
|
The PR is updated according to your advice. Thank you again, @cloud-fan ! |
Hi, @cloud-fan and @gatorsmile . |
Retest this please. |
Test build #83669 has finished for PR 19651 at commit
|
Retest this please. |
private[this] val valueWrappers = requiredSchema.fields.map(f => getValueWrapper(f.dataType)) | ||
|
||
def deserialize(writable: OrcStruct): InternalRow = { | ||
convertOrcStructToInternalRow(writable, dataSchema, requiredSchema, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you follow the code style in OrcFileFormat.unwrapOrcStructs
? Basically create an unwrapper for each field, and unwrapper is a (Any, InternalRow, Int) => Unit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
your implementation here doesn't consider boxing for primitive types at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use valueWrapper
for each field here. Do you mean changing name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your wrapper returns a value, while the old implementation's wrapper set value to InternalRow
, which avoids boxing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh. I see.
|
||
val broadcastedConf = | ||
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) | ||
val resolver = sparkSession.sessionState.conf.resolver |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we can use sparkSession.sessionState.conf.isCaseSensitive
here, as it's much cheaper than serializing a function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure!
|
||
for { | ||
conjunction <- convertibleFilters.reduceOption(org.apache.spark.sql.sources.And) | ||
builder <- buildSearchArgument(dataTypeMap, conjunction, SearchArgumentFactory.newBuilder()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you mean, even if each individual filter is convertible, the final filter(combine filters by And
) may be un-convertible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your previous question was about line 40.
why call this function inside a loop? Can we put it at the beginning?
+ val convertibleFilters = for {
+ filter <- filters
+ _ <- buildSearchArgument(dataTypeMap, filter, SearchArgumentFactory.newBuilder())
Here. It seems you are asking another one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah you are just following the previous code:
// First, tries to convert each filter individually to see whether it's convertible, and then
// collect all convertible ones to build the final `SearchArgument`.
val convertibleFilters = for {
filter <- filters
_ <- buildSearchArgument(dataTypeMap, filter, SearchArgumentFactory.newBuilder())
} yield filter
for {
// Combines all convertible filters using `And` to produce a single conjunction
conjunction <- convertibleFilters.reduceOption(And)
// Then tries to build a single ORC `SearchArgument` for the conjunction predicate
builder <- buildSearchArgument(dataTypeMap, conjunction, SearchArgumentFactory.newBuilder())
} yield builder.build()
can you add back those comments?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure!
Thank you so much, @cloud-fan . |
* builder methods mentioned above can only be found in test code, where all tested filters are | ||
* known to be convertible. | ||
*/ | ||
private[orc] object OrcFilters { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't review it carefully, just assume it's same with the old version, with API update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. It's logically the same with old version. Only API usage is updated here.
Test build #84378 has finished for PR 19651 at commit
|
Test build #84393 has finished for PR 19651 at commit
|
great, all tests pass! Let's restore to old ORC implementation and merge it. |
Sure, @cloud-fan . |
Now, this PR has only new OrcFileFormat-related addition: 1009 insertions(+), 2 deletions(-)
|
Test build #84396 has finished for PR 19651 at commit
|
@cloud-fan . It pass the Jenkins again. Could you take a look again? |
val paths = SparkHadoopUtil.get.listLeafStatuses(fs, origPath) | ||
.filterNot(_.isDirectory) | ||
.map(_.getPath) | ||
.filterNot(_.getName.startsWith("_")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: How about combining two filterNot
into one filterNot
by creating one condition with two startsWith
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kiszk . This comes from the existing code, OrcFileOperator.scala. This PR keeps the original function because I don't want to make a possibility of difference. We had better do those kind of improvement later in a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your explanation, got it.
thanks, merging to master! followups:
|
Thank you so much for making ORC move forward, @cloud-fan ! |
…a sources ## What changes were proposed in this pull request? After [SPARK-20682](apache#19651), Apache Spark 2.3 is able to read ORC files with Unicode schema. Previously, it raises `org.apache.spark.sql.catalyst.parser.ParseException`. This PR adds a Unicode schema test for CSV/JSON/ORC/Parquet file-based data sources. Note that TEXT data source only has [a single column with a fixed name 'value'](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala#L71). ## How was this patch tested? Pass the newly added test case. Author: Dongjoon Hyun <dongjoon@apache.org> Closes apache#20266 from dongjoon-hyun/SPARK-23072.
…a sources ## What changes were proposed in this pull request? After [SPARK-20682](#19651), Apache Spark 2.3 is able to read ORC files with Unicode schema. Previously, it raises `org.apache.spark.sql.catalyst.parser.ParseException`. This PR adds a Unicode schema test for CSV/JSON/ORC/Parquet file-based data sources. Note that TEXT data source only has [a single column with a fixed name 'value'](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala#L71). ## How was this patch tested? Pass the newly added test case. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #20266 from dongjoon-hyun/SPARK-23072. (cherry picked from commit a0aedb0) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
Since SPARK-2883, Apache Spark supports Apache ORC inside
sql/hive
module with Hive dependency. This PR aims to add a new ORC data source insidesql/core
and to replace the old ORC data source eventually. This PR resolves the following three issues.How was this patch tested?
Pass the Jenkins with the existing all tests and new tests for SPARK-15474 and SPARK-21791.