-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-19965][SS] DataFrame batch reader may fail to infer partitions when reading FileStreamSink's output #17346
Conversation
Test build #74819 has finished for PR 17346 at commit
|
Test build #74820 has finished for PR 17346 at commit
|
@zsxwing would you take a look at this? Thanks! |
Test build #75336 has finished for PR 17346 at commit
|
Test build #75565 has finished for PR 17346 at commit
|
Rebased to master to resolve conflicts |
Test build #76081 has finished for PR 17346 at commit
|
Test build #76293 has started for PR 17346 at commit |
Test build #76301 has finished for PR 17346 at commit
|
Jenkins retest this please |
@zsxwing would you take a look at your convenience? Thanks! |
Test build #76348 has finished for PR 17346 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the delay. Looks pretty good. Just some nits.
* - ancestorIsMetadataDirectory(/a/b/c) => false | ||
*/ | ||
def ancestorIsMetadataDirectory(path: Path): Boolean = { | ||
require(path.isAbsolute, s"$path is required to be absolute") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if we can call makeQualified
instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
switched to makeQualified
require(path.isAbsolute, s"$path is required to be absolute") | ||
var currentPath = path | ||
var finished = false | ||
while (!finished) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about changing it to currentPath != null
? Then you don't need finished
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed. good point!
val inputData = MemoryStream[Int] | ||
val ds = inputData.toDS() | ||
|
||
val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use withTempDir
to create temp dir instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
val ds = inputData.toDS() | ||
|
||
val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath | ||
val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: same as above
// or "/.../_spark_metadata/0" (a file in the metadata dir). `rootPathsSpecified` might contain | ||
// such streaming metadata dir or files, e.g. when after globbing "basePath/*" where "basePath" | ||
// is the output of a streaming query. | ||
override val rootPaths = rootPathsSpecified.filterNot(FileStreamSink.ancestorIsMetadataDirectory) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to confirm one thing: for files in rootPaths
or their sub dirs, they will be dropped by InMemoryFileIndex.shouldFilterOut
. Right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea that's quite correct! They will be filted by InMemoryFileIndex.shouldFilterOut
.
Test build #76408 has finished for PR 17346 at commit
|
Comments have been addressed -- @zsxwing it'd be great if you could take another look |
LGTM. Thanks! Merging to master and 2.2. |
… when reading FileStreamSink's output ## The Problem Right now DataFrame batch reader may fail to infer partitions when reading FileStreamSink's output: ``` [info] - partitioned writing and batch reading with 'basePath' *** FAILED *** (3 seconds, 928 milliseconds) [info] java.lang.AssertionError: assertion failed: Conflicting directory structures detected. Suspicious paths: [info] ***/stream.output-65e3fa45-595a-4d29-b3df-4c001e321637 [info] ***/stream.output-65e3fa45-595a-4d29-b3df-4c001e321637/_spark_metadata [info] [info] If provided paths are partition directories, please set "basePath" in the options of the data source to specify the root directory of the table. If there are multiple root directories, please load them separately and then union them. [info] at scala.Predef$.assert(Predef.scala:170) [info] at org.apache.spark.sql.execution.datasources.PartitioningUtils$.parsePartitions(PartitioningUtils.scala:133) [info] at org.apache.spark.sql.execution.datasources.PartitioningUtils$.parsePartitions(PartitioningUtils.scala:98) [info] at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.inferPartitioning(PartitioningAwareFileIndex.scala:156) [info] at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.partitionSpec(InMemoryFileIndex.scala:54) [info] at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.partitionSchema(PartitioningAwareFileIndex.scala:55) [info] at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:133) [info] at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:361) [info] at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:160) [info] at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:536) [info] at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:520) [info] at org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply$mcV$sp(FileStreamSinkSuite.scala:292) [info] at org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply(FileStreamSinkSuite.scala:268) [info] at org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply(FileStreamSinkSuite.scala:268) ``` ## What changes were proposed in this pull request? This patch alters `InMemoryFileIndex` to filter out these `basePath`s whose ancestor is the streaming metadata dir (`_spark_metadata`). E.g., the following and other similar dir or files will be filtered out: - (introduced by globbing `basePath/*`) - `basePath/_spark_metadata` - (introduced by globbing `basePath/*/*`) - `basePath/_spark_metadata/0` - `basePath/_spark_metadata/1` - ... ## How was this patch tested? Added unit tests Author: Liwei Lin <lwlin7@gmail.com> Closes #17346 from lw-lin/filter-metadata. (cherry picked from commit 6b9e49d) Signed-off-by: Shixiong Zhu <shixiong@databricks.com>
thank you @zsxwing |
The Problem
Right now DataFrame batch reader may fail to infer partitions when reading FileStreamSink's output:
What changes were proposed in this pull request?
This patch alters
InMemoryFileIndex
to filter out thesebasePath
s whose ancestor is the streaming metadata dir (_spark_metadata
). E.g., the following and other similar dir or files will be filtered out:basePath/*
)basePath/_spark_metadata
basePath/*/*
)basePath/_spark_metadata/0
basePath/_spark_metadata/1
How was this patch tested?
Added unit tests