From edc29e078b59afdb093fbb2f9be764f11199a9fc Mon Sep 17 00:00:00 2001 From: Shirshanka Das Date: Mon, 27 Jan 2025 21:42:06 -0800 Subject: [PATCH 1/2] feat(sdk): data process instance - container, subtype, dataplatforminstance support --- .../dataprocess/dataprocess_instance.py | 115 +++++++++- .../dataprocess/test_data_process_instance.py | 200 ++++++++++++++++++ 2 files changed, 304 insertions(+), 11 deletions(-) create mode 100644 metadata-ingestion/tests/unit/api/entities/dataprocess/test_data_process_instance.py diff --git a/metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py b/metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py index d406fa36e00db6..b32b98eb9e3221 100644 --- a/metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py +++ b/metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py @@ -5,7 +5,7 @@ from datahub.api.entities.datajob import DataFlow, DataJob from datahub.emitter.generic_emitter import Emitter from datahub.emitter.mcp import MetadataChangeProposalWrapper -from datahub.emitter.mcp_builder import DatahubKey +from datahub.emitter.mcp_builder import ContainerKey, DatahubKey from datahub.metadata.com.linkedin.pegasus2avro.dataprocess import ( DataProcessInstanceInput, DataProcessInstanceOutput, @@ -15,11 +15,15 @@ ) from datahub.metadata.schema_classes import ( AuditStampClass, + ContainerClass, + DataPlatformInstanceClass, DataProcessInstanceRunEventClass, DataProcessInstanceRunResultClass, DataProcessRunStatusClass, DataProcessTypeClass, + SubTypesClass, ) +from datahub.metadata.urns import DataPlatformInstanceUrn, DataPlatformUrn from datahub.utilities.str_enum import StrEnum from datahub.utilities.urns.data_flow_urn import DataFlowUrn from datahub.utilities.urns.data_job_urn import DataJobUrn @@ -42,7 +46,7 @@ class InstanceRunResult(StrEnum): @dataclass class DataProcessInstance: - """This is a DataProcessInstance class which represent an instance of a DataFlow or DataJob. + """This is a DataProcessInstance class which represents an instance of a DataFlow, DataJob, or a standalone process within a Container. Args: id: The id of the dataprocess instance execution. @@ -71,6 +75,10 @@ class DataProcessInstance: _template_object: Optional[Union[DataJob, DataFlow]] = field( init=False, default=None, repr=False ) + data_platform_instance: Optional[str] = None + subtype: Optional[str] = None + container_urn: Optional[str] = None + _platform: str = field(init=False, repr=False, default=None) def __post_init__(self): self.urn = DataProcessInstanceUrn( @@ -80,6 +88,28 @@ def __post_init__(self): id=self.id, ).guid() ) + self._platform = self.orchestrator + + try: + # We first try to create from string assuming its an urn + self._platform = str(DataPlatformUrn.from_string(self._platform)) + except Exception: + # If it fails, we assume its an id + self._platform = str(DataPlatformUrn(self._platform)) + + if self.data_platform_instance is not None: + try: + # We first try to create from string assuming its an urn + self.data_platform_instance = str( + DataPlatformInstanceUrn.from_string(self.data_platform_instance) + ) + except Exception: + # If it fails, we assume its an id + self.data_platform_instance = str( + DataPlatformInstanceUrn( + platform=self._platform, instance=self.data_platform_instance + ) + ) def start_event_mcp( self, start_timestamp_millis: int, attempt: Optional[int] = None @@ -269,6 +299,29 @@ def generate_mcp( ) yield mcp + assert self._platform + if self.data_platform_instance: + mcp = MetadataChangeProposalWrapper( + entityUrn=str(self.urn), + aspect=DataPlatformInstanceClass( + platform=self._platform, instance=self.data_platform_instance + ), + ) + yield mcp + + if self.subtype: + mcp = MetadataChangeProposalWrapper( + entityUrn=str(self.urn), aspect=SubTypesClass(typeNames=[self.subtype]) + ) + yield mcp + + if self.container_urn: + mcp = MetadataChangeProposalWrapper( + entityUrn=str(self.urn), + aspect=ContainerClass(container=self.container_urn), + ) + yield mcp + yield from self.generate_inlet_outlet_mcp(materialize_iolets=materialize_iolets) @staticmethod @@ -309,13 +362,20 @@ def from_datajob( clone_outlets: bool = False, ) -> "DataProcessInstance": """ - Generates DataProcessInstance from a DataJob + Generates a DataProcessInstance from a given DataJob. - :param datajob: (DataJob) the datajob from generate the DataProcessInstance - :param id: (str) the id for the DataProcessInstance - :param clone_inlets: (bool) whether to clone datajob's inlets - :param clone_outlets: (bool) whether to clone datajob's outlets - :return: DataProcessInstance + This method creates a DataProcessInstance object using the provided DataJob + and assigns it a unique identifier. Optionally, it can clone the inlets and + outlets from the DataJob to the DataProcessInstance. + + Args: + datajob (DataJob): The DataJob instance from which to generate the DataProcessInstance. + id (str): The unique identifier for the DataProcessInstance. + clone_inlets (bool, optional): If True, clones the inlets from the DataJob to the DataProcessInstance. Defaults to False. + clone_outlets (bool, optional): If True, clones the outlets from the DataJob to the DataProcessInstance. Defaults to False. + + Returns: + DataProcessInstance: The generated DataProcessInstance object. """ dpi: DataProcessInstance = DataProcessInstance( orchestrator=datajob.flow_urn.orchestrator, @@ -332,14 +392,47 @@ def from_datajob( return dpi @staticmethod - def from_dataflow(dataflow: DataFlow, id: str) -> "DataProcessInstance": + def from_container( + container_key: ContainerKey, + id: str, + ) -> "DataProcessInstance": """ - Generates DataProcessInstance from a DataFlow + Create a DataProcessInstance that is located within a Container. + Use this method when you need to represent a DataProcessInstance that + is not an instance of a DataJob or a DataFlow. + e.g. If recording an ad-hoc training run that is just associated with an Experiment. - :param dataflow: (DataFlow) the DataFlow from generate the DataProcessInstance + :param container_key: (ContainerKey) the container key to generate the DataProcessInstance :param id: (str) the id for the DataProcessInstance :return: DataProcessInstance """ + dpi: DataProcessInstance = DataProcessInstance( + id=id, + orchestrator=DataPlatformUrn.from_string( + container_key.platform + ).platform_name, + template_urn=None, + container_urn=container_key.as_urn(), + ) + + return dpi + + @staticmethod + def from_dataflow(dataflow: DataFlow, id: str) -> "DataProcessInstance": + """ + Creates a DataProcessInstance from a given DataFlow. + + This method generates a DataProcessInstance object using the provided DataFlow + and a specified id. The DataProcessInstance will inherit properties from the + DataFlow such as orchestrator, environment, and template URN. + + Args: + dataflow (DataFlow): The DataFlow object from which to generate the DataProcessInstance. + id (str): The unique identifier for the DataProcessInstance. + + Returns: + DataProcessInstance: The newly created DataProcessInstance object. + """ dpi = DataProcessInstance( id=id, orchestrator=dataflow.orchestrator, diff --git a/metadata-ingestion/tests/unit/api/entities/dataprocess/test_data_process_instance.py b/metadata-ingestion/tests/unit/api/entities/dataprocess/test_data_process_instance.py new file mode 100644 index 00000000000000..1eaec8a6e58799 --- /dev/null +++ b/metadata-ingestion/tests/unit/api/entities/dataprocess/test_data_process_instance.py @@ -0,0 +1,200 @@ +import time +import unittest +from unittest.mock import Mock + +from datahub.api.entities.datajob import DataFlow, DataJob +from datahub.api.entities.dataprocess.dataprocess_instance import DataProcessInstance +from datahub.emitter.mcp_builder import ContainerKey +from datahub.metadata.com.linkedin.pegasus2avro.dataprocess import RunResultType +from datahub.metadata.schema_classes import ( + DataProcessRunStatusClass, + DataProcessTypeClass, +) +from datahub.metadata.urns import DataFlowUrn, DataJobUrn, DataPlatformUrn, DatasetUrn + + +class TestDataProcessInstance(unittest.TestCase): + def setUp(self): + # Common test data + self.test_id = "test_process_123" + self.test_orchestrator = "airflow" + self.test_cluster = "prod" + + # Create mock ContainerKey + self.mock_container_key = ContainerKey( + platform="urn:li:dataPlatform:mlflow", name="test_experiment", env="PROD" + ) + + # Create mock DataJob + self.mock_flow_urn = DataFlowUrn.create_from_ids( + orchestrator="airflow", flow_id="test_flow", env="prod" + ) + self.mock_job_urn = DataJobUrn.create_from_ids( + job_id="test_job", data_flow_urn=str(self.mock_flow_urn) + ) + self.mock_datajob = DataJob( + id="test_job", + flow_urn=self.mock_flow_urn, + inlets=[ + DatasetUrn.from_string( + "urn:li:dataset:(urn:li:dataPlatform:hive,test_input,PROD)" + ) + ], + outlets=[ + DatasetUrn.from_string( + "urn:li:dataset:(urn:li:dataPlatform:hive,test_output,PROD)" + ) + ], + ) + + # Create mock DataFlow + self.mock_dataflow = DataFlow( + orchestrator="airflow", id="test_flow", env="prod" + ) + + def test_basic_initialization(self): + """Test basic initialization of DataProcessInstance""" + + instance = DataProcessInstance( + id=self.test_id, + orchestrator=self.test_orchestrator, + cluster=self.test_cluster, + ) + + self.assertEqual(instance.id, self.test_id) + self.assertEqual(instance.orchestrator, self.test_orchestrator) + self.assertEqual(instance.cluster, self.test_cluster) + self.assertEqual(instance.type, DataProcessTypeClass.BATCH_SCHEDULED) + + def test_from_datajob_creation(self): + """Test creation of DataProcessInstance from DataJob""" + + instance = DataProcessInstance.from_datajob( + datajob=self.mock_datajob, + id=self.test_id, + clone_inlets=True, + clone_outlets=True, + ) + + self.assertEqual(instance.id, self.test_id) + self.assertEqual(instance.orchestrator, "airflow") + self.assertEqual(instance.template_urn, self.mock_datajob.urn) + self.assertEqual(len(instance.inlets), 1) + self.assertEqual(len(instance.outlets), 1) + + def test_from_dataflow_creation(self): + """Test creation of DataProcessInstance from DataFlow""" + + instance = DataProcessInstance.from_dataflow( + dataflow=self.mock_dataflow, id=self.test_id + ) + + self.assertEqual(instance.id, self.test_id) + self.assertEqual(instance.orchestrator, "airflow") + self.assertEqual(instance.template_urn, self.mock_dataflow.urn) + + def test_from_container_creation(self): + """Test creation of DataProcessInstance from ContainerKey""" + + instance = DataProcessInstance.from_container( + container_key=self.mock_container_key, id=self.test_id + ) + + self.assertEqual(instance.id, self.test_id) + self.assertEqual(instance.orchestrator, "mlflow") # Platform name from URN + self.assertIsNone( + instance.template_urn + ) # Should be None for container-based instances + self.assertEqual(instance.container_urn, self.mock_container_key.as_urn()) + + # Verify the platform is set correctly + expected_platform = str( + DataPlatformUrn.from_string(self.mock_container_key.platform) + ) + self.assertEqual(instance._platform, expected_platform) + + def test_start_event_generation(self): + """Test generation of start event MCPs""" + + instance = DataProcessInstance( + id=self.test_id, orchestrator=self.test_orchestrator + ) + + start_time = int(time.time() * 1000) + mcps = list(instance.start_event_mcp(start_time, attempt=1)) + + self.assertEqual(len(mcps), 1) + start_event = mcps[0] + self.assertEqual(start_event.aspect.status, DataProcessRunStatusClass.STARTED) + self.assertEqual(start_event.aspect.timestampMillis, start_time) + self.assertEqual(start_event.aspect.attempt, 1) + + def test_end_event_generation(self): + """Test generation of end event MCPs""" + + instance = DataProcessInstance( + id=self.test_id, orchestrator=self.test_orchestrator + ) + + end_time = int(time.time() * 1000) + mcps = list( + instance.end_event_mcp(end_time, result=RunResultType.SUCCESS, attempt=1) + ) + + self.assertEqual(len(mcps), 1) + end_event = mcps[0] + self.assertEqual(end_event.aspect.status, DataProcessRunStatusClass.COMPLETE) + self.assertEqual(end_event.aspect.timestampMillis, end_time) + self.assertEqual(end_event.aspect.result.type, RunResultType.SUCCESS) + + def test_emit_process_with_emitter(self): + """Test emitting process events with mock emitter""" + + mock_emitter = Mock() + instance = DataProcessInstance( + id=self.test_id, orchestrator=self.test_orchestrator + ) + + # Test emit method + instance.emit(mock_emitter) + self.assertTrue(mock_emitter.emit.called) + + # Test emit_process_start + start_time = int(time.time() * 1000) + instance.emit_process_start(mock_emitter, start_time) + self.assertTrue(mock_emitter.emit.called) + + # Test emit_process_end + end_time = int(time.time() * 1000) + instance.emit_process_end(mock_emitter, end_time, result=RunResultType.SUCCESS) + self.assertTrue(mock_emitter.emit.called) + + def test_generate_mcp(self): + """Test generation of MCPs""" + + instance = DataProcessInstance( + id=self.test_id, + orchestrator=self.test_orchestrator, + properties={"env": "prod"}, + url="http://test.url", + ) + + created_time = int(time.time() * 1000) + mcps = list(instance.generate_mcp(created_time, materialize_iolets=True)) + + # Check if we have the basic MCPs generated + self.assertGreaterEqual( + len(mcps), 2 + ) # Should at least have properties and relationships + + # Verify the properties MCP + properties_mcp = next( + mcp for mcp in mcps if hasattr(mcp.aspect, "customProperties") + ) + self.assertEqual(properties_mcp.aspect.name, self.test_id) + self.assertEqual(properties_mcp.aspect.customProperties["env"], "prod") + self.assertEqual(properties_mcp.aspect.externalUrl, "http://test.url") + + +if __name__ == "__main__": + unittest.main() From 4d387d70409015d44d2357b61a0315368eb6e9f6 Mon Sep 17 00:00:00 2001 From: Shirshanka Das Date: Mon, 27 Jan 2025 23:00:59 -0800 Subject: [PATCH 2/2] fix lint --- .../dataprocess/dataprocess_instance.py | 2 +- .../dataprocess/test_data_process_instance.py | 20 ++++++++++++++++--- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py b/metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py index b32b98eb9e3221..82af9f127cdefa 100644 --- a/metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py +++ b/metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py @@ -78,7 +78,7 @@ class DataProcessInstance: data_platform_instance: Optional[str] = None subtype: Optional[str] = None container_urn: Optional[str] = None - _platform: str = field(init=False, repr=False, default=None) + _platform: Optional[str] = field(init=False, repr=False, default=None) def __post_init__(self): self.urn = DataProcessInstanceUrn( diff --git a/metadata-ingestion/tests/unit/api/entities/dataprocess/test_data_process_instance.py b/metadata-ingestion/tests/unit/api/entities/dataprocess/test_data_process_instance.py index 1eaec8a6e58799..04f4754d8e8832 100644 --- a/metadata-ingestion/tests/unit/api/entities/dataprocess/test_data_process_instance.py +++ b/metadata-ingestion/tests/unit/api/entities/dataprocess/test_data_process_instance.py @@ -2,8 +2,12 @@ import unittest from unittest.mock import Mock +import datahub.metadata.schema_classes as models from datahub.api.entities.datajob import DataFlow, DataJob -from datahub.api.entities.dataprocess.dataprocess_instance import DataProcessInstance +from datahub.api.entities.dataprocess.dataprocess_instance import ( + DataProcessInstance, + InstanceRunResult, +) from datahub.emitter.mcp_builder import ContainerKey from datahub.metadata.com.linkedin.pegasus2avro.dataprocess import RunResultType from datahub.metadata.schema_classes import ( @@ -125,6 +129,7 @@ def test_start_event_generation(self): self.assertEqual(len(mcps), 1) start_event = mcps[0] + assert isinstance(start_event.aspect, models.DataProcessInstanceRunEventClass) self.assertEqual(start_event.aspect.status, DataProcessRunStatusClass.STARTED) self.assertEqual(start_event.aspect.timestampMillis, start_time) self.assertEqual(start_event.aspect.attempt, 1) @@ -138,13 +143,17 @@ def test_end_event_generation(self): end_time = int(time.time() * 1000) mcps = list( - instance.end_event_mcp(end_time, result=RunResultType.SUCCESS, attempt=1) + instance.end_event_mcp( + end_time, result=InstanceRunResult.SUCCESS, attempt=1 + ) ) self.assertEqual(len(mcps), 1) end_event = mcps[0] + assert isinstance(end_event.aspect, models.DataProcessInstanceRunEventClass) self.assertEqual(end_event.aspect.status, DataProcessRunStatusClass.COMPLETE) self.assertEqual(end_event.aspect.timestampMillis, end_time) + assert end_event.aspect.result is not None self.assertEqual(end_event.aspect.result.type, RunResultType.SUCCESS) def test_emit_process_with_emitter(self): @@ -166,7 +175,9 @@ def test_emit_process_with_emitter(self): # Test emit_process_end end_time = int(time.time() * 1000) - instance.emit_process_end(mock_emitter, end_time, result=RunResultType.SUCCESS) + instance.emit_process_end( + mock_emitter, end_time, result=InstanceRunResult.SUCCESS + ) self.assertTrue(mock_emitter.emit.called) def test_generate_mcp(self): @@ -191,6 +202,9 @@ def test_generate_mcp(self): properties_mcp = next( mcp for mcp in mcps if hasattr(mcp.aspect, "customProperties") ) + assert isinstance( + properties_mcp.aspect, models.DataProcessInstancePropertiesClass + ) self.assertEqual(properties_mcp.aspect.name, self.test_id) self.assertEqual(properties_mcp.aspect.customProperties["env"], "prod") self.assertEqual(properties_mcp.aspect.externalUrl, "http://test.url")