Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Raise estimatedCubeWeights group size #104

Closed
osopardo1 opened this issue May 25, 2022 · 0 comments · Fixed by #107
Closed

Raise estimatedCubeWeights group size #104

osopardo1 opened this issue May 25, 2022 · 0 comments · Fixed by #107
Assignees
Labels
type: bug Something isn't working

Comments

@osopardo1
Copy link
Member

What went wrong?

Since we are building the index in a distributed way and we have an overall desiredCubeSize, we need to calculate what would be the desiredGroupCubeSize for each partition in order to reach that size.

Right now, we are doing it with the method estimateGroupCubeSize, that has this implementation:

def estimateGroupCubeSize(
      desiredCubeSize: Int,
      numPartitions: Int,
      numElements: Long,
      bufferCapacity: Long): Int = {
    val numGroups = Math.max(numPartitions, numElements / bufferCapacity)
    val groupCubeSize = desiredCubeSize / numGroups
    Math.max(1, groupCubeSize.toInt)

But we have seen that with a large dataset, the group size estimated happens to be 1. This caused a poor estimation of the CubeWeights with a lot of cubes never used in the final result.

One solution is to raise the minimum number up to 1000, to be able to have a more descriptive representation of the elements in a group.

How to reproduce?

You can use different tests in the code.

1. Code that triggered the bug, or steps to reproduce:

// your-code-here

case class Client3(id: Long, name: String, age: Int, val2: Long, val3: Double)

val spark = SparkSession.active

val rdd =
  spark.sparkContext.parallelize(
    0.to(100000)
      .map(i => Client3(i * i, s"student-$i", i, i * 1000 + 123, i * 2567.3432143)))

val df = spark.createDataFrame(rdd)
val rev = SparkRevisionFactory.createNewRevision(QTableID("test"), df.schema, options)

val (indexed, tc: BroadcastedTableChanges) = oTreeAlgorithm.index(df, IndexStatus(rev))

tc.cubeWeights.size // check this number to see how many cubes are estimated
indexed.select("_qbeastCube").distinct().count() // check this number to see how many cubes are written on the data

2. Branch and commit id:

main at 55442d9

3. Spark version:

On the spark shell run spark.version.

3.1.2

4. Hadoop version:

On the spark shell run org.apache.hadoop.util.VersionInfo.getVersion().

3.2.0

5. How are you running Spark?

Are you running Spark inside a container? Are you launching the app on a remote K8s cluster? Or are you just running the tests in a local computer?

Local spark

6. Stack trace:

Trace of the log/error messages.

Non

@osopardo1 osopardo1 added the type: bug Something isn't working label May 25, 2022
This was linked to pull requests May 27, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: bug Something isn't working
Projects
None yet
2 participants