Skip to content

Commit

Permalink
[SPARK-16664][SQL] Fix persist call on Data frames with more than 200…
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

f12f11e introduced this bug, missed foreach as map

## How was this patch tested?

Test added

Author: Wesley Tang <tangmingjun@mininglamp.com>

Closes #14324 from breakdawn/master.

(cherry picked from commit d1d5069)
Signed-off-by: Sean Owen <sowen@cloudera.com>
  • Loading branch information
Wesley Tang authored and srowen committed Jul 29, 2016
1 parent 5cd79c3 commit ed03d0a
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
val groupedAccessorsItr = initializeAccessors.grouped(numberOfStatementsThreshold)
val groupedExtractorsItr = extractors.grouped(numberOfStatementsThreshold)
var groupedAccessorsLength = 0
groupedAccessorsItr.zipWithIndex.map { case (body, i) =>
groupedAccessorsItr.zipWithIndex.foreach { case (body, i) =>
groupedAccessorsLength += 1
val funcName = s"accessors$i"
val funcCode = s"""
Expand All @@ -137,7 +137,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
""".stripMargin
ctx.addNewFunction(funcName, funcCode)
}
groupedExtractorsItr.zipWithIndex.map { case (body, i) =>
groupedExtractorsItr.zipWithIndex.foreach { case (body, i) =>
val funcName = s"extractors$i"
val funcCode = s"""
|private void $funcName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1550,4 +1550,12 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
checkAnswer(joined, Row("x", null, null))
checkAnswer(joined.filter($"new".isNull), Row("x", null, null))
}

test("SPARK-16664: persist with more than 200 columns") {
val size = 201L
val rdd = sparkContext.makeRDD(Seq(Row.fromSeq(Seq.range(0, size))))
val schemas = List.range(0, size).map(a => StructField("name" + a, LongType, true))
val df = spark.createDataFrame(rdd, StructType(schemas), false)
assert(df.persist.take(1).apply(0).toSeq(100).asInstanceOf[Long] == 100)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
val columnTypes1 = List.fill(length1)(IntegerType)
val columnarIterator1 = GenerateColumnAccessor.generate(columnTypes1)

val length2 = 10000
// SPARK-16664: the limit of janino is 8117
val length2 = 8117
val columnTypes2 = List.fill(length2)(IntegerType)
val columnarIterator2 = GenerateColumnAccessor.generate(columnTypes2)
}
Expand Down

0 comments on commit ed03d0a

Please sign in to comment.