diff --git a/airflow/providers/google/cloud/openlineage/mixins.py b/airflow/providers/google/cloud/openlineage/mixins.py new file mode 100644 index 0000000000000..691cec58845ad --- /dev/null +++ b/airflow/providers/google/cloud/openlineage/mixins.py @@ -0,0 +1,271 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import copy +import json +import traceback +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from openlineage.client.facet import ( + BaseFacet, + OutputStatisticsOutputDatasetFacet, + SchemaDatasetFacet, + ) + from openlineage.client.run import Dataset + + from airflow.providers.google.cloud.openlineage.utils import BigQueryJobRunFacet + + +class _BigQueryOpenLineageMixin: + def get_openlineage_facets_on_complete(self, _): + """ + Retrieve OpenLineage data for a COMPLETE BigQuery job. + + This method retrieves statistics for the specified job_ids using the BigQueryDatasetsProvider. + It calls BigQuery API, retrieving input and output dataset info from it, as well as run-level + usage statistics. + + Run facets should contain: + - ExternalQueryRunFacet + - BigQueryJobRunFacet + + Run facets may contain: + - ErrorMessageRunFacet + + Job facets should contain: + - SqlJobFacet if operator has self.sql + + Input datasets should contain facets: + - DataSourceDatasetFacet + - SchemaDatasetFacet + + Output datasets should contain facets: + - DataSourceDatasetFacet + - SchemaDatasetFacet + - OutputStatisticsOutputDatasetFacet + """ + from openlineage.client.facet import ExternalQueryRunFacet, SqlJobFacet + + from airflow.providers.openlineage.extractors import OperatorLineage + from airflow.providers.openlineage.sqlparser import SQLParser + + if not self.job_id: + return OperatorLineage() + + run_facets: dict[str, BaseFacet] = { + "externalQuery": ExternalQueryRunFacet(externalQueryId=self.job_id, source="bigquery") + } + + job_facets = {"sql": SqlJobFacet(query=SQLParser.normalize_sql(self.sql))} + + self.client = self.hook.get_client(project_id=self.hook.project_id) + job_ids = self.job_id + if isinstance(self.job_id, str): + job_ids = [self.job_id] + inputs, outputs = [], [] + for job_id in job_ids: + inner_inputs, inner_outputs, inner_run_facets = self.get_facets(job_id=job_id) + inputs.extend(inner_inputs) + outputs.extend(inner_outputs) + run_facets.update(inner_run_facets) + + return OperatorLineage( + inputs=inputs, + outputs=outputs, + run_facets=run_facets, + job_facets=job_facets, + ) + + def get_facets(self, job_id: str): + from openlineage.client.facet import ErrorMessageRunFacet + + from airflow.providers.google.cloud.openlineage.utils import ( + BigQueryErrorRunFacet, + get_from_nullable_chain, + ) + + inputs = [] + outputs = [] + run_facets: dict[str, BaseFacet] = {} + if hasattr(self, "log"): + self.log.debug("Extracting data from bigquery job: `%s`", job_id) + try: + job = self.client.get_job(job_id=job_id) # type: ignore + props = job._properties + + if get_from_nullable_chain(props, ["status", "state"]) != "DONE": + raise ValueError(f"Trying to extract data from running bigquery job: `{job_id}`") + + # TODO: remove bigQuery_job in next release + run_facets["bigQuery_job"] = run_facets["bigQueryJob"] = self._get_bigquery_job_run_facet(props) + + if get_from_nullable_chain(props, ["statistics", "numChildJobs"]): + if hasattr(self, "log"): + self.log.debug("Found SCRIPT job. Extracting lineage from child jobs instead.") + # SCRIPT job type has no input / output information but spawns child jobs that have one + # https://cloud.google.com/bigquery/docs/information-schema-jobs#multi-statement_query_job + for child_job_id in self.client.list_jobs(parent_job=job_id): + child_job = self.client.get_job(job_id=child_job_id) # type: ignore + child_inputs, child_output = self._get_inputs_outputs_from_job(child_job._properties) + inputs.extend(child_inputs) + outputs.append(child_output) + else: + inputs, _output = self._get_inputs_outputs_from_job(props) + outputs.append(_output) + except Exception as e: + if hasattr(self, "log"): + self.log.warning("Cannot retrieve job details from BigQuery.Client. %s", e, exc_info=True) + exception_msg = traceback.format_exc() + # TODO: remove BigQueryErrorRunFacet in next release + run_facets.update( + { + "errorMessage": ErrorMessageRunFacet( + message=f"{e}: {exception_msg}", + programmingLanguage="python", + ), + "bigQuery_error": BigQueryErrorRunFacet( + clientError=f"{e}: {exception_msg}", + ), + } + ) + deduplicated_outputs = self._deduplicate_outputs(outputs) + return inputs, deduplicated_outputs, run_facets + + def _deduplicate_outputs(self, outputs: list[Dataset | None]) -> list[Dataset]: + # Sources are the same so we can compare only names + final_outputs = {} + for single_output in outputs: + if not single_output: + continue + key = single_output.name + if key not in final_outputs: + final_outputs[key] = single_output + continue + + # No OutputStatisticsOutputDatasetFacet is added to duplicated outputs as we can not determine + # if the rowCount or size can be summed together. + single_output.facets.pop("outputStatistics", None) + final_outputs[key] = single_output + + return list(final_outputs.values()) + + def _get_inputs_outputs_from_job(self, properties: dict) -> tuple[list[Dataset], Dataset | None]: + from airflow.providers.google.cloud.openlineage.utils import get_from_nullable_chain + + input_tables = get_from_nullable_chain(properties, ["statistics", "query", "referencedTables"]) or [] + output_table = get_from_nullable_chain(properties, ["configuration", "query", "destinationTable"]) + inputs = [self._get_dataset(input_table) for input_table in input_tables] + if output_table: + output = self._get_dataset(output_table) + dataset_stat_facet = self._get_statistics_dataset_facet(properties) + if dataset_stat_facet: + output.facets.update({"outputStatistics": dataset_stat_facet}) + + return inputs, output + + @staticmethod + def _get_bigquery_job_run_facet(properties: dict) -> BigQueryJobRunFacet: + from airflow.providers.google.cloud.openlineage.utils import ( + BigQueryJobRunFacet, + get_from_nullable_chain, + ) + + if get_from_nullable_chain(properties, ["configuration", "query", "query"]): + # Exclude the query to avoid event size issues and duplicating SqlJobFacet information. + properties = copy.deepcopy(properties) + properties["configuration"]["query"].pop("query") + cache_hit = get_from_nullable_chain(properties, ["statistics", "query", "cacheHit"]) + billed_bytes = get_from_nullable_chain(properties, ["statistics", "query", "totalBytesBilled"]) + return BigQueryJobRunFacet( + cached=str(cache_hit).lower() == "true", + billedBytes=int(billed_bytes) if billed_bytes else None, + properties=json.dumps(properties), + ) + + @staticmethod + def _get_statistics_dataset_facet(properties) -> OutputStatisticsOutputDatasetFacet | None: + from openlineage.client.facet import OutputStatisticsOutputDatasetFacet + + from airflow.providers.google.cloud.openlineage.utils import get_from_nullable_chain + + query_plan = get_from_nullable_chain(properties, chain=["statistics", "query", "queryPlan"]) + if not query_plan: + return None + + out_stage = query_plan[-1] + out_rows = out_stage.get("recordsWritten", None) + out_bytes = out_stage.get("shuffleOutputBytes", None) + if out_bytes and out_rows: + return OutputStatisticsOutputDatasetFacet(rowCount=int(out_rows), size=int(out_bytes)) + return None + + def _get_dataset(self, table: dict) -> Dataset: + from openlineage.client.run import Dataset + + BIGQUERY_NAMESPACE = "bigquery" + + project = table.get("projectId") + dataset = table.get("datasetId") + table_name = table.get("tableId") + dataset_name = f"{project}.{dataset}.{table_name}" + + dataset_schema = self._get_table_schema_safely(dataset_name) + return Dataset( + namespace=BIGQUERY_NAMESPACE, + name=dataset_name, + facets={ + "schema": dataset_schema, + } + if dataset_schema + else {}, + ) + + def _get_table_schema_safely(self, table_name: str) -> SchemaDatasetFacet | None: + try: + return self._get_table_schema(table_name) + except Exception as e: + if hasattr(self, "log"): + self.log.warning("Could not extract output schema from bigquery. %s", e) + return None + + def _get_table_schema(self, table: str) -> SchemaDatasetFacet | None: + from openlineage.client.facet import SchemaDatasetFacet, SchemaField + + from airflow.providers.google.cloud.openlineage.utils import get_from_nullable_chain + + bq_table = self.client.get_table(table) + + if not bq_table._properties: + return None + + fields = get_from_nullable_chain(bq_table._properties, ["schema", "fields"]) + if not fields: + return None + + return SchemaDatasetFacet( + fields=[ + SchemaField( + name=field.get("name"), + type=field.get("type"), + description=field.get("description"), + ) + for field in fields + ] + ) diff --git a/airflow/providers/google/cloud/openlineage/utils.py b/airflow/providers/google/cloud/openlineage/utils.py index fb0d4c663b179..4dc0e4030bd9d 100644 --- a/airflow/providers/google/cloud/openlineage/utils.py +++ b/airflow/providers/google/cloud/openlineage/utils.py @@ -17,9 +17,6 @@ # under the License. from __future__ import annotations -import copy -import json -import traceback from typing import TYPE_CHECKING, Any from attr import define, field @@ -29,17 +26,15 @@ ColumnLineageDatasetFacetFieldsAdditional, ColumnLineageDatasetFacetFieldsAdditionalInputFields, DocumentationDatasetFacet, - ErrorMessageRunFacet, - OutputStatisticsOutputDatasetFacet, SchemaDatasetFacet, SchemaField, ) -from openlineage.client.run import Dataset from airflow.providers.google import __version__ as provider_version if TYPE_CHECKING: from google.cloud.bigquery.table import Table + from openlineage.client.run import Dataset BIGQUERY_NAMESPACE = "bigquery" @@ -174,215 +169,3 @@ def get_from_nullable_chain(source: Any, chain: list[str]) -> Any | None: return source except AttributeError: return None - - -class _BigQueryOpenLineageMixin: - def get_openlineage_facets_on_complete(self, _): - """ - Retrieve OpenLineage data for a COMPLETE BigQuery job. - - This method retrieves statistics for the specified job_ids using the BigQueryDatasetsProvider. - It calls BigQuery API, retrieving input and output dataset info from it, as well as run-level - usage statistics. - - Run facets should contain: - - ExternalQueryRunFacet - - BigQueryJobRunFacet - - Run facets may contain: - - ErrorMessageRunFacet - - Job facets should contain: - - SqlJobFacet if operator has self.sql - - Input datasets should contain facets: - - DataSourceDatasetFacet - - SchemaDatasetFacet - - Output datasets should contain facets: - - DataSourceDatasetFacet - - SchemaDatasetFacet - - OutputStatisticsOutputDatasetFacet - """ - from openlineage.client.facet import ExternalQueryRunFacet, SqlJobFacet - - from airflow.providers.openlineage.extractors import OperatorLineage - from airflow.providers.openlineage.sqlparser import SQLParser - - if not self.job_id: - return OperatorLineage() - - run_facets: dict[str, BaseFacet] = { - "externalQuery": ExternalQueryRunFacet(externalQueryId=self.job_id, source="bigquery") - } - - job_facets = {"sql": SqlJobFacet(query=SQLParser.normalize_sql(self.sql))} - - self.client = self.hook.get_client(project_id=self.hook.project_id) - job_ids = self.job_id - if isinstance(self.job_id, str): - job_ids = [self.job_id] - inputs, outputs = [], [] - for job_id in job_ids: - inner_inputs, inner_outputs, inner_run_facets = self.get_facets(job_id=job_id) - inputs.extend(inner_inputs) - outputs.extend(inner_outputs) - run_facets.update(inner_run_facets) - - return OperatorLineage( - inputs=inputs, - outputs=outputs, - run_facets=run_facets, - job_facets=job_facets, - ) - - def get_facets(self, job_id: str): - inputs = [] - outputs = [] - run_facets: dict[str, BaseFacet] = {} - if hasattr(self, "log"): - self.log.debug("Extracting data from bigquery job: `%s`", job_id) - try: - job = self.client.get_job(job_id=job_id) # type: ignore - props = job._properties - - if get_from_nullable_chain(props, ["status", "state"]) != "DONE": - raise ValueError(f"Trying to extract data from running bigquery job: `{job_id}`") - - # TODO: remove bigQuery_job in next release - run_facets["bigQuery_job"] = run_facets["bigQueryJob"] = self._get_bigquery_job_run_facet(props) - - if get_from_nullable_chain(props, ["statistics", "numChildJobs"]): - if hasattr(self, "log"): - self.log.debug("Found SCRIPT job. Extracting lineage from child jobs instead.") - # SCRIPT job type has no input / output information but spawns child jobs that have one - # https://cloud.google.com/bigquery/docs/information-schema-jobs#multi-statement_query_job - for child_job_id in self.client.list_jobs(parent_job=job_id): - child_job = self.client.get_job(job_id=child_job_id) # type: ignore - child_inputs, child_output = self._get_inputs_outputs_from_job(child_job._properties) - inputs.extend(child_inputs) - outputs.append(child_output) - else: - inputs, _output = self._get_inputs_outputs_from_job(props) - outputs.append(_output) - except Exception as e: - if hasattr(self, "log"): - self.log.warning("Cannot retrieve job details from BigQuery.Client. %s", e, exc_info=True) - exception_msg = traceback.format_exc() - # TODO: remove BigQueryErrorRunFacet in next release - run_facets.update( - { - "errorMessage": ErrorMessageRunFacet( - message=f"{e}: {exception_msg}", - programmingLanguage="python", - ), - "bigQuery_error": BigQueryErrorRunFacet( - clientError=f"{e}: {exception_msg}", - ), - } - ) - deduplicated_outputs = self._deduplicate_outputs(outputs) - return inputs, deduplicated_outputs, run_facets - - def _deduplicate_outputs(self, outputs: list[Dataset | None]) -> list[Dataset]: - # Sources are the same so we can compare only names - final_outputs = {} - for single_output in outputs: - if not single_output: - continue - key = single_output.name - if key not in final_outputs: - final_outputs[key] = single_output - continue - - # No OutputStatisticsOutputDatasetFacet is added to duplicated outputs as we can not determine - # if the rowCount or size can be summed together. - single_output.facets.pop("outputStatistics", None) - final_outputs[key] = single_output - - return list(final_outputs.values()) - - def _get_inputs_outputs_from_job(self, properties: dict) -> tuple[list[Dataset], Dataset | None]: - input_tables = get_from_nullable_chain(properties, ["statistics", "query", "referencedTables"]) or [] - output_table = get_from_nullable_chain(properties, ["configuration", "query", "destinationTable"]) - inputs = [self._get_dataset(input_table) for input_table in input_tables] - if output_table: - output = self._get_dataset(output_table) - dataset_stat_facet = self._get_statistics_dataset_facet(properties) - if dataset_stat_facet: - output.facets.update({"outputStatistics": dataset_stat_facet}) - - return inputs, output - - @staticmethod - def _get_bigquery_job_run_facet(properties: dict) -> BigQueryJobRunFacet: - if get_from_nullable_chain(properties, ["configuration", "query", "query"]): - # Exclude the query to avoid event size issues and duplicating SqlJobFacet information. - properties = copy.deepcopy(properties) - properties["configuration"]["query"].pop("query") - cache_hit = get_from_nullable_chain(properties, ["statistics", "query", "cacheHit"]) - billed_bytes = get_from_nullable_chain(properties, ["statistics", "query", "totalBytesBilled"]) - return BigQueryJobRunFacet( - cached=str(cache_hit).lower() == "true", - billedBytes=int(billed_bytes) if billed_bytes else None, - properties=json.dumps(properties), - ) - - @staticmethod - def _get_statistics_dataset_facet(properties) -> OutputStatisticsOutputDatasetFacet | None: - query_plan = get_from_nullable_chain(properties, chain=["statistics", "query", "queryPlan"]) - if not query_plan: - return None - - out_stage = query_plan[-1] - out_rows = out_stage.get("recordsWritten", None) - out_bytes = out_stage.get("shuffleOutputBytes", None) - if out_bytes and out_rows: - return OutputStatisticsOutputDatasetFacet(rowCount=int(out_rows), size=int(out_bytes)) - return None - - def _get_dataset(self, table: dict) -> Dataset: - project = table.get("projectId") - dataset = table.get("datasetId") - table_name = table.get("tableId") - dataset_name = f"{project}.{dataset}.{table_name}" - - dataset_schema = self._get_table_schema_safely(dataset_name) - return Dataset( - namespace=BIGQUERY_NAMESPACE, - name=dataset_name, - facets={ - "schema": dataset_schema, - } - if dataset_schema - else {}, - ) - - def _get_table_schema_safely(self, table_name: str) -> SchemaDatasetFacet | None: - try: - return self._get_table_schema(table_name) - except Exception as e: - if hasattr(self, "log"): - self.log.warning("Could not extract output schema from bigquery. %s", e) - return None - - def _get_table_schema(self, table: str) -> SchemaDatasetFacet | None: - bq_table = self.client.get_table(table) - - if not bq_table._properties: - return None - - fields = get_from_nullable_chain(bq_table._properties, ["schema", "fields"]) - if not fields: - return None - - return SchemaDatasetFacet( - fields=[ - SchemaField( - name=field.get("name"), - type=field.get("type"), - description=field.get("description"), - ) - for field in fields - ] - ) diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py index 83786ae762d96..846f884e771e4 100644 --- a/airflow/providers/google/cloud/operators/bigquery.py +++ b/airflow/providers/google/cloud/operators/bigquery.py @@ -47,7 +47,7 @@ from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook, BigQueryJob from airflow.providers.google.cloud.hooks.gcs import GCSHook, _parse_gcs_url from airflow.providers.google.cloud.links.bigquery import BigQueryDatasetLink, BigQueryTableLink -from airflow.providers.google.cloud.openlineage.utils import _BigQueryOpenLineageMixin +from airflow.providers.google.cloud.openlineage.mixins import _BigQueryOpenLineageMixin from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator from airflow.providers.google.cloud.triggers.bigquery import ( BigQueryCheckTrigger, @@ -67,6 +67,7 @@ from airflow.models.taskinstancekey import TaskInstanceKey from airflow.utils.context import Context + BIGQUERY_JOB_DETAILS_LINK_FMT = "https://console.cloud.google.com/bigquery?j={job_id}" LABEL_REGEX = re.compile(r"^[\w-]{0,63}$") diff --git a/tests/providers/google/cloud/openlineage/test_mixins.py b/tests/providers/google/cloud/openlineage/test_mixins.py new file mode 100644 index 0000000000000..50e90d29a3c91 --- /dev/null +++ b/tests/providers/google/cloud/openlineage/test_mixins.py @@ -0,0 +1,255 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import json +from unittest.mock import MagicMock + +import pytest +from openlineage.client.facet import ( + ExternalQueryRunFacet, + OutputStatisticsOutputDatasetFacet, + SchemaDatasetFacet, + SchemaField, +) +from openlineage.client.run import Dataset + +from airflow.providers.google.cloud.openlineage.mixins import _BigQueryOpenLineageMixin +from airflow.providers.google.cloud.openlineage.utils import ( + BigQueryJobRunFacet, +) + + +def read_file_json(file): + with open(file=file) as f: + return json.loads(f.read()) + + +class TableMock(MagicMock): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.inputs = [ + read_file_json("tests/providers/google/cloud/utils/table_details.json"), + read_file_json("tests/providers/google/cloud/utils/out_table_details.json"), + ] + + @property + def _properties(self): + return self.inputs.pop() + + +class TestBigQueryOpenLineageMixin: + def setup_method(self): + self.job_details = read_file_json("tests/providers/google/cloud/utils/job_details.json") + self.script_job_details = read_file_json("tests/providers/google/cloud/utils/script_job_details.json") + hook = MagicMock() + self.client = MagicMock() + + class BQOperator(_BigQueryOpenLineageMixin): + sql = "" + job_id = "job_id" + + @property + def hook(self): + return hook + + hook.get_client.return_value = self.client + + self.client.get_table.return_value = TableMock() + + self.operator = BQOperator() + + def test_bq_job_information(self): + self.client.get_job.return_value._properties = self.job_details + + lineage = self.operator.get_openlineage_facets_on_complete(None) + + self.job_details["configuration"]["query"].pop("query") + assert lineage.run_facets == { + "bigQuery_job": BigQueryJobRunFacet( + cached=False, billedBytes=111149056, properties=json.dumps(self.job_details) + ), + "bigQueryJob": BigQueryJobRunFacet( + cached=False, billedBytes=111149056, properties=json.dumps(self.job_details) + ), + "externalQuery": ExternalQueryRunFacet(externalQueryId="job_id", source="bigquery"), + } + assert lineage.inputs == [ + Dataset( + namespace="bigquery", + name="airflow-openlineage.new_dataset.test_table", + facets={ + "schema": SchemaDatasetFacet( + fields=[ + SchemaField("state", "STRING", "2-digit state code"), + SchemaField("gender", "STRING", "Sex (M=male or F=female)"), + SchemaField("year", "INTEGER", "4-digit year of birth"), + SchemaField("name", "STRING", "Given name of a person at birth"), + SchemaField("number", "INTEGER", "Number of occurrences of the name"), + ] + ) + }, + ) + ] + assert lineage.outputs == [ + Dataset( + namespace="bigquery", + name="airflow-openlineage.new_dataset.output_table", + facets={ + "outputStatistics": OutputStatisticsOutputDatasetFacet( + rowCount=20, size=321, fileCount=None + ) + }, + ), + ] + + def test_bq_script_job_information(self): + self.client.get_job.side_effect = [ + MagicMock(_properties=self.script_job_details), + MagicMock(_properties=self.job_details), + ] + self.client.list_jobs.return_value = ["child_job_id"] + + lineage = self.operator.get_openlineage_facets_on_complete(None) + + self.script_job_details["configuration"]["query"].pop("query") + assert lineage.run_facets == { + "bigQueryJob": BigQueryJobRunFacet( + cached=False, billedBytes=120586240, properties=json.dumps(self.script_job_details) + ), + "bigQuery_job": BigQueryJobRunFacet( + cached=False, billedBytes=120586240, properties=json.dumps(self.script_job_details) + ), + "externalQuery": ExternalQueryRunFacet(externalQueryId="job_id", source="bigquery"), + } + assert lineage.inputs == [ + Dataset( + namespace="bigquery", + name="airflow-openlineage.new_dataset.test_table", + facets={ + "schema": SchemaDatasetFacet( + fields=[ + SchemaField("state", "STRING", "2-digit state code"), + SchemaField("gender", "STRING", "Sex (M=male or F=female)"), + SchemaField("year", "INTEGER", "4-digit year of birth"), + SchemaField("name", "STRING", "Given name of a person at birth"), + SchemaField("number", "INTEGER", "Number of occurrences of the name"), + ] + ) + }, + ) + ] + assert lineage.outputs == [ + Dataset( + namespace="bigquery", + name="airflow-openlineage.new_dataset.output_table", + facets={ + "outputStatistics": OutputStatisticsOutputDatasetFacet( + rowCount=20, size=321, fileCount=None + ) + }, + ), + ] + + def test_deduplicate_outputs(self): + outputs = [ + None, + Dataset( + name="d1", namespace="", facets={"outputStatistics": OutputStatisticsOutputDatasetFacet(3, 4)} + ), + Dataset( + name="d1", + namespace="", + facets={"outputStatistics": OutputStatisticsOutputDatasetFacet(3, 4), "t1": "t1"}, + ), + Dataset( + name="d2", + namespace="", + facets={"outputStatistics": OutputStatisticsOutputDatasetFacet(6, 7), "t2": "t2"}, + ), + Dataset( + name="d2", + namespace="", + facets={"outputStatistics": OutputStatisticsOutputDatasetFacet(60, 70), "t20": "t20"}, + ), + ] + result = self.operator._deduplicate_outputs(outputs) + assert len(result) == 2 + first_result = result[0] + assert first_result.name == "d1" + assert first_result.facets == {"t1": "t1"} + second_result = result[1] + assert second_result.name == "d2" + assert second_result.facets == {"t20": "t20"} + + @pytest.mark.parametrize("cache", (None, "false", False, 0)) + def test_get_job_run_facet_no_cache_and_with_bytes(self, cache): + properties = { + "statistics": {"query": {"cacheHit": cache, "totalBytesBilled": 10}}, + "configuration": {"query": {"query": "SELECT ..."}}, + } + result = self.operator._get_bigquery_job_run_facet(properties) + assert result.cached is False + assert result.billedBytes == 10 + properties["configuration"]["query"].pop("query") + assert result.properties == json.dumps(properties) + + @pytest.mark.parametrize("cache", ("true", True)) + def test_get_job_run_facet_with_cache_and_no_bytes(self, cache): + properties = { + "statistics": { + "query": { + "cacheHit": cache, + } + }, + "configuration": {"query": {"query": "SELECT ..."}}, + } + result = self.operator._get_bigquery_job_run_facet(properties) + assert result.cached is True + assert result.billedBytes is None + properties["configuration"]["query"].pop("query") + assert result.properties == json.dumps(properties) + + def test_get_statistics_dataset_facet_no_query_plan(self): + properties = { + "statistics": {"query": {"totalBytesBilled": 10}}, + "configuration": {"query": {"query": "SELECT ..."}}, + } + result = self.operator._get_statistics_dataset_facet(properties) + assert result is None + + def test_get_statistics_dataset_facet_no_stats(self): + properties = { + "statistics": {"query": {"totalBytesBilled": 10, "queryPlan": [{"test": "test"}]}}, + "configuration": {"query": {"query": "SELECT ..."}}, + } + result = self.operator._get_statistics_dataset_facet(properties) + assert result is None + + def test_get_statistics_dataset_facet_with_stats(self): + properties = { + "statistics": { + "query": { + "totalBytesBilled": 10, + "queryPlan": [{"recordsWritten": 123, "shuffleOutputBytes": "321"}], + } + }, + "configuration": {"query": {"query": "SELECT ..."}}, + } + result = self.operator._get_statistics_dataset_facet(properties) + assert result.rowCount == 123 + assert result.size == 321 diff --git a/tests/providers/google/cloud/openlineage/test_utils.py b/tests/providers/google/cloud/openlineage/test_utils.py index c4543fac1e0c8..19c4b1d0bee0c 100644 --- a/tests/providers/google/cloud/openlineage/test_utils.py +++ b/tests/providers/google/cloud/openlineage/test_utils.py @@ -26,16 +26,12 @@ ColumnLineageDatasetFacetFieldsAdditional, ColumnLineageDatasetFacetFieldsAdditionalInputFields, DocumentationDatasetFacet, - ExternalQueryRunFacet, - OutputStatisticsOutputDatasetFacet, SchemaDatasetFacet, SchemaField, ) from openlineage.client.run import Dataset from airflow.providers.google.cloud.openlineage.utils import ( - BigQueryJobRunFacet, - _BigQueryOpenLineageMixin, get_facets_from_bq_table, get_identity_column_lineage_facet, ) @@ -78,209 +74,6 @@ def _properties(self): return self.inputs.pop() -class TestBigQueryOpenLineageMixin: - def setup_method(self): - self.job_details = read_file_json("tests/providers/google/cloud/utils/job_details.json") - self.script_job_details = read_file_json("tests/providers/google/cloud/utils/script_job_details.json") - hook = MagicMock() - self.client = MagicMock() - - class BQOperator(_BigQueryOpenLineageMixin): - sql = "" - job_id = "job_id" - - @property - def hook(self): - return hook - - hook.get_client.return_value = self.client - - self.client.get_table.return_value = TableMock() - - self.operator = BQOperator() - - def test_bq_job_information(self): - self.client.get_job.return_value._properties = self.job_details - - lineage = self.operator.get_openlineage_facets_on_complete(None) - - self.job_details["configuration"]["query"].pop("query") - assert lineage.run_facets == { - "bigQuery_job": BigQueryJobRunFacet( - cached=False, billedBytes=111149056, properties=json.dumps(self.job_details) - ), - "bigQueryJob": BigQueryJobRunFacet( - cached=False, billedBytes=111149056, properties=json.dumps(self.job_details) - ), - "externalQuery": ExternalQueryRunFacet(externalQueryId="job_id", source="bigquery"), - } - assert lineage.inputs == [ - Dataset( - namespace="bigquery", - name="airflow-openlineage.new_dataset.test_table", - facets={ - "schema": SchemaDatasetFacet( - fields=[ - SchemaField("state", "STRING", "2-digit state code"), - SchemaField("gender", "STRING", "Sex (M=male or F=female)"), - SchemaField("year", "INTEGER", "4-digit year of birth"), - SchemaField("name", "STRING", "Given name of a person at birth"), - SchemaField("number", "INTEGER", "Number of occurrences of the name"), - ] - ) - }, - ) - ] - assert lineage.outputs == [ - Dataset( - namespace="bigquery", - name="airflow-openlineage.new_dataset.output_table", - facets={ - "outputStatistics": OutputStatisticsOutputDatasetFacet( - rowCount=20, size=321, fileCount=None - ) - }, - ), - ] - - def test_bq_script_job_information(self): - self.client.get_job.side_effect = [ - MagicMock(_properties=self.script_job_details), - MagicMock(_properties=self.job_details), - ] - self.client.list_jobs.return_value = ["child_job_id"] - - lineage = self.operator.get_openlineage_facets_on_complete(None) - - self.script_job_details["configuration"]["query"].pop("query") - assert lineage.run_facets == { - "bigQueryJob": BigQueryJobRunFacet( - cached=False, billedBytes=120586240, properties=json.dumps(self.script_job_details) - ), - "bigQuery_job": BigQueryJobRunFacet( - cached=False, billedBytes=120586240, properties=json.dumps(self.script_job_details) - ), - "externalQuery": ExternalQueryRunFacet(externalQueryId="job_id", source="bigquery"), - } - assert lineage.inputs == [ - Dataset( - namespace="bigquery", - name="airflow-openlineage.new_dataset.test_table", - facets={ - "schema": SchemaDatasetFacet( - fields=[ - SchemaField("state", "STRING", "2-digit state code"), - SchemaField("gender", "STRING", "Sex (M=male or F=female)"), - SchemaField("year", "INTEGER", "4-digit year of birth"), - SchemaField("name", "STRING", "Given name of a person at birth"), - SchemaField("number", "INTEGER", "Number of occurrences of the name"), - ] - ) - }, - ) - ] - assert lineage.outputs == [ - Dataset( - namespace="bigquery", - name="airflow-openlineage.new_dataset.output_table", - facets={ - "outputStatistics": OutputStatisticsOutputDatasetFacet( - rowCount=20, size=321, fileCount=None - ) - }, - ), - ] - - def test_deduplicate_outputs(self): - outputs = [ - None, - Dataset( - name="d1", namespace="", facets={"outputStatistics": OutputStatisticsOutputDatasetFacet(3, 4)} - ), - Dataset( - name="d1", - namespace="", - facets={"outputStatistics": OutputStatisticsOutputDatasetFacet(3, 4), "t1": "t1"}, - ), - Dataset( - name="d2", - namespace="", - facets={"outputStatistics": OutputStatisticsOutputDatasetFacet(6, 7), "t2": "t2"}, - ), - Dataset( - name="d2", - namespace="", - facets={"outputStatistics": OutputStatisticsOutputDatasetFacet(60, 70), "t20": "t20"}, - ), - ] - result = self.operator._deduplicate_outputs(outputs) - assert len(result) == 2 - first_result = result[0] - assert first_result.name == "d1" - assert first_result.facets == {"t1": "t1"} - second_result = result[1] - assert second_result.name == "d2" - assert second_result.facets == {"t20": "t20"} - - @pytest.mark.parametrize("cache", (None, "false", False, 0)) - def test_get_job_run_facet_no_cache_and_with_bytes(self, cache): - properties = { - "statistics": {"query": {"cacheHit": cache, "totalBytesBilled": 10}}, - "configuration": {"query": {"query": "SELECT ..."}}, - } - result = self.operator._get_bigquery_job_run_facet(properties) - assert result.cached is False - assert result.billedBytes == 10 - properties["configuration"]["query"].pop("query") - assert result.properties == json.dumps(properties) - - @pytest.mark.parametrize("cache", ("true", True)) - def test_get_job_run_facet_with_cache_and_no_bytes(self, cache): - properties = { - "statistics": { - "query": { - "cacheHit": cache, - } - }, - "configuration": {"query": {"query": "SELECT ..."}}, - } - result = self.operator._get_bigquery_job_run_facet(properties) - assert result.cached is True - assert result.billedBytes is None - properties["configuration"]["query"].pop("query") - assert result.properties == json.dumps(properties) - - def test_get_statistics_dataset_facet_no_query_plan(self): - properties = { - "statistics": {"query": {"totalBytesBilled": 10}}, - "configuration": {"query": {"query": "SELECT ..."}}, - } - result = self.operator._get_statistics_dataset_facet(properties) - assert result is None - - def test_get_statistics_dataset_facet_no_stats(self): - properties = { - "statistics": {"query": {"totalBytesBilled": 10, "queryPlan": [{"test": "test"}]}}, - "configuration": {"query": {"query": "SELECT ..."}}, - } - result = self.operator._get_statistics_dataset_facet(properties) - assert result is None - - def test_get_statistics_dataset_facet_with_stats(self): - properties = { - "statistics": { - "query": { - "totalBytesBilled": 10, - "queryPlan": [{"recordsWritten": 123, "shuffleOutputBytes": "321"}], - } - }, - "configuration": {"query": {"query": "SELECT ..."}}, - } - result = self.operator._get_statistics_dataset_facet(properties) - assert result.rowCount == 123 - assert result.size == 321 - - def test_get_facets_from_bq_table(): expected_facets = { "schema": SchemaDatasetFacet(