Skip to content

Commit

Permalink
[SPARK-51281][SQL] DataFrameWriterV2 should respect the path option
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Unlike `DataFrameWriter.saveAsTable` where we explicitly get the "path" option and treat it as table location, `DataFrameWriterV2` doesn't do it and treats the "path" option as a normal option which doesn't have any real impact.

This PR fixes it, and adds a legacy config to restore the old behavior.

### Why are the changes needed?

bug fix

### Does this PR introduce _any_ user-facing change?

Yes, now `DataFrameWriterV2` can correctly write data to the specified path for file source tables.

### How was this patch tested?

new test

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #50040 from cloud-fan/prop.

Lead-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Co-authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
cloud-fan and cloud-fan committed Feb 27, 2025
1 parent 412da42 commit a3671e5
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5566,6 +5566,15 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val LEGACY_DF_WRITER_V2_IGNORE_PATH_OPTION =
buildConf("spark.sql.legacy.dataFrameWriterV2IgnorePathOption")
.internal()
.doc("When set to true, DataFrameWriterV2 ignores the 'path' option and always write data " +
"to the default table location.")
.version("3.5.6")
.booleanConf
.createWithDefault(false)

/**
* Holds information about keys that have been deprecated.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedFunction, UnresolvedIdentifier, UnresolvedRelation}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.catalog.TableWritePrivilege._
import org.apache.spark.sql.connector.expressions._
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.IntegerType

/**
Expand Down Expand Up @@ -146,25 +148,30 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])

/** @inheritdoc */
override def create(): Unit = {
val tableSpec = UnresolvedTableSpec(
properties = properties.toMap,
provider = provider,
optionExpression = OptionList(Seq.empty),
location = None,
comment = None,
collation = None,
serde = None,
external = false)
runCommand(
CreateTableAsSelect(
UnresolvedIdentifier(tableName),
partitioning.getOrElse(Seq.empty) ++ clustering,
logicalPlan,
tableSpec,
buildTableSpec(),
options.toMap,
false))
}

private def buildTableSpec(): UnresolvedTableSpec = {
val ignorePathOption = sparkSession.sessionState.conf.getConf(
SQLConf.LEGACY_DF_WRITER_V2_IGNORE_PATH_OPTION)
UnresolvedTableSpec(
properties = properties.toMap,
provider = provider,
optionExpression = OptionList(Seq.empty),
location = if (ignorePathOption) None else CaseInsensitiveMap(options.toMap).get("path"),
comment = None,
collation = None,
serde = None,
external = false)
}

/** @inheritdoc */
override def replace(): Unit = {
internalReplace(orCreate = false)
Expand Down Expand Up @@ -212,20 +219,11 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])
}

private def internalReplace(orCreate: Boolean): Unit = {
val tableSpec = UnresolvedTableSpec(
properties = properties.toMap,
provider = provider,
optionExpression = OptionList(Seq.empty),
location = None,
comment = None,
collation = None,
serde = None,
external = false)
runCommand(ReplaceTableAsSelect(
UnresolvedIdentifier(tableName),
partitioning.getOrElse(Seq.empty) ++ clustering,
logicalPlan,
tableSpec,
buildTableSpec(),
writeOptions = options.toMap,
orCreate = orCreate))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -839,4 +839,30 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
condition = "CALL_ON_STREAMING_DATASET_UNSUPPORTED",
parameters = Map("methodName" -> "`writeTo`"))
}

test("SPARK-51281: DataFrameWriterV2 should respect the path option") {
def checkResults(df: DataFrame): Unit = {
checkAnswer(df, spark.range(10).toDF())
}

Seq(true, false).foreach { ignorePath =>
withSQLConf(SQLConf.LEGACY_DF_WRITER_V2_IGNORE_PATH_OPTION.key -> ignorePath.toString) {
withTable("t1", "t2") {
spark.range(10).writeTo("t1").using("json").create()
checkResults(spark.table("t1"))

withTempPath { p =>
val path = p.getCanonicalPath
spark.range(10).writeTo("t2").using("json").option("path", path).create()
checkResults(spark.table("t2"))
if (ignorePath) {
assert(!p.exists())
} else {
checkResults(spark.read.json(path))
}
}
}
}
}
}
}

0 comments on commit a3671e5

Please sign in to comment.