Skip to content

Commit

Permalink
Refactored API.
Browse files Browse the repository at this point in the history
  • Loading branch information
rxin committed May 22, 2015
1 parent 3313e2a commit 9331605
Show file tree
Hide file tree
Showing 8 changed files with 344 additions and 401 deletions.
15 changes: 8 additions & 7 deletions sql/core/src/main/scala/org/apache/spark/sql/Column.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
package org.apache.spark.sql

import scala.language.implicitConversions
import scala.collection.JavaConversions._

import org.apache.spark.annotation.Experimental
import org.apache.spark.Logging
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.analysis.{MultiAlias, UnresolvedAttribute, UnresolvedStar, UnresolvedExtractValue}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.types._


Expand Down Expand Up @@ -890,19 +890,20 @@ class Column(protected[sql] val expr: Expression) extends Logging {
def bitwiseXOR(other: Any): Column = BitwiseXor(expr, lit(other).expr)

/**
* Define a [[Window]] column.
* Define a windowing column.
*
* {{{
* val w = Window.partitionBy("name").orderBy("id")
* df.select(
* sum("price").over(w.range.preceding(2)),
* avg("price").over(w.range.preceding(4)),
* avg("price").over(partitionBy("name").orderBy("id).range.preceding(1))
* sum("price").over(w.rangeBetween(Long.MinValue, 2)),
* avg("price").over(w.rowsBetween(0, 4))
* )
* }}}
*
* @group expr_ops
* @since 1.4.0
*/
def over(w: Window): Column = w.newColumn(this).toColumn
def over(window: expressions.WindowSpec): Column = window.withAggregate(this)

}

Expand Down
9 changes: 5 additions & 4 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.analysis.{MultiAlias, ResolvedStar, Unresol
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{Filter, _}
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
import org.apache.spark.sql.catalyst.{expressions, CatalystTypeConverters, ScalaReflection, SqlParser}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, SqlParser}
import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, LogicalRDD}
import org.apache.spark.sql.json.JacksonGenerator
import org.apache.spark.sql.sources.CreateTableUsingAsSelect
Expand Down Expand Up @@ -411,7 +411,7 @@ class DataFrame private[sql](
joined.left,
joined.right,
joinType = Inner,
Some(expressions.EqualTo(
Some(catalyst.expressions.EqualTo(
joined.left.resolve(usingColumn),
joined.right.resolve(usingColumn))))
)
Expand Down Expand Up @@ -480,8 +480,9 @@ class DataFrame private[sql](
// By the time we get here, since we have already run analysis, all attributes should've been
// resolved and become AttributeReference.
val cond = plan.condition.map { _.transform {
case expressions.EqualTo(a: AttributeReference, b: AttributeReference) if a.sameRef(b) =>
expressions.EqualTo(plan.left.resolve(a.name), plan.right.resolve(b.name))
case catalyst.expressions.EqualTo(a: AttributeReference, b: AttributeReference)
if a.sameRef(b) =>
catalyst.expressions.EqualTo(plan.left.resolve(a.name), plan.right.resolve(b.name))
}}
plan.copy(condition = cond)
}
Expand Down
222 changes: 0 additions & 222 deletions sql/core/src/main/scala/org/apache/spark/sql/Window.scala

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.expressions

import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.expressions._

/**
* :: Experimental ::
* Utility functions for defining window in DataFrames.
*
* {{{
* // PARTITION BY country ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
* Window.partitionBy("country").orderBy("date").rowsBetween(Long.MinValue, 0)
*
* // PARTITION BY country ORDER BY date ROWS BETWEEN 3 PRECEDING AND 3 FOLLOWING
* Window.partitionBy("country").orderBy("date").rowsBetween(-3, 3)
* }}}
*
* @since 1.4.0
*/
@Experimental
object Window {

/**
* Creates a [[WindowSpec]] with the partitioning defined.
* @since 1.4.0
*/
@scala.annotation.varargs
def partitionBy(colName: String, colNames: String*): WindowSpec = {
spec.partitionBy(colName, colNames : _*)
}

/**
* Creates a [[WindowSpec]] with the partitioning defined.
* @since 1.4.0
*/
@scala.annotation.varargs
def partitionBy(cols: Column*): WindowSpec = {
spec.partitionBy(cols : _*)
}

/**
* Creates a [[WindowSpec]] with the ordering defined.
* @since 1.4.0
*/
@scala.annotation.varargs
def orderBy(colName: String, colNames: String*): WindowSpec = {
spec.orderBy(colName, colNames : _*)
}

/**
* Creates a [[WindowSpec]] with the ordering defined.
* @since 1.4.0
*/
@scala.annotation.varargs
def orderBy(cols: Column*): WindowSpec = {
spec.orderBy(cols : _*)
}

private def spec: WindowSpec = {
new WindowSpec(Seq.empty, Seq.empty, UnspecifiedFrame)
}

}
Loading

0 comments on commit 9331605

Please sign in to comment.