Skip to content

Commit

Permalink
Merge pull request #109 from osopardo1/update-spark-and-delta-versions
Browse files Browse the repository at this point in the history
Change Delta Lake, Hadoop and Spark versions
  • Loading branch information
osopardo1 authored Sep 6, 2022
2 parents 90b1d3b + be0f5b6 commit 966b007
Show file tree
Hide file tree
Showing 11 changed files with 42 additions and 42 deletions.
16 changes: 8 additions & 8 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,22 +93,22 @@ Identify the committers and contributors who have worked on the code being chang
### 1. Install [**sbt**(>=1.4.7)](https://www.scala-sbt.org/download.html).

### 2. Install **Spark**
Download **Spark 3.1.1 with Hadoop 3.2***, unzip it, and create the `SPARK_HOME` environment variable:<br />
*: You can use Hadoop 2.7 if desired, but you could have some troubles with different cloud providers' storage, read more about it [here](docs/CloudStorages.md).
Download **Spark 3.2.2 with Hadoop 3.3***, unzip it, and create the `SPARK_HOME` environment variable:<br />
*: You can use Hadoop 2.7 or 3.2 if desired, but you could have some troubles with different cloud providers' storage, read more about it [here](docs/CloudStorages.md).

```bash
wget https://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
wget https://archive.apache.org/dist/spark/spark-3.2.2/spark-3.2.2-bin-hadoop3.2.tgz

tar xzvf spark-3.1.1-bin-hadoop3.2.tgz
tar xzvf spark-3.2.2-bin-hadoop3.2.tgz

export SPARK_HOME=$PWD/spark-3.1.1-bin-hadoop3.2
export SPARK_HOME=$PWD/spark-3.2.2-bin-hadoop3.2
```

### 3. Project packaging:
Navigate to the repository folder and package the project using **sbt**. [JDK 8](https://www.azul.com/downloads/?version=java-8-lts&package=jdk) is recommended.

> ℹ️ **Note**: You can specify **custom** Spark or Hadoop **versions** when packaging by using
>`-Dspark.version=3.2.0` or `-Dhadoop.version=2.7.4` when running `sbt assembly`.
>`-Dspark.version=3.2.2` or `-Dhadoop.version=2.7.4` when running `sbt assembly`.
If you have troubles with the versions you use, don't hesitate to **ask the community** in [GitHub discussions](https://github.com/Qbeast-io/qbeast-spark/discussions).

``` bash
Expand All @@ -125,9 +125,9 @@ For example:
sbt assembly

$SPARK_HOME/bin/spark-shell \
--jars ./target/scala-2.12/qbeast-spark-assembly-0.2.0.jar \
--jars ./target/scala-2.12/qbeast-spark-assembly-0.3.0-alpha.jar \
--conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \
--packages io.delta:delta-core_2.12:1.0.0
--packages io.delta:delta-core_2.12:1.2.0
```

<br/>
Expand Down
15 changes: 8 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@

**Qbeast Spark** is an extension for [**Data Lakehouses**](http://cidrdb.org/cidr2021/papers/cidr2021_paper17.pdf) that enables **multi-dimensional filtering** and **sampling** directly on the storage

[![apache-spark](https://img.shields.io/badge/apache--spark-3.1.x-blue)](https://spark.apache.org/releases/spark-release-3-1-1.html)
[![apache-hadoop](https://img.shields.io/badge/apache--hadoop-3.2.0-blue)](https://hadoop.apache.org/release/3.2.0.html)
[![delta-core](https://img.shields.io/badge/delta--core-1.0.0-blue)](https://github.com/delta-io/delta/releases/tag/v1.0.0)
[![apache-spark](https://img.shields.io/badge/apache--spark-3.2.x-blue)](https://spark.apache.org/releases/spark-release-3-2-2.html)
[![apache-hadoop](https://img.shields.io/badge/apache--hadoop-3.3.x-blue)](https://hadoop.apache.org/release/3.3.1.html)
[![delta-core](https://img.shields.io/badge/delta--core-1.2.0-blue)](https://github.com/delta-io/delta/releases/tag/v1.2.0)
[![codecov](https://codecov.io/gh/Qbeast-io/qbeast-spark/branch/main/graph/badge.svg?token=8WO7HGZ4MW)](https://codecov.io/gh/Qbeast-io/qbeast-spark)

</div>
Expand Down Expand Up @@ -169,10 +169,11 @@ spark.sql("insert into table myTable (value) values (4)")
Go to [QbeastTable documentation](./docs/QbeastTable.md) for more detailed information.

# Dependencies and Version Compatibility
| Version | Spark | Hadoop | Delta Lake |
|---------|:-----:|:------:|:----------:|
| 0.1.0 | 3.0.0 | 3.2.0 | 0.8.0 |
| 0.2.0 | 3.1.x | 3.2.0 | 1.0.0 |
| Version | Spark | Hadoop | Delta Lake |
|-------------|:-----:|:------:|:----------:|
| 0.1.0 | 3.0.0 | 3.2.0 | 0.8.0 |
| 0.2.0 | 3.1.x | 3.2.0 | 1.0.0 |
| 0.3.0-alpha | 3.2.x | 3.3.x | 1.2.x |

Check [here](https://docs.delta.io/latest/releases.html) for **Delta Lake** and **Apache Spark** version compatibility.

Expand Down
12 changes: 6 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import Dependencies._
import xerial.sbt.Sonatype._

val mainVersion = "0.3.0-alpha"

lazy val qbeastCore = (project in file("core"))
.settings(
name := "qbeast-core",
version := "0.1.0",
version := mainVersion,
libraryDependencies ++= Seq(apacheCommons % Test))

val qbeast_spark_version = "0.2.0"

// Projects
lazy val qbeastSpark = (project in file("."))
.enablePlugins(ScalaUnidocPlugin)
Expand All @@ -33,12 +33,12 @@ qbeastSpark / Compile / doc / scalacOptions ++= Seq(
"-doc-title",
"qbeast-spark",
"-doc-version",
qbeast_spark_version,
mainVersion,
"-doc-footer",
"Copyright 2022 Qbeast - Docs for version " + qbeast_spark_version + " of qbeast-spark")
"Copyright 2022 Qbeast - Docs for version " + mainVersion + " of qbeast-spark")

// Common metadata
ThisBuild / version := qbeast_spark_version
ThisBuild / version := mainVersion
ThisBuild / organization := "io.qbeast"
ThisBuild / organizationName := "Qbeast Analytics, S.L."
ThisBuild / organizationHomepage := Some(url("https://qbeast.io/"))
Expand Down
7 changes: 4 additions & 3 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import sbt._
* External libraries used in the project with versions.
*/
object Dependencies {
lazy val sparkVersion: String = sys.props.get("spark.version").getOrElse("3.1.2")
lazy val hadoopVersion: String = sys.props.get("hadoop.version").getOrElse("3.2.0")
lazy val sparkVersion: String = sys.props.get("spark.version").getOrElse("3.2.2")
lazy val hadoopVersion: String = sys.props.get("hadoop.version").getOrElse("3.3.1")
lazy val deltaVersion: String = "1.2.0"

val sparkCore = "org.apache.spark" %% "spark-core" % sparkVersion
val sparkSql = "org.apache.spark" %% "spark-sql" % sparkVersion
val hadoopClient = "org.apache.hadoop" % "hadoop-client" % hadoopVersion
val deltaCore = "io.delta" %% "delta-core" % "1.0.0"
val deltaCore = "io.delta" %% "delta-core" % deltaVersion
val sparkFastTests = "com.github.mrpowers" %% "spark-fast-tests" % "1.0.0"
val scalaTest = "org.scalatest" %% "scalatest" % "3.2.9"
val mockito = "org.scalatestplus" %% "mockito-3-4" % "3.2.9.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package io.qbeast.spark.delta
import io.qbeast.core.model.{ReplicatedSet, Revision, TableChanges, mapper}
import io.qbeast.spark.utils.MetadataConfig
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta.schema.{ImplicitMetadataOperation, SchemaUtils}
import org.apache.spark.sql.delta.schema.{ImplicitMetadataOperation, SchemaMergingUtils}
import org.apache.spark.sql.delta.{
DeltaErrors,
MetadataMismatchErrorBuilder,
Expand Down Expand Up @@ -106,7 +106,7 @@ private[delta] class QbeastMetadataOperation extends ImplicitMetadataOperation {
val mergedSchema = if (isOverwriteMode && canOverwriteSchema) {
dataSchema
} else {
SchemaUtils.mergeSchemas(txn.metadata.schema, dataSchema)
SchemaMergingUtils.mergeSchemas(txn.metadata.schema, dataSchema)
}
val normalizedPartitionCols =
Seq.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package io.qbeast.spark.internal.commands

import io.qbeast.core.model.RevisionID
import io.qbeast.spark.table.IndexedTable
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.execution.command.LeafRunnableCommand
import org.apache.spark.sql.{Row, SparkSession}

/**
Expand All @@ -15,7 +15,7 @@ import org.apache.spark.sql.{Row, SparkSession}
* @param indexedTable indexed table to analyze
*/
case class AnalyzeTableCommand(revisionID: RevisionID, indexedTable: IndexedTable)
extends RunnableCommand {
extends LeafRunnableCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
indexedTable.analyze(revisionID).map(r => Row.fromSeq(Seq(r)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ package io.qbeast.spark.internal.commands

import io.qbeast.core.model.RevisionID
import io.qbeast.spark.table.IndexedTable
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.execution.command.{LeafRunnableCommand}
import org.apache.spark.sql.{Row, SparkSession}

case class CompactTableCommand(revisionID: RevisionID, indexedTable: IndexedTable)
extends RunnableCommand {
extends LeafRunnableCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
indexedTable.compact(revisionID)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package io.qbeast.spark.internal.commands

import io.qbeast.core.model.RevisionID
import io.qbeast.spark.table.IndexedTable
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.execution.command.LeafRunnableCommand
import org.apache.spark.sql.{Row, SparkSession}

/**
Expand All @@ -15,7 +15,7 @@ import org.apache.spark.sql.{Row, SparkSession}
* @param indexedTable indexed table to optimize
*/
case class OptimizeTableCommand(revisionID: RevisionID, indexedTable: IndexedTable)
extends RunnableCommand {
extends LeafRunnableCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
indexedTable.optimize(revisionID)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,7 @@ case class QbeastMurmur3Hash(children: Seq[Expression], seed: Int) extends HashE
Murmur3HashFunction.hash(value, dataType, seed).toInt
}

override protected def withNewChildrenInternal(
newChildren: IndexedSeq[Expression]): Expression = copy(children = newChildren)

}
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,6 @@ class QueryExecutorTest extends QbeastIntegrationTestSpec {
val matchFiles = queryExecutor.execute().map(_.path)
val diff = allFiles.toSet -- matchFiles.toSet

// scalastyle:off println
println(
s"Number of files: ${allFiles.length}, Matching files: ${matchFiles.length}, " +
s"Skipped files: ${diff.size}")

val allQbeastFiles = allDeltaFiles.map(addFile =>
QbeastBlock(addFile.path, addFile.tags, addFile.size, addFile.modificationTime))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import org.apache.spark.sql.functions.{avg, col, rand, when}

class QbeastFilterPushdownTest extends QbeastIntegrationTestSpec {

private val filter_user_greaterThanOrEq = "(`user_id` >= 536764969)"
private val filter_user_lessThan = "(`user_id` < 546280860)"
private val filter_product_lessThan = "(`product_id` >= 11522682)"
private val filter_product_greaterThanOrEq = "(`product_id` < 50500010)"
private val filter_user_greaterThanOrEq = "(user_id >= 536764969)"
private val filter_user_lessThan = "(user_id < 546280860)"
private val filter_product_lessThan = "(product_id >= 11522682)"
private val filter_product_greaterThanOrEq = "(product_id < 50500010)"

private def checkFileFiltering(query: DataFrame): Unit = {
val leaves = query.queryExecution.executedPlan.collectLeaves()
Expand All @@ -37,7 +37,7 @@ class QbeastFilterPushdownTest extends QbeastIntegrationTestSpec {
}

private def checkLogicalFilterPushdown(sqlFilters: Seq[String], query: DataFrame): Unit = {
val leaves = query.queryExecution.executedPlan.collectLeaves()
val leaves = query.queryExecution.sparkPlan.collectLeaves()

val dataFilters = leaves
.collectFirst {
Expand Down

0 comments on commit 966b007

Please sign in to comment.