Skip to content

Commit

Permalink
Updating Docs version and adding information about Catalogs
Browse files Browse the repository at this point in the history
  • Loading branch information
osopardo1 authored and osopardo1 committed Dec 12, 2022
1 parent 9bd3305 commit 83e3e9a
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 17 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ export SPARK_HOME=$PWD/spark-3.1.1-bin-hadoop3.2
$SPARK_HOME/bin/spark-shell \
--conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog \
--packages io.qbeast:qbeast-spark_2.12:0.2.0,io.delta:delta-core_2.12:1.0.0
--packages io.qbeast:qbeast-spark_2.12:0.3.0,io.delta:delta-core_2.12:1.0.0
```

### 2. Indexing a dataset
Expand Down
50 changes: 50 additions & 0 deletions docs/AdvancedConfiguration.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,56 @@

There's different configurations for the index that can affect the performance on read or the writing process. Here is a resume of some of them.

## Catalogs

We designed the `QbeastCatalog` to work as an **entry point for other format's Catalog's** as well.

However, you can also handle different Catalogs simultanously.

### 1. Unified Catalog

```bash
--conf spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog
```

Using the `spark_catalog` configuration, you can write **qbeast** and **delta** ( or upcoming formats ;) ) tables into the `default` namespace.

```scala
df.write
.format("qbeast")
.option("columnsToIndex", "user_id,product_id")
.saveAsTable("qbeast_table")

df.write
.format("delta")
.saveAsTable("delta_table")
```
### 2. Secondary catalog

For using **more than one Catalog in the same session**, you can set it up in a different space.

```bash
--conf spark.sql.catalog.spark_catalog = org.apache.spark.sql.delta.catalog.DeltaCatalog \
--conf spark.sql.catalog.qbeast_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog
```

Notice the `QbeastCatalog` conf parameter is not anymore `spark_catalog`, but has a customized name like `qbeast_catalog`. Each table written using the **qbeast** implementation, should have the prefix `qbeast_catalog`.

For example:

```scala
// DataFrame API
df.write
.format("qbeast")
.option("columnsToIndex", "user_id,product_id")
.saveAsTable("qbeast_catalog.default.qbeast_table")

// SQL
spark.sql("CREATE TABLE qbeast_catalog.default.qbeast_table USING qbeast AS SELECT * FROM ecommerce")
```



## ColumnsToIndex

These are the columns you want to index. Try to find those which are interesting for your queries, or your data pipelines.
Expand Down
58 changes: 42 additions & 16 deletions docs/Quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,8 @@ Inside the project folder, launch a spark-shell with the required **dependencies
```bash
$SPARK_HOME/bin/spark-shell \
--conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \
--conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider \
--packages io.qbeast:qbeast-spark_2.12:0.2.0,\
io.delta:delta-core_2.12:1.0.0,\
com.amazonaws:aws-java-sdk:1.12.20,\
org.apache.hadoop:hadoop-common:3.2.0,\
org.apache.hadoop:hadoop-client:3.2.0,\
org.apache.hadoop:hadoop-aws:3.2.0
--conf spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog \
--packages io.qbeast:qbeast-spark_2.12:0.3.0,io.delta:delta-core_2.12:1.2.0
```
As an **_extra configuration_**, you can also change two global parameters of the index:

Expand All @@ -37,26 +32,28 @@ As an **_extra configuration_**, you can also change two global parameters of th
```
Consult the [Qbeast-Spark advanced configuration](AdvancedConfiguration.md) for more information.

Read the ***store_sales*** public dataset from `TPC-DS`, the table has with **23** columns in total and was generated with a `scaleFactor` of 1. Check [The Making of TPC-DS](http://www.tpc.org/tpcds/presentations/the_making_of_tpcds.pdf) for more details on the dataset.

Read the ***ecommerce*** test dataset from [Kaggle](https://www.kaggle.com/code/adilemrebilgic/e-commerce-analytics/data).
```scala
val parquetTablePath = "s3a://qbeast-public-datasets/store_sales"

val parquetDf = spark.read.format("parquet").load(parquetTablePath).na.drop()
val ecommerce = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("src/test/resources/ecommerce100K_2019_Oct.csv")
```

Indexing the data with the desired columns, in this case `ss_cdemo_sk` and `ss_cdemo_sk`.
Indexing the data with the desired columns, in this case `user_id` and `product_id`.
```scala
val qbeastTablePath = "/tmp/qbeast-test-data/qtable"

(parquetDf.write
(ecommerce.write
.mode("overwrite")
.format("qbeast") // Saving the dataframe in a qbeast datasource
.option("columnsToIndex", "ss_cdemo_sk,ss_cdemo_sk") // Indexing the table
.option("cubeSize", 300000) // The desired number of records of the resulting files/cubes. Default is 100000
.option("columnsToIndex", "user_id,product_id") // Indexing the table
.option("cubeSize", "500") // The desired number of records of the resulting files/cubes. Default is 5M
.save(qbeastTablePath))
```


## Sampling

Allow the sample operator to be pushed down to the source when sampling, reducing i/o and computational cost.
Expand All @@ -80,6 +77,35 @@ qbeastDf.sample(0.1).explain()

Notice that the sample operator is no longer present in the physical plan. It's converted into a `Filter (qbeast_hash)` instead and is used to select files during data scanning(`DataFilters` from `FileScan`). We skip reading many files in this way, involving less I/O.

## SQL

Thanks to the `QbeastCatalog`, you can use plain SQL and `CREATE TABLE` or `INSERT INTO` in qbeast format.

To check the different configuration on the Catalog, please go to [Advanced Configuration](AdvancedConfiguration.md)

```scala
ecommerce.createOrReplaceTmpView("ecommerce_october")

spark.sql("CREATE OR REPLACE TABLE ecommerce_qbeast USING qbeast AS SELECT * FROM ecommerce_october")

//OR

val ecommerceNovember = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("./src/test/resources/ecommerce100K_2019_Nov.csv")

ecommerceNovember.createOrReplaceTmpView("ecommerce_november")

spark.sql("INSERT INTO ecommerce_qbeast SELECT * FROM ecommerce_november")
```
Sampling has also an operator called `TABLESAMPLE`, which can be expressed in both terms of rows or percentage.

```scala
spark.sql("SELECT avg(price) FROM ecommerce_qbeast TABLESAMPLE(10 PERCENT)").show()
```


## Analyze and Optimize

Expand Down

0 comments on commit 83e3e9a

Please sign in to comment.