Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #398: Qbeast Spark Refactor -- Delta abstraction #446

Merged
merged 10 commits into from
Oct 28, 2024
2 changes: 1 addition & 1 deletion .github/workflows/test-artifact.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
run: sbt "scalafixAll --check"
- name: Test
run: |
sbt coverage 'test' coverageReport
sbt coverage 'qbeastCore/test' 'qbeastDelta/test' 'qbeastSpark/test' coverageReport
- name: Upload to Codecov
run: |
curl https://keybase.io/codecovsecurity/pgp_keys.asc | gpg --no-default-keyring --keyring trustedkeys.gpg --import # One-time step
Expand Down
18 changes: 9 additions & 9 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ The following log levels are used to track code behaviour:
def writeWithTransaction(writer: => (TableChanges, Seq[FileAction])): Unit = {
// [...] Code to write the transaction [...]
if (txn.appId == appId && version <= txn.version) {
val message = s"Transaction ${version} from application ${appId} is already completed," +
val message = s"Transaction $version from application $appId is already completed," +
" the requested write is ignored"
logWarn(message)
return
Expand All @@ -131,7 +131,7 @@ The following log levels are used to track code behaviour:
if (isNewRevision(options)) {
// Merging revisions code
logDebug(
s"Merging transformations for table ${tableID} with cubeSize=${newRevisionCubeSize}")
s"Merging transformations for table $tableID with cubeSize=$newRevisionCubeSize")
// Code to merge revisions
}
```
Expand All @@ -144,9 +144,9 @@ The following log levels are used to track code behaviour:
indexStatus: IndexStatus,
options: QbeastOptions,
append: Boolean): Unit = {
logTrace(s"Begin: Writing data to table ${tableID}")
logTrace(s"Begin: Writing data to table $tableID")
// [...] Code to write the data [...]
logTrace(s"End: Writing data to table ${tableID}")
logTrace(s"End: Writing data to table $tableID")
}
```

Expand Down Expand Up @@ -191,10 +191,10 @@ For example:
sbt assembly

$SPARK_HOME/bin/spark-shell \
--jars ./target/scala-2.12/qbeast-spark-assembly-0.6.0.jar \
--jars ./target/scala-2.12/qbeast-spark-assembly-0.8.0-SNAPSHOT.jar \
--packages io.delta:delta-spark_2.12:3.1.0 \
--conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog
--conf spark.sql.extensions=io.qbeast.sql.QbeastSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=io.qbeast.catalog.QbeastCatalog
```

### 4. Publishing artefacts in the local repository
Expand Down Expand Up @@ -280,8 +280,8 @@ To publish a new version of the qbeast-spark project, follow these steps:
export QBEAST_SPARK_VERSION=0.6.0-SNAPSHOT
$SPARK_350/bin/spark-shell --repositories https://s01.oss.sonatype.org/content/repositories/snapshots \
--packages io.delta:delta-spark_2.12:3.1.0,io.qbeast:qbeast-spark_2.12:$QBEAST_SPARK_VERSION \
--conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog
--conf spark.sql.extensions=io.qbeast.sql.QbeastSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=io.qbeast.catalog.QbeastCatalog

```
6. If everything is ok, change the `build.sbt` with the corresponding version and publish the RC.
Expand Down
44 changes: 38 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,49 @@ import Dependencies._

val mainVersion = "0.8.0-SNAPSHOT"

// Projects
lazy val qbeastCore = (project in file("./core"))
.settings(
name := "qbeast-core",
libraryDependencies ++= Seq(sparkCore % Provided, sparkSql % Provided, sparkml % Provided),
Test / parallelExecution := false,
assembly / test := {},
assembly / assemblyOption := (assembly / assemblyOption).value.copy(includeScala = false))
.settings(noWarningInConsole)

lazy val qbeastDelta = (project in file("./delta"))
.dependsOn(qbeastCore)
.settings(
name := "qbeast-delta",
libraryDependencies ++= Seq(sparkCore % Provided, deltaSpark % Provided, sparkSql % Provided),
Test / parallelExecution := false,
assembly / test := {},
assembly / assemblyOption := (assembly / assemblyOption).value.copy(includeScala = false))
.settings(noWarningInConsole)

lazy val qbeastSpark = (project in file("."))
.dependsOn(qbeastCore, qbeastDelta)
.enablePlugins(ScalaUnidocPlugin)
.settings(
name := "qbeast-spark",
libraryDependencies ++= Seq(
sparkCore % Provided,
sparkSql % Provided,
hadoopClient % Provided,
deltaSpark % Provided,
sparkml % Provided,
apacheCommons % Test,
amazonAws % Test,
hadoopCommons % Test,
sparkml % Test,
hadoopAws % Test),
Test / parallelExecution := false,
assembly / test := {},
assembly / assemblyOption := (assembly / assemblyOption).value.copy(includeScala = false))
.settings(noWarningInConsole)

qbeastCore / Compile / doc / scalacOptions ++= Seq(
"-doc-title",
"qbeast-core",
"-doc-version",
mainVersion,
"-doc-footer",
"Copyright 2022 Qbeast - Docs for version " + mainVersion + " of qbeast-core")

qbeastSpark / Compile / doc / scalacOptions ++= Seq(
"-doc-title",
"qbeast-spark",
Expand All @@ -31,6 +54,14 @@ qbeastSpark / Compile / doc / scalacOptions ++= Seq(
"-doc-footer",
"Copyright 2022 Qbeast - Docs for version " + mainVersion + " of qbeast-spark")

qbeastDelta / Compile / doc / scalacOptions ++= Seq(
"-doc-title",
"qbeast-delta",
"-doc-version",
mainVersion,
"-doc-footer",
"Copyright 2022 Qbeast - Docs for version " + mainVersion + " of qbeast-delta")

// Common metadata
ThisBuild / version := mainVersion
ThisBuild / organization := "io.qbeast"
Expand All @@ -45,6 +76,7 @@ ThisBuild / libraryDependencies ++= Seq(
mockito % Test)

Test / javaOptions ++= Seq("-Xmx10G", "-XX:+UseG1GC")
Test / testOptions += Tests.Argument("-oD")
Test / fork := true

// Scala compiler settings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
*/
package io.qbeast.core.model

import io.qbeast.spark.model.CubeState
import io.qbeast.spark.model.CubeState.CubeStateValue
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.SparkSession

import CubeState.CubeStateValue

/**
* Container for the table changes
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
*/
package io.qbeast.core.model

import org.apache.spark.sql.DataFrame

/**
* ColumnsToIndexSelector interface to automatically select which columns to index.
* @tparam DATA
* the data to index
*/
trait ColumnsToIndexSelector[DATA] {
trait ColumnsToIndexSelector {

/**
* The maximum number of columns to index.
Expand All @@ -34,7 +34,7 @@ trait ColumnsToIndexSelector[DATA] {
* the data to index
* @return
*/
def selectColumnsToIndex(data: DATA): Seq[String] =
def selectColumnsToIndex(data: DataFrame): Seq[String] =
selectColumnsToIndex(data, MAX_COLUMNS_TO_INDEX)

/**
Expand All @@ -46,6 +46,6 @@ trait ColumnsToIndexSelector[DATA] {
* @return
* A sequence with the names of the columns to index
*/
def selectColumnsToIndex(data: DATA, numColumnsToIndex: Int): Seq[String]
def selectColumnsToIndex(data: DataFrame, numColumnsToIndex: Int): Seq[String]

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.qbeast.spark.model
package io.qbeast.core.model

/**
* Names of possible states of the cube
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,13 @@
package io.qbeast.core.model

import io.qbeast.IISeq
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.DataFrame

/**
* Data Writer template
* @tparam DATA
* type of data
* @tparam DataSchema
* type of the data schema
* @tparam FileDescriptor
* type of file descriptor
*/
trait DataWriter[DATA, DataSchema, FileDescriptor] {
trait DataWriter {

/**
* Write the index data to the files
Expand All @@ -43,8 +39,8 @@ trait DataWriter[DATA, DataSchema, FileDescriptor] {
*/
def write(
tableID: QTableID,
schema: DataSchema,
data: DATA,
tableChanges: TableChanges): IISeq[FileDescriptor]
schema: StructType,
data: DataFrame,
tableChanges: TableChanges): IISeq[IndexFile]

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ import scala.collection.immutable
final class IndexFileBuilder {
private var path: Option[String] = None
private var size: Long = 0L
private var dataChange: Boolean = true
private var modificationTime: Long = 0L
private var revisionId: RevisionID = 0L
private val blocks = immutable.Seq.newBuilder[VolatileBlock]
private var stats: Option[QbeastStats] = None

/**
* Sets the path.
Expand Down Expand Up @@ -53,6 +55,32 @@ final class IndexFileBuilder {
this
}

/**
* Sets the stats.
*
* @param stats
* the stats
* @return
* this instance
*/
def setStats(stats: Option[QbeastStats]): IndexFileBuilder = {
this.stats = stats
this
}

/**
* Sets the data change.
*
* @param dataChange
* whether this index file represents a data change
* @return
* this instance
*/
def setDataChange(dataChange: Boolean): IndexFileBuilder = {
this.dataChange = dataChange
this
}

/**
* Sets the modification time
*
Expand Down Expand Up @@ -103,9 +131,11 @@ final class IndexFileBuilder {
IndexFile(
filePath,
size,
dataChange,
modificationTime,
revisionId,
blocks.result().map(_.toBlock(filePath)))
blocks.result().map(_.toBlock(filePath)),
stats)
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@
package io.qbeast.core.model

import io.qbeast.IISeq
import org.apache.spark.sql.DataFrame

/**
* Index Manager template
* @tparam DATA
* type of data to index
*/
trait IndexManager[DATA] {
trait IndexManager {

/**
* Indexes the data
Expand All @@ -33,7 +32,7 @@ trait IndexManager[DATA] {
* @return
* the changes of the index and reorganization of data
*/
def index(data: DATA, indexStatus: IndexStatus): (DATA, TableChanges)
def index(data: DataFrame, indexStatus: IndexStatus): (DataFrame, TableChanges)

/**
* Optimizes the index
Expand All @@ -44,7 +43,7 @@ trait IndexManager[DATA] {
* @return
* the changes on the index and reorganization of data
*/
def optimize(data: DATA, indexStatus: IndexStatus): (DATA, TableChanges)
def optimize(data: DataFrame, indexStatus: IndexStatus): (DataFrame, TableChanges)

/**
* Analyzes the current index status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,14 @@
*/
package io.qbeast.core.model

import io.qbeast.spark.internal.QbeastOptions
import io.qbeast.IISeq
import org.apache.spark.sql.types.StructType

/**
* Metadata Manager template
* @tparam DataSchema
* type of data schema
* @tparam FileDescriptor
* type of file descriptor
* @tparam QbeastOptions
* type of the Qbeast options
*/
trait MetadataManager[DataSchema, FileDescriptor, QbeastOptions] {
trait MetadataManager {
type Configuration = Map[String, String]

/**
Expand All @@ -45,7 +41,7 @@ trait MetadataManager[DataSchema, FileDescriptor, QbeastOptions] {
* @return
* the current schema
*/
def loadCurrentSchema(tableID: QTableID): DataSchema
def loadCurrentSchema(tableID: QTableID): StructType

/**
* Writes and updates the metadata by using transaction control
Expand All @@ -60,9 +56,9 @@ trait MetadataManager[DataSchema, FileDescriptor, QbeastOptions] {
*/
def updateWithTransaction(
tableID: QTableID,
schema: DataSchema,
schema: StructType,
options: QbeastOptions,
append: Boolean)(writer: => (TableChanges, IISeq[FileDescriptor])): Unit
append: Boolean)(writer: => (TableChanges, IISeq[IndexFile], IISeq[DeleteFile])): Unit

/**
* Updates the table metadata by overwriting the metadata configurations with the provided
Expand All @@ -74,7 +70,7 @@ trait MetadataManager[DataSchema, FileDescriptor, QbeastOptions] {
* @param update
* configurations used to overwrite the existing metadata
*/
def updateMetadataWithTransaction(tableID: QTableID, schema: DataSchema)(
def updateMetadataWithTransaction(tableID: QTableID, schema: StructType)(
update: => Configuration): Unit

/**
Expand Down Expand Up @@ -127,6 +123,7 @@ trait MetadataManager[DataSchema, FileDescriptor, QbeastOptions] {
/**
* Creates an initial log directory
* @param tableID
* table ID
*/
def createLog(tableID: QTableID): Unit

Expand Down
Loading
Loading