Skip to content

Commit

Permalink
[SPARK-19372][SQL] Fix throwing a Java exception at df.fliter() due t…
Browse files Browse the repository at this point in the history
…o 64KB bytecode size limit

## What changes were proposed in this pull request?

When an expression for `df.filter()` has many nodes (e.g. 400), the size of Java bytecode for the generated Java code is more than 64KB. It produces an Java exception. As a result, the execution fails.
This PR continues to execute by calling `Expression.eval()` disabling code generation if an exception has been caught.

## How was this patch tested?

Add a test suite into `DataFrameSuite`

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes apache#17087 from kiszk/SPARK-19372.
  • Loading branch information
kiszk authored and Robert Kruszewski committed May 19, 2017
1 parent 32c62a2 commit f1a7bf7
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -908,14 +908,8 @@ object CodeGenerator extends Logging {
// Cache.get() may wrap the original exception. See the following URL
// http://google.github.io/guava/releases/14.0/api/docs/com/google/common/cache/
// Cache.html#get(K,%20java.util.concurrent.Callable)
case e : UncheckedExecutionException =>
val excChains = ExceptionUtils.getThrowables(e)
val exc = if (excChains.length == 1) excChains(0) else excChains(excChains.length - 2)
throw exc
case e : ExecutionError =>
val excChains = ExceptionUtils.getThrowables(e)
val exc = if (excChains.length == 1) excChains(0) else excChains(excChains.length - 2)
throw exc
case e @ (_: UncheckedExecutionException | _: ExecutionError) =>
throw e.getCause
}

/**
Expand Down Expand Up @@ -973,7 +967,7 @@ object CodeGenerator extends Logging {
case e: CompileException =>
val msg = s"failed to compile: $e\n$formatted"
logError(msg, e)
throw new CompileException(msg, e.asInstanceOf[CompileException].getLocation)
throw new CompileException(msg, e.getLocation)
}
evaluator.getClazz().newInstance().asInstanceOf[GeneratedClass]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ object InterpretedPredicate {
def create(expression: Expression): InterpretedPredicate = new InterpretedPredicate(expression)
}

class InterpretedPredicate(expression: Expression) extends BasePredicate {
def eval(r: InternalRow): Boolean = expression.eval(r).asInstanceOf[Boolean]
case class InterpretedPredicate(expression: Expression) extends BasePredicate {
override def eval(r: InternalRow): Boolean = expression.eval(r).asInstanceOf[Boolean]
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,9 +373,8 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
try {
GeneratePredicate.generate(expression, inputSchema)
} catch {
case e: JaninoRuntimeException if sqlContext == null || sqlContext.conf.wholeStageFallback =>
genInterpretedPredicate(expression, inputSchema)
case e: CompileException if sqlContext == null || sqlContext.conf.wholeStageFallback =>
case e @ (_: JaninoRuntimeException | _: CompileException)
if sqlContext == null || sqlContext.conf.wholeStageFallback =>
genInterpretedPredicate(expression, inputSchema)
}
}
Expand Down
11 changes: 11 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1856,4 +1856,15 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
.filter($"x1".isNotNull || !$"y".isin("a!"))
.count
}

test("SPARK-19372: Filter can be executed w/o generated code due to JVM code size limit") {
val N = 400
val rows = Seq(Row.fromSeq(Seq.fill(N)("string")))
val schema = StructType(Seq.tabulate(N)(i => StructField(s"_c$i", StringType)))
val df = spark.createDataFrame(spark.sparkContext.makeRDD(rows), schema)

val filter = (0 until N)
.foldLeft(lit(false))((e, index) => e.or(df.col(df.columns(index)) =!= "string"))
df.filter(filter).count
}
}

0 comments on commit f1a7bf7

Please sign in to comment.