From 70c15eee78a0f4ff3abf0ebd6dcb7295767a52f6 Mon Sep 17 00:00:00 2001 From: Kacper Muda Date: Wed, 20 Nov 2024 16:29:30 +0100 Subject: [PATCH] feat: add OpenLineage support for BigQueryToBigQueryOperator Signed-off-by: Kacper Muda --- .../google/cloud/openlineage/utils.py | 75 ++-- .../cloud/transfers/bigquery_to_bigquery.py | 90 ++++- .../google/cloud/transfers/bigquery_to_gcs.py | 13 +- .../google/cloud/transfers/gcs_to_bigquery.py | 19 +- .../google/cloud/openlineage/test_utils.py | 137 +++++++- .../transfers/test_bigquery_to_bigquery.py | 332 +++++++++++++++++- .../cloud/transfers/test_bigquery_to_gcs.py | 21 +- .../cloud/transfers/test_gcs_to_bigquery.py | 11 +- 8 files changed, 594 insertions(+), 104 deletions(-) diff --git a/providers/src/airflow/providers/google/cloud/openlineage/utils.py b/providers/src/airflow/providers/google/cloud/openlineage/utils.py index 82172d5d241c9..403023f7b4315 100644 --- a/providers/src/airflow/providers/google/cloud/openlineage/utils.py +++ b/providers/src/airflow/providers/google/cloud/openlineage/utils.py @@ -27,6 +27,7 @@ from airflow.providers.common.compat.openlineage.facet import Dataset from airflow.providers.common.compat.openlineage.facet import ( + BaseFacet, ColumnLineageDatasetFacet, DocumentationDatasetFacet, Fields, @@ -41,50 +42,82 @@ BIGQUERY_URI = "bigquery" -def get_facets_from_bq_table(table: Table) -> dict[Any, Any]: +def get_facets_from_bq_table(table: Table) -> dict[str, BaseFacet]: """Get facets from BigQuery table object.""" - facets = { - "schema": SchemaDatasetFacet( + facets: dict[str, BaseFacet] = {} + if table.schema: + facets["schema"] = SchemaDatasetFacet( fields=[ SchemaDatasetFacetFields( - name=field.name, type=field.field_type, description=field.description + name=schema_field.name, type=schema_field.field_type, description=schema_field.description ) - for field in table.schema + for schema_field in table.schema ] - ), - "documentation": DocumentationDatasetFacet(description=table.description or ""), - } + ) + if table.description: + facets["documentation"] = DocumentationDatasetFacet(description=table.description) return facets def get_identity_column_lineage_facet( - field_names: list[str], + dest_field_names: list[str], input_datasets: list[Dataset], -) -> ColumnLineageDatasetFacet: +) -> dict[str, ColumnLineageDatasetFacet]: """ - Get column lineage facet. - - Simple lineage will be created, where each source column corresponds to single destination column - in each input dataset and there are no transformations made. + Get column lineage facet for identity transformations. + + This function generates a simple column lineage facet, where each destination column + consists of source columns of the same name from all input datasets that have that column. + The lineage assumes there are no transformations applied, meaning the columns retain their + identity between the source and destination datasets. + + Args: + dest_field_names: A list of destination column names for which lineage should be determined. + input_datasets: A list of input datasets with schema facets. + + Returns: + A dictionary containing a single key, `columnLineage`, mapped to a `ColumnLineageDatasetFacet`. + If no column lineage can be determined, an empty dictionary is returned - see Notes below. + + Notes: + - If any input dataset lacks a schema facet, the function immediately returns an empty dictionary. + - If any field in the source dataset's schema is not present in the destination table, + the function returns an empty dictionary. The destination table can contain extra fields, but all + source columns should be present in the destination table. + - If none of the destination columns can be matched to input dataset columns, an empty + dictionary is returned. + - Extra columns in the destination table that do not exist in the input datasets are ignored and + skipped in the lineage facet, as they cannot be traced back to a source column. + - The function assumes there are no transformations applied, meaning the columns retain their + identity between the source and destination datasets. """ - if field_names and not input_datasets: - raise ValueError("When providing `field_names` You must provide at least one `input_dataset`.") + fields_sources: dict[str, list[Dataset]] = {} + for ds in input_datasets: + if not ds.facets or "schema" not in ds.facets: + return {} + for schema_field in ds.facets["schema"].fields: # type: ignore[attr-defined] + if schema_field.name not in dest_field_names: + return {} + fields_sources[schema_field.name] = fields_sources.get(schema_field.name, []) + [ds] + + if not fields_sources: + return {} column_lineage_facet = ColumnLineageDatasetFacet( fields={ - field: Fields( + field_name: Fields( inputFields=[ - InputField(namespace=dataset.namespace, name=dataset.name, field=field) - for dataset in input_datasets + InputField(namespace=dataset.namespace, name=dataset.name, field=field_name) + for dataset in source_datasets ], transformationType="IDENTITY", transformationDescription="identical", ) - for field in field_names + for field_name, source_datasets in fields_sources.items() } ) - return column_lineage_facet + return {"columnLineage": column_lineage_facet} @define diff --git a/providers/src/airflow/providers/google/cloud/transfers/bigquery_to_bigquery.py b/providers/src/airflow/providers/google/cloud/transfers/bigquery_to_bigquery.py index 7be147d09b607..e1f3d3b13f565 100644 --- a/providers/src/airflow/providers/google/cloud/transfers/bigquery_to_bigquery.py +++ b/providers/src/airflow/providers/google/cloud/transfers/bigquery_to_bigquery.py @@ -110,6 +110,7 @@ def __init__( self.location = location self.impersonation_chain = impersonation_chain self.hook: BigQueryHook | None = None + self._job_conf: dict = {} def _prepare_job_configuration(self): self.source_project_dataset_tables = ( @@ -154,39 +155,94 @@ def _prepare_job_configuration(self): return configuration - def _submit_job( - self, - hook: BigQueryHook, - configuration: dict, - ) -> str: - job = hook.insert_job(configuration=configuration, project_id=hook.project_id) - return job.job_id - def execute(self, context: Context) -> None: self.log.info( "Executing copy of %s into: %s", self.source_project_dataset_tables, self.destination_project_dataset_table, ) - hook = BigQueryHook( + self.hook = BigQueryHook( gcp_conn_id=self.gcp_conn_id, location=self.location, impersonation_chain=self.impersonation_chain, ) - self.hook = hook - if not hook.project_id: + if not self.hook.project_id: raise ValueError("The project_id should be set") configuration = self._prepare_job_configuration() - job_id = self._submit_job(hook=hook, configuration=configuration) + self._job_conf = self.hook.insert_job( + configuration=configuration, project_id=self.hook.project_id + ).to_api_repr() - job = hook.get_job(job_id=job_id, location=self.location).to_api_repr() - conf = job["configuration"]["copy"]["destinationTable"] + dest_table_info = self._job_conf["configuration"]["copy"]["destinationTable"] BigQueryTableLink.persist( context=context, task_instance=self, - dataset_id=conf["datasetId"], - project_id=conf["projectId"], - table_id=conf["tableId"], + dataset_id=dest_table_info["datasetId"], + project_id=dest_table_info["projectId"], + table_id=dest_table_info["tableId"], + ) + + def get_openlineage_facets_on_complete(self, task_instance): + """Implement on_complete as we will include final BQ job id.""" + from airflow.providers.common.compat.openlineage.facet import ( + Dataset, + ExternalQueryRunFacet, + ) + from airflow.providers.google.cloud.openlineage.utils import ( + BIGQUERY_NAMESPACE, + get_facets_from_bq_table, + get_identity_column_lineage_facet, ) + from airflow.providers.openlineage.extractors import OperatorLineage + + if not self.hook: + self.hook = BigQueryHook( + gcp_conn_id=self.gcp_conn_id, + location=self.location, + impersonation_chain=self.impersonation_chain, + ) + + if not self._job_conf: + self.log.debug("OpenLineage could not find BQ job configuration.") + return OperatorLineage() + + bq_job_id = self._job_conf["jobReference"]["jobId"] + source_tables_info = self._job_conf["configuration"]["copy"]["sourceTables"] + dest_table_info = self._job_conf["configuration"]["copy"]["destinationTable"] + + run_facets = { + "externalQuery": ExternalQueryRunFacet(externalQueryId=bq_job_id, source="bigquery"), + } + + input_datasets = [] + for in_table_info in source_tables_info: + table_id = ".".join( + (in_table_info["projectId"], in_table_info["datasetId"], in_table_info["tableId"]) + ) + table_object = self.hook.get_client().get_table(table_id) + input_datasets.append( + Dataset( + namespace=BIGQUERY_NAMESPACE, name=table_id, facets=get_facets_from_bq_table(table_object) + ) + ) + + out_table_id = ".".join( + (dest_table_info["projectId"], dest_table_info["datasetId"], dest_table_info["tableId"]) + ) + out_table_object = self.hook.get_client().get_table(out_table_id) + output_dataset_facets = { + **get_facets_from_bq_table(out_table_object), + **get_identity_column_lineage_facet( + dest_field_names=[field.name for field in out_table_object.schema], + input_datasets=input_datasets, + ), + } + output_dataset = Dataset( + namespace=BIGQUERY_NAMESPACE, + name=out_table_id, + facets=output_dataset_facets, + ) + + return OperatorLineage(inputs=input_datasets, outputs=[output_dataset], run_facets=run_facets) diff --git a/providers/src/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py b/providers/src/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py index e2588b8976e33..2833f79a3e81a 100644 --- a/providers/src/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py +++ b/providers/src/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py @@ -294,6 +294,7 @@ def get_openlineage_facets_on_complete(self, task_instance): from pathlib import Path from airflow.providers.common.compat.openlineage.facet import ( + BaseFacet, Dataset, ExternalQueryRunFacet, Identifier, @@ -322,12 +323,12 @@ def get_openlineage_facets_on_complete(self, task_instance): facets=get_facets_from_bq_table(table_object), ) - output_dataset_facets = { - "schema": input_dataset.facets["schema"], - "columnLineage": get_identity_column_lineage_facet( - field_names=[field.name for field in table_object.schema], input_datasets=[input_dataset] - ), - } + output_dataset_facets: dict[str, BaseFacet] = get_identity_column_lineage_facet( + dest_field_names=[field.name for field in table_object.schema], input_datasets=[input_dataset] + ) + if "schema" in input_dataset.facets: + output_dataset_facets["schema"] = input_dataset.facets["schema"] + output_datasets = [] for uri in sorted(self.destination_cloud_storage_uris): bucket, blob = _parse_gcs_url(uri) diff --git a/providers/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py b/providers/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py index 06b9a94171b10..6dfe1bd1a2c31 100644 --- a/providers/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py +++ b/providers/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py @@ -784,9 +784,10 @@ def get_openlineage_facets_on_complete(self, task_instance): source_objects = ( self.source_objects if isinstance(self.source_objects, list) else [self.source_objects] ) - input_dataset_facets = { - "schema": output_dataset_facets["schema"], - } + input_dataset_facets = {} + if "schema" in output_dataset_facets: + input_dataset_facets["schema"] = output_dataset_facets["schema"] + input_datasets = [] for blob in sorted(source_objects): additional_facets = {} @@ -811,14 +812,16 @@ def get_openlineage_facets_on_complete(self, task_instance): ) input_datasets.append(dataset) - output_dataset_facets["columnLineage"] = get_identity_column_lineage_facet( - field_names=[field.name for field in table_object.schema], input_datasets=input_datasets - ) - output_dataset = Dataset( namespace="bigquery", name=str(table_object.reference), - facets=output_dataset_facets, + facets={ + **output_dataset_facets, + **get_identity_column_lineage_facet( + dest_field_names=[field.name for field in table_object.schema], + input_datasets=input_datasets, + ), + }, ) run_facets = {} diff --git a/providers/tests/google/cloud/openlineage/test_utils.py b/providers/tests/google/cloud/openlineage/test_utils.py index 4f2db0038b7b7..e3f40bee1549e 100644 --- a/providers/tests/google/cloud/openlineage/test_utils.py +++ b/providers/tests/google/cloud/openlineage/test_utils.py @@ -19,7 +19,6 @@ import json from unittest.mock import MagicMock -import pytest from google.cloud.bigquery.table import Table from airflow.providers.common.compat.openlineage.facet import ( @@ -89,19 +88,78 @@ def test_get_facets_from_bq_table(): def test_get_facets_from_empty_bq_table(): - expected_facets = { - "schema": SchemaDatasetFacet(fields=[]), - "documentation": DocumentationDatasetFacet(description=""), - } result = get_facets_from_bq_table(TEST_EMPTY_TABLE) - assert result == expected_facets + assert result == {} + + +def test_get_identity_column_lineage_facet_source_datasets_schemas_are_subsets(): + field_names = ["field1", "field2", "field3"] + input_datasets = [ + Dataset( + namespace="gs://first_bucket", + name="dir1", + facets={ + "schema": SchemaDatasetFacet( + fields=[ + SchemaDatasetFacetFields(name="field1", type="STRING"), + ] + ) + }, + ), + Dataset( + namespace="gs://second_bucket", + name="dir2", + facets={ + "schema": SchemaDatasetFacet( + fields=[ + SchemaDatasetFacetFields(name="field2", type="STRING"), + ] + ) + }, + ), + ] + expected_facet = ColumnLineageDatasetFacet( + fields={ + "field1": Fields( + inputFields=[ + InputField( + namespace="gs://first_bucket", + name="dir1", + field="field1", + ) + ], + transformationType="IDENTITY", + transformationDescription="identical", + ), + "field2": Fields( + inputFields=[ + InputField( + namespace="gs://second_bucket", + name="dir2", + field="field2", + ), + ], + transformationType="IDENTITY", + transformationDescription="identical", + ), + # field3 is missing here as it's not present in any source dataset + } + ) + result = get_identity_column_lineage_facet(dest_field_names=field_names, input_datasets=input_datasets) + assert result == {"columnLineage": expected_facet} def test_get_identity_column_lineage_facet_multiple_input_datasets(): field_names = ["field1", "field2"] + schema_facet = SchemaDatasetFacet( + fields=[ + SchemaDatasetFacetFields(name="field1", type="STRING"), + SchemaDatasetFacetFields(name="field2", type="STRING"), + ] + ) input_datasets = [ - Dataset(namespace="gs://first_bucket", name="dir1"), - Dataset(namespace="gs://second_bucket", name="dir2"), + Dataset(namespace="gs://first_bucket", name="dir1", facets={"schema": schema_facet}), + Dataset(namespace="gs://second_bucket", name="dir2", facets={"schema": schema_facet}), ] expected_facet = ColumnLineageDatasetFacet( fields={ @@ -139,24 +197,69 @@ def test_get_identity_column_lineage_facet_multiple_input_datasets(): ), } ) - result = get_identity_column_lineage_facet(field_names=field_names, input_datasets=input_datasets) - assert result == expected_facet + result = get_identity_column_lineage_facet(dest_field_names=field_names, input_datasets=input_datasets) + assert result == {"columnLineage": expected_facet} + + +def test_get_identity_column_lineage_facet_dest_cols_not_in_input_datasets(): + field_names = ["x", "y"] + input_datasets = [ + Dataset( + namespace="gs://first_bucket", + name="dir1", + facets={ + "schema": SchemaDatasetFacet( + fields=[ + SchemaDatasetFacetFields(name="field1", type="STRING"), + ] + ) + }, + ), + Dataset( + namespace="gs://second_bucket", + name="dir2", + facets={ + "schema": SchemaDatasetFacet( + fields=[ + SchemaDatasetFacetFields(name="field2", type="STRING"), + ] + ) + }, + ), + ] + + result = get_identity_column_lineage_facet(dest_field_names=field_names, input_datasets=input_datasets) + assert result == {} + + +def test_get_identity_column_lineage_facet_no_schema_in_input_dataset(): + field_names = ["field1", "field2"] + input_datasets = [ + Dataset(namespace="gs://first_bucket", name="dir1"), + ] + result = get_identity_column_lineage_facet(dest_field_names=field_names, input_datasets=input_datasets) + assert result == {} def test_get_identity_column_lineage_facet_no_field_names(): field_names = [] + schema_facet = SchemaDatasetFacet( + fields=[ + SchemaDatasetFacetFields(name="field1", type="STRING"), + SchemaDatasetFacetFields(name="field2", type="STRING"), + ] + ) input_datasets = [ - Dataset(namespace="gs://first_bucket", name="dir1"), - Dataset(namespace="gs://second_bucket", name="dir2"), + Dataset(namespace="gs://first_bucket", name="dir1", facets={"schema": schema_facet}), + Dataset(namespace="gs://second_bucket", name="dir2", facets={"schema": schema_facet}), ] - expected_facet = ColumnLineageDatasetFacet(fields={}) - result = get_identity_column_lineage_facet(field_names=field_names, input_datasets=input_datasets) - assert result == expected_facet + result = get_identity_column_lineage_facet(dest_field_names=field_names, input_datasets=input_datasets) + assert result == {} def test_get_identity_column_lineage_facet_no_input_datasets(): field_names = ["field1", "field2"] input_datasets = [] - with pytest.raises(ValueError): - get_identity_column_lineage_facet(field_names=field_names, input_datasets=input_datasets) + result = get_identity_column_lineage_facet(dest_field_names=field_names, input_datasets=input_datasets) + assert result == {} diff --git a/providers/tests/google/cloud/transfers/test_bigquery_to_bigquery.py b/providers/tests/google/cloud/transfers/test_bigquery_to_bigquery.py index ed06928c2ccff..304694126ffc1 100644 --- a/providers/tests/google/cloud/transfers/test_bigquery_to_bigquery.py +++ b/providers/tests/google/cloud/transfers/test_bigquery_to_bigquery.py @@ -19,16 +19,29 @@ from unittest import mock +from google.cloud.bigquery import Table + +from airflow.providers.common.compat.openlineage.facet import ( + ColumnLineageDatasetFacet, + Dataset, + DocumentationDatasetFacet, + ExternalQueryRunFacet, + Fields, + InputField, + SchemaDatasetFacet, + SchemaDatasetFacetFields, +) from airflow.providers.google.cloud.transfers.bigquery_to_bigquery import BigQueryToBigQueryOperator BQ_HOOK_PATH = "airflow.providers.google.cloud.transfers.bigquery_to_bigquery.BigQueryHook" -TASK_ID = "test-bq-create-table-operator" +TASK_ID = "test-bq-to-bq-operator" TEST_GCP_PROJECT_ID = "test-project" TEST_DATASET = "test-dataset" TEST_TABLE_ID = "test-table-id" -SOURCE_PROJECT_DATASET_TABLES = f"{TEST_GCP_PROJECT_ID}.{TEST_DATASET}.{TEST_TABLE_ID}" -DESTINATION_PROJECT_DATASET_TABLE = f"{TEST_GCP_PROJECT_ID}.{TEST_DATASET + '_new'}.{TEST_TABLE_ID}" +SOURCE_PROJECT_DATASET_TABLE = f"{TEST_GCP_PROJECT_ID}.{TEST_DATASET}.{TEST_TABLE_ID}" +SOURCE_PROJECT_DATASET_TABLE2 = f"{TEST_GCP_PROJECT_ID}.{TEST_DATASET}.{TEST_TABLE_ID}-2" +DESTINATION_PROJECT_DATASET_TABLE = f"{TEST_GCP_PROJECT_ID}.{TEST_DATASET}_new.{TEST_TABLE_ID}" WRITE_DISPOSITION = "WRITE_EMPTY" CREATE_DISPOSITION = "CREATE_IF_NEEDED" LABELS = {"k1": "v1"} @@ -36,12 +49,18 @@ def split_tablename_side_effect(*args, **kwargs): - if kwargs["table_input"] == SOURCE_PROJECT_DATASET_TABLES: + if kwargs["table_input"] == SOURCE_PROJECT_DATASET_TABLE: return ( TEST_GCP_PROJECT_ID, TEST_DATASET, TEST_TABLE_ID, ) + elif kwargs["table_input"] == SOURCE_PROJECT_DATASET_TABLE2: + return ( + TEST_GCP_PROJECT_ID, + TEST_DATASET, + TEST_TABLE_ID + "-2", + ) elif kwargs["table_input"] == DESTINATION_PROJECT_DATASET_TABLE: return ( TEST_GCP_PROJECT_ID, @@ -55,7 +74,7 @@ class TestBigQueryToBigQueryOperator: def test_execute_without_location_should_execute_successfully(self, mock_hook): operator = BigQueryToBigQueryOperator( task_id=TASK_ID, - source_project_dataset_tables=SOURCE_PROJECT_DATASET_TABLES, + source_project_dataset_tables=SOURCE_PROJECT_DATASET_TABLE, destination_project_dataset_table=DESTINATION_PROJECT_DATASET_TABLE, write_disposition=WRITE_DISPOSITION, create_disposition=CREATE_DISPOSITION, @@ -95,7 +114,48 @@ def test_execute_single_regional_location_should_execute_successfully(self, mock operator = BigQueryToBigQueryOperator( task_id=TASK_ID, - source_project_dataset_tables=SOURCE_PROJECT_DATASET_TABLES, + source_project_dataset_tables=SOURCE_PROJECT_DATASET_TABLE, + destination_project_dataset_table=DESTINATION_PROJECT_DATASET_TABLE, + write_disposition=WRITE_DISPOSITION, + create_disposition=CREATE_DISPOSITION, + labels=LABELS, + encryption_configuration=ENCRYPTION_CONFIGURATION, + location=location, + ) + + mock_hook.return_value.split_tablename.side_effect = split_tablename_side_effect + operator.execute(context=mock.MagicMock()) + mock_hook.return_value.insert_job.assert_called_once_with( + configuration={ + "copy": { + "createDisposition": CREATE_DISPOSITION, + "destinationEncryptionConfiguration": ENCRYPTION_CONFIGURATION, + "destinationTable": { + "datasetId": TEST_DATASET + "_new", + "projectId": TEST_GCP_PROJECT_ID, + "tableId": TEST_TABLE_ID, + }, + "sourceTables": [ + { + "datasetId": TEST_DATASET, + "projectId": TEST_GCP_PROJECT_ID, + "tableId": TEST_TABLE_ID, + }, + ], + "writeDisposition": WRITE_DISPOSITION, + }, + "labels": LABELS, + }, + project_id=mock_hook.return_value.project_id, + ) + + @mock.patch(BQ_HOOK_PATH) + def test_get_openlineage_facets_on_complete_single_source_table(self, mock_hook): + location = "us-central1" + + operator = BigQueryToBigQueryOperator( + task_id=TASK_ID, + source_project_dataset_tables=SOURCE_PROJECT_DATASET_TABLE, destination_project_dataset_table=DESTINATION_PROJECT_DATASET_TABLE, write_disposition=WRITE_DISPOSITION, create_disposition=CREATE_DISPOSITION, @@ -104,9 +164,265 @@ def test_execute_single_regional_location_should_execute_successfully(self, mock location=location, ) + source_table_api_repr = { + "tableReference": { + "projectId": TEST_GCP_PROJECT_ID, + "datasetId": TEST_DATASET, + "tableId": TEST_TABLE_ID, + }, + "description": "Table description.", + "schema": { + "fields": [ + {"name": "field1", "type": "STRING"}, + {"name": "field2", "type": "INTEGER"}, + ] + }, + } + dest_table_api_repr = {**source_table_api_repr} + dest_table_api_repr["tableReference"]["datasetId"] = TEST_DATASET + "_new" + mock_table_data = { + SOURCE_PROJECT_DATASET_TABLE: Table.from_api_repr(source_table_api_repr), + DESTINATION_PROJECT_DATASET_TABLE: Table.from_api_repr(dest_table_api_repr), + } + + mock_hook.return_value.insert_job.return_value.to_api_repr.return_value = { + "jobReference": { + "projectId": TEST_GCP_PROJECT_ID, + "jobId": "actual_job_id", + "location": location, + }, + "configuration": { + "copy": { + "sourceTables": [ + { + "projectId": TEST_GCP_PROJECT_ID, + "datasetId": TEST_DATASET, + "tableId": TEST_TABLE_ID, + }, + ], + "destinationTable": { + "projectId": TEST_GCP_PROJECT_ID, + "datasetId": TEST_DATASET + "_new", + "tableId": TEST_TABLE_ID, + }, + } + }, + } mock_hook.return_value.split_tablename.side_effect = split_tablename_side_effect + mock_hook.return_value.get_client.return_value.get_table.side_effect = ( + lambda table_id: mock_table_data[table_id] + ) + operator.execute(context=mock.MagicMock()) - mock_hook.return_value.get_job.assert_called_once_with( - job_id=mock_hook.return_value.insert_job.return_value.job_id, + result = operator.get_openlineage_facets_on_complete(None) + + assert result.job_facets == {} + assert result.run_facets == { + "externalQuery": ExternalQueryRunFacet(externalQueryId="actual_job_id", source="bigquery") + } + assert len(result.inputs) == 1 + assert result.inputs[0] == Dataset( + namespace="bigquery", + name=SOURCE_PROJECT_DATASET_TABLE, + facets={ + "schema": SchemaDatasetFacet( + fields=[ + SchemaDatasetFacetFields(name="field1", type="STRING"), + SchemaDatasetFacetFields(name="field2", type="INTEGER"), + ] + ), + "documentation": DocumentationDatasetFacet("Table description."), + }, + ) + assert len(result.outputs) == 1 + assert result.outputs[0] == Dataset( + namespace="bigquery", + name=DESTINATION_PROJECT_DATASET_TABLE, + facets={ + "schema": SchemaDatasetFacet( + fields=[ + SchemaDatasetFacetFields(name="field1", type="STRING"), + SchemaDatasetFacetFields(name="field2", type="INTEGER"), + ] + ), + "documentation": DocumentationDatasetFacet("Table description."), + "columnLineage": ColumnLineageDatasetFacet( + fields={ + "field1": Fields( + inputFields=[ + InputField( + namespace="bigquery", + name=SOURCE_PROJECT_DATASET_TABLE, + field="field1", + transformations=[], + ) + ], + transformationDescription="identical", + transformationType="IDENTITY", + ), + "field2": Fields( + inputFields=[ + InputField( + namespace="bigquery", + name=SOURCE_PROJECT_DATASET_TABLE, + field="field2", + transformations=[], + ) + ], + transformationDescription="identical", + transformationType="IDENTITY", + ), + }, + dataset=[], + ), + }, + ) + + @mock.patch(BQ_HOOK_PATH) + def test_get_openlineage_facets_on_complete_multiple_source_tables(self, mock_hook): + location = "us-central1" + + operator = BigQueryToBigQueryOperator( + task_id=TASK_ID, + source_project_dataset_tables=[ + SOURCE_PROJECT_DATASET_TABLE, + SOURCE_PROJECT_DATASET_TABLE2, + ], + destination_project_dataset_table=DESTINATION_PROJECT_DATASET_TABLE, + write_disposition=WRITE_DISPOSITION, + create_disposition=CREATE_DISPOSITION, + labels=LABELS, + encryption_configuration=ENCRYPTION_CONFIGURATION, location=location, ) + + source_table_repr = { + "tableReference": { + "projectId": TEST_GCP_PROJECT_ID, + "datasetId": TEST_DATASET, + "tableId": TEST_TABLE_ID, + }, + "schema": { + "fields": [ + {"name": "field1", "type": "STRING"}, + ] + }, + } + source_table_repr2 = {**source_table_repr} + source_table_repr2["tableReference"]["tableId"] = TEST_TABLE_ID + "-2" + dest_table_api_repr = { + "tableReference": { + "projectId": TEST_GCP_PROJECT_ID, + "datasetId": TEST_DATASET + "_new", + "tableId": TEST_TABLE_ID, + }, + "schema": { + "fields": [ + {"name": "field1", "type": "STRING"}, + {"name": "field2", "type": "INTEGER"}, + ] + }, + } + mock_table_data = { + SOURCE_PROJECT_DATASET_TABLE: Table.from_api_repr(source_table_repr), + SOURCE_PROJECT_DATASET_TABLE2: Table.from_api_repr(source_table_repr2), + DESTINATION_PROJECT_DATASET_TABLE: Table.from_api_repr(dest_table_api_repr), + } + + mock_hook.return_value.insert_job.return_value.to_api_repr.return_value = { + "jobReference": { + "projectId": TEST_GCP_PROJECT_ID, + "jobId": "actual_job_id", + "location": location, + }, + "configuration": { + "copy": { + "sourceTables": [ + { + "projectId": TEST_GCP_PROJECT_ID, + "datasetId": TEST_DATASET, + "tableId": TEST_TABLE_ID, + }, + { + "projectId": TEST_GCP_PROJECT_ID, + "datasetId": TEST_DATASET, + "tableId": TEST_TABLE_ID + "-2", + }, + ], + "destinationTable": { + "projectId": TEST_GCP_PROJECT_ID, + "datasetId": TEST_DATASET + "_new", + "tableId": TEST_TABLE_ID, + }, + } + }, + } + mock_hook.return_value.split_tablename.side_effect = split_tablename_side_effect + mock_hook.return_value.get_client.return_value.get_table.side_effect = ( + lambda table_id: mock_table_data[table_id] + ) + operator.execute(context=mock.MagicMock()) + result = operator.get_openlineage_facets_on_complete(None) + assert result.job_facets == {} + assert result.run_facets == { + "externalQuery": ExternalQueryRunFacet(externalQueryId="actual_job_id", source="bigquery") + } + assert len(result.inputs) == 2 + assert result.inputs[0] == Dataset( + namespace="bigquery", + name=SOURCE_PROJECT_DATASET_TABLE, + facets={ + "schema": SchemaDatasetFacet( + fields=[ + SchemaDatasetFacetFields(name="field1", type="STRING"), + ] + ) + }, + ) + assert result.inputs[1] == Dataset( + namespace="bigquery", + name=SOURCE_PROJECT_DATASET_TABLE2, + facets={ + "schema": SchemaDatasetFacet( + fields=[ + SchemaDatasetFacetFields(name="field1", type="STRING"), + ] + ) + }, + ) + assert len(result.outputs) == 1 + assert result.outputs[0] == Dataset( + namespace="bigquery", + name=DESTINATION_PROJECT_DATASET_TABLE, + facets={ + "schema": SchemaDatasetFacet( + fields=[ + SchemaDatasetFacetFields(name="field1", type="STRING"), + SchemaDatasetFacetFields(name="field2", type="INTEGER"), + ] + ), + "columnLineage": ColumnLineageDatasetFacet( + fields={ + "field1": Fields( + inputFields=[ + InputField( + namespace="bigquery", + name=SOURCE_PROJECT_DATASET_TABLE, + field="field1", + transformations=[], + ), + InputField( + namespace="bigquery", + name=SOURCE_PROJECT_DATASET_TABLE2, + field="field1", + transformations=[], + ), + ], + transformationDescription="identical", + transformationType="IDENTITY", + ) + }, + dataset=[], + ), + }, + ) diff --git a/providers/tests/google/cloud/transfers/test_bigquery_to_gcs.py b/providers/tests/google/cloud/transfers/test_bigquery_to_gcs.py index 7c2a398253752..b451d2037efcb 100644 --- a/providers/tests/google/cloud/transfers/test_bigquery_to_gcs.py +++ b/providers/tests/google/cloud/transfers/test_bigquery_to_gcs.py @@ -299,11 +299,6 @@ def test_get_openlineage_facets_on_complete_bq_dataset(self, mock_hook): def test_get_openlineage_facets_on_complete_bq_dataset_empty_table(self, mock_hook): source_project_dataset_table = f"{PROJECT_ID}.{TEST_DATASET}.{TEST_TABLE_ID}" - expected_input_dataset_facets = { - "schema": SchemaDatasetFacet(fields=[]), - "documentation": DocumentationDatasetFacet(description=""), - } - mock_hook.return_value.split_tablename.return_value = (PROJECT_ID, TEST_DATASET, TEST_TABLE_ID) mock_hook.return_value.get_client.return_value.get_table.return_value = TEST_EMPTY_TABLE @@ -320,7 +315,7 @@ def test_get_openlineage_facets_on_complete_bq_dataset_empty_table(self, mock_ho assert lineage.inputs[0] == Dataset( namespace="bigquery", name=source_project_dataset_table, - facets=expected_input_dataset_facets, + facets={}, ) @mock.patch("airflow.providers.google.cloud.transfers.bigquery_to_gcs.BigQueryHook") @@ -330,16 +325,6 @@ def test_get_openlineage_facets_on_complete_gcs_no_wildcard_empty_table(self, mo real_job_id = "123456_hash" bq_namespace = "bigquery" - expected_input_facets = { - "schema": SchemaDatasetFacet(fields=[]), - "documentation": DocumentationDatasetFacet(description=""), - } - - expected_output_facets = { - "schema": SchemaDatasetFacet(fields=[]), - "columnLineage": ColumnLineageDatasetFacet(fields={}), - } - mock_hook.return_value.split_tablename.return_value = (PROJECT_ID, TEST_DATASET, TEST_TABLE_ID) mock_hook.return_value.insert_job.return_value = MagicMock(job_id=real_job_id, error_result=False) mock_hook.return_value.get_client.return_value.get_table.return_value = TEST_EMPTY_TABLE @@ -357,12 +342,12 @@ def test_get_openlineage_facets_on_complete_gcs_no_wildcard_empty_table(self, mo assert len(lineage.inputs) == 1 assert len(lineage.outputs) == 1 assert lineage.inputs[0] == Dataset( - namespace=bq_namespace, name=source_project_dataset_table, facets=expected_input_facets + namespace=bq_namespace, name=source_project_dataset_table, facets={} ) assert lineage.outputs[0] == Dataset( namespace=f"gs://{TEST_BUCKET}", name=f"{TEST_FOLDER}/{TEST_OBJECT_NO_WILDCARD}", - facets=expected_output_facets, + facets={}, ) assert lineage.run_facets == { "externalQuery": ExternalQueryRunFacet(externalQueryId=real_job_id, source=bq_namespace) diff --git a/providers/tests/google/cloud/transfers/test_gcs_to_bigquery.py b/providers/tests/google/cloud/transfers/test_gcs_to_bigquery.py index 0ba2e07bb05eb..299fc9fead54d 100644 --- a/providers/tests/google/cloud/transfers/test_gcs_to_bigquery.py +++ b/providers/tests/google/cloud/transfers/test_gcs_to_bigquery.py @@ -1439,12 +1439,6 @@ def test_get_openlineage_facets_on_complete_empty_table(self, hook): hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) hook.return_value.get_client.return_value.get_table.return_value = TEST_EMPTY_TABLE - expected_output_dataset_facets = { - "schema": SchemaDatasetFacet(fields=[]), - "documentation": DocumentationDatasetFacet(description=""), - "columnLineage": ColumnLineageDatasetFacet(fields={}), - } - operator = GCSToBigQueryOperator( project_id=JOB_PROJECT_ID, task_id=TASK_ID, @@ -1461,18 +1455,17 @@ def test_get_openlineage_facets_on_complete_empty_table(self, hook): assert lineage.outputs[0] == Dataset( namespace="bigquery", name=TEST_EXPLICIT_DEST, - facets=expected_output_dataset_facets, + facets={}, ) assert lineage.inputs[0] == Dataset( namespace=f"gs://{TEST_BUCKET}", name=TEST_OBJECT_NO_WILDCARD, - facets={"schema": SchemaDatasetFacet(fields=[])}, + facets={}, ) assert lineage.inputs[1] == Dataset( namespace=f"gs://{TEST_BUCKET}", name="/", facets={ - "schema": SchemaDatasetFacet(fields=[]), "symlink": SymlinksDatasetFacet( identifiers=[ Identifier(