Skip to content

Commit

Permalink
hide all of properties of the WindowFunctionDefinition
Browse files Browse the repository at this point in the history
  • Loading branch information
chenghao-intel committed May 21, 2015
1 parent 3b1865f commit c141fb1
Showing 1 changed file with 50 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,32 @@ import org.apache.spark.sql.catalyst.expressions._
*
* }}}
*
* @param column The bounded the aggregate/window function
* @param partitionSpec The partition of the window
* @param orderSpec The ordering of the window
* @param frame The Window Frame type
* @param bindLower A hint of when call the methods `.preceding(n)` `.currentRow()` `.following()`
* if bindLower == true, then we will set the lower bound, otherwise, we should
* set the upper bound for the Row/Range Frame.
*/
@Experimental
class WindowFunctionDefinition protected[sql](
column: Column = null,
partitionSpec: Seq[Expression] = Nil,
orderSpec: Seq[SortOrder] = Nil,
frame: WindowFrame = UnspecifiedFrame,
bindLower: Boolean = true) {
class WindowFunctionDefinition {
private var column: Column = _
private var partitionSpec: Seq[Expression] = Nil
private var orderSpec: Seq[SortOrder] = Nil
private var frame: WindowFrame = UnspecifiedFrame

// Hint of when call the methods `.preceding(n)` `.currentRow()` `.following()`
// if bindLower == true, then we will set the lower bound, otherwise, we should
// set the upper bound for the Row/Range Frame.
private var bindLower: Boolean = true

private def this(
column: Column = null,
partitionSpec: Seq[Expression] = Nil,
orderSpec: Seq[SortOrder] = Nil,
frame: WindowFrame = UnspecifiedFrame,
bindLower: Boolean = true) {
this()
this.column = column
this.partitionSpec = partitionSpec
this.orderSpec = orderSpec
this.frame = frame
this.bindLower = bindLower
}

private[sql] def newColumn(c: Column): WindowFunctionDefinition = {
new WindowFunctionDefinition(c, partitionSpec, orderSpec, frame, bindLower)
Expand Down Expand Up @@ -192,6 +203,8 @@ class WindowFunctionDefinition protected[sql](
* It can be either Lower or Upper Bound position depends on the semantic context.
* For example:
* {{{
* // [CURRENT_ROW, ~)
* df.over(partitionBy("k1").orderBy("k2").rows.preceding(0))
* // [CURRENT_ROW - 1, ~)
* df.over(partitionBy("k1").orderBy("k2").rows.preceding(1))
* // [CURRENT_ROW - 3, CURRENT_ROW - 1]
Expand All @@ -202,12 +215,20 @@ class WindowFunctionDefinition protected[sql](
* @group window_funcs
*/
def preceding(n: Int): WindowFunctionDefinition = {
assert(n > 0)
require(n >= 0, s"preceding(n) requires n greater than or equals 0, but got $n")
val newFrame = frame match {
case f: SpecifiedWindowFrame if bindLower =>
f.copy(frameStart = ValuePreceding(n))
if (n > 0) {
f.copy(frameStart = ValuePreceding(n))
} else {
f.copy(frameStart = CurrentRow)
}
case f: SpecifiedWindowFrame =>
f.copy(frameEnd = ValuePreceding(n))
if (n > 0) {
f.copy(frameEnd = ValuePreceding(n))
} else {
f.copy(frameEnd = CurrentRow)
}
case f => throw new UnsupportedOperationException(s"preceding on $f")
}
new WindowFunctionDefinition(column, partitionSpec, orderSpec, newFrame, false)
Expand Down Expand Up @@ -277,6 +298,8 @@ class WindowFunctionDefinition protected[sql](
* It can be either Lower or Upper Bound position, depends on the semantic context.
* For example:
* {{{
* // [CURRENT_ROW, ~)
* df.over(partitionBy("k1").orderBy("k2").rows.following(0))
* // [CURRENT_ROW + 1, ~)
* df.over(partitionBy("k1").orderBy("k2").rows.following(1))
* // [CURRENT_ROW + 1, CURRENT_ROW + 3]
Expand All @@ -287,12 +310,20 @@ class WindowFunctionDefinition protected[sql](
* @group window_funcs
*/
def following(n: Int): WindowFunctionDefinition = {
assert(n > 0)
require(n >= 0, s"following(n) requires n greater than or equals 0, but got $n")
val newFrame = frame match {
case f: SpecifiedWindowFrame if bindLower =>
f.copy(frameStart = ValueFollowing(n))
if (n > 0) {
f.copy(frameStart = ValueFollowing(n))
} else {
f.copy(frameStart = CurrentRow)
}
case f: SpecifiedWindowFrame =>
f.copy(frameEnd = ValueFollowing(n))
if (n > 0) {
f.copy(frameEnd = ValueFollowing(n))
} else {
f.copy(frameEnd = CurrentRow)
}
case f => throw new UnsupportedOperationException(s"following on $f")
}
new WindowFunctionDefinition(column, partitionSpec, orderSpec, newFrame, false)
Expand Down

0 comments on commit c141fb1

Please sign in to comment.