From bef72aa01fde1c30fed2e092e4ae7218b78f780d Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Wed, 20 Mar 2024 16:22:41 -0700 Subject: [PATCH 1/2] support incremental lineage with cll --- .../api/incremental_lineage_helper.py | 104 ++------------- .../ingestion/source/bigquery_v2/bigquery.py | 6 + .../source/bigquery_v2/bigquery_config.py | 11 -- .../ingestion/source/redshift/config.py | 2 +- .../ingestion/source/redshift/redshift.py | 8 +- .../source/snowflake/snowflake_v2.py | 8 +- .../ingestion/source/sql/sql_common.py | 7 +- ...l_less_upstreams_in_gms_aspect_golden.json | 106 --------------- ...l_more_upstreams_in_gms_aspect_golden.json | 120 ----------------- .../incremental_column_lineage_golden.json | 83 ++++++++++++ .../test_incremental_lineage_helper.py | 124 +----------------- 11 files changed, 113 insertions(+), 466 deletions(-) delete mode 100644 metadata-ingestion/tests/unit/api/source_helpers/incremental_cll_less_upstreams_in_gms_aspect_golden.json delete mode 100644 metadata-ingestion/tests/unit/api/source_helpers/incremental_cll_more_upstreams_in_gms_aspect_golden.json create mode 100644 metadata-ingestion/tests/unit/api/source_helpers/incremental_column_lineage_golden.json diff --git a/metadata-ingestion/src/datahub/ingestion/api/incremental_lineage_helper.py b/metadata-ingestion/src/datahub/ingestion/api/incremental_lineage_helper.py index 4dc4a6d422d73..397111705223c 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/incremental_lineage_helper.py +++ b/metadata-ingestion/src/datahub/ingestion/api/incremental_lineage_helper.py @@ -1,18 +1,14 @@ -import copy -from typing import Dict, Iterable, Optional +from typing import Iterable, Optional from pydantic.fields import Field from datahub.configuration.common import ConfigModel from datahub.emitter.mce_builder import datahub_guid, set_aspect -from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.workunit import MetadataWorkUnit -from datahub.ingestion.graph.client import DataHubGraph from datahub.metadata.schema_classes import ( FineGrainedLineageClass, MetadataChangeEventClass, SystemMetadataClass, - UpstreamClass, UpstreamLineageClass, ) from datahub.specific.dataset import DatasetPatchBuilder @@ -29,7 +25,7 @@ def convert_upstream_lineage_to_patch( for fine_upstream in aspect.fineGrainedLineages or []: patch_builder.add_fine_grained_upstream_lineage(fine_upstream) mcp = next(iter(patch_builder.build())) - return MetadataWorkUnit(id=f"{urn}-upstreamLineage", mcp_raw=mcp) + return MetadataWorkUnit(id=MetadataWorkUnit.generate_workunit_id(mcp), mcp_raw=mcp) def get_fine_grained_lineage_key(fine_upstream: FineGrainedLineageClass) -> str: @@ -42,73 +38,7 @@ def get_fine_grained_lineage_key(fine_upstream: FineGrainedLineageClass) -> str: ) -def _merge_upstream_lineage( - new_aspect: UpstreamLineageClass, gms_aspect: UpstreamLineageClass -) -> UpstreamLineageClass: - merged_aspect = copy.deepcopy(gms_aspect) - - upstreams_map: Dict[str, UpstreamClass] = { - upstream.dataset: upstream for upstream in merged_aspect.upstreams - } - - upstreams_updated = False - fine_upstreams_updated = False - - for table_upstream in new_aspect.upstreams: - if table_upstream.dataset not in upstreams_map or ( - table_upstream.auditStamp.time - > upstreams_map[table_upstream.dataset].auditStamp.time - ): - upstreams_map[table_upstream.dataset] = table_upstream - upstreams_updated = True - - if upstreams_updated: - merged_aspect.upstreams = list(upstreams_map.values()) - - if new_aspect.fineGrainedLineages and merged_aspect.fineGrainedLineages: - fine_upstreams_map: Dict[str, FineGrainedLineageClass] = { - get_fine_grained_lineage_key(fine_upstream): fine_upstream - for fine_upstream in merged_aspect.fineGrainedLineages - } - for column_upstream in new_aspect.fineGrainedLineages: - column_upstream_key = get_fine_grained_lineage_key(column_upstream) - - if column_upstream_key not in fine_upstreams_map or ( - column_upstream.confidenceScore - > fine_upstreams_map[column_upstream_key].confidenceScore - ): - fine_upstreams_map[column_upstream_key] = column_upstream - fine_upstreams_updated = True - - if fine_upstreams_updated: - merged_aspect.fineGrainedLineages = list(fine_upstreams_map.values()) - else: - merged_aspect.fineGrainedLineages = ( - new_aspect.fineGrainedLineages or gms_aspect.fineGrainedLineages - ) - - return merged_aspect - - -def _lineage_wu_via_read_modify_write( - graph: DataHubGraph, - urn: str, - aspect: UpstreamLineageClass, - system_metadata: Optional[SystemMetadataClass], -) -> MetadataWorkUnit: - gms_aspect = graph.get_aspect(urn, UpstreamLineageClass) - if gms_aspect: - new_aspect = _merge_upstream_lineage(aspect, gms_aspect) - else: - new_aspect = aspect - - return MetadataChangeProposalWrapper( - entityUrn=urn, aspect=new_aspect, systemMetadata=system_metadata - ).as_workunit() - - def auto_incremental_lineage( - graph: Optional[DataHubGraph], incremental_lineage: bool, stream: Iterable[MetadataWorkUnit], ) -> Iterable[MetadataWorkUnit]: @@ -117,35 +47,23 @@ def auto_incremental_lineage( return # early exit for wu in stream: + urn = wu.get_urn() + lineage_aspect: Optional[UpstreamLineageClass] = wu.get_aspect_of_type( UpstreamLineageClass ) - urn = wu.get_urn() + if isinstance(wu.metadata, MetadataChangeEventClass): + set_aspect( + wu.metadata, None, UpstreamLineageClass + ) # we'll handle upstreamLineage separately below + if len(wu.metadata.proposedSnapshot.aspects) > 0: + yield wu if lineage_aspect: - if isinstance(wu.metadata, MetadataChangeEventClass): - set_aspect( - wu.metadata, None, UpstreamLineageClass - ) # we'll emit upstreamLineage separately below - if len(wu.metadata.proposedSnapshot.aspects) > 0: - yield wu - - # TODO: Replace with CLL patch now that we have support for it. - if lineage_aspect.fineGrainedLineages: - if graph is None: - raise ValueError( - "Failed to handle incremental lineage, DataHubGraph is missing. " - "Use `datahub-rest` sink OR provide `datahub-api` config in recipe. " - ) - yield _lineage_wu_via_read_modify_write( - graph, urn, lineage_aspect, wu.metadata.systemMetadata - ) - elif lineage_aspect.upstreams: + if lineage_aspect.upstreams: yield convert_upstream_lineage_to_patch( urn, lineage_aspect, wu.metadata.systemMetadata ) - else: - yield wu class IncrementalLineageConfigMixin(ConfigModel): diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index 8452399bddf5d..13514248ecbac 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -1,4 +1,5 @@ import atexit +import functools import logging import os import re @@ -27,6 +28,7 @@ platform_name, support_status, ) +from datahub.ingestion.api.incremental_lineage_helper import auto_incremental_lineage from datahub.ingestion.api.source import ( CapabilityReport, MetadataWorkUnitProcessor, @@ -574,6 +576,10 @@ def gen_dataset_containers( def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: return [ *super().get_workunit_processors(), + functools.partial( + auto_incremental_lineage, + self.config.incremental_lineage + ), StaleEntityRemovalHandler.create( self, self.config, self.ctx ).workunit_processor, diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py index 28f0be2c38033..d954e264d7186 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py @@ -3,7 +3,6 @@ from datetime import timedelta from typing import Any, Dict, List, Optional -import pydantic from google.cloud import bigquery from google.cloud.logging_v2.client import Client as GCPLoggingClient from pydantic import Field, PositiveInt, PrivateAttr, root_validator, validator @@ -212,21 +211,11 @@ class BigQueryV2Config( ) extract_column_lineage: bool = Field( - # TODO: Flip this default to True once we support patching column-level lineage. default=False, description="If enabled, generate column level lineage. " "Requires lineage_use_sql_parser to be enabled. " - "This and `incremental_lineage` cannot both be enabled.", ) - @pydantic.validator("extract_column_lineage") - def validate_column_lineage(cls, v: bool, values: Dict[str, Any]) -> bool: - if v and values.get("incremental_lineage"): - raise ValueError( - "Cannot enable `extract_column_lineage` and `incremental_lineage` at the same time." - ) - return v - extract_lineage_from_catalog: bool = Field( default=False, description="This flag enables the data lineage extraction from Data Lineage API exposed by Google Data Catalog. NOTE: This extractor can't build views lineage. It's recommended to enable the view's DDL parsing. Read the docs to have more information about: https://cloud.google.com/data-catalog/docs/concepts/about-data-lineage", diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py index 376b62178632f..74507d850014a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py @@ -108,7 +108,7 @@ class RedshiftConfig( ) use_lineage_v2: bool = Field( - default=False, + default=True, description="Whether to use the new SQL-based lineage collector.", ) lineage_v2_generate_queries: bool = Field( diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py index b6c28d0bd0806..a343b5811c562 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py @@ -1,7 +1,7 @@ +import functools import itertools import logging from collections import defaultdict -from functools import partial from typing import Dict, Iterable, List, Optional, Type, Union import humanfriendly @@ -402,11 +402,7 @@ def gen_database_container(self, database: str) -> Iterable[MetadataWorkUnit]: def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: return [ *super().get_workunit_processors(), - partial( - auto_incremental_lineage, - self.ctx.graph, - self.config.incremental_lineage, - ), + functools.partial(auto_incremental_lineage, self.config.incremental_lineage), StaleEntityRemovalHandler.create( self, self.config, self.ctx ).workunit_processor, diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py index 318cec8482996..9344e030d749f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py @@ -1,10 +1,10 @@ +import functools import json import logging import os import os.path import platform from dataclasses import dataclass -from functools import partial from typing import Callable, Dict, Iterable, List, Optional, Union from snowflake.connector import SnowflakeConnection @@ -510,10 +510,8 @@ def query(query): def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: return [ *super().get_workunit_processors(), - partial( - auto_incremental_lineage, - self.ctx.graph, - self.config.incremental_lineage, + functools.partial( + auto_incremental_lineage, self.config.incremental_lineage ), StaleEntityRemovalHandler.create( self, self.config, self.ctx diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py index 91736b24727c8..70826c14e13c7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py @@ -1,5 +1,6 @@ import contextlib import datetime +import functools import logging import traceback from dataclasses import dataclass, field @@ -514,10 +515,8 @@ def get_schema_level_workunits( def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: return [ *super().get_workunit_processors(), - partial( - auto_incremental_lineage, - self.ctx.graph, - self.config.incremental_lineage, + functools.partial( + auto_incremental_lineage, self.config.incremental_lineage ), StaleEntityRemovalHandler.create( self, self.config, self.ctx diff --git a/metadata-ingestion/tests/unit/api/source_helpers/incremental_cll_less_upstreams_in_gms_aspect_golden.json b/metadata-ingestion/tests/unit/api/source_helpers/incremental_cll_less_upstreams_in_gms_aspect_golden.json deleted file mode 100644 index 812566143014b..0000000000000 --- a/metadata-ingestion/tests/unit/api/source_helpers/incremental_cll_less_upstreams_in_gms_aspect_golden.json +++ /dev/null @@ -1,106 +0,0 @@ -[ -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:platform,dataset1,PROD)", - "changeType": "UPSERT", - "aspectName": "upstreamLineage", - "aspect": { - "json": { - "upstreams": [ - { - "auditStamp": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - }, - "dataset": "urn:li:dataset:(urn:li:dataPlatform:platform,upstream1,PROD)", - "type": "TRANSFORMED" - }, - { - "auditStamp": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - }, - "dataset": "urn:li:dataset:(urn:li:dataPlatform:platform,upstream2,PROD)", - "type": "TRANSFORMED" - } - ], - "fineGrainedLineages": [ - { - "upstreamType": "FIELD_SET", - "upstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream1,PROD),col_a)" - ], - "downstreamType": "FIELD", - "downstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,dataset1,PROD),col_a)" - ], - "confidenceScore": 1.0 - }, - { - "upstreamType": "FIELD_SET", - "upstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream1,PROD),col_b)" - ], - "downstreamType": "FIELD", - "downstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,dataset1,PROD),col_b)" - ], - "confidenceScore": 1.0 - }, - { - "upstreamType": "FIELD_SET", - "upstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream1,PROD),col_c)" - ], - "downstreamType": "FIELD", - "downstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,dataset1,PROD),col_c)" - ], - "confidenceScore": 1.0 - }, - { - "upstreamType": "FIELD_SET", - "upstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream1,PROD),col_a)", - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream2,PROD),col_a)" - ], - "downstreamType": "FIELD", - "downstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,dataset1,PROD),col_a)" - ], - "confidenceScore": 1.0 - }, - { - "upstreamType": "FIELD_SET", - "upstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream1,PROD),col_b)", - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream2,PROD),col_b)" - ], - "downstreamType": "FIELD", - "downstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,dataset1,PROD),col_b)" - ], - "confidenceScore": 1.0 - }, - { - "upstreamType": "FIELD_SET", - "upstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream1,PROD),col_c)", - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream2,PROD),col_c)" - ], - "downstreamType": "FIELD", - "downstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,dataset1,PROD),col_c)" - ], - "confidenceScore": 1.0 - } - ] - } - }, - "systemMetadata": { - "lastObserved": 1643871600000, - "runId": "run-id", - "lastRunId": "no-run-id-provided" - } -} -] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/api/source_helpers/incremental_cll_more_upstreams_in_gms_aspect_golden.json b/metadata-ingestion/tests/unit/api/source_helpers/incremental_cll_more_upstreams_in_gms_aspect_golden.json deleted file mode 100644 index 17f4d10728268..0000000000000 --- a/metadata-ingestion/tests/unit/api/source_helpers/incremental_cll_more_upstreams_in_gms_aspect_golden.json +++ /dev/null @@ -1,120 +0,0 @@ -[ -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:platform,dataset1,PROD)", - "changeType": "UPSERT", - "aspectName": "upstreamLineage", - "aspect": { - "json": { - "upstreams": [ - { - "auditStamp": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - }, - "dataset": "urn:li:dataset:(urn:li:dataPlatform:platform,upstream1,PROD)", - "type": "TRANSFORMED" - }, - { - "auditStamp": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - }, - "dataset": "urn:li:dataset:(urn:li:dataPlatform:platform,upstream2,PROD)", - "type": "TRANSFORMED" - }, - { - "auditStamp": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - }, - "dataset": "urn:li:dataset:(urn:li:dataPlatform:platform,upstream3,PROD)", - "type": "TRANSFORMED" - } - ], - "fineGrainedLineages": [ - { - "upstreamType": "FIELD_SET", - "upstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream1,PROD),col_a)", - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream2,PROD),col_a)", - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream3,PROD),col_a)" - ], - "downstreamType": "FIELD", - "downstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,dataset1,PROD),col_a)" - ], - "confidenceScore": 1.0 - }, - { - "upstreamType": "FIELD_SET", - "upstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream1,PROD),col_b)", - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream2,PROD),col_b)", - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream3,PROD),col_b)" - ], - "downstreamType": "FIELD", - "downstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,dataset1,PROD),col_b)" - ], - "confidenceScore": 1.0 - }, - { - "upstreamType": "FIELD_SET", - "upstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream1,PROD),col_c)", - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream2,PROD),col_c)", - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream3,PROD),col_c)" - ], - "downstreamType": "FIELD", - "downstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,dataset1,PROD),col_c)" - ], - "confidenceScore": 1.0 - }, - { - "upstreamType": "FIELD_SET", - "upstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream1,PROD),col_a)", - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream2,PROD),col_a)" - ], - "downstreamType": "FIELD", - "downstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,dataset1,PROD),col_a)" - ], - "confidenceScore": 1.0 - }, - { - "upstreamType": "FIELD_SET", - "upstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream1,PROD),col_b)", - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream2,PROD),col_b)" - ], - "downstreamType": "FIELD", - "downstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,dataset1,PROD),col_b)" - ], - "confidenceScore": 1.0 - }, - { - "upstreamType": "FIELD_SET", - "upstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream1,PROD),col_c)", - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream2,PROD),col_c)" - ], - "downstreamType": "FIELD", - "downstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,dataset1,PROD),col_c)" - ], - "confidenceScore": 1.0 - } - ] - } - }, - "systemMetadata": { - "lastObserved": 1643871600000, - "runId": "run-id", - "lastRunId": "no-run-id-provided" - } -} -] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/api/source_helpers/incremental_column_lineage_golden.json b/metadata-ingestion/tests/unit/api/source_helpers/incremental_column_lineage_golden.json new file mode 100644 index 0000000000000..511494a12c452 --- /dev/null +++ b/metadata-ingestion/tests/unit/api/source_helpers/incremental_column_lineage_golden.json @@ -0,0 +1,83 @@ +[ +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:platform,dataset1,PROD)", + "changeType": "PATCH", + "aspectName": "upstreamLineage", + "aspect": { + "json": [ + { + "op": "add", + "path": "/upstreams/urn:li:dataset:(urn:li:dataPlatform:platform,upstream1,PROD)", + "value": { + "auditStamp": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:platform,upstream1,PROD)", + "type": "TRANSFORMED" + } + }, + { + "op": "add", + "path": "/upstreams/urn:li:dataset:(urn:li:dataPlatform:platform,upstream2,PROD)", + "value": { + "auditStamp": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:platform,upstream2,PROD)", + "type": "TRANSFORMED" + } + }, + { + "op": "add", + "path": "/fineGrainedLineages/NONE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,dataset1,PROD),col_a)/NONE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream1,PROD),col_a)", + "value": { + "confidenceScore": 1.0 + } + }, + { + "op": "add", + "path": "/fineGrainedLineages/NONE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,dataset1,PROD),col_a)/NONE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream2,PROD),col_a)", + "value": { + "confidenceScore": 1.0 + } + }, + { + "op": "add", + "path": "/fineGrainedLineages/NONE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,dataset1,PROD),col_b)/NONE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream1,PROD),col_b)", + "value": { + "confidenceScore": 1.0 + } + }, + { + "op": "add", + "path": "/fineGrainedLineages/NONE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,dataset1,PROD),col_b)/NONE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream2,PROD),col_b)", + "value": { + "confidenceScore": 1.0 + } + }, + { + "op": "add", + "path": "/fineGrainedLineages/NONE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,dataset1,PROD),col_c)/NONE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream1,PROD),col_c)", + "value": { + "confidenceScore": 1.0 + } + }, + { + "op": "add", + "path": "/fineGrainedLineages/NONE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,dataset1,PROD),col_c)/NONE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:platform,upstream2,PROD),col_c)", + "value": { + "confidenceScore": 1.0 + } + } + ] + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "run-id", + "lastRunId": "no-run-id-provided" + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/api/source_helpers/test_incremental_lineage_helper.py b/metadata-ingestion/tests/unit/api/source_helpers/test_incremental_lineage_helper.py index e8485106c6a81..0f98054ab1d38 100644 --- a/metadata-ingestion/tests/unit/api/source_helpers/test_incremental_lineage_helper.py +++ b/metadata-ingestion/tests/unit/api/source_helpers/test_incremental_lineage_helper.py @@ -1,13 +1,9 @@ -from typing import List, Optional -from unittest.mock import MagicMock - -import pytest +from typing import List import datahub.metadata.schema_classes as models from datahub.emitter.mce_builder import make_dataset_urn, make_schema_field_urn from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.incremental_lineage_helper import auto_incremental_lineage -from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.sink.file import write_metadata_file from tests.test_helpers import mce_helpers @@ -86,7 +82,6 @@ def test_incremental_table_lineage(tmp_path, pytestconfig): aspect = base_table_lineage_aspect() processed_wus = auto_incremental_lineage( - graph=None, incremental_lineage=True, stream=[ MetadataChangeProposalWrapper( @@ -113,7 +108,6 @@ def test_incremental_table_lineage_empty_upstreams(tmp_path, pytestconfig): ) processed_wus = auto_incremental_lineage( - graph=None, incremental_lineage=True, stream=[ MetadataChangeProposalWrapper( @@ -125,125 +119,15 @@ def test_incremental_table_lineage_empty_upstreams(tmp_path, pytestconfig): assert [wu.metadata for wu in processed_wus] == [] -@pytest.mark.parametrize( - "gms_aspect,current_aspect,output_aspect", - [ - # emitting CLL upstreamLineage over table level upstreamLineage - [ - base_table_lineage_aspect(), - base_cll_aspect(), - base_cll_aspect(), - ], - # emitting upstreamLineage for the first time - [ - None, - base_cll_aspect(), - base_cll_aspect(), - ], - # emitting CLL upstreamLineage over same CLL upstreamLineage - [ - base_cll_aspect(), - base_cll_aspect(), - base_cll_aspect(), - ], - # emitting CLL upstreamLineage over same CLL upstreamLineage but with earlier timestamp - [ - base_cll_aspect(), # default timestamp is 0 - base_cll_aspect(timestamp=1643871600000), - base_cll_aspect(timestamp=1643871600000), - ], - ], -) -def test_incremental_column_level_lineage( - gms_aspect: Optional[models.UpstreamLineageClass], - current_aspect: models.UpstreamLineageClass, - output_aspect: models.UpstreamLineageClass, -) -> None: - mock_graph = MagicMock() - mock_graph.get_aspect.return_value = gms_aspect - dataset_urn = make_dataset_urn(platform, "dataset1") - - processed_wus = auto_incremental_lineage( - graph=mock_graph, - incremental_lineage=True, - stream=[ - MetadataChangeProposalWrapper( - entityUrn=dataset_urn, - aspect=current_aspect, - systemMetadata=system_metadata, - ).as_workunit() - ], - ) - - wu: MetadataWorkUnit = next(iter(processed_wus)) - aspect = wu.get_aspect_of_type(models.UpstreamLineageClass) - assert aspect == output_aspect - - -def test_incremental_column_lineage_less_upstreams_in_gms_aspect( - tmp_path, pytestconfig -): +def test_incremental_column_lineage(tmp_path, pytestconfig): test_resources_dir = pytestconfig.rootpath / "tests/unit/api/source_helpers" - test_file = tmp_path / "incremental_cll_less_upstreams_in_gms_aspect.json" - golden_file = ( - test_resources_dir / "incremental_cll_less_upstreams_in_gms_aspect_golden.json" - ) + test_file = tmp_path / "incremental_column_lineage.json" + golden_file = test_resources_dir / "incremental_column_lineage_golden.json" urn = make_dataset_urn(platform, "dataset1") aspect = base_cll_aspect() - mock_graph = MagicMock() - mock_graph.get_aspect.return_value = make_lineage_aspect( - "dataset1", - upstreams=[make_dataset_urn(platform, name) for name in ["upstream1"]], - columns=["col_a", "col_b", "col_c"], - include_cll=True, - ) - - processed_wus = auto_incremental_lineage( - graph=mock_graph, - incremental_lineage=True, - stream=[ - MetadataChangeProposalWrapper( - entityUrn=urn, aspect=aspect, systemMetadata=system_metadata - ).as_workunit() - ], - ) - - write_metadata_file( - test_file, - [wu.metadata for wu in processed_wus], - ) - mce_helpers.check_golden_file( - pytestconfig=pytestconfig, output_path=test_file, golden_path=golden_file - ) - - -def test_incremental_column_lineage_more_upstreams_in_gms_aspect( - tmp_path, pytestconfig -): - test_resources_dir = pytestconfig.rootpath / "tests/unit/api/source_helpers" - test_file = tmp_path / "incremental_cll_more_upstreams_in_gms_aspect.json" - golden_file = ( - test_resources_dir / "incremental_cll_more_upstreams_in_gms_aspect_golden.json" - ) - - urn = make_dataset_urn(platform, "dataset1") - aspect = base_cll_aspect() - - mock_graph = MagicMock() - mock_graph.get_aspect.return_value = make_lineage_aspect( - "dataset1", - upstreams=[ - make_dataset_urn(platform, name) - for name in ["upstream1", "upstream2", "upstream3"] - ], - columns=["col_a", "col_b", "col_c"], - include_cll=True, - ) - processed_wus = auto_incremental_lineage( - graph=mock_graph, incremental_lineage=True, stream=[ MetadataChangeProposalWrapper( From 82f49378243cfce56ff3033bd503881787902df4 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Wed, 20 Mar 2024 16:44:31 -0700 Subject: [PATCH 2/2] fix lint --- .../src/datahub/ingestion/source/bigquery_v2/bigquery.py | 8 +------- .../ingestion/source/bigquery_v2/bigquery_config.py | 2 +- .../src/datahub/ingestion/source/redshift/redshift.py | 4 +++- 3 files changed, 5 insertions(+), 9 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index 13514248ecbac..aeb6a524eb77c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -169,11 +169,6 @@ def cleanup(config: BigQueryV2Config) -> None: SourceCapability.USAGE_STATS, "Enabled by default, can be disabled via configuration `include_usage_statistics`", ) -@capability( - SourceCapability.DELETION_DETECTION, - "Optionally enabled via `stateful_ingestion.remove_stale_metadata`", - supported=True, -) @capability( SourceCapability.CLASSIFICATION, "Optionally enabled via `classification.enabled`", @@ -577,8 +572,7 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: return [ *super().get_workunit_processors(), functools.partial( - auto_incremental_lineage, - self.config.incremental_lineage + auto_incremental_lineage, self.config.incremental_lineage ), StaleEntityRemovalHandler.create( self, self.config, self.ctx diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py index d954e264d7186..3fbac069a1820 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py @@ -213,7 +213,7 @@ class BigQueryV2Config( extract_column_lineage: bool = Field( default=False, description="If enabled, generate column level lineage. " - "Requires lineage_use_sql_parser to be enabled. " + "Requires lineage_use_sql_parser to be enabled.", ) extract_lineage_from_catalog: bool = Field( diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py index a343b5811c562..ad19386d41fad 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py @@ -402,7 +402,9 @@ def gen_database_container(self, database: str) -> Iterable[MetadataWorkUnit]: def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: return [ *super().get_workunit_processors(), - functools.partial(auto_incremental_lineage, self.config.incremental_lineage), + functools.partial( + auto_incremental_lineage, self.config.incremental_lineage + ), StaleEntityRemovalHandler.create( self, self.config, self.ctx ).workunit_processor,