Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19784][SPARK-25403][SQL] Refresh the table even table stats is empty #22721

Closed
wants to merge 14 commits into from
Closed

[SPARK-19784][SPARK-25403][SQL] Refresh the table even table stats is empty #22721

wants to merge 14 commits into from

Conversation

wangyum
Copy link
Member

@wangyum wangyum commented Oct 15, 2018

What changes were proposed in this pull request?

We invalidate table relation once table data is changed by SPARK-21237. But there is a situation we have not invalidated(spark.sql.statistics.size.autoUpdate.enabled=false and table.stats.isEmpty):

def updateTableStats(sparkSession: SparkSession, table: CatalogTable): Unit = {
val catalog = sparkSession.sessionState.catalog
if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) {
val newTable = catalog.getTableMetadata(table.identifier)
val newSize = CommandUtils.calculateTotalSize(sparkSession, newTable)
val newStats = CatalogStatistics(sizeInBytes = newSize)
catalog.alterTableStats(table.identifier, Some(newStats))
} else if (table.stats.nonEmpty) {
catalog.alterTableStats(table.identifier, None)
}
}

This will introduce some issues, e.g. SPARK-19784, SPARK-19845, SPARK-25403, SPARK-25332 and SPARK-28413.

This is a example to reproduce SPARK-19784:

val path = "/tmp/spark/parquet"
spark.sql("CREATE TABLE t (a INT) USING parquet")
spark.sql("INSERT INTO TABLE t VALUES (1)")
spark.range(5).toDF("a").write.parquet(path)
spark.sql(s"ALTER TABLE t SET LOCATION '${path}'")
spark.table("t").count() // return 1
spark.sql("refresh table t")
spark.table("t").count() // return 5

This PR invalidates the table relation in this case(spark.sql.statistics.size.autoUpdate.enabled=false and table.stats.isEmpty) to fix this issue.

How was this patch tested?

unit tests

@SparkQA
Copy link

SparkQA commented Oct 15, 2018

Test build #97371 has finished for PR 22721 at commit 8a7f4af.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Oct 15, 2018

retest this please

@SparkQA
Copy link

SparkQA commented Oct 15, 2018

Test build #97374 has finished for PR 22721 at commit 8a7f4af.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Oct 15, 2018

retest this please

@SparkQA
Copy link

SparkQA commented Oct 15, 2018

Test build #97377 has finished for PR 22721 at commit 8a7f4af.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 15, 2018

Test build #97387 has finished for PR 22721 at commit 37fed41.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Oct 17, 2018

cc @cloud-fan

@cloud-fan
Copy link
Contributor

what's the impact to end users? wrong statistics?

@wangyum
Copy link
Member Author

wangyum commented Oct 18, 2018

The answer is here: #22758 (comment)

@SparkQA
Copy link

SparkQA commented Oct 18, 2018

Test build #97521 has finished for PR 22721 at commit 983c5a8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

I think it's reasonable to follow InsertIntoHiveTable, but it's better to provide more details about what changes in InsertIntoHadoopFsRelationCommand:

  1. what's refreshed? Previously we refreshed the data cache via path, and also refresh the file index. But the plan cache is still there. Now we refresh the plan cache. Since file index exists in the plan, so we don't need to refresh it if we refresh plan cache, but the data cache still needs to be refreshed.
  2. what's the performance impact? plan cache is very useful when reading partitioned tables, to avoid listing files repeatedly. But seems it's OK because we already refresh file index before, so we must re-list files after insertion.

@SparkQA
Copy link

SparkQA commented Oct 19, 2018

Test build #97579 has finished for PR 22721 at commit 6c8a73f.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Oct 19, 2018

retest this please

@SparkQA
Copy link

SparkQA commented Oct 19, 2018

Test build #97597 has finished for PR 22721 at commit 6c8a73f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Retest this please.

@SparkQA
Copy link

SparkQA commented Oct 29, 2018

Test build #98181 has finished for PR 22721 at commit 6c8a73f.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Oct 29, 2018

Retest this please.

@SparkQA
Copy link

SparkQA commented Oct 29, 2018

Test build #98186 has finished for PR 22721 at commit 6c8a73f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum wangyum changed the title [SPARK-25403][SQL] Refreshes the table after inserting the table [SPARK-19784][SPARK-25403][SQL] Refresh the table even table stats is empty Nov 12, 2018
@wangyum
Copy link
Member Author

wangyum commented Sep 10, 2019

cc @cloud-fan

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Sep 18, 2019

Test build #110863 has finished for PR 22721 at commit c6a1a7d.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Sep 18, 2019

retest this please

@SparkQA
Copy link

SparkQA commented Sep 18, 2019

Test build #110884 has finished for PR 22721 at commit c6a1a7d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@github-actions
Copy link

github-actions bot commented Jan 5, 2020

We're closing this PR because it hasn't been updated in a while.
This isn't a judgement on the merit of the PR in any way. It's just
a way of keeping the PR queue manageable.

If you'd like to revive this PR, please reopen it!

@github-actions github-actions bot added the Stale label Jan 5, 2020
@wangyum wangyum closed this Jan 5, 2020
@cloud-fan
Copy link
Contributor

does the problem still exist? I think we need to merge this PR.

@wangyum
Copy link
Member Author

wangyum commented Jan 6, 2020

Yes. It still exist:

scala> spark.version
res0: String = 3.0.0-preview2

scala> val path = "/tmp/spark/parquet"
path: String = /tmp/spark/parquet

scala> spark.sql("CREATE TABLE t (a INT) USING parquet")
res1: org.apache.spark.sql.DataFrame = []

scala> spark.sql("INSERT INTO TABLE t VALUES (1)")
res2: org.apache.spark.sql.DataFrame = []

scala> spark.range(5).toDF("a").write.parquet(path)

scala> spark.sql(s"ALTER TABLE t SET LOCATION '${path}'")
res4: org.apache.spark.sql.DataFrame = []

scala> spark.table("t").count() // return 1
res5: Long = 1

scala> spark.sql("refresh table t")
res6: org.apache.spark.sql.DataFrame = []

scala> spark.table("t").count() // return 5
res7: Long = 5

@wangyum wangyum reopened this Jan 6, 2020
# Conflicts:
#	sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@SparkQA
Copy link

SparkQA commented Jan 6, 2020

Test build #116137 has finished for PR 22721 at commit c6a1a7d.

  • This patch fails build dependency tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@@ -50,6 +50,9 @@ object CommandUtils extends Logging {
catalog.alterTableStats(table.identifier, Some(newStats))
} else if (table.stats.nonEmpty) {
catalog.alterTableStats(table.identifier, None)
} else {
// In other cases, we still need to invalidate the table relation cache.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to confirm: does catalog.alterTableStats refresh the relation cache?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

/**
* Alter Spark's statistics of an existing metastore table identified by the provided table
* identifier.
*/
def alterTableStats(identifier: TableIdentifier, newStats: Option[CatalogStatistics]): Unit = {
val db = formatDatabaseName(identifier.database.getOrElse(getCurrentDatabase))
val table = formatTableName(identifier.table)
val tableIdentifier = TableIdentifier(table, Some(db))
requireDbExists(db)
requireTableExists(tableIdentifier)
externalCatalog.alterTableStats(db, table, newStats)
// Invalidate the table relation cache
refreshTable(identifier)
}

@SparkQA
Copy link

SparkQA commented Jan 6, 2020

Test build #116140 has finished for PR 22721 at commit 817d2de.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Jan 6, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Jan 6, 2020

Test build #116179 has finished for PR 22721 at commit 817d2de.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@github-actions github-actions bot closed this Jan 7, 2020
@dongjoon-hyun
Copy link
Member

Hi, @wangyum . According to @nchammas and @HeartSaVioR , it seems that you need to remove the label Stable to prevent the automatic GitHub action bot.

@cloud-fan cloud-fan removed the Stale label Jan 7, 2020
@cloud-fan cloud-fan reopened this Jan 7, 2020
@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 17881a4 Jan 7, 2020
@wangyum wangyum deleted the SPARK-25403 branch January 7, 2020 03:46
@dongjoon-hyun
Copy link
Member

Thank you, @wangyum and @cloud-fan .
Can we have this at Apache Spark 2.4.5?

@SparkQA
Copy link

SparkQA commented Jan 7, 2020

Test build #116205 has finished for PR 22721 at commit 817d2de.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -50,6 +50,9 @@ object CommandUtils extends Logging {
catalog.alterTableStats(table.identifier, Some(newStats))
} else if (table.stats.nonEmpty) {
catalog.alterTableStats(table.identifier, None)
} else {
// In other cases, we still need to invalidate the table relation cache.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain why we need to refresh table while updating stats, please. For instance, we can do the same work twice. See:

  1. InsertIntoHiveTable:
    sparkSession.sessionState.catalog.refreshTable(table.identifier)
    CommandUtils.updateTableStats(sparkSession, table)
  2. LoadDataCommand :
    catalog.refreshTable(targetTable.identifier)
    CommandUtils.updateTableStats(sparkSession, targetTable)
  3. AlterTableDropPartitionCommand:
    sparkSession.catalog.refreshTable(table.identifier.quotedString)
    CommandUtils.updateTableStats(sparkSession, table)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not all commands have refresh table logic, such as AlterTableSetLocationCommand:

CommandUtils.updateTableStats(sparkSession, table)
Seq.empty[Row]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants