Skip to content

Commit

Permalink
[SPARK-10621][SQL] Consistent naming for functions in SQL, Python, Scala
Browse files Browse the repository at this point in the history
Author: Reynold Xin <rxin@databricks.com>

Closes #9948 from rxin/SPARK-10621.

(cherry picked from commit 151d7c2)
Signed-off-by: Reynold Xin <rxin@databricks.com>
  • Loading branch information
rxin committed Nov 25, 2015
1 parent b181126 commit 486db87
Show file tree
Hide file tree
Showing 2 changed files with 196 additions and 39 deletions.
111 changes: 94 additions & 17 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,18 +150,18 @@ def _():

_window_functions = {
'rowNumber':
"""returns a sequential number starting at 1 within a window partition.
This is equivalent to the ROW_NUMBER function in SQL.""",
""".. note:: Deprecated in 1.6, use row_number instead.""",
'row_number':
"""returns a sequential number starting at 1 within a window partition.""",
'denseRank':
""".. note:: Deprecated in 1.6, use dense_rank instead.""",
'dense_rank':
"""returns the rank of rows within a window partition, without any gaps.
The difference between rank and denseRank is that denseRank leaves no gaps in ranking
sequence when there are ties. That is, if you were ranking a competition using denseRank
and had three people tie for second place, you would say that all three were in second
place and that the next person came in third.
This is equivalent to the DENSE_RANK function in SQL.""",
place and that the next person came in third.""",
'rank':
"""returns the rank of rows within a window partition.
Expand All @@ -172,14 +172,14 @@ def _():
This is equivalent to the RANK function in SQL.""",
'cumeDist':
""".. note:: Deprecated in 1.6, use cume_dist instead.""",
'cume_dist':
"""returns the cumulative distribution of values within a window partition,
i.e. the fraction of rows that are below the current row.
This is equivalent to the CUME_DIST function in SQL.""",
i.e. the fraction of rows that are below the current row.""",
'percentRank':
"""returns the relative rank (i.e. percentile) of rows within a window partition.
This is equivalent to the PERCENT_RANK function in SQL.""",
""".. note:: Deprecated in 1.6, use percent_rank instead.""",
'percent_rank':
"""returns the relative rank (i.e. percentile) of rows within a window partition.""",
}

for _name, _doc in _functions.items():
Expand All @@ -189,7 +189,7 @@ def _():
for _name, _doc in _binary_mathfunctions.items():
globals()[_name] = since(1.4)(_create_binary_mathfunction(_name, _doc))
for _name, _doc in _window_functions.items():
globals()[_name] = since(1.4)(_create_window_function(_name, _doc))
globals()[_name] = since(1.6)(_create_window_function(_name, _doc))
for _name, _doc in _functions_1_6.items():
globals()[_name] = since(1.6)(_create_function(_name, _doc))
del _name, _doc
Expand Down Expand Up @@ -288,6 +288,38 @@ def countDistinct(col, *cols):

@since(1.4)
def monotonicallyIncreasingId():
"""
.. note:: Deprecated in 1.6, use monotonically_increasing_id instead.
"""
return monotonically_increasing_id()


@since(1.6)
def input_file_name():
"""Creates a string column for the file name of the current Spark task.
"""
sc = SparkContext._active_spark_context
return Column(sc._jvm.functions.input_file_name())


@since(1.6)
def isnan(col):
"""An expression that returns true iff the column is NaN.
"""
sc = SparkContext._active_spark_context
return Column(sc._jvm.functions.isnan(_to_java_column(col)))


@since(1.6)
def isnull(col):
"""An expression that returns true iff the column is null.
"""
sc = SparkContext._active_spark_context
return Column(sc._jvm.functions.isnull(_to_java_column(col)))


@since(1.6)
def monotonically_increasing_id():
"""A column that generates monotonically increasing 64-bit integers.
The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive.
Expand All @@ -300,11 +332,21 @@ def monotonicallyIncreasingId():
0, 1, 2, 8589934592 (1L << 33), 8589934593, 8589934594.
>>> df0 = sc.parallelize(range(2), 2).mapPartitions(lambda x: [(1,), (2,), (3,)]).toDF(['col1'])
>>> df0.select(monotonicallyIncreasingId().alias('id')).collect()
>>> df0.select(monotonically_increasing_id().alias('id')).collect()
[Row(id=0), Row(id=1), Row(id=2), Row(id=8589934592), Row(id=8589934593), Row(id=8589934594)]
"""
sc = SparkContext._active_spark_context
return Column(sc._jvm.functions.monotonicallyIncreasingId())
return Column(sc._jvm.functions.monotonically_increasing_id())


@since(1.6)
def nanvl(col1, col2):
"""Returns col1 if it is not NaN, or col2 if col1 is NaN.
Both inputs should be floating point columns (DoubleType or FloatType).
"""
sc = SparkContext._active_spark_context
return Column(sc._jvm.functions.nanvl(_to_java_column(col1), _to_java_column(col2)))


@since(1.4)
Expand Down Expand Up @@ -382,15 +424,23 @@ def shiftRightUnsigned(col, numBits):

@since(1.4)
def sparkPartitionId():
"""
.. note:: Deprecated in 1.6, use spark_partition_id instead.
"""
return spark_partition_id()


@since(1.6)
def spark_partition_id():
"""A column for partition ID of the Spark task.
Note that this is indeterministic because it depends on data partitioning and task scheduling.
>>> df.repartition(1).select(sparkPartitionId().alias("pid")).collect()
>>> df.repartition(1).select(spark_partition_id().alias("pid")).collect()
[Row(pid=0), Row(pid=0)]
"""
sc = SparkContext._active_spark_context
return Column(sc._jvm.functions.sparkPartitionId())
return Column(sc._jvm.functions.spark_partition_id())


@since(1.5)
Expand Down Expand Up @@ -1410,6 +1460,33 @@ def explode(col):
return Column(jc)


@since(1.6)
def get_json_object(col, path):
"""
Extracts json object from a json string based on json path specified, and returns json string
of the extracted json object. It will return null if the input json string is invalid.
:param col: string column in json format
:param path: path to the json object to extract
"""
sc = SparkContext._active_spark_context
jc = sc._jvm.functions.get_json_object(_to_java_column(col), path)
return Column(jc)


@since(1.6)
def json_tuple(col, fields):
"""Creates a new row for a json column according to the given field names.
:param col: string column in json format
:param fields: list of fields to extract
"""
sc = SparkContext._active_spark_context
jc = sc._jvm.functions.json_tuple(_to_java_column(col), fields)
return Column(jc)


@since(1.5)
def size(col):
"""
Expand Down
Loading

0 comments on commit 486db87

Please sign in to comment.