Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-11654][SQL] add reduce to GroupedDataset #9673

Closed
wants to merge 5 commits into from

Conversation

marmbrus
Copy link
Contributor

This PR adds a new method, reduce, to GroupedDataset, which allows similar operations to reduceByKey on a traditional PairRDD.

val ds = Seq("abc", "xyz", "hello").toDS()
ds.groupBy(_.length).reduce(_ + _).collect()  // not actually commutative :P

res0: Array(3 -> "abcxyz", 5 -> "hello")

While implementing this method and its test cases several more deficiencies were found in our encoder handling. Specifically, in order to support positional resolution, named resolution and tuple composition, it is important to keep the unresolved encoder around and to use it when constructing new Datasets with the same object type but different output attributes. We now divide the encoder lifecycle into three phases (that mirror the lifecycle of standard expressions) and have checks at various boundaries:

  • Unresoved Encoders: all users facing encoders (those constructed by implicits, static methods, or tuple composition) are unresolved, meaning they have only UnresolvedAttributes for named fields and BoundReferences for fields accessed by ordinal.
  • Resolved Encoders: internal to a [Grouped]Dataset the encoder is resolved, meaning all input has been resolved to a specific AttributeReference. Any encoders that are placed into a logical plan for use in object construction should be resolved.
  • BoundEncoder: Are constructed by physical plans, right before actual conversion from row -> object is performed.

It is left to future work to add explicit checks for resolution and provide good error messages when it fails. We might also consider enforcing the above constraints in the type system (i.e. fromRow only exists on a ResolvedEncoder), but we should probably wait before spending too much time on this.

@marmbrus
Copy link
Contributor Author

/cc @cloud-fan

@SparkQA
Copy link

SparkQA commented Nov 12, 2015

Test build #45762 has finished for PR 9673 at commit ab2dbb9.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * class TypedColumn[-T, U](\n

@SparkQA
Copy link

SparkQA commented Nov 12, 2015

Test build #45775 has finished for PR 9673 at commit f247497.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * class TypedColumn[-T, U](\n

@marmbrus
Copy link
Contributor Author

I'm going to merge this to master and 1.6. Will address any comments in a follow-up.

asfgit pushed a commit that referenced this pull request Nov 13, 2015
This PR adds a new method, `reduce`, to `GroupedDataset`, which allows similar operations to `reduceByKey` on a traditional `PairRDD`.

```scala
val ds = Seq("abc", "xyz", "hello").toDS()
ds.groupBy(_.length).reduce(_ + _).collect()  // not actually commutative :P

res0: Array(3 -> "abcxyz", 5 -> "hello")
```

While implementing this method and its test cases several more deficiencies were found in our encoder handling.  Specifically, in order to support positional resolution, named resolution and tuple composition, it is important to keep the unresolved encoder around and to use it when constructing new `Datasets` with the same object type but different output attributes.  We now divide the encoder lifecycle into three phases (that mirror the lifecycle of standard expressions) and have checks at various boundaries:

 - Unresoved Encoders: all users facing encoders (those constructed by implicits, static methods, or tuple composition) are unresolved, meaning they have only `UnresolvedAttributes` for named fields and `BoundReferences` for fields accessed by ordinal.
 - Resolved Encoders: internal to a `[Grouped]Dataset` the encoder is resolved, meaning all input has been resolved to a specific `AttributeReference`.  Any encoders that are placed into a logical plan for use in object construction should be resolved.
 - BoundEncoder: Are constructed by physical plans, right before actual conversion from row -> object is performed.

It is left to future work to add explicit checks for resolution and provide good error messages when it fails.  We might also consider enforcing the above constraints in the type system (i.e. `fromRow` only exists on a `ResolvedEncoder`), but we should probably wait before spending too much time on this.

Author: Michael Armbrust <michael@databricks.com>
Author: Wenchen Fan <wenchen@databricks.com>

Closes #9673 from marmbrus/pr/9628.

(cherry picked from commit 41bbd23)
Signed-off-by: Michael Armbrust <michael@databricks.com>
@asfgit asfgit closed this in 41bbd23 Nov 13, 2015
@@ -360,7 +364,15 @@ class Dataset[T] private[sql](
* @since 1.6.0
*/
def select[U1: Encoder](c1: TypedColumn[T, U1]): Dataset[U1] = {
new Dataset[U1](sqlContext, Project(Alias(withEncoder(c1).expr, "_1")() :: Nil, logicalPlan))
// We use an unbound encoder since the expression will make up its own schema.
// TODO: This probably doesn't work if we are relying on reordering of the input class fields.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the input class reference to the T of this Dataset?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it is. Actually, this comment is stale and there is a test now to show that it works.

dskrvk pushed a commit to dskrvk/spark that referenced this pull request Nov 13, 2015
This PR adds a new method, `reduce`, to `GroupedDataset`, which allows similar operations to `reduceByKey` on a traditional `PairRDD`.

```scala
val ds = Seq("abc", "xyz", "hello").toDS()
ds.groupBy(_.length).reduce(_ + _).collect()  // not actually commutative :P

res0: Array(3 -> "abcxyz", 5 -> "hello")
```

While implementing this method and its test cases several more deficiencies were found in our encoder handling.  Specifically, in order to support positional resolution, named resolution and tuple composition, it is important to keep the unresolved encoder around and to use it when constructing new `Datasets` with the same object type but different output attributes.  We now divide the encoder lifecycle into three phases (that mirror the lifecycle of standard expressions) and have checks at various boundaries:

 - Unresoved Encoders: all users facing encoders (those constructed by implicits, static methods, or tuple composition) are unresolved, meaning they have only `UnresolvedAttributes` for named fields and `BoundReferences` for fields accessed by ordinal.
 - Resolved Encoders: internal to a `[Grouped]Dataset` the encoder is resolved, meaning all input has been resolved to a specific `AttributeReference`.  Any encoders that are placed into a logical plan for use in object construction should be resolved.
 - BoundEncoder: Are constructed by physical plans, right before actual conversion from row -> object is performed.

It is left to future work to add explicit checks for resolution and provide good error messages when it fails.  We might also consider enforcing the above constraints in the type system (i.e. `fromRow` only exists on a `ResolvedEncoder`), but we should probably wait before spending too much time on this.

Author: Michael Armbrust <michael@databricks.com>
Author: Wenchen Fan <wenchen@databricks.com>

Closes apache#9673 from marmbrus/pr/9628.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants