Skip to content

Commit

Permalink
Merge pull request apache#8 from hagerf/viirya_branch
Browse files Browse the repository at this point in the history
keeping type information, added test for typed groupByRelationalKey
  • Loading branch information
viirya authored Oct 11, 2019
2 parents b5fcbda + c530a1f commit 078c200
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 3 deletions.
5 changes: 2 additions & 3 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1735,15 +1735,14 @@ class Dataset[T] private[sql](
@scala.annotation.varargs
def groupByRelationKey(
col1: String,
cols: String*): KeyValueGroupedDataset[Row, Row] = {
cols: String*): KeyValueGroupedDataset[Row, T] = {
val colNames: Seq[String] = col1 +: cols
val keyAttrs = colNames.map(colName => resolve(colName).toAttribute)
val keySchema = StructType.fromAttributes(keyAttrs)
val keyEncoder = RowEncoder(keySchema)
val valEncoder = RowEncoder(schema)
new KeyValueGroupedDataset(
keyEncoder,
valEncoder,
encoder,
queryExecution,
logicalPlan.output,
keyAttrs)
Expand Down
16 changes: 16 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1860,6 +1860,22 @@ class DatasetSuite extends QueryTest with SharedSparkSession {
}
}
}

test("SPARK-29427: groupByRelationKey") {
val df1 = Seq(DoubleData(1, "one"), DoubleData(2, "two"), DoubleData( 3, "three")).toDS()
.repartition($"id")
val df2 = Seq(DoubleData(5, "one"), DoubleData(1, "two"), DoubleData( 3, "three")).toDS()
.repartition($"a", $"b")

val df3 = df1.groupByRelationKey("id").keyAs[Int]
.cogroup(df2.groupByRelationKey("id").keyAs[Int]) { case (key, data1, data2) =>
if (key === 1) {
Iterator(DoubleData(key, (data1++data2).reduceLeft(_.val1 + _.val1)))
} else Iterator.empty
}

checkDataset(df3, DoubleData(1, "onetwo"))
}
}

object AssertExecutionId {
Expand Down

0 comments on commit 078c200

Please sign in to comment.