Skip to content

Commit

Permalink
[SPARK-20567] Lazily bind in GenerateExec
Browse files Browse the repository at this point in the history
  • Loading branch information
marmbrus committed May 3, 2017
1 parent ef3df91 commit 7c86b0e
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ case class GenerateExec(

override def outputPartitioning: Partitioning = child.outputPartitioning

val boundGenerator: Generator = BindReferences.bindReference(generator, child.output)
lazy val boundGenerator: Generator = BindReferences.bindReference(generator, child.output)

protected override def doExecute(): RDD[InternalRow] = {
// boundGenerator.terminate() should be triggered after all of the rows in the partition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,22 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with BeforeAndAfte
)
}

test("count distinct") {
val inputData = MemoryStream[(Int, Seq[Int])]

val aggregated =
inputData.toDF()
.select($"*", explode($"_2") as 'value)
.groupBy($"_1")
.agg(size(collect_set($"value")))
.as[(Int, Int)]

testStream(aggregated, Update)(
AddData(inputData, (1, Seq(1, 2))),
CheckLastBatch((1, 2))
)
}

test("simple count, complete mode") {
val inputData = MemoryStream[Int]

Expand Down

0 comments on commit 7c86b0e

Please sign in to comment.