Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

'Table implementation does not support writes' while calling 'saveAsTable' from DataFrameWriter using qbeast #42

Closed
Jiaweihu08 opened this issue Nov 16, 2021 · 2 comments · Fixed by #124
Assignees
Labels
type: bug Something isn't working

Comments

@Jiaweihu08
Copy link
Member

What went wrong?
Table implementation does not support writes while attempting to save a dataframe to the metastore_db using the saveAsTable method.

How to reproduce?

val spark = SparkSession
      .builder()
      .master("local[*]")
      .appName("QbeastDataSource")
      .withExtensions(new QbeastSparkSessionExtension())
      .getOrCreate()

val parquet_path = "src/test/resources/ecommerce100K_2019_Oct.csv"

val df = spark.read
      .format("csv")
      .option("header", true)
      .option("inferSchema", true)
      .load(parquet_path)

df.write
      .format("qbeast")
      .option("columnsToIndex", "product_id,category_id,price,user_id")
      .saveAsTable("default.qbeast_table")
  1. Branch and commit id:
    Fork of qbeast-spark from @osopardo1, branch 28-dimension-filtering, commit id 4d642af

  2. Spark version:
    Spark 3.1.1

  3. Hadoop version:
    Hadoop 3.2.0

  4. Are you running Spark inside a container? Are you launching the app on a remote K8s cluster? Or are you just running the tests in a local computer?
    Standalone mode

  5. Stack trace:

21/11/16 10:50:18 ERROR Utils: Aborting task
org.apache.spark.SparkException: Table implementation does not support writes: default.qbeast_table
	at org.apache.spark.sql.execution.datasources.v2.TableWriteExecHelper.$anonfun$writeToTable$1(WriteToDataSourceV2Exec.scala:489)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
	at org.apache.spark.sql.execution.datasources.v2.TableWriteExecHelper.writeToTable(WriteToDataSourceV2Exec.scala:465)
	at org.apache.spark.sql.execution.datasources.v2.TableWriteExecHelper.writeToTable$(WriteToDataSourceV2Exec.scala:460)
	at org.apache.spark.sql.execution.datasources.v2.CreateTableAsSelectExec.writeToTable(WriteToDataSourceV2Exec.scala:62)
	at org.apache.spark.sql.execution.datasources.v2.CreateTableAsSelectExec.run(WriteToDataSourceV2Exec.scala:84)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:40)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:40)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.doExecute(V2CommandExec.scala:55)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
	at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:686)
	at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:623)
	at io.qbeast.spark.sql.sources.saveAsTableTest.$anonfun$new$1(saveAsTableTest.scala:49)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
	at org.scalatest.Transformer.apply(Transformer.scala:20)
	at org.scalatest.flatspec.AnyFlatSpecLike$$anon$5.apply(AnyFlatSpecLike.scala:1684)
	at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
	at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
	at org.scalatest.flatspec.AnyFlatSpec.withFixture(AnyFlatSpec.scala:1685)
	at org.scalatest.flatspec.AnyFlatSpecLike.invokeWithFixture$1(AnyFlatSpecLike.scala:1682)
	at org.scalatest.flatspec.AnyFlatSpecLike.$anonfun$runTest$1(AnyFlatSpecLike.scala:1694)
	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
	at org.scalatest.flatspec.AnyFlatSpecLike.runTest(AnyFlatSpecLike.scala:1694)
	at org.scalatest.flatspec.AnyFlatSpecLike.runTest$(AnyFlatSpecLike.scala:1676)
	at org.scalatest.flatspec.AnyFlatSpec.runTest(AnyFlatSpec.scala:1685)
	at org.scalatest.flatspec.AnyFlatSpecLike.$anonfun$runTests$1(AnyFlatSpecLike.scala:1752)
	at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
	at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
	at org.scalatest.flatspec.AnyFlatSpecLike.runTests(AnyFlatSpecLike.scala:1752)
	at org.scalatest.flatspec.AnyFlatSpecLike.runTests$(AnyFlatSpecLike.scala:1751)
	at org.scalatest.flatspec.AnyFlatSpec.runTests(AnyFlatSpec.scala:1685)
	at org.scalatest.Suite.run(Suite.scala:1112)
	at org.scalatest.Suite.run$(Suite.scala:1094)
	at org.scalatest.flatspec.AnyFlatSpec.org$scalatest$flatspec$AnyFlatSpecLike$$super$run(AnyFlatSpec.scala:1685)
	at org.scalatest.flatspec.AnyFlatSpecLike.$anonfun$run$1(AnyFlatSpecLike.scala:1797)
	at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
	at org.scalatest.flatspec.AnyFlatSpecLike.run(AnyFlatSpecLike.scala:1797)
	at org.scalatest.flatspec.AnyFlatSpecLike.run$(AnyFlatSpecLike.scala:1795)
	at org.scalatest.flatspec.AnyFlatSpec.run(AnyFlatSpec.scala:1685)
	at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1320)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1314)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1314)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:993)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:971)
	at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1480)
	at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:971)
	at org.scalatest.tools.Runner$.run(Runner.scala:798)
	at org.scalatest.tools.Runner.run(Runner.scala)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:38)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:25)


Table implementation does not support writes: default.qbeast_table
org.apache.spark.SparkException: Table implementation does not support writes: default.qbeast_table
	at org.apache.spark.sql.execution.datasources.v2.TableWriteExecHelper.$anonfun$writeToTable$1(WriteToDataSourceV2Exec.scala:489)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
	at org.apache.spark.sql.execution.datasources.v2.TableWriteExecHelper.writeToTable(WriteToDataSourceV2Exec.scala:465)
	at org.apache.spark.sql.execution.datasources.v2.TableWriteExecHelper.writeToTable$(WriteToDataSourceV2Exec.scala:460)
	at org.apache.spark.sql.execution.datasources.v2.CreateTableAsSelectExec.writeToTable(WriteToDataSourceV2Exec.scala:62)
	at org.apache.spark.sql.execution.datasources.v2.CreateTableAsSelectExec.run(WriteToDataSourceV2Exec.scala:84)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:40)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:40)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.doExecute(V2CommandExec.scala:55)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
	at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:686)
	at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:623)
	at io.qbeast.spark.sql.sources.saveAsTableTest.$anonfun$new$1(saveAsTableTest.scala:49)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
	at org.scalatest.Transformer.apply(Transformer.scala:20)
	at org.scalatest.flatspec.AnyFlatSpecLike$$anon$5.apply(AnyFlatSpecLike.scala:1684)
	at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
	at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
	at org.scalatest.flatspec.AnyFlatSpec.withFixture(AnyFlatSpec.scala:1685)
	at org.scalatest.flatspec.AnyFlatSpecLike.invokeWithFixture$1(AnyFlatSpecLike.scala:1682)
	at org.scalatest.flatspec.AnyFlatSpecLike.$anonfun$runTest$1(AnyFlatSpecLike.scala:1694)
	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
	at org.scalatest.flatspec.AnyFlatSpecLike.runTest(AnyFlatSpecLike.scala:1694)
	at org.scalatest.flatspec.AnyFlatSpecLike.runTest$(AnyFlatSpecLike.scala:1676)
	at org.scalatest.flatspec.AnyFlatSpec.runTest(AnyFlatSpec.scala:1685)
	at org.scalatest.flatspec.AnyFlatSpecLike.$anonfun$runTests$1(AnyFlatSpecLike.scala:1752)
	at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
	at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
	at org.scalatest.flatspec.AnyFlatSpecLike.runTests(AnyFlatSpecLike.scala:1752)
	at org.scalatest.flatspec.AnyFlatSpecLike.runTests$(AnyFlatSpecLike.scala:1751)
	at org.scalatest.flatspec.AnyFlatSpec.runTests(AnyFlatSpec.scala:1685)
	at org.scalatest.Suite.run(Suite.scala:1112)
	at org.scalatest.Suite.run$(Suite.scala:1094)
	at org.scalatest.flatspec.AnyFlatSpec.org$scalatest$flatspec$AnyFlatSpecLike$$super$run(AnyFlatSpec.scala:1685)
	at org.scalatest.flatspec.AnyFlatSpecLike.$anonfun$run$1(AnyFlatSpecLike.scala:1797)
	at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
	at org.scalatest.flatspec.AnyFlatSpecLike.run(AnyFlatSpecLike.scala:1797)
	at org.scalatest.flatspec.AnyFlatSpecLike.run$(AnyFlatSpecLike.scala:1795)
	at org.scalatest.flatspec.AnyFlatSpec.run(AnyFlatSpec.scala:1685)
	at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1320)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1314)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1314)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:993)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:971)
	at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1480)
	at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:971)
	at org.scalatest.tools.Runner$.run(Runner.scala:798)
	at org.scalatest.tools.Runner.run(Runner.scala)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:38)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:25)
@Jiaweihu08 Jiaweihu08 added the type: bug Something isn't working label Nov 16, 2021
@osopardo1 osopardo1 added the low label Dec 21, 2021
@osopardo1 osopardo1 added the good first issue Good for newcomers label Mar 7, 2022
@eavilaes
Copy link
Contributor

eavilaes commented Jul 21, 2022

I came up with the same result running something different. I'm running SQL sentences using spark.sql(), and I've done some tests to figure out what is happening. Here is what I have figured out:

Note: The cluster has delta-core 1.0.0, qbeast-spark 0.2.0 and qbeast-spark-keeper 0.1.0-a2.

  • spark-shell with no --conf (to set the SessionExtensions and Catalog). The result running it is the same as you got (both with Qbeast and Delta format):
scala> spark.sql("""CREATE OR REPLACE TABLE eric.test
     | USING delta
     | LOCATION 's3://my-bucket/tmp-eric/tests/delta-table'
     | AS SELECT * FROM eric.original_table;""")
22/07/21 15:34:23 WARN HiveExternalCatalog: Couldn't find corresponding Hive SerDe for data source provider delta. Persisting data source table `eric`.`test` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.
22/07/21 15:34:24 ERROR Utils: Aborting task
org.apache.spark.SparkException: Table implementation does not support writes: eric.test
        at org.apache.spark.sql.execution.datasources.v2.TableWriteExecHelper.$anonfun$writeToTable$1(WriteToDataSourceV2Exec.scala:489)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
        at org.apache.spark.sql.execution.datasources.v2.TableWriteExecHelper.writeToTable(WriteToDataSourceV2Exec.scala:465)
        at org.apache.spark.sql.execution.datasources.v2.TableWriteExecHelper.writeToTable$(WriteToDataSourceV2Exec.scala:460)
        at org.apache.spark.sql.execution.datasources.v2.ReplaceTableAsSelectExec.writeToTable(WriteToDataSourceV2Exec.scala:132)
        at org.apache.spark.sql.execution.datasources.v2.ReplaceTableAsSelectExec.run(WriteToDataSourceV2Exec.scala:162)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:40)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:40)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:46)
        at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:230)
        at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3751)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
        at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)

I found out that I forgot to set the SessionExtension and configs for the Catalog. So, I opened a spark-shell like this:

spark-shell \
--conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \
--conf spark.sql.catalog.spark_corg.apache.spark.sql.delta.catalog.DeltaCatalog \
--conf spark.delta.logStore.class=org.apache.spark.sql.delta.storage.S3SingleDriverLogStore

This way I have been able to write in delta format (even with QbeastSparkSessionExtension set). But I still cannot write in qbeast format:

scala> spark.sql("""CREATE OR REPLACE TABLE eric.test1
     | USING delta
     | LOCATION 's3://my-bucket/tmp-eric/tests/delta-with-confs'
     | AS SELECT * FROM eric.original_table;""")
22/07/21 15:30:53 WARN HiveExternalCatalog: Couldn't find corresponding Hive SerDe for data source provider delta. Persisting data source table `eric`.`test1` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.
22/07/21 15:30:53 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
res2: org.apache.spark.sql.DataFrame = []

scala> spark.sql("""CREATE OR REPLACE TABLE eric.test2
     | USING qbeast
     | LOCATION 's3://my-bucket/tmp-eric/tests/qbeast-with-confs'
     | AS SELECT * FROM eric.original_table;""")
22/07/21 15:31:03 WARN HiveExternalCatalog: Couldn't find corresponding Hive SerDe for data source provider qbeast. Persisting data source table `eric`.`test2` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.
22/07/21 15:31:04 ERROR Utils: Aborting task
org.apache.spark.sql.AnalysisException: Table implementation does not support writes: eric.test2
        at org.apache.spark.sql.delta.catalog.DeltaCatalog$BestEffortStagedTable.newWriteBuilder(DeltaCatalog.scala:554)
        at org.apache.spark.sql.execution.datasources.v2.TableWriteExecHelper.$anonfun$writeToTable$1(WriteToDataSourceV2Exec.scala:472)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)

I feel like it has something to do with the --confs for the Catalog, but I wonder how it should be done.

@osopardo1
Copy link
Member

osopardo1 commented Jul 22, 2022

Thank you, @eavilaes , for noticing! Once we finish with the #116 PR, we will get hands-on with it.

As you have guessed, we need some kind of support for Catalog on Qbeast. We will take a look at DeltaCatalog and see what to include.

@osopardo1 osopardo1 added high and removed low labels Jul 22, 2022
@eavilaes eavilaes linked a pull request Aug 3, 2022 that will close this issue
5 tasks
@osopardo1 osopardo1 removed the good first issue Good for newcomers label Aug 30, 2022
@osopardo1 osopardo1 self-assigned this Aug 30, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants