Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Table properties to Qbeast #124

Merged
merged 51 commits into from
Nov 25, 2022

Conversation

osopardo1
Copy link
Member

@osopardo1 osopardo1 commented Aug 3, 2022

Description

Solves issue #42 .

The problematic with saveAsTable() goes beyond a simple override method. It requires a lot of reworking in the design and implementation of QbeastDataSource and its related classes.

A DataSource is the main entry point for writing and reading with Qbeast Format through Spark. Apache Spark has two different versions of this API:

  • DataSourceV1: this is the first version developed to adapt and extend Apache Spark sources, introduced in version 1.3. Its limitations include dependencies with SparkSession, lack of support for reading optimizations, and no partition or sorting information. V1 understands the data as Relations (BaseRelation, HadoopFsRelation...) and can be extended through RelationProvider and CreatableRelationProvider.
  • DataSourceV2: this was released a couple of years ago with the 2.3 version change. It has better compatibility with Java, more flexible extension points, and no dependencies on high-level API. V2 understands the data underneath as Table and can be extended through TableProvider, Table and implemented with traits like SupportsWrite, SupportsRead, SupportsOverwrite...

This separation of API's has been profitable in terms of optimization but lacks consistency between both. Some SQL statements and operations are implemented for V1, not V2, and vice-versa.

There's a nice series of blogs around this topic that you can read to get the full picture: http://blog.madhukaraphatak.com/categories/datasource-v2-series/

Type of change

This change consists on:

  • Refinement of QbeastTableImpl. This class extends Table and adds writing capabilities to V2 Qbeast DataSource.
  • Add QbeastWriterBuilder. This class is in charge of building the Write process. It extends V1WriteBuilder, which makes the conversion easier.
  • Add QbeastCatalog.

QbeastCatalog is an extension of CatalogExtension with SupportNamespaces (support creation, rename and deletion of namespaces) and StagingTableCatalog. An StagingTableCatalog is for creating a table before committing any metadata along with the content of CREATE TABLE AS SELECT or REPLACE TABLE AS SELECT operations. From the Spark documentation, we can observe the following:

It is highly recommended to implement this trait whenever possible so that CREATE TABLE AS SELECT and REPLACE TABLE AS SELECT operations are atomic. For example, when one runs a REPLACE TABLE AS SELECT operation, if the catalog does not implement this trait, the planner will first drop the table via TableCatalog.dropTable(Identifier), then create the table via TableCatalog.createTable(Identifier, StructType, Transform[], Map), and then perform the write via SupportsWrite.newWriteBuilder(LogicalWriteInfo). However, if the write operation fails, the catalog will have already dropped the table, and the planner cannot roll back the dropping of the table.

If the catalog implements this plugin, the catalog can implement the methods to "stage" the creation and the replacement of a table. After the table's BatchWrite.commit(WriterCommitMessage[]) is called, StagedTable.commitStagedChanges() is called, at which point the staged table can complete both the data write and the metadata swap operation atomically.

The QbeastCatalog can be used as the default catalog through the spark_catalog configuration:

spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog

Or as an external catalog that you can combine with other catalogs:

spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
spark.sql.catalog.qbeast_catalog= io.qbeast.spark.internal.sources.catalog.QbeastCatalog
spark.sql.catalog.qbeast_catalog.warehouse=/tmp/dir

// Write data with qbeast_catalog prefix
data.write
          .format("qbeast")
          .option("columnsToIndex", "id")
          .saveAsTable("qbeast_catalog.default.qbeast_table")
  • Add QbeastStagedTableImpl. This class contains the code to commit the staged changes atomically. It creates the underlying log and saves any data that may have been processed in the SELECT AS statement.
  • Add SaveAsTableRule to make sure columnsToIndex option is passed to the QbeastTableImpl.
  • Add QbeastAnalysis rules to read with V1 optimizations.
  • Add functionalities to QbeastDataSource and rework some of the methods.
  • Separate tests under Integration tests and Correctness tests. The Integrations test will make sure the Spark Statements work as supposed. The correctness will ensure that we don't miss information and that all the metadata is well described.

Checklist:

Here is the list of things you should do before submitting this pull request:

  • New feature / bug fix has been committed following the Contribution guide.
  • Add comments to the code (make it easier for the community!).
  • Change the documentation.
  • Add tests.
  • Your branch is updated to the main branch (dependent changes have been merged).

How Has This Been Tested? (Optional)

We added few test in order to check how units of code behave.
Under io.qbeast.spark.internal.sources.catalog package, you will found different tests.

  • DefaultStagedTableTest. This is the table returned by default in case qbeast cannot operate.
  • QbeastCatalogTest. This tests checks all the methods called by the CatalogManager, and make sure they work as expected.
  • QbeastCatalogIntegrationTest. This tests checks integration with other catalogs and behavior of the Qbeast implementation.
  • QbeastStagedTableTest. This tests contains method testing for the implementation of an Staged table.

