Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ADAP-1166: Add table format telemetry reporting to Spark adapter (#517) #1174

Merged
merged 2 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs:
code-quality:
name: code-quality

runs-on: ubuntu-latest
runs-on: ubuntu-22.04
timeout-minutes: 10

steps:
Expand Down Expand Up @@ -69,7 +69,7 @@ jobs:
unit:
name: unit test / python ${{ matrix.python-version }}

runs-on: ubuntu-latest
runs-on: ubuntu-22.04
timeout-minutes: 10

strategy:
Expand Down Expand Up @@ -114,7 +114,7 @@ jobs:
build:
name: build packages

runs-on: ubuntu-latest
runs-on: ubuntu-22.04

outputs:
is_alpha: ${{ steps.check-is-alpha.outputs.is_alpha }}
Expand Down Expand Up @@ -172,7 +172,7 @@ jobs:
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest, macos-14, windows-latest]
os: [ubuntu-22.04, macos-14, windows-latest]
python-version: ["3.9", "3.10", "3.11", "3.12"]
dist-type: ["whl", "gz"]

Expand Down
20 changes: 20 additions & 0 deletions dbt/adapters/spark/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,26 @@ def debug_query(self) -> None:
"""Override for DebugTask method"""
self.execute("select 1 as id")

@classmethod
def _get_adapter_specific_run_info(cls, config: RelationConfig) -> Dict[str, Any]:
table_format: Optional[str] = None
# Full table_format support within this adapter is coming. Until then, for telemetry,
# we're relying on table_formats_within_file_formats - a subset of file_format values
table_formats_within_file_formats = ["delta", "iceberg", "hive", "hudi"]

if (
config
and hasattr(config, "_extra")
and (file_format := config._extra.get("file_format"))
):
if file_format in table_formats_within_file_formats:
table_format = file_format

return {
"adapter_type": "spark",
"table_format": table_format,
}


# spark does something interesting with joins when both tables have the same
# static values for the join condition and complains that the join condition is
Expand Down
51 changes: 51 additions & 0 deletions tests/unit/test_adapter_telemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from unittest import mock

import dbt.adapters.spark.__version__

from dbt.adapters.spark.impl import SparkAdapter
from dbt.adapters.base.relation import AdapterTrackingRelationInfo


def assert_telemetry_data(adapter_type: str, file_format: str):
table_formats_within_file_formats = ["delta", "iceberg", "hive", "hudi"]
expected_table_format = None
if file_format in table_formats_within_file_formats:
expected_table_format = file_format

mock_model_config = mock.MagicMock()
mock_model_config._extra = mock.MagicMock()
mock_model_config._extra = {
"adapter_type": adapter_type,
"file_format": file_format,
}

res = SparkAdapter.get_adapter_run_info(mock_model_config)

assert res.adapter_name == adapter_type
assert res.base_adapter_version == dbt.adapters.__about__.version
assert res.adapter_version == dbt.adapters.spark.__version__.version

assert res.model_adapter_details == {
"adapter_type": adapter_type,
"table_format": expected_table_format,
}

assert type(res) is AdapterTrackingRelationInfo


def test_telemetry_with_spark_details():
spark_file_formats = [
"text",
"csv",
"json",
"jdbc",
"parquet",
"orc",
"hive",
"delta",
"iceberg",
"libsvm",
"hudi",
]
for file_format in spark_file_formats:
assert_telemetry_data("spark", file_format)
Loading