Skip to content

Commit

Permalink
[SPARK-49849][CONNECT][PYTHON] API compatibility check for Structured…
Browse files Browse the repository at this point in the history
… Streaming Query Management

### What changes were proposed in this pull request?

This PR proposes to add API compatibility check for Spark SQL Structured Streaming Query Management functions

### Why are the changes needed?

To guarantee of the same behavior between Spark Classic and Spark Connect

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added UTs

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#48581 from itholic/compat_streaming_query.

Authored-by: Haejoon Lee <haejoon.lee@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
itholic authored and HyukjinKwon committed Oct 22, 2024
1 parent d5419ab commit 2c904e4
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 2 deletions.
2 changes: 1 addition & 1 deletion python/pyspark/sql/connect/streaming/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def active(self) -> List[StreamingQuery]:

active.__doc__ = PySparkStreamingQueryManager.active.__doc__

def get(self, id: str) -> Optional[StreamingQuery]:
def get(self, id: str) -> Optional["StreamingQuery"]:
cmd = pb2.StreamingQueryManagerCommand()
cmd.get_query = id
response = self._execute_streaming_query_manager_cmd(cmd)
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/sql/streaming/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ def active(self) -> List[StreamingQuery]:
"""
return [StreamingQuery(jsq) for jsq in self._jsqm.active()]

def get(self, id: str) -> Optional[StreamingQuery]:
def get(self, id: str) -> Optional["StreamingQuery"]:
"""
Returns an active query from this :class:`SparkSession`.
Expand Down
38 changes: 38 additions & 0 deletions python/pyspark/sql/tests/test_connect_compatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
from pyspark.sql.group import GroupedData as ClassicGroupedData
import pyspark.sql.avro.functions as ClassicAvro
import pyspark.sql.protobuf.functions as ClassicProtobuf
from pyspark.sql.streaming.query import StreamingQuery as ClassicStreamingQuery
from pyspark.sql.streaming.query import StreamingQueryManager as ClassicStreamingQueryManager

if should_test_connect:
from pyspark.sql.connect.dataframe import DataFrame as ConnectDataFrame
Expand All @@ -49,6 +51,10 @@
from pyspark.sql.connect.group import GroupedData as ConnectGroupedData
import pyspark.sql.connect.avro.functions as ConnectAvro
import pyspark.sql.connect.protobuf.functions as ConnectProtobuf
from pyspark.sql.connect.streaming.query import StreamingQuery as ConnectStreamingQuery
from pyspark.sql.connect.streaming.query import (
StreamingQueryManager as ConnectStreamingQueryManager,
)


class ConnectCompatibilityTestsMixin:
Expand Down Expand Up @@ -401,6 +407,22 @@ def test_avro_compatibility(self):
expected_missing_classic_methods,
)

def test_streaming_query_compatibility(self):
"""Test Streaming Query compatibility between classic and connect."""
expected_missing_connect_properties = set()
expected_missing_classic_properties = set()
expected_missing_connect_methods = set()
expected_missing_classic_methods = set()
self.check_compatibility(
ClassicStreamingQuery,
ConnectStreamingQuery,
"StreamingQuery",
expected_missing_connect_properties,
expected_missing_classic_properties,
expected_missing_connect_methods,
expected_missing_classic_methods,
)

def test_protobuf_compatibility(self):
"""Test Protobuf compatibility between classic and connect."""
expected_missing_connect_properties = set()
Expand All @@ -423,6 +445,22 @@ def test_protobuf_compatibility(self):
expected_missing_classic_methods,
)

def test_streaming_query_manager_compatibility(self):
"""Test Streaming Query Manager compatibility between classic and connect."""
expected_missing_connect_properties = set()
expected_missing_classic_properties = set()
expected_missing_connect_methods = set()
expected_missing_classic_methods = {"close"}
self.check_compatibility(
ClassicStreamingQueryManager,
ConnectStreamingQueryManager,
"StreamingQueryManager",
expected_missing_connect_properties,
expected_missing_classic_properties,
expected_missing_connect_methods,
expected_missing_classic_methods,
)


@unittest.skipIf(not should_test_connect, connect_requirement_message)
class ConnectCompatibilityTests(ConnectCompatibilityTestsMixin, ReusedSQLTestCase):
Expand Down

0 comments on commit 2c904e4

Please sign in to comment.