forked from apache/spark
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[SPARK-7322][SQL] Window functions in DataFrame
This closes apache#6104. Author: Cheng Hao <hao.cheng@intel.com> Author: Reynold Xin <rxin@databricks.com> Closes apache#6343 from rxin/window-df and squashes the following commits: 026d587 [Reynold Xin] Address code review feedback. dc448fe [Reynold Xin] Fixed Hive tests. 9794d9d [Reynold Xin] Moved Java test package. 9331605 [Reynold Xin] Refactored API. 3313e2a [Reynold Xin] Merge pull request apache#6104 from chenghao-intel/df_window d625a64 [Cheng Hao] Update the dataframe window API as suggsted c141fb1 [Cheng Hao] hide all of properties of the WindowFunctionDefinition 3b1865f [Cheng Hao] scaladoc typos f3fd2d0 [Cheng Hao] polish the unit test 6847825 [Cheng Hao] Add additional analystcs functions 57e3bc0 [Cheng Hao] typos 24a08ec [Cheng Hao] scaladoc 28222ed [Cheng Hao] fix bug of range/row Frame 1d91865 [Cheng Hao] style issue 53f89f2 [Cheng Hao] remove the over from the functions.scala 964c013 [Cheng Hao] add more unit tests and window functions 64e18a7 [Cheng Hao] Add Window Function support for DataFrame
- Loading branch information
1 parent
47bed06
commit 4b5a00c
Showing
13 changed files
with
807 additions
and
7 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
81 changes: 81 additions & 0 deletions
81
sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.expressions | ||
|
||
import org.apache.spark.annotation.Experimental | ||
import org.apache.spark.sql.Column | ||
import org.apache.spark.sql.catalyst.expressions._ | ||
|
||
/** | ||
* :: Experimental :: | ||
* Utility functions for defining window in DataFrames. | ||
* | ||
* {{{ | ||
* // PARTITION BY country ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW | ||
* Window.partitionBy("country").orderBy("date").rowsBetween(Long.MinValue, 0) | ||
* | ||
* // PARTITION BY country ORDER BY date ROWS BETWEEN 3 PRECEDING AND 3 FOLLOWING | ||
* Window.partitionBy("country").orderBy("date").rowsBetween(-3, 3) | ||
* }}} | ||
* | ||
* @since 1.4.0 | ||
*/ | ||
@Experimental | ||
object Window { | ||
|
||
/** | ||
* Creates a [[WindowSpec]] with the partitioning defined. | ||
* @since 1.4.0 | ||
*/ | ||
@scala.annotation.varargs | ||
def partitionBy(colName: String, colNames: String*): WindowSpec = { | ||
spec.partitionBy(colName, colNames : _*) | ||
} | ||
|
||
/** | ||
* Creates a [[WindowSpec]] with the partitioning defined. | ||
* @since 1.4.0 | ||
*/ | ||
@scala.annotation.varargs | ||
def partitionBy(cols: Column*): WindowSpec = { | ||
spec.partitionBy(cols : _*) | ||
} | ||
|
||
/** | ||
* Creates a [[WindowSpec]] with the ordering defined. | ||
* @since 1.4.0 | ||
*/ | ||
@scala.annotation.varargs | ||
def orderBy(colName: String, colNames: String*): WindowSpec = { | ||
spec.orderBy(colName, colNames : _*) | ||
} | ||
|
||
/** | ||
* Creates a [[WindowSpec]] with the ordering defined. | ||
* @since 1.4.0 | ||
*/ | ||
@scala.annotation.varargs | ||
def orderBy(cols: Column*): WindowSpec = { | ||
spec.orderBy(cols : _*) | ||
} | ||
|
||
private def spec: WindowSpec = { | ||
new WindowSpec(Seq.empty, Seq.empty, UnspecifiedFrame) | ||
} | ||
|
||
} |
175 changes: 175 additions & 0 deletions
175
sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,175 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.expressions | ||
|
||
import org.apache.spark.annotation.Experimental | ||
import org.apache.spark.sql.{Column, catalyst} | ||
import org.apache.spark.sql.catalyst.expressions._ | ||
|
||
|
||
/** | ||
* :: Experimental :: | ||
* A window specification that defines the partitioning, ordering, and frame boundaries. | ||
* | ||
* Use the static methods in [[Window]] to create a [[WindowSpec]]. | ||
* | ||
* @since 1.4.0 | ||
*/ | ||
@Experimental | ||
class WindowSpec private[sql]( | ||
partitionSpec: Seq[Expression], | ||
orderSpec: Seq[SortOrder], | ||
frame: catalyst.expressions.WindowFrame) { | ||
|
||
/** | ||
* Defines the partitioning columns in a [[WindowSpec]]. | ||
* @since 1.4.0 | ||
*/ | ||
@scala.annotation.varargs | ||
def partitionBy(colName: String, colNames: String*): WindowSpec = { | ||
partitionBy((colName +: colNames).map(Column(_)): _*) | ||
} | ||
|
||
/** | ||
* Defines the partitioning columns in a [[WindowSpec]]. | ||
* @since 1.4.0 | ||
*/ | ||
@scala.annotation.varargs | ||
def partitionBy(cols: Column*): WindowSpec = { | ||
new WindowSpec(cols.map(_.expr), orderSpec, frame) | ||
} | ||
|
||
/** | ||
* Defines the ordering columns in a [[WindowSpec]]. | ||
* @since 1.4.0 | ||
*/ | ||
@scala.annotation.varargs | ||
def orderBy(colName: String, colNames: String*): WindowSpec = { | ||
orderBy((colName +: colNames).map(Column(_)): _*) | ||
} | ||
|
||
/** | ||
* Defines the ordering columns in a [[WindowSpec]]. | ||
* @since 1.4.0 | ||
*/ | ||
@scala.annotation.varargs | ||
def orderBy(cols: Column*): WindowSpec = { | ||
val sortOrder: Seq[SortOrder] = cols.map { col => | ||
col.expr match { | ||
case expr: SortOrder => | ||
expr | ||
case expr: Expression => | ||
SortOrder(expr, Ascending) | ||
} | ||
} | ||
new WindowSpec(partitionSpec, sortOrder, frame) | ||
} | ||
|
||
/** | ||
* Defines the frame boundaries, from `start` (inclusive) to `end` (inclusive). | ||
* | ||
* Both `start` and `end` are relative positions from the current row. For example, "0" means | ||
* "current row", while "-1" means the row before the current row, and "5" means the fifth row | ||
* after the current row. | ||
* | ||
* @param start boundary start, inclusive. | ||
* The frame is unbounded if this is the minimum long value. | ||
* @param end boundary end, inclusive. | ||
* The frame is unbounded if this is the maximum long value. | ||
* @since 1.4.0 | ||
*/ | ||
def rowsBetween(start: Long, end: Long): WindowSpec = { | ||
between(RowFrame, start, end) | ||
} | ||
|
||
/** | ||
* Defines the frame boundaries, from `start` (inclusive) to `end` (inclusive). | ||
* | ||
* Both `start` and `end` are relative from the current row. For example, "0" means "current row", | ||
* while "-1" means one off before the current row, and "5" means the five off after the | ||
* current row. | ||
* | ||
* @param start boundary start, inclusive. | ||
* The frame is unbounded if this is the minimum long value. | ||
* @param end boundary end, inclusive. | ||
* The frame is unbounded if this is the maximum long value. | ||
* @since 1.4.0 | ||
*/ | ||
def rangeBetween(start: Long, end: Long): WindowSpec = { | ||
between(RangeFrame, start, end) | ||
} | ||
|
||
private def between(typ: FrameType, start: Long, end: Long): WindowSpec = { | ||
val boundaryStart = start match { | ||
case 0 => CurrentRow | ||
case Long.MinValue => UnboundedPreceding | ||
case x if x < 0 => ValuePreceding(-start.toInt) | ||
case x if x > 0 => ValueFollowing(start.toInt) | ||
} | ||
|
||
val boundaryEnd = end match { | ||
case 0 => CurrentRow | ||
case Long.MaxValue => UnboundedFollowing | ||
case x if x < 0 => ValuePreceding(-end.toInt) | ||
case x if x > 0 => ValueFollowing(end.toInt) | ||
} | ||
|
||
new WindowSpec( | ||
partitionSpec, | ||
orderSpec, | ||
SpecifiedWindowFrame(typ, boundaryStart, boundaryEnd)) | ||
} | ||
|
||
/** | ||
* Converts this [[WindowSpec]] into a [[Column]] with an aggregate expression. | ||
*/ | ||
private[sql] def withAggregate(aggregate: Column): Column = { | ||
val windowExpr = aggregate.expr match { | ||
case Average(child) => WindowExpression( | ||
UnresolvedWindowFunction("avg", child :: Nil), | ||
WindowSpecDefinition(partitionSpec, orderSpec, frame)) | ||
case Sum(child) => WindowExpression( | ||
UnresolvedWindowFunction("sum", child :: Nil), | ||
WindowSpecDefinition(partitionSpec, orderSpec, frame)) | ||
case Count(child) => WindowExpression( | ||
UnresolvedWindowFunction("count", child :: Nil), | ||
WindowSpecDefinition(partitionSpec, orderSpec, frame)) | ||
case First(child) => WindowExpression( | ||
// TODO this is a hack for Hive UDAF first_value | ||
UnresolvedWindowFunction("first_value", child :: Nil), | ||
WindowSpecDefinition(partitionSpec, orderSpec, frame)) | ||
case Last(child) => WindowExpression( | ||
// TODO this is a hack for Hive UDAF last_value | ||
UnresolvedWindowFunction("last_value", child :: Nil), | ||
WindowSpecDefinition(partitionSpec, orderSpec, frame)) | ||
case Min(child) => WindowExpression( | ||
UnresolvedWindowFunction("min", child :: Nil), | ||
WindowSpecDefinition(partitionSpec, orderSpec, frame)) | ||
case Max(child) => WindowExpression( | ||
UnresolvedWindowFunction("max", child :: Nil), | ||
WindowSpecDefinition(partitionSpec, orderSpec, frame)) | ||
case wf: WindowFunction => WindowExpression( | ||
wf, | ||
WindowSpecDefinition(partitionSpec, orderSpec, frame)) | ||
case x => | ||
throw new UnsupportedOperationException(s"$x is not supported in window operation.") | ||
} | ||
new Column(windowExpr) | ||
} | ||
|
||
} |
Oops, something went wrong.