Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-51290][SQL] Enable filling default values in DSv2 writes #50044

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

aokolnychyi
Copy link
Contributor

What changes were proposed in this pull request?

This PR enables filling default values in DSv2 writes.

Why are the changes needed?

These changes are needed for proper support of default values for DSv2 connectors.

Does this PR introduce any user-facing change?

Users will be able to omit columns with default values. There is no impact to existing jobs.

How was this patch tested?

This patch comes with tests.

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions github-actions bot added the SQL label Feb 21, 2025
@@ -3534,7 +3534,8 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
TableOutputResolver.suitableForByNameCheck(v2Write.isByName,
expected = v2Write.table.output, queryOutput = v2Write.query.output)
val projection = TableOutputResolver.resolveOutputColumns(
v2Write.table.name, v2Write.table.output, v2Write.query, v2Write.isByName, conf)
v2Write.table.name, v2Write.table.output, v2Write.query, v2Write.isByName, conf,
supportColDefaultValue = true)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there is value in validating if the catalog defines SUPPORT_COLUMN_DEFAULT_VALUE in capabilities during writes. If a connector includes default value metadata in its schema, it should be enough to fill default values. The flag exists for ALTER and CREATE/REPLACE statements.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea true, Spark fills the default values during table writing and it works for all catalogs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean supportColDefaultValue is true or false doesn't matter for v2 here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. You mean to check for the flag SUPPORT_COLUMN_DEFAULT_VALUE here for the catalog.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, I don't see value in checking SUPPORT_COLUMN_DEFAULT_VALUE here.

@@ -718,6 +724,11 @@ private class BufferedRowsReader(
schema: StructType,
row: InternalRow): Any = {
val index = schema.fieldIndex(field.name)

if (index >= row.numFields) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed for support for adding columns with default values to the end.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is method extractFieldValue. Looks like it is only used by get. Why this is for adding columns?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed to read data inserted prior to adding columns to the schema. If that happens, there would be extra columns in the schema and we have to default new columns using the existence default value.

@@ -423,8 +423,8 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest {
assertNotResolved(parsedPlan)
assertAnalysisErrorCondition(
parsedPlan,
expectedErrorCondition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
expectedMessageParameters = Map("tableName" -> "`table-name`", "colName" -> "`x`")
expectedErrorCondition = "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_COLUMNS",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because of spark.sql.defaultColumn.useNullsForMissingDefaultValues and is aligned with V1 writes.

@aokolnychyi
Copy link
Contributor Author

@@ -328,7 +336,7 @@ trait AlterTableTests extends SharedSparkSession with QueryErrorsBase {
}

test("SPARK-39383 DEFAULT columns on V2 data sources with ALTER TABLE ADD/ALTER COLUMN") {
withSQLConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS.key -> s"$v2Format, ") {
withSQLConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS.key -> s"$v2Format,$catalog") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this conf affect the testing v2 in-memory catalog? I thought it's only for v1 file source.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya is correct. We previously passed "" as provider in the in-memory connector, which required workarounds like this. No longer needed as we pass the catalog name as provider. Simplifies testing.

@@ -122,7 +122,7 @@ class BasicInMemoryTableCatalog extends TableCatalog {
override def alterTable(ident: Identifier, changes: TableChange*): Table = {
val table = loadTable(ident).asInstanceOf[InMemoryTable]
val properties = CatalogV2Util.applyPropertiesChanges(table.properties, changes)
val schema = CatalogV2Util.applySchemaChanges(table.schema, changes, None, "ALTER TABLE")
val schema = CatalogV2Util.applySchemaChanges(table.schema, changes, Some(name), "ALTER TABLE")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to add memory table provider to DEFAULT_COLUMN_ALLOWED_PROVIDERS?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, nvm, the name is given when initializing the catalog.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct.

@aokolnychyi aokolnychyi closed this Mar 1, 2025
@aokolnychyi aokolnychyi reopened this Mar 1, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants