Skip to content

Commit

Permalink
Merge pull request #124 from osopardo1/42-table-writes-catalog
Browse files Browse the repository at this point in the history
Add Table properties to Qbeast
  • Loading branch information
osopardo1 authored Nov 25, 2022
2 parents 014aa0e + 2b5c00b commit d503833
Show file tree
Hide file tree
Showing 39 changed files with 2,721 additions and 243 deletions.
34 changes: 19 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ export SPARK_HOME=$PWD/spark-3.1.1-bin-hadoop3.2
```bash
$SPARK_HOME/bin/spark-shell \
--conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog \
--packages io.qbeast:qbeast-spark_2.12:0.2.0,io.delta:delta-core_2.12:1.0.0
```

Expand All @@ -118,6 +119,24 @@ csv_df.write
.save(tmp_dir)
```

#### SQL Syntax.
You can create a table with Qbeast with the help of `QbeastCatalog`.

```scala
spark.sql(
"CREATE TABLE student (id INT, name STRING, age INT) " +
"USING qbeast OPTIONS ('columnsToIndex'='id')")

```

Use **`INSERT INTO`** to add records to the new table. It will update the index in a **dynamic** fashion when new data is inserted.

```scala

spark.sql("INSERT INTO table student SELECT * FROM visitor_students")

```

### 3. Load the dataset
Load the newly indexed dataset.

Expand Down Expand Up @@ -151,21 +170,6 @@ qbeastTable.getIndexMetrics()
qbeastTable.analyze()
```

The format supports **Spark SQL** syntax.
It also updates the index in a **dynamic** fashion when new data is inserted.

```scala
val newData = Seq(1, 2, 3, 4).toDF("value")

newData.createOrReplaceTempView("newTable")

spark.sql("insert into table myTable select * from newTable")

spark.sql("insert into table myTable (value) values (4)")


```

Go to [QbeastTable documentation](./docs/QbeastTable.md) for more detailed information.

# Dependencies and Version Compatibility
Expand Down
5 changes: 5 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ ThisBuild / pomExtra :=
<name>Pol Santamaria</name>
<url>https://github.com/polsm91</url>
</developer>
<developer>
<id>Adricu8</id>
<name>Adria Correas</name>
<url>https://github.com/Adricu8</url>
</developer>
</developers>

// Scalafmt settings
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/scala/io/qbeast/core/model/MetadataManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,17 @@ trait MetadataManager[DataSchema, FileDescriptor] {
knownAnnounced: Set[CubeId],
oldReplicatedSet: ReplicatedSet): Boolean

/**
* Checks if there's an existing log directory for the table
* @param tableID the table ID
* @return
*/
def existsLog(tableID: QTableID): Boolean

/**
* Creates an initial log directory
* @param tableID
*/
def createLog(tableID: QTableID): Unit

}
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,23 @@ object SparkDeltaMetadataManager extends MetadataManager[StructType, FileAction]
diff.nonEmpty
}

/**
* Checks if there's an existing log directory for the table
*
* @param tableID the table ID
* @return
*/
override def existsLog(tableID: QTableID): Boolean = {
loadDeltaQbeastLog(tableID).deltaLog.tableExists
}

/**
* Creates an initial log directory
*
* @param tableID
*/
override def createLog(tableID: QTableID): Unit = {
loadDeltaQbeastLog(tableID).deltaLog.createLogDirectory()
}

}
16 changes: 16 additions & 0 deletions src/main/scala/io/qbeast/spark/internal/QbeastOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*/
package io.qbeast.spark.internal

import io.qbeast.core.model.QTableID
import io.qbeast.spark.index.ColumnsToIndex
import org.apache.spark.qbeast.config.DEFAULT_CUBE_SIZE
import org.apache.spark.sql.AnalysisExceptionFactory
Expand All @@ -20,6 +21,7 @@ case class QbeastOptions(columnsToIndex: Seq[String], cubeSize: Int)
object QbeastOptions {
val COLUMNS_TO_INDEX = "columnsToIndex"
val CUBE_SIZE = "cubeSize"
val PATH = "path"

private def getColumnsToIndex(options: Map[String, String]): Seq[String] = {
val encodedColumnsToIndex = options.getOrElse(
Expand Down Expand Up @@ -49,4 +51,18 @@ object QbeastOptions {
QbeastOptions(columnsToIndex, desiredCubeSize)
}

def loadTableIDFromParameters(parameters: Map[String, String]): QTableID = {
new QTableID(
parameters.getOrElse(
PATH, {
throw AnalysisExceptionFactory.create("'path' is not specified")
}))
}

def checkQbeastProperties(parameters: Map[String, String]): Unit = {
require(
parameters.contains("columnsToIndex") || parameters.contains("columnstoindex"),
throw AnalysisExceptionFactory.create("'columnsToIndex is not specified"))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
package io.qbeast.spark.internal

import io.delta.sql.DeltaSparkSessionExtension
import io.qbeast.spark.internal.rules.{SampleRule}
import io.qbeast.spark.internal.rules.{QbeastAnalysis, SampleRule, SaveAsTableRule}
import org.apache.spark.sql.SparkSessionExtensions

/**
Expand All @@ -16,9 +16,17 @@ class QbeastSparkSessionExtension extends DeltaSparkSessionExtension {

super.apply(extensions)

extensions.injectResolutionRule { session =>
new QbeastAnalysis(session)
}

extensions.injectOptimizerRule { session =>
new SampleRule(session)
}

extensions.injectOptimizerRule { session =>
new SaveAsTableRule(session)
}
}

}
41 changes: 41 additions & 0 deletions src/main/scala/io/qbeast/spark/internal/rules/QbeastAnalysis.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2021 Qbeast Analytics, S.L.
*/
package io.qbeast.spark.internal.rules

import io.qbeast.spark.internal.sources.v2.QbeastTableImpl
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation

/**
* Analyzes and resolves the Spark Plan before Optimization
* @param spark the SparkSession
*/
class QbeastAnalysis(spark: SparkSession) extends Rule[LogicalPlan] {

/**
* Returns the V1Relation from a V2Relation
* @param dataSourceV2Relation the V2Relation
* @param table the underlying table
* @return the LogicalRelation
*/
private def toV1Relation(
dataSourceV2Relation: DataSourceV2Relation,
table: QbeastTableImpl): LogicalRelation = {

val underlyingRelation = table.toBaseRelation
LogicalRelation(underlyingRelation, dataSourceV2Relation.output, None, isStreaming = false)

}

override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
// This rule is a hack to return a V1 relation for reading
// Because we didn't implemented SupportsRead on QbeastTableImpl yet
case v2Relation @ DataSourceV2Relation(t: QbeastTableImpl, _, _, _, _) =>
toV1Relation(v2Relation, t)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2021 Qbeast Analytics, S.L.
*/
package io.qbeast.spark.internal.rules

import io.qbeast.spark.internal.sources.catalog.QbeastCatalogUtils.isQbeastProvider
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.{
CreateTableAsSelect,
LogicalPlan,
ReplaceTableAsSelect
}
import org.apache.spark.sql.catalyst.rules.Rule

/**
* Rule class that enforces to pass all the write options to the Table Implementation
* @param spark the SparkSession
*/
class SaveAsTableRule(spark: SparkSession) extends Rule[LogicalPlan] with Logging {

override def apply(plan: LogicalPlan): LogicalPlan = {
// When CreateTableAsSelect statement is in place for qbeast
// We need to pass the writeOptions as properties to the creation of the table
// to make sure columnsToIndex is present
plan transformDown {
case saveAsSelect: CreateTableAsSelect if isQbeastProvider(saveAsSelect.properties) =>
val options = saveAsSelect.writeOptions
val finalProperties = saveAsSelect.properties ++ options
saveAsSelect.copy(properties = finalProperties)
case replaceAsSelect: ReplaceTableAsSelect
if isQbeastProvider(replaceAsSelect.properties) =>
val options = replaceAsSelect.writeOptions
val finalProperties = replaceAsSelect.properties ++ options
replaceAsSelect.copy(properties = finalProperties)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,25 @@
*/
package io.qbeast.spark.internal.sources

import io.qbeast.core.model.{QTableID}
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.sources.InsertableRelation

import org.apache.spark.sql.{SQLContext}
import org.apache.spark.sql.types.{StructType, StructField}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession
import io.qbeast.spark.delta.OTreeIndex
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import io.qbeast.spark.table.IndexedTable
import io.qbeast.context.QbeastContext
import org.apache.hadoop.fs.{Path}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec}
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat

/**
* Companion object for QbeastBaseRelation
*/
object QbeastBaseRelation {

/**
* Creates a QbeastBaseRelation instance
* @param tableID the identifier of the table
* @return the QbeastBaseRelation
*/

/**
* Returns a HadoopFsRelation that contains all of the data present
* in the table. This relation will be continually updated
Expand All @@ -39,48 +31,72 @@ object QbeastBaseRelation {
* @param sqlContext the SQLContext
* @return the HadoopFsRelation
*/
def createRelation(sqlContext: SQLContext, table: IndexedTable): BaseRelation = {
def createRelation(
sqlContext: SQLContext,
table: IndexedTable,
options: Map[String, String]): BaseRelation = {

val spark = SparkSession.active
val tableID = table.tableID
val snapshot = QbeastContext.metadataManager.loadSnapshot(tableID)
val schema = QbeastContext.metadataManager.loadCurrentSchema(tableID)
val revision = snapshot.loadLatestRevision
val columnsToIndex = revision.columnTransformers.map(row => row.columnName).mkString(",")
val cubeSize = revision.desiredCubeSize
val parameters =
Map[String, String]("columnsToIndex" -> columnsToIndex, "cubeSize" -> cubeSize.toString())
if (snapshot.isInitial) {
// If the Table is initial, read empty relation
// This could happen if we CREATE/REPLACE TABLE without inserting data
// In this case, we use the options variable
new HadoopFsRelation(
OTreeIndex(spark, new Path(tableID.id)),
partitionSchema = StructType(Seq.empty[StructField]),
dataSchema = schema,
bucketSpec = None,
new ParquetFileFormat(),
options)(spark) with InsertableRelation {
def insert(data: DataFrame, overwrite: Boolean): Unit = {
table.save(data, options, append = !overwrite)
}
}
} else {
// If the table contains data, initialize it
val revision = snapshot.loadLatestRevision
val columnsToIndex = revision.columnTransformers.map(row => row.columnName).mkString(",")
val cubeSize = revision.desiredCubeSize
val parameters =
Map[String, String]("columnsToIndex" -> columnsToIndex, "cubeSize" -> cubeSize.toString())

val path = new Path(tableID.id)
val fileIndex = OTreeIndex(spark, path)
val bucketSpec: Option[BucketSpec] = None
val file = new ParquetFileFormat()
val path = new Path(tableID.id)
val fileIndex = OTreeIndex(spark, path)
val bucketSpec: Option[BucketSpec] = None
val file = new ParquetFileFormat()

new HadoopFsRelation(
fileIndex,
partitionSchema = StructType(Seq.empty[StructField]),
dataSchema = schema,
bucketSpec = bucketSpec,
file,
parameters)(spark) with InsertableRelation {
def insert(data: DataFrame, overwrite: Boolean): Unit = {
table.save(data, parameters, append = !overwrite)
new HadoopFsRelation(
fileIndex,
partitionSchema = StructType(Seq.empty[StructField]),
dataSchema = schema,
bucketSpec = bucketSpec,
file,
parameters)(spark) with InsertableRelation {
def insert(data: DataFrame, overwrite: Boolean): Unit = {
table.save(data, parameters, append = !overwrite)
}
}
}
}

/**
* Function that can be called from a QbeastBaseRelation object to create a
* new QbeastBaseRelation with a new tableID.
* @param tableID the identifier of the table
* @param indexedTable the indexed table
* @return BaseRelation for the new table in Qbeast format
*/
def forQbeastTable(tableID: QTableID, indexedTable: IndexedTable): BaseRelation = {
def forQbeastTable(indexedTable: IndexedTable): BaseRelation = {
forQbeastTableWithOptions(indexedTable, Map.empty)
}

def forQbeastTableWithOptions(
indexedTable: IndexedTable,
withOptions: Map[String, String]): BaseRelation = {
val spark = SparkSession.active
createRelation(spark.sqlContext, indexedTable)

createRelation(spark.sqlContext, indexedTable, withOptions)
}

}
Loading

0 comments on commit d503833

Please sign in to comment.