Skip to content

Commit

Permalink
[SPARK-33935][SQL][2.4] Fix CBO cost function
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Changed the cost function in CBO to match documentation.

### Why are the changes needed?

The parameter `spark.sql.cbo.joinReorder.card.weight` is documented as:
```
The weight of cardinality (number of rows) for plan cost comparison in join reorder: rows * weight + size * (1 - weight).
```
The implementation in `JoinReorderDP.betterThan` does not match this documentaiton:
```
def betterThan(other: JoinPlan, conf: SQLConf): Boolean = {
      if (other.planCost.card == 0 || other.planCost.size == 0) {
        false
      } else {
        val relativeRows = BigDecimal(this.planCost.card) / BigDecimal(other.planCost.card)
        val relativeSize = BigDecimal(this.planCost.size) / BigDecimal(other.planCost.size)
        relativeRows * conf.joinReorderCardWeight +
          relativeSize * (1 - conf.joinReorderCardWeight) < 1
      }
    }
```

This different implementation has an unfortunate consequence:
given two plans A and B, both A betterThan B and B betterThan A might give the same results. This happes when one has many rows with small sizes and other has few rows with large sizes.

A example values, that have this fenomen with the default weight value (0.7):
A.card = 500, B.card = 300
A.size = 30, B.size = 80
Both A betterThan B and B betterThan A would have score above 1 and would return false.

This happens with several of the TPCDS queries.

The new implementation does not have this behavior.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

New and existing UTs

Closes #31043 from tanelk/SPARK-33935_cbo_cost_function_2.4.

Authored-by: Tanel Kiis <tanel.kiis@reach-u.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
  • Loading branch information
Tanel Kiis authored and maropu committed Jan 6, 2021
1 parent 45e19bb commit 3e6a6b7
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -344,14 +344,11 @@ object JoinReorderDP extends PredicateHelper with Logging {
}

def betterThan(other: JoinPlan, conf: SQLConf): Boolean = {
if (other.planCost.card == 0 || other.planCost.size == 0) {
false
} else {
val relativeRows = BigDecimal(this.planCost.card) / BigDecimal(other.planCost.card)
val relativeSize = BigDecimal(this.planCost.size) / BigDecimal(other.planCost.size)
relativeRows * conf.joinReorderCardWeight +
relativeSize * (1 - conf.joinReorderCardWeight) < 1
}
val thisCost = BigDecimal(this.planCost.card) * conf.joinReorderCardWeight +
BigDecimal(this.planCost.size) * (1 - conf.joinReorderCardWeight)
val otherCost = BigDecimal(other.planCost.card) * conf.joinReorderCardWeight +
BigDecimal(other.planCost.size) * (1 - conf.joinReorderCardWeight)
thisCost < otherCost
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
import org.apache.spark.sql.catalyst.optimizer.JoinReorderDP.JoinPlan
import org.apache.spark.sql.catalyst.plans.{Cross, Inner, PlanTest}
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
Expand Down Expand Up @@ -315,4 +316,18 @@ class JoinReorderSuite extends PlanTest with StatsEstimationTestBase {
case (a1, a2) => a1.semanticEquals(a2)
}
}

test("SPARK-33935: betterThan should be consistent") {
val plan1 = JoinPlan(null, null, null, Cost(300, 80))
val plan2 = JoinPlan(null, null, null, Cost(500, 30))

// cost1 = 300*0.7 + 80*0.3 = 234
// cost2 = 500*0.7 + 30*0.3 = 359

assert(!plan1.betterThan(plan1, conf))
assert(!plan2.betterThan(plan2, conf))

assert(plan1.betterThan(plan2, conf))
assert(!plan2.betterThan(plan1, conf))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -293,12 +293,12 @@ class StarJoinCostBasedReorderSuite extends PlanTest with StatsEstimationTestBas
(nameToAttr("f1_fk2") === nameToAttr("d2_pk")))

val expected =
f1.join(d2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("d2_pk")))
.join(d1, Inner, Some(nameToAttr("f1_fk1") === nameToAttr("d1_pk")))
.join(t3.join(t4, Inner, Some(nameToAttr("t3_c1") === nameToAttr("t4_c1"))), Inner,
Some(nameToAttr("t3_c1") === nameToAttr("t4_c1")))
t3.join(t4, Inner, Some(nameToAttr("t3_c1") === nameToAttr("t4_c1")))
.join(t1.join(t2, Inner, Some(nameToAttr("t1_c1") === nameToAttr("t2_c1"))), Inner,
Some(nameToAttr("t1_c2") === nameToAttr("t4_c2")))
.join(f1
.join(d2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("d2_pk")))
.join(d1, Inner, Some(nameToAttr("f1_fk1") === nameToAttr("d1_pk"))))
.select(outputsOf(d1, t1, t2, t3, t4, f1, d2): _*)

assertEqualPlans(query, expected)
Expand Down

0 comments on commit 3e6a6b7

Please sign in to comment.