Skip to content

Commit

Permalink
Rebasing
Browse files Browse the repository at this point in the history
  • Loading branch information
JosepSampe committed Oct 24, 2024
1 parent 5be787a commit b5c9d4a
Show file tree
Hide file tree
Showing 216 changed files with 325 additions and 265 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test-artifact.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
run: sbt "scalafixAll --check"
- name: Test
run: |
sbt coverage 'test' coverageReport
sbt coverage 'qbeastCore/test' 'qbeastDelta/test' 'qbeastSpark/test' coverageReport
- name: Upload to Codecov
run: |
curl https://keybase.io/codecovsecurity/pgp_keys.asc | gpg --no-default-keyring --keyring trustedkeys.gpg --import # One-time step
Expand Down
8 changes: 4 additions & 4 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ sbt assembly
$SPARK_HOME/bin/spark-shell \
--jars ./target/scala-2.12/qbeast-spark-assembly-0.6.0.jar \
--packages io.delta:delta-spark_2.12:3.1.0 \
--conf spark.sql.extensions=io.qbeast.spark.delta.QbeastDeltaSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog
--conf spark.sql.extensions=io.qbeast.sql.QbeastSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=io.qbeast.catalog.QbeastCatalog
```

### 4. Publishing artefacts in the local repository
Expand Down Expand Up @@ -280,8 +280,8 @@ To publish a new version of the qbeast-spark project, follow these steps:
export QBEAST_SPARK_VERSION=0.6.0-SNAPSHOT
$SPARK_350/bin/spark-shell --repositories https://s01.oss.sonatype.org/content/repositories/snapshots \
--packages io.delta:delta-spark_2.12:3.1.0,io.qbeast:qbeast-spark_2.12:$QBEAST_SPARK_VERSION \
--conf spark.sql.extensions=io.qbeast.spark.delta.QbeastDeltaSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog
--conf spark.sql.extensions=io.qbeast.sql.QbeastSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=io.qbeast.catalog.QbeastCatalog

```
6. If everything is ok, change the `build.sbt` with the corresponding version and publish the RC.
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ export SPARK_HOME=$PWD/spark-3.5.0-bin-hadoop3
```bash
$SPARK_HOME/bin/spark-shell \
--packages io.qbeast:qbeast-spark_2.12:0.7.0,io.delta:delta-spark_2.12:3.1.0 \
--conf spark.sql.extensions=io.qbeast.spark.delta.QbeastDeltaSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog
--conf spark.sql.extensions=io.qbeast.sql.QbeastSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=io.qbeast.catalog.QbeastCatalog
```

### 2. Indexing a dataset
Expand Down Expand Up @@ -173,7 +173,7 @@ Go to the [Quickstart](./docs/Quickstart.md) or [notebook](docs/sample_pushdown_
Get **insights** to the data using the `QbeastTable` interface!

```scala
import io.qbeast.spark.QbeastTable
import io.qbeast.table.QbeastTable

val qbeastTable = QbeastTable.forPath(spark, tmpDir)

Expand Down
43 changes: 37 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,49 @@ import Dependencies._

val mainVersion = "0.8.0-SNAPSHOT"

// Projects
lazy val qbeastCore = (project in file("./core"))
.settings(
name := "qbeast-core",
libraryDependencies ++= Seq(sparkCore % Provided, sparkSql % Provided, sparkml % Provided),
Test / parallelExecution := false,
assembly / test := {},
assembly / assemblyOption := (assembly / assemblyOption).value.copy(includeScala = false))
.settings(noWarningInConsole)

lazy val qbeastDelta = (project in file("./delta"))
.dependsOn(qbeastCore)
.settings(
name := "qbeast-delta",
libraryDependencies ++= Seq(sparkCore % Provided, deltaSpark % Provided, sparkSql % Provided),
Test / parallelExecution := false,
assembly / test := {},
assembly / assemblyOption := (assembly / assemblyOption).value.copy(includeScala = false))
.settings(noWarningInConsole)

lazy val qbeastSpark = (project in file("."))
.dependsOn(qbeastCore, qbeastDelta)
.enablePlugins(ScalaUnidocPlugin)
.settings(
name := "qbeast-spark",
libraryDependencies ++= Seq(
sparkCore % Provided,
sparkSql % Provided,
hadoopClient % Provided,
deltaSpark % Provided,
sparkml % Provided,
apacheCommons % Test,
amazonAws % Test,
hadoopCommons % Test,
sparkml % Test,
hadoopAws % Test),
Test / parallelExecution := false,
assembly / test := {},
assembly / assemblyOption := (assembly / assemblyOption).value.copy(includeScala = false))
.settings(noWarningInConsole)

qbeastCore / Compile / doc / scalacOptions ++= Seq(
"-doc-title",
"qbeast-core",
"-doc-version",
mainVersion,
"-doc-footer",
"Copyright 2022 Qbeast - Docs for version " + mainVersion + " of qbeast-core")

qbeastSpark / Compile / doc / scalacOptions ++= Seq(
"-doc-title",
"qbeast-spark",
Expand All @@ -31,6 +54,14 @@ qbeastSpark / Compile / doc / scalacOptions ++= Seq(
"-doc-footer",
"Copyright 2022 Qbeast - Docs for version " + mainVersion + " of qbeast-spark")

qbeastDelta / Compile / doc / scalacOptions ++= Seq(
"-doc-title",
"qbeast-delta",
"-doc-version",
mainVersion,
"-doc-footer",
"Copyright 2022 Qbeast - Docs for version " + mainVersion + " of qbeast-delta")

// Common metadata
ThisBuild / version := mainVersion
ThisBuild / organization := "io.qbeast"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
*/
package io.qbeast.core.model

import io.qbeast.spark.model.CubeState
import io.qbeast.spark.model.CubeState.CubeStateValue
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.SparkSession

import CubeState.CubeStateValue

/**
* Container for the table changes
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.qbeast.spark.model
package io.qbeast.core.model

/**
* Names of possible states of the cube
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType

/**
* Implementation of FileIndex to be used for empty tables.
* Implementation of FileIndex to be used for empty table.
*/
object EmptyFileIndex extends FileIndex with Serializable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import io.qbeast.spark.index.ColumnsToIndex
import io.qbeast.spark.internal.QbeastOptions.COLUMNS_TO_INDEX
import io.qbeast.spark.internal.QbeastOptions.CUBE_SIZE
import org.apache.spark.qbeast.config.DEFAULT_CUBE_SIZE
import org.apache.spark.qbeast.config.DEFAULT_TABLE_FORMAT
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.AnalysisExceptionFactory
import org.apache.spark.sql.DataFrame
Expand Down Expand Up @@ -56,6 +57,7 @@ import scala.util.matching.Regex
case class QbeastOptions(
columnsToIndex: Seq[String],
cubeSize: Int,
tableFormat: String,
stats: Option[DataFrame],
txnAppId: Option[String],
txnVersion: Option[String],
Expand Down Expand Up @@ -104,6 +106,7 @@ case class QbeastOptions(
object QbeastOptions {
val COLUMNS_TO_INDEX: String = "columnsToIndex"
val CUBE_SIZE: String = "cubeSize"
val TABLE_FORMAT: String = "tableFormat"
val PATH: String = "path"
val STATS: String = "columnStats"
val TXN_APP_ID: String = "txnAppId"
Expand Down Expand Up @@ -142,6 +145,12 @@ object QbeastOptions {
}
}

private def getTableFormat(options: Map[String, String]): String =
options.get(TABLE_FORMAT) match {
case Some(value) => value
case None => DEFAULT_TABLE_FORMAT
}

/**
* Get the column stats from the options This stats should be in a JSON formatted string with
* the following schema {columnName_min:value, columnName_max:value, ...}
Expand Down Expand Up @@ -219,6 +228,7 @@ object QbeastOptions {
def apply(options: CaseInsensitiveMap[String]): QbeastOptions = {
val columnsToIndex = getColumnsToIndex(options)
val desiredCubeSize = getDesiredCubeSize(options)
val tableFormat = getTableFormat(options)
val stats = getStats(options)
val txnAppId = getTxnAppId(options)
val txnVersion = getTxnVersion(options)
Expand All @@ -230,6 +240,7 @@ object QbeastOptions {
QbeastOptions(
columnsToIndex,
desiredCubeSize,
tableFormat,
stats,
txnAppId,
txnVersion,
Expand Down Expand Up @@ -258,14 +269,33 @@ object QbeastOptions {
val caseInsensitiveMap = CaseInsensitiveMap(options)
val userMetadata = getUserMetadata(caseInsensitiveMap)
val hookInfo = getHookInfo(caseInsensitiveMap)
QbeastOptions(Seq.empty, 0, None, None, None, userMetadata, None, None, hookInfo)
QbeastOptions(
Seq.empty,
0,
DEFAULT_TABLE_FORMAT,
None,
None,
None,
userMetadata,
None,
None,
hookInfo)
}

/**
* The empty options to be used as a placeholder.
*/
lazy val empty: QbeastOptions =
QbeastOptions(Seq.empty, DEFAULT_CUBE_SIZE, None, None, None, None, None, None)
QbeastOptions(
Seq.empty,
DEFAULT_CUBE_SIZE,
DEFAULT_TABLE_FORMAT,
None,
None,
None,
None,
None,
None)

def loadTableIDFromParameters(parameters: Map[String, String]): QTableID = {
new QTableID(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
package io.qbeast.spark.writer

import io.qbeast.core.model.CubeId
import io.qbeast.core.model.CubeState
import io.qbeast.core.model.IndexFile
import io.qbeast.core.model.IndexFileBuilder
import io.qbeast.core.model.IndexFileBuilder.BlockBuilder
import io.qbeast.core.model.TableChanges
import io.qbeast.core.model.Weight
import io.qbeast.spark.index.QbeastColumns
import io.qbeast.spark.model.CubeState
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapred.TaskAttemptContextImpl
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
*/
package org.apache.spark.qbeast

import io.qbeast.context.QbeastContext
import org.apache.spark.internal.config.ConfigBuilder
import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.internal.config.OptionalConfigEntry
import org.apache.spark.sql.SparkSession

package object config {

Expand Down Expand Up @@ -58,19 +58,31 @@ package object config {
.intConf
.createWithDefault(3)

def DEFAULT_NUMBER_OF_RETRIES: Int = QbeastContext.config
private[config] val tableFormat: ConfigEntry[String] =
ConfigBuilder("spark.qbeast.tableFormat")
.version("0.2.0")
.stringConf
.createWithDefault("delta")

def DEFAULT_NUMBER_OF_RETRIES: Int = SparkSession.active.sparkContext.conf
.get(defaultNumberOfRetries)

def DEFAULT_CUBE_SIZE: Int = QbeastContext.config
def DEFAULT_CUBE_SIZE: Int = SparkSession.active.sparkContext.conf
.get(defaultCubeSize)

def CUBE_WEIGHTS_BUFFER_CAPACITY: Long = QbeastContext.config
def DEFAULT_TABLE_FORMAT: String = SparkSession.active.sparkContext.conf
.get(tableFormat)

def CUBE_WEIGHTS_BUFFER_CAPACITY: Long = SparkSession.active.sparkContext.conf
.get(cubeWeightsBufferCapacity)

def STAGING_SIZE_IN_BYTES: Option[Long] = QbeastContext.config.get(stagingSizeInBytes)
def STAGING_SIZE_IN_BYTES: Option[Long] =
SparkSession.active.sparkContext.conf.get(stagingSizeInBytes)

def COLUMN_SELECTOR_ENABLED: Boolean = QbeastContext.config.get(columnsToIndexSelectorEnabled)
def COLUMN_SELECTOR_ENABLED: Boolean =
SparkSession.active.sparkContext.conf.get(columnsToIndexSelectorEnabled)

def MAX_NUM_COLUMNS_TO_INDEX: Int = QbeastContext.config.get(maxNumColumnsToIndex)
def MAX_NUM_COLUMNS_TO_INDEX: Int =
SparkSession.active.sparkContext.conf.get(maxNumColumnsToIndex)

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import io.qbeast.core.model.QTableID
import io.qbeast.core.model.StagingDataManager
import io.qbeast.core.model.StagingDataManagerFactory
import io.qbeast.core.model.StagingResolution
import io.qbeast.spark.internal.commands.ConvertToQbeastCommand
import io.qbeast.spark.internal.QbeastOptions
import org.apache.hadoop.fs.Path
import org.apache.spark.qbeast.config.STAGING_SIZE_IN_BYTES
Expand All @@ -36,7 +35,7 @@ import org.apache.spark.sql.SparkSession
/**
* Access point for staged data
*/
private[spark] class DeltaStagingDataManager(tableID: QTableID)
class DeltaStagingDataManager(tableID: QTableID)
extends DeltaStagingUtils
with StagingDataManager {
private val spark = SparkSession.active
Expand Down Expand Up @@ -133,13 +132,6 @@ private[spark] class DeltaStagingDataManager(tableID: QTableID)
.option(DeltaOptions.USER_METADATA_OPTION, options.userMetadata.get)
}
writer.save(tableID.id)

// Convert if the table is not yet qbeast
if (isInitial) {
val colsToIndex = indexStatus.revision.columnTransformers.map(_.columnName)
val dcs = indexStatus.revision.desiredCubeSize
ConvertToQbeastCommand(s"delta.`${tableID.id}`", colsToIndex, dcs).run(spark)
}
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package org.apache.spark.sql.delta

import io.qbeast.context.QbeastContext
import io.qbeast.core.model.QTableID
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.Expression
Expand Down Expand Up @@ -85,7 +84,7 @@ trait DeltaStatsCollectionUtils {
sparkSession: SparkSession,
tableID: QTableID): Option[DeltaJobStatisticsTracker] = {

if (QbeastContext.config.get(DeltaSQLConf.DELTA_COLLECT_STATS)) {
if (sparkSession.conf.get(DeltaSQLConf.DELTA_COLLECT_STATS)) {
val outputStatsAtrributes = data.queryExecution.analyzed.output
val outputSchema = data.schema

Expand Down
Loading

0 comments on commit b5c9d4a

Please sign in to comment.