Skip to content

Commit

Permalink
fix bug of range/row Frame
Browse files Browse the repository at this point in the history
  • Loading branch information
chenghao-intel committed May 21, 2015
1 parent 1d91865 commit 28222ed
Show file tree
Hide file tree
Showing 3 changed files with 259 additions and 126 deletions.
21 changes: 6 additions & 15 deletions sql/core/src/main/scala/org/apache/spark/sql/Column.scala
Original file line number Diff line number Diff line change
Expand Up @@ -890,28 +890,19 @@ class Column(protected[sql] val expr: Expression) extends Logging {
def bitwiseXOR(other: Any): Column = BitwiseXor(expr, lit(other).expr)

/**
* Create a new [[WindowFunctionDefinition]] bundled with this column.
* Define a [[WindowFunctionDefinition]] column.
* {{{
* df.select(avg($"value").over...)
* }}}
*
* @group expr_ops
*/
def over: WindowFunctionDefinition = new WindowFunctionDefinition(this)

/**
* Reuse an existed [[WindowFunctionDefinition]] and bundled with this column.
* {{{
* val w = over.partitionBy("name").orderBy("id")
* val w = partitionBy("name").orderBy("id")
* df.select(
* sum("price").over(w).between.preceding(2),
* avg("price").over(w).between.preceding(4)
* sum("price").over(w).range.preceding(2),
* avg("price").over(w).range.preceding(4),
* avg("price").over(partitionBy("name").orderBy("id).range.preceding(1))
* )
* }}}
*
* @group expr_ops
*/
def over(w: WindowFunctionDefinition): WindowFunctionDefinition = w.newColumn(this)
def over(w: WindowFunctionDefinition): Column = w.newColumn(this).toColumn

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,11 @@ class WindowFunctionDefinition protected[sql](
column: Column = null,
partitionSpec: Seq[Expression] = Nil,
orderSpec: Seq[SortOrder] = Nil,
frame: WindowFrame = UnspecifiedFrame) {
frame: WindowFrame = UnspecifiedFrame,
bindLower: Boolean = true) {

private[sql] def newColumn(c: Column): WindowFunctionDefinition = {
new WindowFunctionDefinition(c, partitionSpec, orderSpec, frame)
new WindowFunctionDefinition(c, partitionSpec, orderSpec, frame, bindLower)
}

/**
Expand Down Expand Up @@ -120,20 +121,48 @@ class WindowFunctionDefinition protected[sql](
}

/**
* Returns a new ranged [[WindowFunctionDefinition]]. For example:
* Returns the current [[WindowFunctionDefinition]]. This is a dummy function,
* which makes the usage more like the SQL.
* For example:
* {{{
* df.over.partitionBy("k1").orderBy($"k2", $"k3").between
* df.over.partitionBy("k1").orderBy($"k2", $"k3").range.between
* }}}
* @group window_funcs
*/
def between: WindowFunctionDefinition = {
assert(this.frame.isInstanceOf[SpecifiedWindowFrame], "Should be a WindowFrame.")
new WindowFunctionDefinition(column, partitionSpec, orderSpec, frame, true)
}

/**
* Returns a new [[WindowFunctionDefinition]] indicate that we need to specify the
* upper bound.
* For example:
* {{{
* df.over.partitionBy("k1").orderBy($"k2", $"k3").range.between.preceding(3).and
* }}}
* @group window_funcs
*/
def and: WindowFunctionDefinition = {
new WindowFunctionDefinition(column, partitionSpec, orderSpec, frame, false)
}

/**
* Returns a new Ranged [[WindowFunctionDefinition]].
* For example:
* {{{
* df.over.partitionBy("k1").orderBy($"k2", $"k3").range.between
* }}}
* @group window_funcs
*/
def range: WindowFunctionDefinition = {
new WindowFunctionDefinition(column, partitionSpec, orderSpec,
SpecifiedWindowFrame(RangeFrame, UnboundedPreceding, UnboundedFollowing))
}

/**
* Returns a new [[WindowFunctionDefinition]], with fixed number of records
* from/to CURRENT ROW. For example:
* Returns a new [[WindowFunctionDefinition]], with fixed number of records.
* For example:
* {{{
* df.over.partitionBy("k1").orderBy($"k2", $"k3").row
* }}}
Expand All @@ -145,57 +174,114 @@ class WindowFunctionDefinition protected[sql](
}

/**
* Returns a new [[WindowFunctionDefinition]], with range of preceding position specified.
* For a Ranged [[WindowFunctionDefinition]], the range is [CURRENT_ROW - n, unspecified]
* For a Fixed Row [[WindowFunctionDefinition]], the range as [CURRENT_ROW - n, CURRENT_ROW].
* Returns a new [[WindowFunctionDefinition]], with position specified preceding of CURRENT_ROW.
* It can be either Lower or Upper Bound position, depends on whether the `and` method called.
* For example:
* {{{
* // The range is [CURRENT_ROW - 1, CURRENT_ROW]
* df.over.partitionBy("k1").orderBy($"k2", $"k3").row.preceding(1)
* // The range [CURRENT_ROW - 1, previous upper bound]
* df.over.partitionBy("k1").orderBy($"k2", $"k3").between.preceding(1)
* // [CURRENT_ROW - 1, ~)
* df.over(partitionBy("k1").orderBy("k2").row.preceding(1))
* // [CURRENT_ROW - 3, CURRENT_ROW - 1]
* df.over(partitionBy("k1").orderBy("k2").row.between.preceding(3).and.preceding(1))
* // (~, CURRENT_ROW - 1]
* df.over(partitionBy("k1").orderBy("k2").row.between.unboundedPreceding.and.preceding(1))
* }}}
* If n equals 0, it will be considered as CURRENT_ROW
* @group window_funcs
*/
def preceding(n: Int): WindowFunctionDefinition = {
assert(n > 0)
val newFrame = frame match {
case f @ SpecifiedWindowFrame(RowFrame, _, _) if n == 0 => // TODO should we need this?
f.copy(frameStart = CurrentRow, frameEnd = CurrentRow)
case f @ SpecifiedWindowFrame(RowFrame, _, _) =>
f.copy(frameStart = ValuePreceding(n), frameEnd = CurrentRow)
case f @ SpecifiedWindowFrame(RangeFrame, _, _) if n == 0 => f.copy(frameStart = CurrentRow)
case f @ SpecifiedWindowFrame(RangeFrame, _, _) => f.copy(frameStart = ValuePreceding(n))
case f: SpecifiedWindowFrame if bindLower =>
f.copy(frameStart = ValuePreceding(n))
case f: SpecifiedWindowFrame =>
f.copy(frameEnd = ValuePreceding(n))
case f => throw new UnsupportedOperationException(s"preceding on $f")
}
new WindowFunctionDefinition(column, partitionSpec, orderSpec, newFrame)
new WindowFunctionDefinition(column, partitionSpec, orderSpec, newFrame, false)
}

/**
* Returns a new [[WindowFunctionDefinition]], with lower position as unbounded.
* For example:
* {{{
* // (~, CURRENT_ROW]
* df.over(partitionBy("k1").orderBy("k2").row.between.unboundedPreceding.and.currentRow)
* }}}
* @group window_funcs
*/
def unboundedPreceding(): WindowFunctionDefinition = {
val newFrame = frame match {
case f : SpecifiedWindowFrame =>
f.copy(frameStart = UnboundedPreceding)
case f => throw new UnsupportedOperationException(s"unboundedPreceding on $f")
}
new WindowFunctionDefinition(column, partitionSpec, orderSpec, newFrame, false)
}

/**
* Returns a new [[WindowFunctionDefinition]], with upper position as unbounded.
* For example:
* {{{
* // [CURRENT_ROW, ~)
* df.over(partitionBy("k1").orderBy("k2").row.between.currentRow.and.unboundedFollowing)
* }}}
* @group window_funcs
*/
def unboundedFollowing(): WindowFunctionDefinition = {
val newFrame = frame match {
case f : SpecifiedWindowFrame =>
f.copy(frameEnd = UnboundedFollowing)
case f => throw new UnsupportedOperationException(s"unboundedFollowing on $f")
}
new WindowFunctionDefinition(column, partitionSpec, orderSpec, newFrame, false)
}

/**
* Returns a new [[WindowFunctionDefinition]], with position as CURRENT_ROW.
* It can be either Lower or Upper Bound position, depends on whether the `and` method called.
* For example:
* {{{
* // [CURRENT_ROW, ~)
* df.over(partitionBy("k1").orderBy("k2").row.between.currentRow.and.unboundedFollowing)
* // [CURRENT_ROW - 3, CURRENT_ROW]
* df.over(partitionBy("k1").orderBy("k2").row.between.preceding(3).and.currentRow)
* }}}
* @group window_funcs
*/
def currentRow(): WindowFunctionDefinition = {
val newFrame = frame match {
case f : SpecifiedWindowFrame if bindLower =>
f.copy(frameStart = CurrentRow)
case f : SpecifiedWindowFrame =>
f.copy(frameEnd = CurrentRow)
case f => throw new UnsupportedOperationException(s"currentRow on $f")
}
new WindowFunctionDefinition(column, partitionSpec, orderSpec, newFrame, false)
}

/**
* Returns a new [[WindowFunctionDefinition]], with range of following position specified.
* For a Ranged [[WindowFunctionDefinition]], the range is [unspecified, CURRENT_ROW + n]
* For a Fixed Row [[WindowFunctionDefinition]], the range as [CURRENT_ROW, CURRENT_ROW + n].
* Returns a new [[WindowFunctionDefinition]], with position specified following of CURRENT_ROW.
* It can be either Lower or Upper Bound position, depends on whether the `and` method called.
* For example:
* {{{
* // The range is [CURRENT_ROW, CURRENT_ROW + 1]
* df.over.partitionBy("k1").orderBy($"k2", $"k3").row.following(1)
* // The range [previous lower bound, CURRENT_ROW + 1]
* df.over.partitionBy("k1").orderBy($"k2", $"k3").between.following(1)
* // [CURRENT_ROW + 1, ~)
* df.over(partitionBy("k1").orderBy("k2").row.following(1))
* // [CURRENT_ROW + 1, CURRENT_ROW + 3]
* df.over(partitionBy("k1").orderBy("k2").row.between.following(1).and.following(3))
* // [CURRENT_ROW + 1, ~)
* df.over(partitionBy("k1").orderBy("k2").row.between.following(1).and.unboundedFollowing)
* }}}
* If n equals 0, it will be considered as CURRENT_ROW
* @group window_funcs
*/
def following(n: Int): WindowFunctionDefinition = {
assert(n > 0)
val newFrame = frame match {
case f @ SpecifiedWindowFrame(RowFrame, _, _) if n == 0 => // TODO should we need this?
f.copy(frameStart = CurrentRow, frameEnd = CurrentRow)
case f @ SpecifiedWindowFrame(RowFrame, _, _) =>
f.copy(frameStart = CurrentRow, frameEnd = ValueFollowing(n))
case f @ SpecifiedWindowFrame(RangeFrame, _, _) if n == 0 => f.copy(frameEnd = CurrentRow)
case f @ SpecifiedWindowFrame(RangeFrame, _, _) => f.copy(frameEnd = ValuePreceding(n))
case f: SpecifiedWindowFrame if bindLower =>
f.copy(frameStart = ValueFollowing(n))
case f: SpecifiedWindowFrame =>
f.copy(frameEnd = ValueFollowing(n))
case f => throw new UnsupportedOperationException(s"following on $f")
}
new WindowFunctionDefinition(column, partitionSpec, orderSpec, newFrame)
new WindowFunctionDefinition(column, partitionSpec, orderSpec, newFrame, false)
}

/**
Expand All @@ -221,7 +307,7 @@ class WindowFunctionDefinition protected[sql](
* }}}
* @group window_funcs
*/
def toColumn: Column = {
private[sql] def toColumn: Column = {
if (column == null) {
throw new AnalysisException("Window didn't bind with expression")
}
Expand Down
Loading

0 comments on commit 28222ed

Please sign in to comment.