Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19372][SQL] Fix throwing a Java exception at df.fliter() due to 64KB bytecode size limit #17087

Closed
wants to merge 15 commits into from

Conversation

kiszk
Copy link
Member

@kiszk kiszk commented Feb 27, 2017

What changes were proposed in this pull request?

When an expression for df.filter() has many nodes (e.g. 400), the size of Java bytecode for the generated Java code is more than 64KB. It produces an Java exception. As a result, the execution fails.
This PR continues to execute by calling Expression.eval() disabling code generation if an exception has been caught.

How was this patch tested?

Add a test suite into DataFrameSuite

@SparkQA
Copy link

SparkQA commented Feb 27, 2017

Test build #73530 has finished for PR 17087 at commit 6f40a93.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented Mar 6, 2017

@davies, could you please review this?

@srowen
Copy link
Member

srowen commented Mar 6, 2017

Is it right to just do this in one code path? why not all similar cases where the 64k limit is exceeded?
Can the underlying problem be fixed or improved?
You're also catching all Exceptions, why not something more specific? is it possible?
It feels like a funny change to make for these reasons.

@kiszk
Copy link
Member Author

kiszk commented Mar 6, 2017

I already identified where 64k limit is exceeded. As a result, I think that it is not easy to fix the issue.
The previous issues are derived from independent blocks (in Seq[ExprCode]) that can be splited into multiple methods. On the other hand, it is not easy split a tree with many nodes for Expression (in ExprCode) that cannot be into multiple methods. It is very hard to split a ExprCode into multiple correct ExprCodes.
I think that we have to redesign how to keep generated Java code for a tree for Expression .

For catching exception, you are right. I have to catch only an exception related to compilation errors. I will address it by handling nested exceptions. I will address it.

@SparkQA
Copy link

SparkQA commented Mar 6, 2017

Test build #74019 has finished for PR 17087 at commit 98cd961.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@marmbrus
Copy link
Contributor

marmbrus commented Mar 7, 2017

I agree with the general approach of having a fallback from code generation to interpreted evaluation, but I also agree that this feels too narrowly targeted. In particular, why do this in one operator rather than in newPredicate (or maybe even in codegen itself).

Another thing that maybe @davies can comment on, I thought we already had this fallback implemented? So I'm curious why its not already handling this test case. Maybe there is an existing mechanism we just need to make more general.

}
} catch {
// JaninoRuntimeException is in a nested exception if Java compilation error occurs
case e: Exception if ExceptionUtils.getRootCause(e).isInstanceOf[JaninoRuntimeException] =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than walk the exception tree, should we just make our wrapping more specific?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I will make this wrapping more specific.

@kiszk
Copy link
Member Author

kiszk commented Mar 8, 2017

@marmbrus thank you for your comments.

For feedback mechanism, I imagine that you are talking about this. When the whole-stage codegen is enabled, this fallback works.
In this case, the whole-stage codegen is disabled since the number of fields are big and these two variables will have false.

I agree that we should prepare more generic approach that can be applicable to more cases. However, it is not easy to implement for now. It would require some refactoring.
As one of simple refactoring, if an exception occurs, newPredicate returns null or none. Then, caller of newPredicate always checks return value for handling of code generation failure. It make caller code simpler.
While more complex refactoring could be applicable, another PR should address it.

What do you think?

FYI: I have just noticed that this problem may occur at CartesianProductExec and InMemoryTableScanExec. I have to do the same thing in these two places.

@marmbrus
Copy link
Contributor

marmbrus commented Mar 8, 2017

I don't think we need a complex refactoring. Why can't newPredicate catch the exception, log a warning and return an interpreted Predicate?

@kiszk
Copy link
Member Author

kiszk commented Mar 9, 2017

I am refactoring newPredicate. newPredicate will catch the exception and log a warning. However, I think that newPredicate cannot return an interpreted result. This is because code generation return Predicate while BindReferences.bindReference(condition, child.output) returns Expression. These two classes exists in different class hierarchy.

@kiszk
Copy link
Member Author

kiszk commented Mar 9, 2017

@marmbrus I have just commit the code of intermediate refactoring. Would it be possible to give comments?
If it is fine (return null and check it at caller), I will update other caller sites for newPredicate.

@SparkQA
Copy link

SparkQA commented Mar 9, 2017

Test build #74262 has finished for PR 17087 at commit 0e2bbe7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@marmbrus
Copy link
Contributor

marmbrus commented Mar 9, 2017

There appears to have been some code drift (as GeneratePredicate and InterpretedPredicate both used to return a class that inherited from a common interface), but I don't think its hard to just isolate the fault handling in newPredicate.

Just fix InterpretedPredicate to actually return a Predicate rather than a bare lambda function. The code there already handles binding and evaluation.

@kiszk
Copy link
Member Author

kiszk commented Mar 10, 2017

Thank you for pointing out InterpretedPredicate. Now, newPredicate always returns Predicate that can be executed by calling eval().
It looks simpler and better than the first implementation.

@SparkQA
Copy link

SparkQA commented Mar 10, 2017

Test build #74310 has finished for PR 17087 at commit 5fb413f.

  • This patch fails to build.
  • This patch does not merge cleanly.
  • This patch adds the following public classes (experimental):
  • class InterpretedPredicate(expression: Expression) extends GenPredicate

@SparkQA
Copy link

SparkQA commented Mar 10, 2017

Test build #74311 has finished for PR 17087 at commit c02589c.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 10, 2017

Test build #74313 has finished for PR 17087 at commit a2f85cd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class InterpretedPredicate(expression: Expression) extends GenPredicate

@SparkQA
Copy link

SparkQA commented Mar 11, 2017

Test build #74374 has finished for PR 17087 at commit c5fc5f1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class InterpretedPredicate(expression: Expression) extends GenPredicate

@@ -372,7 +374,7 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co
try {
CodeGenerator.compile(cleanedSource)
} catch {
case e: Exception if !Utils.isTesting && sqlContext.conf.wholeStageFallback =>
case e: JaninoRuntimeException if !Utils.isTesting && sqlContext.conf.wholeStageFallback =>
Copy link
Member

@viirya viirya Mar 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CodeGenerator.doCompile catches Exception and re-throw Exception.

Copy link
Member

@viirya viirya Mar 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can other exceptions be thrown during compiling?

If other exceptions causing compiling failed happens, I think we still need fallback to non wholestage execution?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, reverted

@@ -951,10 +965,10 @@ object CodeGenerator extends Logging {
evaluator.cook("generated.java", code.body)
recordCompilationStats(evaluator)
} catch {
case e: Exception =>
case e: JaninoRuntimeException =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like CompileException can be thrown from janino?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for pointing out . Now, CompileException can be caught.

@@ -355,7 +357,21 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ

protected def newPredicate(
expression: Expression, inputSchema: Seq[Attribute]): GenPredicate = {
GeneratePredicate.generate(expression, inputSchema)
try {
GeneratePredicate.generate(expression, inputSchema)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we only do this fallback if sqlContext.conf.wholeStageFallback is turned on?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is good to control it using an option. This is not a part of the whole-stage codegen.
Is it better to use sqlContext.conf.wholeStageFallback or add sqlContext.conf.codegenFallback?
What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if it makes sense that wholeStageFallback is false and this new option is true, or vice verse.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, now look at sqlContext.conf.wholeStageFallback.

@@ -32,6 +32,7 @@ import org.apache.spark.sql.types.LongType
import org.apache.spark.util.ThreadUtils
import org.apache.spark.util.random.{BernoulliCellSampler, PoissonSampler}


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Extra space line.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, done

}
logWarning(s"Codegen disabled for this expression:\n $logMessage")
InterpretedPredicate.create(expression, inputSchema)
case e: Exception =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case can be removed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, done

}

class InterpretedPredicate(expression: Expression) extends GenPredicate {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change necessary?

Btw, let InterpretedPredicate extends GenPredicate looks weird logically.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First, this change is necessary to return Predicate for InterpretedPredicate.

For GenPredicate, I used the same naming convention in SparkPlan.scala. If we use Predicate, we will have a conflict with others. What name would it be better instead of GenPredicate?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BasePredicate?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, looks good. Done

@SparkQA
Copy link

SparkQA commented Mar 13, 2017

Test build #74428 has started for PR 17087 at commit 11f56d1.

@kiszk
Copy link
Member Author

kiszk commented Mar 13, 2017

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Mar 13, 2017

Test build #74435 has finished for PR 17087 at commit 11f56d1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 13, 2017

Test build #74442 has finished for PR 17087 at commit 530f84f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class InterpretedPredicate(expression: Expression) extends BasePredicate

@SparkQA
Copy link

SparkQA commented Apr 20, 2017

Test build #75999 has finished for PR 17087 at commit 8b6ba75.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented Apr 21, 2017

@marmbrus could you please take a look?

@kiszk
Copy link
Member Author

kiszk commented Apr 30, 2017

ping @marmbrus

}

class InterpretedPredicate(expression: Expression) extends BasePredicate {
def eval(r: InternalRow): Boolean = expression.eval(r).asInstanceOf[Boolean]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: override def eval...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, done for both.

}

class InterpretedPredicate(expression: Expression) extends BasePredicate {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: case class

// Cache.get() may wrap the original exception. See the following URL
// http://google.github.io/guava/releases/14.0/api/docs/com/google/common/cache/
// Cache.html#get(K,%20java.util.concurrent.Callable)
case e : UncheckedExecutionException =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use the following simple codes:

case e@(_: UncheckedExecutionException | _: ExecutionError) =>
    throw e.getCause

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, done. thanks.

try {
GeneratePredicate.generate(expression, inputSchema)
} catch {
case e: JaninoRuntimeException if sqlContext == null || sqlContext.conf.wholeStageFallback =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It's better to add a debug log to print e. you can merge two cases like this:

      case e@(_: JaninoRuntimeException | _: CompileException)
        if sqlContext == null || sqlContext.conf.wholeStageFallback =>
        logDebug(e.getMessage, e)
        genInterpretedPredicate(expression, inputSchema)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, done. Thank you.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgot this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kiszk could you fix this as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zsxwing Sorry, I made mistake. Now, I pushed it.

@@ -353,9 +356,28 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
GenerateMutableProjection.generate(expressions, inputSchema, useSubexprElimination)
}

private def genInterpretedPredicate(
expression: Expression, inputSchema: Seq[Attribute]): InterpretedPredicate = {
val str = expression.toString
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Expression.toString will truncate too big expression. Right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, Expression.toString truncates something. However, it does not work for this case. Thus, I did not change here.

case e: CompileException =>
val msg = s"failed to compile: $e\n$formatted"
logError(msg, e)
throw new CompileException(msg, e.asInstanceOf[CompileException].getLocation)
Copy link
Member

@zsxwing zsxwing May 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use throw new CompileException(msg, e.getLocation, e)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, done.

Copy link
Member

@zsxwing zsxwing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made one pass. This looks good. Most of my comments are style issues.

@SparkQA
Copy link

SparkQA commented May 12, 2017

Test build #76862 has finished for PR 17087 at commit 1f19c80.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class InterpretedPredicate(expression: Expression) extends BasePredicate

case e @ (_: UncheckedExecutionException | _: ExecutionError) =>
val excChains = ExceptionUtils.getThrowables(e)
val exc = if (excChains.length == 1) excChains(0) else excChains(excChains.length - 2)
throw exc
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use e.getCause?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, done

@SparkQA
Copy link

SparkQA commented May 16, 2017

Test build #76949 has finished for PR 17087 at commit 3868bf5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 16, 2017

Test build #76972 has finished for PR 17087 at commit a5fd465.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented May 16, 2017

LGTM. Thanks! Merging to master.

@asfgit asfgit closed this in 6f62e9d May 16, 2017
robert3005 pushed a commit to palantir/spark that referenced this pull request May 19, 2017
…o 64KB bytecode size limit

## What changes were proposed in this pull request?

When an expression for `df.filter()` has many nodes (e.g. 400), the size of Java bytecode for the generated Java code is more than 64KB. It produces an Java exception. As a result, the execution fails.
This PR continues to execute by calling `Expression.eval()` disabling code generation if an exception has been caught.

## How was this patch tested?

Add a test suite into `DataFrameSuite`

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes apache#17087 from kiszk/SPARK-19372.
liyichao pushed a commit to liyichao/spark that referenced this pull request May 24, 2017
…o 64KB bytecode size limit

## What changes were proposed in this pull request?

When an expression for `df.filter()` has many nodes (e.g. 400), the size of Java bytecode for the generated Java code is more than 64KB. It produces an Java exception. As a result, the execution fails.
This PR continues to execute by calling `Expression.eval()` disabling code generation if an exception has been caught.

## How was this patch tested?

Add a test suite into `DataFrameSuite`

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes apache#17087 from kiszk/SPARK-19372.
@dongjoon-hyun
Copy link
Member

Hi, All.
I'm wondering if it is too late for Spark 2.2.0 to include this.
Is this too risky for that?

asfgit pushed a commit that referenced this pull request May 26, 2017
…o 64KB bytecode size limit

## What changes were proposed in this pull request?

When an expression for `df.filter()` has many nodes (e.g. 400), the size of Java bytecode for the generated Java code is more than 64KB. It produces an Java exception. As a result, the execution fails.
This PR continues to execute by calling `Expression.eval()` disabling code generation if an exception has been caught.

## How was this patch tested?

Add a test suite into `DataFrameSuite`

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #17087 from kiszk/SPARK-19372.
poplav pushed a commit to poplav/spark that referenced this pull request Aug 14, 2017
…o 64KB bytecode size limit

When an expression for `df.filter()` has many nodes (e.g. 400), the size of Java bytecode for the generated Java code is more than 64KB. It produces an Java exception. As a result, the execution fails.
This PR continues to execute by calling `Expression.eval()` disabling code generation if an exception has been caught.

Add a test suite into `DataFrameSuite`

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes apache#17087 from kiszk/SPARK-19372.
@poplav
Copy link

poplav commented Aug 15, 2017

Hi, All. I am trying to get this included into Spark 2.1.1. I opened a PR #18942.

poplav pushed a commit to poplav/spark that referenced this pull request Aug 17, 2017
…o 64KB bytecode size limit

When an expression for `df.filter()` has many nodes (e.g. 400), the size of Java bytecode for the generated Java code is more than 64KB. It produces an Java exception. As a result, the execution fails.
This PR continues to execute by calling `Expression.eval()` disabling code generation if an exception has been caught.

Add a test suite into `DataFrameSuite`

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes apache#17087 from kiszk/SPARK-19372.
GeneratePredicate.generate(expression, inputSchema)
} catch {
case e @ (_: JaninoRuntimeException | _: CompileException)
if sqlContext == null || sqlContext.conf.wholeStageFallback =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sqlContext.conf.wholeStageFallback is almost useless here, because almost all the cases this will be done in executors.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty risky if we always fallback. This might hide bugs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me fix it by another PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants