Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Table properties to Qbeast #124

Merged
merged 51 commits into from
Nov 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
b627bab
Wip on adding datasourceV2 compatibility
osopardo1 Jul 27, 2022
99adfd7
WIP on merging V1 and V2 methods
osopardo1 Aug 3, 2022
037a290
Add tests
osopardo1 Aug 3, 2022
7baadd2
Fix SQL in tests
osopardo1 Aug 3, 2022
6812a85
Add ReadSupport and TODOs on the code
osopardo1 Aug 4, 2022
e7fb0dd
Add headers
osopardo1 Aug 4, 2022
c73d4b5
Ammend some commints and create new test class for QbeastSparkIntegra…
osopardo1 Aug 4, 2022
74f70de
Moved Correctness tests to new class
osopardo1 Aug 4, 2022
f6188ff
Extend DeltaCatalog and modify buildScan() method
osopardo1 Aug 5, 2022
35b7143
Rollback QbeastCatalog
osopardo1 Aug 5, 2022
c49d20d
Add select statement to SQLIntegrationTest
osopardo1 Aug 5, 2022
983e5ce
Fix Table Properties
osopardo1 Aug 5, 2022
275eca1
Return rdd in buildScan()
osopardo1 Aug 5, 2022
569f96f
Add new spark rule for fallback reads to V1 implementation
osopardo1 Aug 5, 2022
6f29bdd
Create headers
osopardo1 Aug 5, 2022
1a82a17
Add path to getTable for tests
osopardo1 Aug 5, 2022
2cec710
Rollback to old createRelation implementation
osopardo1 Aug 5, 2022
2772bfe
Add comments to the code and rename QbeastScanRDD to QbeastScan
osopardo1 Aug 6, 2022
d1e4db4
Remove spark val from QbeastTableImpl
osopardo1 Aug 6, 2022
7fa0bb1
Add INSERT INTO tests using more statements, add INSERT OVERWRITE tes…
Jiaweihu08 Aug 8, 2022
de3242d
Add comments
Jiaweihu08 Aug 8, 2022
5467986
Use existing method to read csv data
Jiaweihu08 Aug 9, 2022
f23dbdd
Add INSERT INTO test for managed table
Jiaweihu08 Aug 10, 2022
7b60c1b
Add 'INSERT OVERWRITE'-awareness to managed tables
Jiaweihu08 Aug 11, 2022
8894e91
Add test for INSERT OVERWRITE on managed tables
Jiaweihu08 Aug 11, 2022
6fbd37a
Merge branch 'main' into 42-table-writes-catalog
osopardo1 Sep 1, 2022
10f1f41
Use same test data
osopardo1 Sep 5, 2022
9650ab1
Merge branch 'main' into 42-table-writes-catalog
osopardo1 Sep 6, 2022
88dd7f7
Adapting QbeastWriteBuilder to version 3.2.2 of Spark
osopardo1 Sep 6, 2022
13f86b9
Armonize tests
osopardo1 Sep 6, 2022
88087fa
Add staging properties to the table and change QbeastCatalog to Qbeas…
osopardo1 Sep 13, 2022
7f03e96
Renamed QbeastCatalog to QbeastCatalogUtils
osopardo1 Sep 13, 2022
4923773
Add scaladocs and comments
osopardo1 Sep 13, 2022
e2448ed
Rename Catalog
osopardo1 Sep 14, 2022
8f9218b
Remove duplication of CreateTable
osopardo1 Sep 14, 2022
9276b59
Resolving build errors
osopardo1 Sep 14, 2022
1b5bebf
Extend CatalogExtenssion instead of DelegatingCatalogExtenssion or De…
osopardo1 Sep 19, 2022
b355478
Update readme
osopardo1 Sep 19, 2022
3b10dee
Fix creation of empty tables and add more test
osopardo1 Sep 20, 2022
737efda
Update readme and add SupportsTruncate
osopardo1 Sep 20, 2022
93d4e7b
Shorten code comment
Jiaweihu08 Sep 28, 2022
d0f84cb
Add basic test for Catalog and DefaultStagedTable
Oct 5, 2022
de86bd8
Test for QbeastStagedTableTest and other exceptions tests
Oct 6, 2022
c46ffbd
Add test on commitStagedChanges
Oct 6, 2022
0aa8e59
Fixed bug on newWriteBuilder() creation and fixed tests
Oct 6, 2022
92f8988
Remove SparkTransformations and add tests for QbeastTableImpl, Qbeast…
Oct 11, 2022
f0dc97b
Add Adria as Developer in pomExtra
Oct 13, 2022
6501112
Add test without location
Oct 14, 2022
7f58c4d
Add different method inputs for isQbeastProvider method
Oct 25, 2022
509aa02
Add different method inputs for isQbeastProvider
Oct 25, 2022
2b5c00b
Add rule for passing properties on ReplaceAsTableStatement
Oct 25, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 19 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ export SPARK_HOME=$PWD/spark-3.1.1-bin-hadoop3.2
```bash
$SPARK_HOME/bin/spark-shell \
--conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog \
--packages io.qbeast:qbeast-spark_2.12:0.2.0,io.delta:delta-core_2.12:1.0.0
```

Expand All @@ -118,6 +119,24 @@ csv_df.write
.save(tmp_dir)
```

#### SQL Syntax.
You can create a table with Qbeast with the help of `QbeastCatalog`.

```scala
spark.sql(
"CREATE TABLE student (id INT, name STRING, age INT) " +
"USING qbeast OPTIONS ('columnsToIndex'='id')")

```

Use **`INSERT INTO`** to add records to the new table. It will update the index in a **dynamic** fashion when new data is inserted.

```scala

spark.sql("INSERT INTO table student SELECT * FROM visitor_students")

```

### 3. Load the dataset
Load the newly indexed dataset.

Expand Down Expand Up @@ -151,21 +170,6 @@ qbeastTable.getIndexMetrics()
qbeastTable.analyze()
```

The format supports **Spark SQL** syntax.
It also updates the index in a **dynamic** fashion when new data is inserted.

```scala
val newData = Seq(1, 2, 3, 4).toDF("value")

newData.createOrReplaceTempView("newTable")

spark.sql("insert into table myTable select * from newTable")

spark.sql("insert into table myTable (value) values (4)")


```

Go to [QbeastTable documentation](./docs/QbeastTable.md) for more detailed information.

# Dependencies and Version Compatibility
Expand Down
5 changes: 5 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ ThisBuild / pomExtra :=
<name>Pol Santamaria</name>
<url>https://github.com/polsm91</url>
</developer>
<developer>
<id>Adricu8</id>
<name>Adria Correas</name>
<url>https://github.com/Adricu8</url>
</developer>
</developers>

// Scalafmt settings
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/scala/io/qbeast/core/model/MetadataManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,17 @@ trait MetadataManager[DataSchema, FileDescriptor] {
knownAnnounced: Set[CubeId],
oldReplicatedSet: ReplicatedSet): Boolean

/**
* Checks if there's an existing log directory for the table
* @param tableID the table ID
* @return
*/
def existsLog(tableID: QTableID): Boolean

/**
* Creates an initial log directory
* @param tableID
*/
def createLog(tableID: QTableID): Unit

}
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,23 @@ object SparkDeltaMetadataManager extends MetadataManager[StructType, FileAction]
diff.nonEmpty
}

/**
* Checks if there's an existing log directory for the table
*
* @param tableID the table ID
* @return
*/
override def existsLog(tableID: QTableID): Boolean = {
loadDeltaQbeastLog(tableID).deltaLog.tableExists
}

/**
* Creates an initial log directory
*
* @param tableID
*/
override def createLog(tableID: QTableID): Unit = {
loadDeltaQbeastLog(tableID).deltaLog.createLogDirectory()
}

}
16 changes: 16 additions & 0 deletions src/main/scala/io/qbeast/spark/internal/QbeastOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*/
package io.qbeast.spark.internal

import io.qbeast.core.model.QTableID
import io.qbeast.spark.index.ColumnsToIndex
import org.apache.spark.qbeast.config.DEFAULT_CUBE_SIZE
import org.apache.spark.sql.AnalysisExceptionFactory
Expand All @@ -20,6 +21,7 @@ case class QbeastOptions(columnsToIndex: Seq[String], cubeSize: Int)
object QbeastOptions {
val COLUMNS_TO_INDEX = "columnsToIndex"
val CUBE_SIZE = "cubeSize"
val PATH = "path"

private def getColumnsToIndex(options: Map[String, String]): Seq[String] = {
val encodedColumnsToIndex = options.getOrElse(
Expand Down Expand Up @@ -49,4 +51,18 @@ object QbeastOptions {
QbeastOptions(columnsToIndex, desiredCubeSize)
}

def loadTableIDFromParameters(parameters: Map[String, String]): QTableID = {
new QTableID(
parameters.getOrElse(
PATH, {
throw AnalysisExceptionFactory.create("'path' is not specified")
}))
}

def checkQbeastProperties(parameters: Map[String, String]): Unit = {
require(
parameters.contains("columnsToIndex") || parameters.contains("columnstoindex"),
throw AnalysisExceptionFactory.create("'columnsToIndex is not specified"))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
package io.qbeast.spark.internal

import io.delta.sql.DeltaSparkSessionExtension
import io.qbeast.spark.internal.rules.{SampleRule}
import io.qbeast.spark.internal.rules.{QbeastAnalysis, SampleRule, SaveAsTableRule}
import org.apache.spark.sql.SparkSessionExtensions

/**
Expand All @@ -16,9 +16,17 @@ class QbeastSparkSessionExtension extends DeltaSparkSessionExtension {

super.apply(extensions)

extensions.injectResolutionRule { session =>
new QbeastAnalysis(session)
}

extensions.injectOptimizerRule { session =>
new SampleRule(session)
}

extensions.injectOptimizerRule { session =>
new SaveAsTableRule(session)
}
}

}
41 changes: 41 additions & 0 deletions src/main/scala/io/qbeast/spark/internal/rules/QbeastAnalysis.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2021 Qbeast Analytics, S.L.
*/
package io.qbeast.spark.internal.rules

import io.qbeast.spark.internal.sources.v2.QbeastTableImpl
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation

/**
* Analyzes and resolves the Spark Plan before Optimization
* @param spark the SparkSession
*/
class QbeastAnalysis(spark: SparkSession) extends Rule[LogicalPlan] {

/**
* Returns the V1Relation from a V2Relation
* @param dataSourceV2Relation the V2Relation
* @param table the underlying table
* @return the LogicalRelation
*/
private def toV1Relation(
dataSourceV2Relation: DataSourceV2Relation,
table: QbeastTableImpl): LogicalRelation = {

val underlyingRelation = table.toBaseRelation
LogicalRelation(underlyingRelation, dataSourceV2Relation.output, None, isStreaming = false)

}

override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
// This rule is a hack to return a V1 relation for reading
// Because we didn't implemented SupportsRead on QbeastTableImpl yet
case v2Relation @ DataSourceV2Relation(t: QbeastTableImpl, _, _, _, _) =>
toV1Relation(v2Relation, t)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2021 Qbeast Analytics, S.L.
*/
package io.qbeast.spark.internal.rules

import io.qbeast.spark.internal.sources.catalog.QbeastCatalogUtils.isQbeastProvider
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.{
CreateTableAsSelect,
LogicalPlan,
ReplaceTableAsSelect
}
import org.apache.spark.sql.catalyst.rules.Rule

/**
* Rule class that enforces to pass all the write options to the Table Implementation
* @param spark the SparkSession
*/
class SaveAsTableRule(spark: SparkSession) extends Rule[LogicalPlan] with Logging {

override def apply(plan: LogicalPlan): LogicalPlan = {
// When CreateTableAsSelect statement is in place for qbeast
// We need to pass the writeOptions as properties to the creation of the table
// to make sure columnsToIndex is present
plan transformDown {
case saveAsSelect: CreateTableAsSelect if isQbeastProvider(saveAsSelect.properties) =>
val options = saveAsSelect.writeOptions
val finalProperties = saveAsSelect.properties ++ options
saveAsSelect.copy(properties = finalProperties)
case replaceAsSelect: ReplaceTableAsSelect
if isQbeastProvider(replaceAsSelect.properties) =>
val options = replaceAsSelect.writeOptions
val finalProperties = replaceAsSelect.properties ++ options
replaceAsSelect.copy(properties = finalProperties)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,25 @@
*/
package io.qbeast.spark.internal.sources

import io.qbeast.core.model.{QTableID}
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.sources.InsertableRelation

import org.apache.spark.sql.{SQLContext}
import org.apache.spark.sql.types.{StructType, StructField}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession
import io.qbeast.spark.delta.OTreeIndex
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import io.qbeast.spark.table.IndexedTable
import io.qbeast.context.QbeastContext
import org.apache.hadoop.fs.{Path}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec}
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat

/**
* Companion object for QbeastBaseRelation
*/
object QbeastBaseRelation {

/**
* Creates a QbeastBaseRelation instance
* @param tableID the identifier of the table
* @return the QbeastBaseRelation
*/

/**
* Returns a HadoopFsRelation that contains all of the data present
* in the table. This relation will be continually updated
Expand All @@ -39,48 +31,72 @@ object QbeastBaseRelation {
* @param sqlContext the SQLContext
* @return the HadoopFsRelation
*/
def createRelation(sqlContext: SQLContext, table: IndexedTable): BaseRelation = {
def createRelation(
sqlContext: SQLContext,
table: IndexedTable,
options: Map[String, String]): BaseRelation = {

val spark = SparkSession.active
val tableID = table.tableID
val snapshot = QbeastContext.metadataManager.loadSnapshot(tableID)
val schema = QbeastContext.metadataManager.loadCurrentSchema(tableID)
val revision = snapshot.loadLatestRevision
val columnsToIndex = revision.columnTransformers.map(row => row.columnName).mkString(",")
val cubeSize = revision.desiredCubeSize
val parameters =
Map[String, String]("columnsToIndex" -> columnsToIndex, "cubeSize" -> cubeSize.toString())
if (snapshot.isInitial) {
// If the Table is initial, read empty relation
// This could happen if we CREATE/REPLACE TABLE without inserting data
// In this case, we use the options variable
new HadoopFsRelation(
OTreeIndex(spark, new Path(tableID.id)),
partitionSchema = StructType(Seq.empty[StructField]),
dataSchema = schema,
bucketSpec = None,
new ParquetFileFormat(),
options)(spark) with InsertableRelation {
def insert(data: DataFrame, overwrite: Boolean): Unit = {
table.save(data, options, append = !overwrite)
}
}
} else {
// If the table contains data, initialize it
val revision = snapshot.loadLatestRevision
val columnsToIndex = revision.columnTransformers.map(row => row.columnName).mkString(",")
val cubeSize = revision.desiredCubeSize
val parameters =
Map[String, String]("columnsToIndex" -> columnsToIndex, "cubeSize" -> cubeSize.toString())

val path = new Path(tableID.id)
val fileIndex = OTreeIndex(spark, path)
val bucketSpec: Option[BucketSpec] = None
val file = new ParquetFileFormat()
val path = new Path(tableID.id)
val fileIndex = OTreeIndex(spark, path)
val bucketSpec: Option[BucketSpec] = None
val file = new ParquetFileFormat()

new HadoopFsRelation(
fileIndex,
partitionSchema = StructType(Seq.empty[StructField]),
dataSchema = schema,
bucketSpec = bucketSpec,
file,
parameters)(spark) with InsertableRelation {
def insert(data: DataFrame, overwrite: Boolean): Unit = {
table.save(data, parameters, append = !overwrite)
new HadoopFsRelation(
fileIndex,
partitionSchema = StructType(Seq.empty[StructField]),
dataSchema = schema,
bucketSpec = bucketSpec,
file,
parameters)(spark) with InsertableRelation {
def insert(data: DataFrame, overwrite: Boolean): Unit = {
table.save(data, parameters, append = !overwrite)
}
}
}
}

/**
* Function that can be called from a QbeastBaseRelation object to create a
* new QbeastBaseRelation with a new tableID.
* @param tableID the identifier of the table
* @param indexedTable the indexed table
* @return BaseRelation for the new table in Qbeast format
*/
def forQbeastTable(tableID: QTableID, indexedTable: IndexedTable): BaseRelation = {
def forQbeastTable(indexedTable: IndexedTable): BaseRelation = {
forQbeastTableWithOptions(indexedTable, Map.empty)
}

def forQbeastTableWithOptions(
indexedTable: IndexedTable,
withOptions: Map[String, String]): BaseRelation = {
val spark = SparkSession.active
createRelation(spark.sqlContext, indexedTable)

createRelation(spark.sqlContext, indexedTable, withOptions)
}

}
Loading