From cdb398d08ccd55f52c0c025b7e3989fe75f0022a Mon Sep 17 00:00:00 2001 From: Aseem Bansal Date: Wed, 23 Oct 2024 17:54:41 +0530 Subject: [PATCH 1/4] feat(ingest/transform): extend ownership transformer to other entities --- .../transformer/add_dataset_ownership.py | 6 ++---- .../transformer/dataset_transformer.py | 21 ++++++++++++++----- .../transformer/pattern_cleanup_ownership.py | 6 ++---- .../transformer/remove_dataset_ownership.py | 6 ++---- 4 files changed, 22 insertions(+), 17 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_ownership.py b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_ownership.py index 54be2e5fac1e30..b107a62c905b4a 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_ownership.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_ownership.py @@ -13,9 +13,7 @@ from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.graph.client import DataHubGraph -from datahub.ingestion.transformer.dataset_transformer import ( - DatasetOwnershipTransformer, -) +from datahub.ingestion.transformer.dataset_transformer import OwnershipTransformer from datahub.metadata.schema_classes import ( BrowsePathsV2Class, MetadataChangeProposalClass, @@ -37,7 +35,7 @@ class AddDatasetOwnershipConfig(TransformerSemanticsConfigModel): is_container: bool = False -class AddDatasetOwnership(DatasetOwnershipTransformer): +class AddDatasetOwnership(OwnershipTransformer): """Transformer that adds owners to datasets according to a callback function.""" ctx: PipelineContext diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/dataset_transformer.py b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_transformer.py index 42dd54f4a584a0..00b3a9ba59f924 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/dataset_transformer.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_transformer.py @@ -27,6 +27,22 @@ def entity_types(self) -> List[str]: return ["dataset"] +class OwnershipTransformer( + DatasetTransformer, SingleAspectTransformer, metaclass=ABCMeta +): + def aspect_name(self) -> str: + return "ownership" + + def entity_types(self) -> List[str]: + return [ + "dataset", + "dataJob", + "dataFlow", + "chart", + "dashboard", + ] + + class TagTransformer(BaseTransformer, SingleAspectTransformer, metaclass=ABCMeta): """Transformer that does transform sequentially on each tag.""" @@ -47,11 +63,6 @@ def entity_types(self) -> List[str]: return ["container"] -class DatasetOwnershipTransformer(DatasetTransformer, metaclass=ABCMeta): - def aspect_name(self) -> str: - return "ownership" - - class DatasetDomainTransformer(DatasetTransformer, metaclass=ABCMeta): def aspect_name(self) -> str: return "domains" diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/pattern_cleanup_ownership.py b/metadata-ingestion/src/datahub/ingestion/transformer/pattern_cleanup_ownership.py index 8ef61ab9679e63..f17546d6f72990 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/pattern_cleanup_ownership.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/pattern_cleanup_ownership.py @@ -4,9 +4,7 @@ import datahub.emitter.mce_builder as builder from datahub.configuration.common import ConfigModel from datahub.ingestion.api.common import PipelineContext -from datahub.ingestion.transformer.dataset_transformer import ( - DatasetOwnershipTransformer, -) +from datahub.ingestion.transformer.dataset_transformer import OwnershipTransformer from datahub.metadata.schema_classes import ( OwnerClass, OwnershipClass, @@ -20,7 +18,7 @@ class PatternCleanUpOwnershipConfig(ConfigModel): pattern_for_cleanup: List[str] -class PatternCleanUpOwnership(DatasetOwnershipTransformer): +class PatternCleanUpOwnership(OwnershipTransformer): """Transformer that clean the ownership URN.""" ctx: PipelineContext diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/remove_dataset_ownership.py b/metadata-ingestion/src/datahub/ingestion/transformer/remove_dataset_ownership.py index f5d71a4340554f..934e2a13d56314 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/remove_dataset_ownership.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/remove_dataset_ownership.py @@ -3,9 +3,7 @@ from datahub.configuration.common import ConfigModel from datahub.emitter.mce_builder import Aspect from datahub.ingestion.api.common import PipelineContext -from datahub.ingestion.transformer.dataset_transformer import ( - DatasetOwnershipTransformer, -) +from datahub.ingestion.transformer.dataset_transformer import OwnershipTransformer from datahub.metadata.schema_classes import OwnershipClass @@ -13,7 +11,7 @@ class ClearDatasetOwnershipConfig(ConfigModel): pass -class SimpleRemoveDatasetOwnership(DatasetOwnershipTransformer): +class SimpleRemoveDatasetOwnership(OwnershipTransformer): """Transformer that clears all owners on each dataset.""" def __init__(self, config: ClearDatasetOwnershipConfig, ctx: PipelineContext): From 3a68378195d1ad598d7acdb7054dc9d7c37b3967 Mon Sep 17 00:00:00 2001 From: Aseem Bansal Date: Wed, 23 Oct 2024 21:00:24 +0530 Subject: [PATCH 2/4] update tests --- .../tests/unit/test_transform_dataset.py | 46 +++---------------- 1 file changed, 7 insertions(+), 39 deletions(-) diff --git a/metadata-ingestion/tests/unit/test_transform_dataset.py b/metadata-ingestion/tests/unit/test_transform_dataset.py index 4e9a38cb37ae63..381d6bb41ded4e 100644 --- a/metadata-ingestion/tests/unit/test_transform_dataset.py +++ b/metadata-ingestion/tests/unit/test_transform_dataset.py @@ -225,20 +225,7 @@ def test_simple_dataset_ownership_transformation(mock_time): with_owner_aspect = make_dataset_with_owner() - not_a_dataset = models.MetadataChangeEventClass( - proposedSnapshot=models.DataJobSnapshotClass( - urn="urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_abc,PROD),task_456)", - aspects=[ - models.DataJobInfoClass( - name="User Deletions", - description="Constructs the fct_users_deleted from logging_events", - type=models.AzkabanJobTypeClass.SQL, - ) - ], - ) - ) - - inputs = [no_owner_aspect, with_owner_aspect, not_a_dataset, EndOfStream()] + inputs = [no_owner_aspect, with_owner_aspect, EndOfStream()] transformer = SimpleAddDatasetOwnership.create( { @@ -262,7 +249,7 @@ def test_simple_dataset_ownership_transformation(mock_time): ) assert first_ownership_aspect is None - last_event = outputs[3].record + last_event = outputs[2].record assert isinstance(last_event, MetadataChangeProposalWrapper) assert isinstance(last_event.aspect, OwnershipClass) assert len(last_event.aspect.owners) == 2 @@ -287,11 +274,8 @@ def test_simple_dataset_ownership_transformation(mock_time): ] ) - # Verify that the third entry is unchanged. - assert inputs[2] == outputs[2].record - # Verify that the last entry is EndOfStream - assert inputs[3] == outputs[4].record + assert inputs[-1] == outputs[-1].record def test_simple_dataset_ownership_with_type_transformation(mock_time): @@ -982,20 +966,7 @@ def test_pattern_dataset_ownership_transformation(mock_time): ), ) - not_a_dataset = models.MetadataChangeEventClass( - proposedSnapshot=models.DataJobSnapshotClass( - urn="urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_abc,PROD),task_456)", - aspects=[ - models.DataJobInfoClass( - name="User Deletions", - description="Constructs the fct_users_deleted from logging_events", - type=models.AzkabanJobTypeClass.SQL, - ) - ], - ) - ) - - inputs = [no_owner_aspect, with_owner_aspect, not_a_dataset, EndOfStream()] + inputs = [no_owner_aspect, with_owner_aspect, EndOfStream()] transformer = PatternAddDatasetOwnership.create( { @@ -1019,7 +990,7 @@ def test_pattern_dataset_ownership_transformation(mock_time): # Check the first entry. assert inputs[0] == outputs[0].record - first_ownership_aspect = outputs[3].record.aspect + first_ownership_aspect = outputs[2].record.aspect assert first_ownership_aspect assert len(first_ownership_aspect.owners) == 1 assert all( @@ -1042,9 +1013,6 @@ def test_pattern_dataset_ownership_transformation(mock_time): ] ) - # Verify that the third entry is unchanged. - assert inputs[2] == outputs[2].record - # Verify that the last entry is unchanged (EOS) assert inputs[-1] == outputs[-1].record @@ -1188,7 +1156,7 @@ def fake_get_aspect( transformer.transform([RecordEnvelope(input, metadata={}) for input in inputs]) ) - assert len(outputs) == len(inputs) + 3 + assert len(outputs) == len(inputs) + 4 # Check the first entry. assert inputs[0] == outputs[0].record @@ -1219,7 +1187,7 @@ def fake_get_aspect( # Check container ownerships for i in range(2): - container_ownership_aspect = outputs[i + 4].record.aspect + container_ownership_aspect = outputs[i + 5].record.aspect assert container_ownership_aspect ownership = json.loads(container_ownership_aspect.value.decode("utf-8")) assert len(ownership) == 2 From aa694b7f2b27e3aef4a5967764d120adc80e54e5 Mon Sep 17 00:00:00 2001 From: Aseem Bansal Date: Mon, 28 Oct 2024 20:18:15 +0530 Subject: [PATCH 3/4] Revert "update tests" This reverts commit 3a68378195d1ad598d7acdb7054dc9d7c37b3967. --- .../tests/unit/test_transform_dataset.py | 46 ++++++++++++++++--- 1 file changed, 39 insertions(+), 7 deletions(-) diff --git a/metadata-ingestion/tests/unit/test_transform_dataset.py b/metadata-ingestion/tests/unit/test_transform_dataset.py index 381d6bb41ded4e..4e9a38cb37ae63 100644 --- a/metadata-ingestion/tests/unit/test_transform_dataset.py +++ b/metadata-ingestion/tests/unit/test_transform_dataset.py @@ -225,7 +225,20 @@ def test_simple_dataset_ownership_transformation(mock_time): with_owner_aspect = make_dataset_with_owner() - inputs = [no_owner_aspect, with_owner_aspect, EndOfStream()] + not_a_dataset = models.MetadataChangeEventClass( + proposedSnapshot=models.DataJobSnapshotClass( + urn="urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_abc,PROD),task_456)", + aspects=[ + models.DataJobInfoClass( + name="User Deletions", + description="Constructs the fct_users_deleted from logging_events", + type=models.AzkabanJobTypeClass.SQL, + ) + ], + ) + ) + + inputs = [no_owner_aspect, with_owner_aspect, not_a_dataset, EndOfStream()] transformer = SimpleAddDatasetOwnership.create( { @@ -249,7 +262,7 @@ def test_simple_dataset_ownership_transformation(mock_time): ) assert first_ownership_aspect is None - last_event = outputs[2].record + last_event = outputs[3].record assert isinstance(last_event, MetadataChangeProposalWrapper) assert isinstance(last_event.aspect, OwnershipClass) assert len(last_event.aspect.owners) == 2 @@ -274,8 +287,11 @@ def test_simple_dataset_ownership_transformation(mock_time): ] ) + # Verify that the third entry is unchanged. + assert inputs[2] == outputs[2].record + # Verify that the last entry is EndOfStream - assert inputs[-1] == outputs[-1].record + assert inputs[3] == outputs[4].record def test_simple_dataset_ownership_with_type_transformation(mock_time): @@ -966,7 +982,20 @@ def test_pattern_dataset_ownership_transformation(mock_time): ), ) - inputs = [no_owner_aspect, with_owner_aspect, EndOfStream()] + not_a_dataset = models.MetadataChangeEventClass( + proposedSnapshot=models.DataJobSnapshotClass( + urn="urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_abc,PROD),task_456)", + aspects=[ + models.DataJobInfoClass( + name="User Deletions", + description="Constructs the fct_users_deleted from logging_events", + type=models.AzkabanJobTypeClass.SQL, + ) + ], + ) + ) + + inputs = [no_owner_aspect, with_owner_aspect, not_a_dataset, EndOfStream()] transformer = PatternAddDatasetOwnership.create( { @@ -990,7 +1019,7 @@ def test_pattern_dataset_ownership_transformation(mock_time): # Check the first entry. assert inputs[0] == outputs[0].record - first_ownership_aspect = outputs[2].record.aspect + first_ownership_aspect = outputs[3].record.aspect assert first_ownership_aspect assert len(first_ownership_aspect.owners) == 1 assert all( @@ -1013,6 +1042,9 @@ def test_pattern_dataset_ownership_transformation(mock_time): ] ) + # Verify that the third entry is unchanged. + assert inputs[2] == outputs[2].record + # Verify that the last entry is unchanged (EOS) assert inputs[-1] == outputs[-1].record @@ -1156,7 +1188,7 @@ def fake_get_aspect( transformer.transform([RecordEnvelope(input, metadata={}) for input in inputs]) ) - assert len(outputs) == len(inputs) + 4 + assert len(outputs) == len(inputs) + 3 # Check the first entry. assert inputs[0] == outputs[0].record @@ -1187,7 +1219,7 @@ def fake_get_aspect( # Check container ownerships for i in range(2): - container_ownership_aspect = outputs[i + 5].record.aspect + container_ownership_aspect = outputs[i + 4].record.aspect assert container_ownership_aspect ownership = json.loads(container_ownership_aspect.value.decode("utf-8")) assert len(ownership) == 2 From 5b67b9aa3bbd01a088a479c729e47932d501c633 Mon Sep 17 00:00:00 2001 From: Aseem Bansal Date: Mon, 28 Oct 2024 20:40:39 +0530 Subject: [PATCH 4/4] add tests for new aspects --- .../tests/unit/test_transform_dataset.py | 57 ++++++++++++++----- 1 file changed, 42 insertions(+), 15 deletions(-) diff --git a/metadata-ingestion/tests/unit/test_transform_dataset.py b/metadata-ingestion/tests/unit/test_transform_dataset.py index 4e9a38cb37ae63..389f7b70b3311e 100644 --- a/metadata-ingestion/tests/unit/test_transform_dataset.py +++ b/metadata-ingestion/tests/unit/test_transform_dataset.py @@ -220,7 +220,7 @@ def make_dataset_with_properties() -> models.MetadataChangeEventClass: ) -def test_simple_dataset_ownership_transformation(mock_time): +def test_dataset_ownership_transformation(mock_time): no_owner_aspect = make_generic_dataset() with_owner_aspect = make_dataset_with_owner() @@ -254,7 +254,7 @@ def test_simple_dataset_ownership_transformation(mock_time): transformer.transform([RecordEnvelope(input, metadata={}) for input in inputs]) ) - assert len(outputs) == len(inputs) + 1 + assert len(outputs) == len(inputs) + 2 # Check the first entry. first_ownership_aspect = builder.get_aspect_if_available( @@ -287,11 +287,21 @@ def test_simple_dataset_ownership_transformation(mock_time): ] ) + third_ownership_aspect = outputs[4].record.aspect + assert third_ownership_aspect + assert len(third_ownership_aspect.owners) == 2 + assert all( + [ + owner.type == models.OwnershipTypeClass.DATAOWNER and owner.typeUrn is None + for owner in second_ownership_aspect.owners + ] + ) + # Verify that the third entry is unchanged. assert inputs[2] == outputs[2].record # Verify that the last entry is EndOfStream - assert inputs[3] == outputs[4].record + assert inputs[-1] == outputs[-1].record def test_simple_dataset_ownership_with_type_transformation(mock_time): @@ -1003,6 +1013,7 @@ def test_pattern_dataset_ownership_transformation(mock_time): "rules": { ".*example1.*": [builder.make_user_urn("person1")], ".*example2.*": [builder.make_user_urn("person2")], + ".*dag_abc.*": [builder.make_user_urn("person2")], } }, "ownership_type": "DATAOWNER", @@ -1014,7 +1025,9 @@ def test_pattern_dataset_ownership_transformation(mock_time): transformer.transform([RecordEnvelope(input, metadata={}) for input in inputs]) ) - assert len(outputs) == len(inputs) + 1 # additional MCP due to the no-owner MCE + assert ( + len(outputs) == len(inputs) + 2 + ) # additional MCP due to the no-owner MCE + datajob # Check the first entry. assert inputs[0] == outputs[0].record @@ -1042,6 +1055,16 @@ def test_pattern_dataset_ownership_transformation(mock_time): ] ) + third_ownership_aspect = outputs[4].record.aspect + assert third_ownership_aspect + assert len(third_ownership_aspect.owners) == 1 + assert all( + [ + owner.type == models.OwnershipTypeClass.DATAOWNER + for owner in third_ownership_aspect.owners + ] + ) + # Verify that the third entry is unchanged. assert inputs[2] == outputs[2].record @@ -1122,14 +1145,14 @@ def fake_get_aspect( pipeline_context.graph.get_aspect = fake_get_aspect # type: ignore # No owner aspect for the first dataset - no_owner_aspect = models.MetadataChangeEventClass( + no_owner_aspect_dataset = models.MetadataChangeEventClass( proposedSnapshot=models.DatasetSnapshotClass( urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)", aspects=[models.StatusClass(removed=False)], ), ) # Dataset with an existing owner - with_owner_aspect = models.MetadataChangeEventClass( + with_owner_aspect_dataset = models.MetadataChangeEventClass( proposedSnapshot=models.DatasetSnapshotClass( urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example2,PROD)", aspects=[ @@ -1148,8 +1171,7 @@ def fake_get_aspect( ), ) - # Not a dataset, should be ignored - not_a_dataset = models.MetadataChangeEventClass( + datajob = models.MetadataChangeEventClass( proposedSnapshot=models.DataJobSnapshotClass( urn="urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_abc,PROD),task_456)", aspects=[ @@ -1163,9 +1185,9 @@ def fake_get_aspect( ) inputs = [ - no_owner_aspect, - with_owner_aspect, - not_a_dataset, + no_owner_aspect_dataset, + with_owner_aspect_dataset, + datajob, EndOfStream(), ] @@ -1176,6 +1198,7 @@ def fake_get_aspect( "rules": { ".*example1.*": [builder.make_user_urn("person1")], ".*example2.*": [builder.make_user_urn("person2")], + ".*dag_abc.*": [builder.make_user_urn("person3")], } }, "ownership_type": "DATAOWNER", @@ -1188,9 +1211,9 @@ def fake_get_aspect( transformer.transform([RecordEnvelope(input, metadata={}) for input in inputs]) ) - assert len(outputs) == len(inputs) + 3 + assert len(outputs) == len(inputs) + 4 - # Check the first entry. + # Check that DatasetSnapshotClass has not changed assert inputs[0] == outputs[0].record # Check the ownership for the first dataset (example1) @@ -1217,12 +1240,16 @@ def fake_get_aspect( ] ) + third_ownership_aspect = outputs[4].record.aspect + assert third_ownership_aspect + assert len(third_ownership_aspect.owners) == 1 # new for datajob + # Check container ownerships for i in range(2): - container_ownership_aspect = outputs[i + 4].record.aspect + container_ownership_aspect = outputs[i + 5].record.aspect assert container_ownership_aspect ownership = json.loads(container_ownership_aspect.value.decode("utf-8")) - assert len(ownership) == 2 + assert len(ownership) == 3 assert ownership[0]["value"]["owner"] == builder.make_user_urn("person1") assert ownership[1]["value"]["owner"] == builder.make_user_urn("person2")