From 83e3e9a4bebf7411fd4cb276be9cf3e5834cd756 Mon Sep 17 00:00:00 2001 From: osopardo1 Date: Mon, 12 Dec 2022 15:23:02 +0100 Subject: [PATCH] Updating Docs version and adding information about Catalogs --- README.md | 2 +- docs/AdvancedConfiguration.md | 50 ++++++++++++++++++++++++++++++ docs/Quickstart.md | 58 +++++++++++++++++++++++++---------- 3 files changed, 93 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index 545ddf3c8..888504cd8 100644 --- a/README.md +++ b/README.md @@ -93,7 +93,7 @@ export SPARK_HOME=$PWD/spark-3.1.1-bin-hadoop3.2 $SPARK_HOME/bin/spark-shell \ --conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \ --conf spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog \ ---packages io.qbeast:qbeast-spark_2.12:0.2.0,io.delta:delta-core_2.12:1.0.0 +--packages io.qbeast:qbeast-spark_2.12:0.3.0,io.delta:delta-core_2.12:1.0.0 ``` ### 2. Indexing a dataset diff --git a/docs/AdvancedConfiguration.md b/docs/AdvancedConfiguration.md index 0a1e3d1e2..1ca38bec7 100644 --- a/docs/AdvancedConfiguration.md +++ b/docs/AdvancedConfiguration.md @@ -2,6 +2,56 @@ There's different configurations for the index that can affect the performance on read or the writing process. Here is a resume of some of them. +## Catalogs + +We designed the `QbeastCatalog` to work as an **entry point for other format's Catalog's** as well. + +However, you can also handle different Catalogs simultanously. + +### 1. Unified Catalog + +```bash +--conf spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog +``` + +Using the `spark_catalog` configuration, you can write **qbeast** and **delta** ( or upcoming formats ;) ) tables into the `default` namespace. + +```scala +df.write + .format("qbeast") + .option("columnsToIndex", "user_id,product_id") + .saveAsTable("qbeast_table") + +df.write + .format("delta") + .saveAsTable("delta_table") +``` +### 2. Secondary catalog + +For using **more than one Catalog in the same session**, you can set it up in a different space. + +```bash +--conf spark.sql.catalog.spark_catalog = org.apache.spark.sql.delta.catalog.DeltaCatalog \ +--conf spark.sql.catalog.qbeast_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog +``` + +Notice the `QbeastCatalog` conf parameter is not anymore `spark_catalog`, but has a customized name like `qbeast_catalog`. Each table written using the **qbeast** implementation, should have the prefix `qbeast_catalog`. + +For example: + +```scala +// DataFrame API +df.write + .format("qbeast") + .option("columnsToIndex", "user_id,product_id") + .saveAsTable("qbeast_catalog.default.qbeast_table") + +// SQL +spark.sql("CREATE TABLE qbeast_catalog.default.qbeast_table USING qbeast AS SELECT * FROM ecommerce") +``` + + + ## ColumnsToIndex These are the columns you want to index. Try to find those which are interesting for your queries, or your data pipelines. diff --git a/docs/Quickstart.md b/docs/Quickstart.md index 29c39bf6e..73b3ea92a 100644 --- a/docs/Quickstart.md +++ b/docs/Quickstart.md @@ -16,13 +16,8 @@ Inside the project folder, launch a spark-shell with the required **dependencies ```bash $SPARK_HOME/bin/spark-shell \ --conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \ ---conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider \ ---packages io.qbeast:qbeast-spark_2.12:0.2.0,\ -io.delta:delta-core_2.12:1.0.0,\ -com.amazonaws:aws-java-sdk:1.12.20,\ -org.apache.hadoop:hadoop-common:3.2.0,\ -org.apache.hadoop:hadoop-client:3.2.0,\ -org.apache.hadoop:hadoop-aws:3.2.0 +--conf spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog \ +--packages io.qbeast:qbeast-spark_2.12:0.3.0,io.delta:delta-core_2.12:1.2.0 ``` As an **_extra configuration_**, you can also change two global parameters of the index: @@ -37,26 +32,28 @@ As an **_extra configuration_**, you can also change two global parameters of th ``` Consult the [Qbeast-Spark advanced configuration](AdvancedConfiguration.md) for more information. -Read the ***store_sales*** public dataset from `TPC-DS`, the table has with **23** columns in total and was generated with a `scaleFactor` of 1. Check [The Making of TPC-DS](http://www.tpc.org/tpcds/presentations/the_making_of_tpcds.pdf) for more details on the dataset. - +Read the ***ecommerce*** test dataset from [Kaggle](https://www.kaggle.com/code/adilemrebilgic/e-commerce-analytics/data). ```scala -val parquetTablePath = "s3a://qbeast-public-datasets/store_sales" - -val parquetDf = spark.read.format("parquet").load(parquetTablePath).na.drop() +val ecommerce = spark.read + .format("csv") + .option("header", "true") + .option("inferSchema", "true") + .load("src/test/resources/ecommerce100K_2019_Oct.csv") ``` -Indexing the data with the desired columns, in this case `ss_cdemo_sk` and `ss_cdemo_sk`. +Indexing the data with the desired columns, in this case `user_id` and `product_id`. ```scala val qbeastTablePath = "/tmp/qbeast-test-data/qtable" -(parquetDf.write +(ecommerce.write .mode("overwrite") .format("qbeast") // Saving the dataframe in a qbeast datasource - .option("columnsToIndex", "ss_cdemo_sk,ss_cdemo_sk") // Indexing the table - .option("cubeSize", 300000) // The desired number of records of the resulting files/cubes. Default is 100000 + .option("columnsToIndex", "user_id,product_id") // Indexing the table + .option("cubeSize", "500") // The desired number of records of the resulting files/cubes. Default is 5M .save(qbeastTablePath)) ``` + ## Sampling Allow the sample operator to be pushed down to the source when sampling, reducing i/o and computational cost. @@ -80,6 +77,35 @@ qbeastDf.sample(0.1).explain() Notice that the sample operator is no longer present in the physical plan. It's converted into a `Filter (qbeast_hash)` instead and is used to select files during data scanning(`DataFilters` from `FileScan`). We skip reading many files in this way, involving less I/O. +## SQL + +Thanks to the `QbeastCatalog`, you can use plain SQL and `CREATE TABLE` or `INSERT INTO` in qbeast format. + +To check the different configuration on the Catalog, please go to [Advanced Configuration](AdvancedConfiguration.md) + +```scala +ecommerce.createOrReplaceTmpView("ecommerce_october") + +spark.sql("CREATE OR REPLACE TABLE ecommerce_qbeast USING qbeast AS SELECT * FROM ecommerce_october") + +//OR + +val ecommerceNovember = spark.read + .format("csv") + .option("header", "true") + .option("inferSchema", "true") + .load("./src/test/resources/ecommerce100K_2019_Nov.csv") + +ecommerceNovember.createOrReplaceTmpView("ecommerce_november") + +spark.sql("INSERT INTO ecommerce_qbeast SELECT * FROM ecommerce_november") +``` +Sampling has also an operator called `TABLESAMPLE`, which can be expressed in both terms of rows or percentage. + +```scala +spark.sql("SELECT avg(price) FROM ecommerce_qbeast TABLESAMPLE(10 PERCENT)").show() +``` + ## Analyze and Optimize