Skip to content
This repository has been archived by the owner on Jan 9, 2020. It is now read-only.

Commit

Permalink
[SPARK-18690][PYTHON][SQL] Backward compatibility of unbounded frames
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Makes `Window.unboundedPreceding` and `Window.unboundedFollowing` backward compatible.

## How was this patch tested?

Pyspark SQL unittests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: zero323 <zero323@users.noreply.github.com>

Closes apache#16123 from zero323/SPARK-17845-follow-up.
  • Loading branch information
zero323 authored and rxin committed Dec 3, 2016
1 parent 2dc0d7e commit a9cbfc4
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 14 deletions.
35 changes: 35 additions & 0 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1980,6 +1980,41 @@ def assert_runs_only_one_job_stage_and_task(job_group_name, f):
# Regression test for SPARK-17514: limit(n).collect() should the perform same as take(n)
assert_runs_only_one_job_stage_and_task("collect_limit", lambda: df.limit(1).collect())

@unittest.skipIf(sys.version_info < (3, 3), "Unittest < 3.3 doesn't support mocking")
def test_unbounded_frames(self):
from unittest.mock import patch
from pyspark.sql import functions as F
from pyspark.sql import window
import importlib

df = self.spark.range(0, 3)

def rows_frame_match():
return "ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING" in df.select(
F.count("*").over(window.Window.rowsBetween(-sys.maxsize, sys.maxsize))
).columns[0]

def range_frame_match():
return "RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING" in df.select(
F.count("*").over(window.Window.rangeBetween(-sys.maxsize, sys.maxsize))
).columns[0]

with patch("sys.maxsize", 2 ** 31 - 1):
importlib.reload(window)
self.assertTrue(rows_frame_match())
self.assertTrue(range_frame_match())

with patch("sys.maxsize", 2 ** 63 - 1):
importlib.reload(window)
self.assertTrue(rows_frame_match())
self.assertTrue(range_frame_match())

with patch("sys.maxsize", 2 ** 127 - 1):
importlib.reload(window)
self.assertTrue(rows_frame_match())
self.assertTrue(range_frame_match())

importlib.reload(window)

if __name__ == "__main__":
from pyspark.sql.tests import *
Expand Down
30 changes: 16 additions & 14 deletions python/pyspark/sql/window.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class Window(object):

_JAVA_MIN_LONG = -(1 << 63) # -9223372036854775808
_JAVA_MAX_LONG = (1 << 63) - 1 # 9223372036854775807
_PRECEDING_THRESHOLD = max(-sys.maxsize, _JAVA_MIN_LONG)
_FOLLOWING_THRESHOLD = min(sys.maxsize, _JAVA_MAX_LONG)

unboundedPreceding = _JAVA_MIN_LONG

Expand Down Expand Up @@ -98,9 +100,9 @@ def rowsBetween(start, end):
The frame is unbounded if this is ``Window.unboundedFollowing``, or
any value greater than or equal to 9223372036854775807.
"""
if start <= Window._JAVA_MIN_LONG:
if start <= Window._PRECEDING_THRESHOLD:
start = Window.unboundedPreceding
if end >= Window._JAVA_MAX_LONG:
if end >= Window._FOLLOWING_THRESHOLD:
end = Window.unboundedFollowing
sc = SparkContext._active_spark_context
jspec = sc._jvm.org.apache.spark.sql.expressions.Window.rowsBetween(start, end)
Expand All @@ -123,14 +125,14 @@ def rangeBetween(start, end):
:param start: boundary start, inclusive.
The frame is unbounded if this is ``Window.unboundedPreceding``, or
any value less than or equal to -9223372036854775808.
any value less than or equal to max(-sys.maxsize, -9223372036854775808).
:param end: boundary end, inclusive.
The frame is unbounded if this is ``Window.unboundedFollowing``, or
any value greater than or equal to 9223372036854775807.
any value greater than or equal to min(sys.maxsize, 9223372036854775807).
"""
if start <= Window._JAVA_MIN_LONG:
if start <= Window._PRECEDING_THRESHOLD:
start = Window.unboundedPreceding
if end >= Window._JAVA_MAX_LONG:
if end >= Window._FOLLOWING_THRESHOLD:
end = Window.unboundedFollowing
sc = SparkContext._active_spark_context
jspec = sc._jvm.org.apache.spark.sql.expressions.Window.rangeBetween(start, end)
Expand Down Expand Up @@ -185,14 +187,14 @@ def rowsBetween(self, start, end):
:param start: boundary start, inclusive.
The frame is unbounded if this is ``Window.unboundedPreceding``, or
any value less than or equal to -9223372036854775808.
any value less than or equal to max(-sys.maxsize, -9223372036854775808).
:param end: boundary end, inclusive.
The frame is unbounded if this is ``Window.unboundedFollowing``, or
any value greater than or equal to 9223372036854775807.
any value greater than or equal to min(sys.maxsize, 9223372036854775807).
"""
if start <= Window._JAVA_MIN_LONG:
if start <= Window._PRECEDING_THRESHOLD:
start = Window.unboundedPreceding
if end >= Window._JAVA_MAX_LONG:
if end >= Window._FOLLOWING_THRESHOLD:
end = Window.unboundedFollowing
return WindowSpec(self._jspec.rowsBetween(start, end))

Expand All @@ -211,14 +213,14 @@ def rangeBetween(self, start, end):
:param start: boundary start, inclusive.
The frame is unbounded if this is ``Window.unboundedPreceding``, or
any value less than or equal to -9223372036854775808.
any value less than or equal to max(-sys.maxsize, -9223372036854775808).
:param end: boundary end, inclusive.
The frame is unbounded if this is ``Window.unboundedFollowing``, or
any value greater than or equal to 9223372036854775807.
any value greater than or equal to min(sys.maxsize, 9223372036854775807).
"""
if start <= Window._JAVA_MIN_LONG:
if start <= Window._PRECEDING_THRESHOLD:
start = Window.unboundedPreceding
if end >= Window._JAVA_MAX_LONG:
if end >= Window._FOLLOWING_THRESHOLD:
end = Window.unboundedFollowing
return WindowSpec(self._jspec.rangeBetween(start, end))

Expand Down

0 comments on commit a9cbfc4

Please sign in to comment.