Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: remove deprecated BigQuery facets from OpenLineage utils #44838

Merged
merged 1 commit into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

14 changes: 3 additions & 11 deletions providers/src/airflow/providers/google/cloud/openlineage/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,7 @@ def get_openlineage_facets_on_complete(self, _):

def get_facets(self, job_id: str):
from airflow.providers.common.compat.openlineage.facet import ErrorMessageRunFacet
from airflow.providers.google.cloud.openlineage.utils import (
BigQueryErrorRunFacet,
get_from_nullable_chain,
)
from airflow.providers.google.cloud.openlineage.utils import get_from_nullable_chain

inputs = []
outputs = []
Expand All @@ -125,8 +122,7 @@ def get_facets(self, job_id: str):
if get_from_nullable_chain(props, ["status", "state"]) != "DONE":
raise ValueError(f"Trying to extract data from running bigquery job: `{job_id}`")

# TODO: remove bigQuery_job in next release
run_facets["bigQuery_job"] = run_facets["bigQueryJob"] = self._get_bigquery_job_run_facet(props)
run_facets["bigQueryJob"] = self._get_bigquery_job_run_facet(props)

if get_from_nullable_chain(props, ["statistics", "numChildJobs"]):
if hasattr(self, "log"):
Expand All @@ -145,16 +141,12 @@ def get_facets(self, job_id: str):
if hasattr(self, "log"):
self.log.warning("Cannot retrieve job details from BigQuery.Client. %s", e, exc_info=True)
exception_msg = traceback.format_exc()
# TODO: remove BigQueryErrorRunFacet in next release
run_facets.update(
{
"errorMessage": ErrorMessageRunFacet(
message=f"{e}: {exception_msg}",
programmingLanguage="python",
),
"bigQuery_error": BigQueryErrorRunFacet(
clientError=f"{e}: {exception_msg}",
),
)
}
)
deduplicated_outputs = self._deduplicate_outputs(outputs)
Expand Down
22 changes: 0 additions & 22 deletions providers/src/airflow/providers/google/cloud/openlineage/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,28 +218,6 @@ def _get_schema() -> str:
)


# TODO: remove BigQueryErrorRunFacet in next release
@define
class BigQueryErrorRunFacet(RunFacet):
"""
Represents errors that can happen during execution of BigqueryExtractor.

:param clientError: represents errors originating in bigquery client
:param parserError: represents errors that happened during parsing SQL provided to bigquery
"""

clientError: str | None = field(default=None)
parserError: str | None = field(default=None)

@staticmethod
def _get_schema() -> str:
return (
"https://raw.githubusercontent.com/apache/airflow/"
f"providers-google/{provider_version}/airflow/providers/google/"
"openlineage/BigQueryErrorRunFacet.json"
)


def get_from_nullable_chain(source: Any, chain: list[str]) -> Any | None:
"""
Get object from nested structure of objects, where it's not guaranteed that all keys in the nested structure exist.
Expand Down
6 changes: 0 additions & 6 deletions providers/tests/google/cloud/openlineage/test_mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,6 @@ def test_bq_job_information(self):

self.job_details["configuration"]["query"].pop("query")
assert lineage.run_facets == {
"bigQuery_job": BigQueryJobRunFacet(
cached=False, billedBytes=111149056, properties=json.dumps(self.job_details)
),
"bigQueryJob": BigQueryJobRunFacet(
cached=False, billedBytes=111149056, properties=json.dumps(self.job_details)
),
Expand Down Expand Up @@ -136,9 +133,6 @@ def test_bq_script_job_information(self):
"bigQueryJob": BigQueryJobRunFacet(
cached=False, billedBytes=120586240, properties=json.dumps(self.script_job_details)
),
"bigQuery_job": BigQueryJobRunFacet(
cached=False, billedBytes=120586240, properties=json.dumps(self.script_job_details)
),
"externalQuery": ExternalQueryRunFacet(externalQueryId="job_id", source="bigquery"),
}
assert lineage.inputs == [
Expand Down
1 change: 0 additions & 1 deletion providers/tests/google/cloud/operators/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -1614,7 +1614,6 @@ def test_execute_openlineage_events(self, mock_hook):
]

assert lineage.run_facets == {
"bigQuery_job": mock.ANY,
"bigQueryJob": mock.ANY,
"externalQuery": ExternalQueryRunFacet(externalQueryId=mock.ANY, source="bigquery"),
}
Expand Down