Skip to content

Commit

Permalink
Add groupByRelationKey.
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Oct 11, 2019
1 parent 7c5db45 commit b5fcbda
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 0 deletions.
24 changes: 24 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1725,6 +1725,30 @@ class Dataset[T] private[sql](
withGroupingKey.newColumns)
}

/**
* Returns a [[KeyValueGroupedDataset]] where the data is grouped by a row of given
* columns.
*
* @group typedrel
* @since 3.0.0
*/
@scala.annotation.varargs
def groupByRelationKey(
col1: String,
cols: String*): KeyValueGroupedDataset[Row, Row] = {
val colNames: Seq[String] = col1 +: cols
val keyAttrs = colNames.map(colName => resolve(colName).toAttribute)
val keySchema = StructType.fromAttributes(keyAttrs)
val keyEncoder = RowEncoder(keySchema)
val valEncoder = RowEncoder(schema)
new KeyValueGroupedDataset(
keyEncoder,
valEncoder,
queryExecution,
logicalPlan.output,
keyAttrs)
}

/**
* (Java-specific)
* Returns a [[KeyValueGroupedDataset]] where the data is grouped by the given key `func`.
Expand Down
16 changes: 16 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2203,4 +2203,20 @@ class DataFrameSuite extends QueryTest with SharedSparkSession {
|*(1) Range (0, 10, step=1, splits=2)""".stripMargin))
}
}

test("groupByRelationKey") {
val df1 = Seq((1, 2, 3), (2, 3, 4)).toDF("a", "b", "c")
.repartition($"a", $"b").sortWithinPartitions("a", "b")
val df2 = Seq((1, 2, 4), (2, 3, 5)).toDF("a", "b", "c")
.repartition($"a", $"b").sortWithinPartitions("a", "b")

val df3 = df1.groupByRelationKey("a", "b")
.cogroup(df2.groupByRelationKey("a", "b")) { case (key, data1, data2) =>
data1.zip(data2).map { p =>
p._1.getInt(2) + p._2.getInt(2)
}
}.toDF

checkAnswer(df3, Row(7) :: Row(9) :: Nil)
}
}

0 comments on commit b5fcbda

Please sign in to comment.