Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-37290][SQL] - Exponential planning time in case of non-deterministic function #35233

Closed
wants to merge 4 commits into from

Conversation

Stelyus
Copy link
Contributor

@Stelyus Stelyus commented Jan 17, 2022

What changes were proposed in this pull request?

When using non-deterministic function, the method getAllValidConstraints can throw an OOM

  protected def getAllValidConstraints(projectList: Seq[NamedExpression]): ExpressionSet = {
    var allConstraints = child.constraints
    projectList.foreach {
      case a @ Alias(l: Literal, _) =>
        allConstraints += EqualNullSafe(a.toAttribute, l)
      case a @ Alias(e, _) =>
        // For every alias in `projectList`, replace the reference in constraints by its attribute.
        allConstraints ++= allConstraints.map(_ transform {
          case expr: Expression if expr.semanticEquals(e) =>
            a.toAttribute
        })
        allConstraints += EqualNullSafe(e, a.toAttribute)
      case _ => // Don't change.
    }

    allConstraints
  }

In particular, this line allConstraints ++= allConstraints.map(...) can generate an exponential number of expressions
This is because non deterministic functions are considered unique in a ExpressionSet
Therefore, the number of non-deterministic expressions double every time we go through this line

We can filter and keep only deterministic expression because
1 - the semanticEquals automatically discard non deterministic expressions
2 - this method is only used in one code path, and we keep only determinic expressions

lazy val constraints: ExpressionSet = {
    if (conf.constraintPropagationEnabled) {
      validConstraints
        .union(inferAdditionalConstraints(validConstraints))
        .union(constructIsNotNullConstraints(validConstraints, output))
        .filter { c =>
          c.references.nonEmpty && c.references.subsetOf(outputSet) && c.deterministic
        }
    } else {
      ExpressionSet()
    }
  }

Why are the changes needed?

It can lead to an exponential number of expressions and / or OOM

Does this PR introduce any user-facing change?

No

How was this patch tested?

Local test

@github-actions github-actions bot added the SQL label Jan 17, 2022
@Stelyus Stelyus changed the title Spark 37290 [SPARK-37290][SQL] - Exponential planning time in case of non-deterministic function Jan 17, 2022
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Jan 18, 2022

How was this patch tested?

Can we either add a unittest or describe how you tested?

@Stelyus
Copy link
Contributor Author

Stelyus commented Jan 18, 2022

Tested with:

val adselect_raw = spark.createDataFrame(Seq(("imp-1",1),("imp-2",2)))
    .cache()
val adselect = adselect_raw.select(
        expr("uuid()").alias("userUuid"),
        expr("_1").alias("impressionUuid"),
        expr("_1").alias("accessDateTime"),
        expr("_1").alias("publisher"),
        expr("_1").alias("site"),
        expr("_1").alias("placement"),
        expr("_1").alias("advertiser"),
        expr("_1").alias("campaign"),
        expr("_1").alias("lineItem"),
        expr("_1").alias("creative"),
        expr("_1").alias("browserLanguage"),
        expr("_1").alias("geoLocode"),
        expr("_1").alias("osFamily"),
        expr("_1").alias("osName"),
        expr("_1").alias("browserName"),
        expr("_1").alias("referrerDomain"),
        expr("_1").alias("placementIabCategory"),
        expr("_1").alias("placementDeviceGroup"),
        expr("_1").alias("placementDevice"),
        expr("_1").alias("placementVideoType"),
        expr("_1").alias("placementSection"),
        expr("_1").alias("placementPlayer"),
        expr("_1").alias("demandType"),
        expr("_1").alias("techCosts"),
        expr("_1").alias("mediaCosts"),
        expr("_1").alias("directSPrice"),
        expr("_1").alias("network"),
        expr("_1").alias("deviceSetting"),
        expr("_1").alias("placementGroup"),
        expr("_1").alias("postalCode"),
        expr("_1").alias("householdId")
    )

val adcount_raw = spark.createDataFrame(Seq(("imp-1", 1), ("imp-2", 2)))
val adcount = adcount_raw.select(
        expr("_1").alias("impressionUuid"),
        expr("_2").alias("accessDateTime")
    )

val result =  adselect.join(adcount, Seq("impressionUuid"))
result.explain()

On my side, I get an OOM

@bersprockets
Copy link
Contributor

@Stelyus

Maybe a more concise reproduction test would be:

val df = spark.range(10).select((1 to 28).map(i => $"id".alias(s"col$i")): _*)
df.selectExpr("rand() as col0", "*").createOrReplaceTempView("tbl1")
sql("select l.* from tbl1 l join tbl1 r on l.col1 = r.col1").explain

Or, if you use spark-sql:

create or replace temp view tbl as
select rand() c0, id c1, id c2, id c3, id c4, id c5, id c6, id c7, id c8, id c9, id c10, id c11,
  id c12, id c13, id c14, id c15, id c16, id c17, id c18, id c19, id c20, id c21, id c22, id c23,
  id c24, id c25, id c26, id c27, id c28
from range(10);

explain select l.* from tbl l join tbl r on l.c1 = r.c1;

Both will blow out a 5G driver.

This was introduced by #29598, which changed allConstraints in getAllValidConstraints from a Set[Expression] to an ExpressionSet. Before the change, the non-deterministic expression could be added only once. Now, as you already explained, the non-deterministic expression gets re-added (doubling in number for each item in projectList) because ExpressionSet.originals is an ArrayBuffer, not a Set.

I wonder if ExpressionSet.originals could also be a Set?

cc @hvanhovell @dbaliafroozeh

@Stelyus
Copy link
Contributor Author

Stelyus commented Feb 14, 2022

@gengliangwang @cloud-fan @HyukjinKwon could you please have a look ?

@cloud-fan
Copy link
Contributor

makes sense to me, cc @maryannxue @sigmod

@@ -185,7 +185,7 @@ trait UnaryNode extends LogicalPlan with UnaryLike[LogicalPlan] {
allConstraints += EqualNullSafe(a.toAttribute, l)
case a @ Alias(e, _) =>
// For every alias in `projectList`, replace the reference in constraints by its attribute.
allConstraints ++= allConstraints.map(_ transform {
allConstraints ++= allConstraints.filter(_.deterministic).map(_ transform {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you know how the allConstraints include nondeterministic expressions at the first place? It starts with child.constraints, and looking at the implementation, it should only produce deterministic expressions
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala#L38

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -185,7 +185,7 @@ trait UnaryNode extends LogicalPlan with UnaryLike[LogicalPlan] {
allConstraints += EqualNullSafe(a.toAttribute, l)
case a @ Alias(e, _) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will it be sufficient to add if e.deterministic here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, updated the PR

@cloud-fan
Copy link
Contributor

thanks, merging to master/3.2/3.1!

@cloud-fan cloud-fan closed this in 881f562 Feb 22, 2022
cloud-fan pushed a commit that referenced this pull request Feb 22, 2022
…nistic function

### What changes were proposed in this pull request?

When using non-deterministic function, the method getAllValidConstraints can throw an OOM

```
  protected def getAllValidConstraints(projectList: Seq[NamedExpression]): ExpressionSet = {
    var allConstraints = child.constraints
    projectList.foreach {
      case a  Alias(l: Literal, _) =>
        allConstraints += EqualNullSafe(a.toAttribute, l)
      case a  Alias(e, _) =>
        // For every alias in `projectList`, replace the reference in constraints by its attribute.
        allConstraints ++= allConstraints.map(_ transform {
          case expr: Expression if expr.semanticEquals(e) =>
            a.toAttribute
        })
        allConstraints += EqualNullSafe(e, a.toAttribute)
      case _ => // Don't change.
    }

    allConstraints
  }
```
In particular, this line `allConstraints ++= allConstraints.map(...)` can generate an exponential number of expressions
This is because non deterministic functions are considered unique in a ExpressionSet
Therefore, the number of non-deterministic expressions double every time we go through this line

We can filter and keep only deterministic expression because
1 - the `semanticEquals` automatically discard non deterministic expressions
2 - this method is only used in one code path, and we keep only determinic expressions
```
lazy val constraints: ExpressionSet = {
    if (conf.constraintPropagationEnabled) {
      validConstraints
        .union(inferAdditionalConstraints(validConstraints))
        .union(constructIsNotNullConstraints(validConstraints, output))
        .filter { c =>
          c.references.nonEmpty && c.references.subsetOf(outputSet) && c.deterministic
        }
    } else {
      ExpressionSet()
    }
  }
```

### Why are the changes needed?
It can lead to an exponential number of expressions and / or OOM

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

Local test

Closes #35233 from Stelyus/SPARK-37290.

Authored-by: Franck Thang <stelyus@outlook.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 881f562)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan pushed a commit that referenced this pull request Feb 22, 2022
…nistic function

### What changes were proposed in this pull request?

When using non-deterministic function, the method getAllValidConstraints can throw an OOM

```
  protected def getAllValidConstraints(projectList: Seq[NamedExpression]): ExpressionSet = {
    var allConstraints = child.constraints
    projectList.foreach {
      case a  Alias(l: Literal, _) =>
        allConstraints += EqualNullSafe(a.toAttribute, l)
      case a  Alias(e, _) =>
        // For every alias in `projectList`, replace the reference in constraints by its attribute.
        allConstraints ++= allConstraints.map(_ transform {
          case expr: Expression if expr.semanticEquals(e) =>
            a.toAttribute
        })
        allConstraints += EqualNullSafe(e, a.toAttribute)
      case _ => // Don't change.
    }

    allConstraints
  }
```
In particular, this line `allConstraints ++= allConstraints.map(...)` can generate an exponential number of expressions
This is because non deterministic functions are considered unique in a ExpressionSet
Therefore, the number of non-deterministic expressions double every time we go through this line

We can filter and keep only deterministic expression because
1 - the `semanticEquals` automatically discard non deterministic expressions
2 - this method is only used in one code path, and we keep only determinic expressions
```
lazy val constraints: ExpressionSet = {
    if (conf.constraintPropagationEnabled) {
      validConstraints
        .union(inferAdditionalConstraints(validConstraints))
        .union(constructIsNotNullConstraints(validConstraints, output))
        .filter { c =>
          c.references.nonEmpty && c.references.subsetOf(outputSet) && c.deterministic
        }
    } else {
      ExpressionSet()
    }
  }
```

### Why are the changes needed?
It can lead to an exponential number of expressions and / or OOM

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

Local test

Closes #35233 from Stelyus/SPARK-37290.

Authored-by: Franck Thang <stelyus@outlook.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 881f562)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, late LGTM. Thank you, @Stelyus and all.

kazuyukitanimura pushed a commit to kazuyukitanimura/spark that referenced this pull request Aug 10, 2022
…nistic function

### What changes were proposed in this pull request?

When using non-deterministic function, the method getAllValidConstraints can throw an OOM

```
  protected def getAllValidConstraints(projectList: Seq[NamedExpression]): ExpressionSet = {
    var allConstraints = child.constraints
    projectList.foreach {
      case a  Alias(l: Literal, _) =>
        allConstraints += EqualNullSafe(a.toAttribute, l)
      case a  Alias(e, _) =>
        // For every alias in `projectList`, replace the reference in constraints by its attribute.
        allConstraints ++= allConstraints.map(_ transform {
          case expr: Expression if expr.semanticEquals(e) =>
            a.toAttribute
        })
        allConstraints += EqualNullSafe(e, a.toAttribute)
      case _ => // Don't change.
    }

    allConstraints
  }
```
In particular, this line `allConstraints ++= allConstraints.map(...)` can generate an exponential number of expressions
This is because non deterministic functions are considered unique in a ExpressionSet
Therefore, the number of non-deterministic expressions double every time we go through this line

We can filter and keep only deterministic expression because
1 - the `semanticEquals` automatically discard non deterministic expressions
2 - this method is only used in one code path, and we keep only determinic expressions
```
lazy val constraints: ExpressionSet = {
    if (conf.constraintPropagationEnabled) {
      validConstraints
        .union(inferAdditionalConstraints(validConstraints))
        .union(constructIsNotNullConstraints(validConstraints, output))
        .filter { c =>
          c.references.nonEmpty && c.references.subsetOf(outputSet) && c.deterministic
        }
    } else {
      ExpressionSet()
    }
  }
```

### Why are the changes needed?
It can lead to an exponential number of expressions and / or OOM

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

Local test

Closes apache#35233 from Stelyus/SPARK-37290.

Authored-by: Franck Thang <stelyus@outlook.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 881f562)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants