From c141fb1e2ec99aa616770d120abac1b0fbb1fa58 Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Mon, 18 May 2015 19:58:46 -0700 Subject: [PATCH] hide all of properties of the WindowFunctionDefinition --- .../spark/sql/WindowFunctionDefinition.scala | 69 ++++++++++++++----- 1 file changed, 50 insertions(+), 19 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/WindowFunctionDefinition.scala b/sql/core/src/main/scala/org/apache/spark/sql/WindowFunctionDefinition.scala index 96a1ad7c65802..b786841a44702 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/WindowFunctionDefinition.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/WindowFunctionDefinition.scala @@ -53,21 +53,32 @@ import org.apache.spark.sql.catalyst.expressions._ * * }}} * - * @param column The bounded the aggregate/window function - * @param partitionSpec The partition of the window - * @param orderSpec The ordering of the window - * @param frame The Window Frame type - * @param bindLower A hint of when call the methods `.preceding(n)` `.currentRow()` `.following()` - * if bindLower == true, then we will set the lower bound, otherwise, we should - * set the upper bound for the Row/Range Frame. */ @Experimental -class WindowFunctionDefinition protected[sql]( - column: Column = null, - partitionSpec: Seq[Expression] = Nil, - orderSpec: Seq[SortOrder] = Nil, - frame: WindowFrame = UnspecifiedFrame, - bindLower: Boolean = true) { +class WindowFunctionDefinition { + private var column: Column = _ + private var partitionSpec: Seq[Expression] = Nil + private var orderSpec: Seq[SortOrder] = Nil + private var frame: WindowFrame = UnspecifiedFrame + + // Hint of when call the methods `.preceding(n)` `.currentRow()` `.following()` + // if bindLower == true, then we will set the lower bound, otherwise, we should + // set the upper bound for the Row/Range Frame. + private var bindLower: Boolean = true + + private def this( + column: Column = null, + partitionSpec: Seq[Expression] = Nil, + orderSpec: Seq[SortOrder] = Nil, + frame: WindowFrame = UnspecifiedFrame, + bindLower: Boolean = true) { + this() + this.column = column + this.partitionSpec = partitionSpec + this.orderSpec = orderSpec + this.frame = frame + this.bindLower = bindLower + } private[sql] def newColumn(c: Column): WindowFunctionDefinition = { new WindowFunctionDefinition(c, partitionSpec, orderSpec, frame, bindLower) @@ -192,6 +203,8 @@ class WindowFunctionDefinition protected[sql]( * It can be either Lower or Upper Bound position depends on the semantic context. * For example: * {{{ + * // [CURRENT_ROW, ~) + * df.over(partitionBy("k1").orderBy("k2").rows.preceding(0)) * // [CURRENT_ROW - 1, ~) * df.over(partitionBy("k1").orderBy("k2").rows.preceding(1)) * // [CURRENT_ROW - 3, CURRENT_ROW - 1] @@ -202,12 +215,20 @@ class WindowFunctionDefinition protected[sql]( * @group window_funcs */ def preceding(n: Int): WindowFunctionDefinition = { - assert(n > 0) + require(n >= 0, s"preceding(n) requires n greater than or equals 0, but got $n") val newFrame = frame match { case f: SpecifiedWindowFrame if bindLower => - f.copy(frameStart = ValuePreceding(n)) + if (n > 0) { + f.copy(frameStart = ValuePreceding(n)) + } else { + f.copy(frameStart = CurrentRow) + } case f: SpecifiedWindowFrame => - f.copy(frameEnd = ValuePreceding(n)) + if (n > 0) { + f.copy(frameEnd = ValuePreceding(n)) + } else { + f.copy(frameEnd = CurrentRow) + } case f => throw new UnsupportedOperationException(s"preceding on $f") } new WindowFunctionDefinition(column, partitionSpec, orderSpec, newFrame, false) @@ -277,6 +298,8 @@ class WindowFunctionDefinition protected[sql]( * It can be either Lower or Upper Bound position, depends on the semantic context. * For example: * {{{ + * // [CURRENT_ROW, ~) + * df.over(partitionBy("k1").orderBy("k2").rows.following(0)) * // [CURRENT_ROW + 1, ~) * df.over(partitionBy("k1").orderBy("k2").rows.following(1)) * // [CURRENT_ROW + 1, CURRENT_ROW + 3] @@ -287,12 +310,20 @@ class WindowFunctionDefinition protected[sql]( * @group window_funcs */ def following(n: Int): WindowFunctionDefinition = { - assert(n > 0) + require(n >= 0, s"following(n) requires n greater than or equals 0, but got $n") val newFrame = frame match { case f: SpecifiedWindowFrame if bindLower => - f.copy(frameStart = ValueFollowing(n)) + if (n > 0) { + f.copy(frameStart = ValueFollowing(n)) + } else { + f.copy(frameStart = CurrentRow) + } case f: SpecifiedWindowFrame => - f.copy(frameEnd = ValueFollowing(n)) + if (n > 0) { + f.copy(frameEnd = ValueFollowing(n)) + } else { + f.copy(frameEnd = CurrentRow) + } case f => throw new UnsupportedOperationException(s"following on $f") } new WindowFunctionDefinition(column, partitionSpec, orderSpec, newFrame, false)