Under io.qbeast.spark.utils, some tests had been refactored.

  • QbeastSparkCorrectnessTest. Checks if the results are correct (writing, reading, sampling..)
  • QbeastSparkIntegrationTest. Checks if the Spark DataFrame API is behaving properly.
  • QbeastSQLIntegrationTest. Checks if the SQL commands are performed correctly.

@osopardo1 osopardo1 requested a review from eavilaes August 4, 2022 13:14
@osopardo1
Copy link
Member Author

osopardo1 commented Aug 4, 2022

Hi @eavilaes ! I have done a potential workaround on this issue.

A lot of things need to be re-worked; it's still a WIP. But due to timing, you can try the new SaveAsTable functionality and be the Q&A Tester hehe.
The only thing you must do is add some extra configuration to the Spark Session (like you did for Delta on #42 ):

spark-shell \
--conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog

Please, feel free to suggest any other changes or problems you face during the execution. Many thanks!

@eavilaes
Copy link
Contributor

eavilaes commented Aug 5, 2022

Just tested, working smoooooth in my case🥳
Thank you very much! 🐻

@codecov
Copy link

codecov bot commented Aug 5, 2022

Codecov Report

Merging #124 (686a74f) into main (966b007) will increase coverage by 1.05%.
The diff coverage is 98.61%.

❗ Current head 686a74f differs from pull request most recent head 2b5c00b. Consider uploading reports for the commit 2b5c00b to get more accurate results

@@            Coverage Diff             @@
##             main     #124      +/-   ##
==========================================
+ Coverage   91.81%   92.86%   +1.05%     
==========================================
  Files          62       73      +11     
  Lines        1453     1709     +256     
  Branches      114      126      +12     
==========================================
+ Hits         1334     1587     +253     
- Misses        119      122       +3     
Impacted Files Coverage Δ
...ala/io/qbeast/spark/utils/SparkToQTypesUtils.scala 88.88% <ø> (-3.42%) ⬇️
...ain/scala/org/apache/spark/sql/V2AndV1Traits.scala 50.00% <50.00%> (ø)
...spark/internal/sources/v2/QbeastWriteBuilder.scala 87.50% <87.50%> (ø)
.../internal/sources/catalog/QbeastCatalogUtils.scala 97.05% <97.05%> (ø)
...e/src/main/scala/io/qbeast/core/model/CubeId.scala 96.36% <100.00%> (+0.11%) ⬆️
...c/main/scala/io/qbeast/core/model/QuerySpace.scala 94.73% <100.00%> (+11.40%) ⬆️
...qbeast/spark/delta/SparkDeltaMetadataManager.scala 88.88% <100.00%> (+1.38%) ⬆️
...io/qbeast/spark/index/query/QuerySpecBuilder.scala 100.00% <100.00%> (ø)
...scala/io/qbeast/spark/internal/QbeastOptions.scala 95.00% <100.00%> (+3.33%) ⬆️
...t/spark/internal/QbeastSparkSessionExtension.scala 100.00% <100.00%> (ø)
... and 13 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@Jiaweihu08
Copy link
Member

Calling INSERT OVERWRITE on a manged table (created through saveAsTable) doesn't eliminate the old data, and it operates rather like an INSERT INTO. Other formats such as Delta and Parquet do eliminate the old data and only the inserted data is left after the operation.

Calling INSERT OVERWRITE or INSERT INTO will and should change the underlying data, and this is true on both VIEW and TABLE for delta or parquet.

'INSERT INTO' operates correctly with qbeast.

When calling either INSERT OVERWRITE or INSERT INTO the OTree algorithm is invoked, as it should.

The following works just fine on a VIEW:

import spark.implicits._

val targetColumns = Seq("product_id", "brand", "price", "user_id")
val initialData = loadTestData(spark).select(targetColumns.map(col): _*)
initialData.write
  .format("qbeast")
  .option("cubeSize", "5000")
  .option("columnsToIndex", "user_id,product_id")
  .save(tmpDir)

val df = spark.read.format("qbeast").load(tmpDir)
df.createOrReplaceTempView("initial")
val dataToInsert = Seq((1, "qbeast", 9.99, 1)).toDF(targetColumns: _*)
dataToInsert.createOrReplaceTempView("toInsert")

spark.sql("INSERT OVERWRITE initial TABLE toInsert")
spark.sql("SELECT * FROM initial").count() shouldBe 1

However, when a TABLE is created using saveAsTable, the behavior is unexpected because the new record is inserted without eliminating the existing data:

import spark.implicits._

val targetColumns = Seq("product_id", "brand", "price", "user_id")

val initialData = loadTestData(spark).select(targetColumns.map(col): _*)
initialData.write
  .format("qbeast")
  .option("cubeSize", "5000")
  .option("columnsToIndex", "user_id,product_id")
  .saveAsTable("initial")

val dataToInsert = Seq((1, "qbeast", 9.99, 1)).toDF(targetColumns: _*)
dataToInsert.createOrReplaceTempView("toInsert")

spark.sql("INSERT OVERWRITE initial TABLE toInsert")

spark.sql("SELECT * FROM initial").count() shouldBe 1 + initialData.count

In the case of delta, records are eliminated in both scenarios with a second JSON log file with remove entries and a single add entry.

For Qbeast, when using saveAsTable a second JSON log file is created with a new revision and a single add entry and NO remove entries. When using createOrReplaceTempView, on the other hand, there are in fact remove entries in the second log file while the revision remains the same.

…ltaCatalog

We need the QbeastCatalog to be independent from other existing Catalog solutions.
@osopardo1
Copy link
Member Author

osopardo1 commented Sep 19, 2022

Since we want the Catalog implementation to be independent of underlying formats, we decided to extend CatalogExtenssion (option 2).

To implement all the methods required (listNamespaces(), listTables()...), we will use the delegated catalog (the one configured with the spark_catalog key) or the default spark built-in catalog.
We can access those variables through the CatalogManager (spark.sessionState.catalogManager) method called v2SessionCatalog.

  /**
   * If the V2_SESSION_CATALOG config is specified, we try to instantiate the user-specified v2
   * session catalog. Otherwise, return the default session catalog.
   *
   * This catalog is a v2 catalog that delegates to the v1 session catalog. it is used when the
   * session catalog is responsible for an identifier, but the source requires the v2 catalog API.
   * This happens when the source implementation extends the v2 TableProvider API and is not listed
   * in the fallback configuration, spark.sql.sources.useV1SourceList
   */
  private[sql] def v2SessionCatalog: CatalogPlugin = {
    conf.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).map { _ =>
      catalogs.getOrElseUpdate(SESSION_CATALOG_NAME, loadV2SessionCatalog())
    }.getOrElse(defaultSessionCatalog)
  }

In that way, the user can configure the QbeastCatalog in two ways:

  1. Using spark_catalog config (this is recommendable if you don't have intentions of sharing the session with other Catalog implementations)
spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog
  1. Using multiple catalog configurations:
spark.sql.catalog.qbeast_catalog= io.qbeast.spark.internal.sources.catalog.QbeastCatalog
spark.sql.catalog.qbeast_catalog.warehouse=/tmp/dir

....

// Write data with qbeast_catalog prefix
data.write
          .format("qbeast")
          .option("columnsToIndex", "id")
          .saveAsTable("qbeast_catalog.default.qbeast_table")

osopardo1 and others added 5 commits September 19, 2022 17:00
This commit also includes removal of Delta dependencies on catalog/sources classes
We delegate a lot of methods to the Session Catalog. We need to verify they work properly with unit tests.

Add some minor fixes as well
@osopardo1
Copy link
Member Author

Made some changes in the test, hope the Codecov report is satisfied this time hehe.

@osopardo1 osopardo1 marked this pull request as ready for review October 6, 2022 13:58
@osopardo1 osopardo1 marked this pull request as draft October 6, 2022 13:59
@osopardo1 osopardo1 marked this pull request as ready for review October 11, 2022 12:51
@osopardo1 osopardo1 requested a review from Jiaweihu08 October 11, 2022 12:51
@osopardo1
Copy link
Member Author

Hello! I think this PR is ready to merge since everyone has tried the SNAPSHOT version and no major errors were raised. Please, @Adricu8 @eavilaes @Jiaweihu08 , when you have time, approve the changes or express your concerns before closing this. Thank you!

@osopardo1 osopardo1 requested a review from Adricu8 November 21, 2022 13:24
@osopardo1 osopardo1 merged commit d503833 into Qbeast-io:main Nov 25, 2022
@osopardo1 osopardo1 deleted the 42-table-writes-catalog branch December 12, 2022 12:41
@osopardo1 osopardo1 restored the 42-table-writes-catalog branch December 12, 2022 12:42
@osopardo1 osopardo1 deleted the 42-table-writes-catalog branch January 10, 2023 15:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: enhancement Improvement of existing feature or code
Projects
None yet
Development

Successfully merging this pull request may close these issues.

'Table implementation does not support writes' while calling 'saveAsTable' from DataFrameWriter using qbeast
3 participants