Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk): data process instance - container, subtype, dataplatformin… #12476

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from datahub.api.entities.datajob import DataFlow, DataJob
from datahub.emitter.generic_emitter import Emitter
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import DatahubKey
from datahub.emitter.mcp_builder import ContainerKey, DatahubKey
from datahub.metadata.com.linkedin.pegasus2avro.dataprocess import (
DataProcessInstanceInput,
DataProcessInstanceOutput,
Expand All @@ -15,11 +15,15 @@
)
from datahub.metadata.schema_classes import (
AuditStampClass,
ContainerClass,
DataPlatformInstanceClass,
DataProcessInstanceRunEventClass,
DataProcessInstanceRunResultClass,
DataProcessRunStatusClass,
DataProcessTypeClass,
SubTypesClass,
)
from datahub.metadata.urns import DataPlatformInstanceUrn, DataPlatformUrn
from datahub.utilities.str_enum import StrEnum
from datahub.utilities.urns.data_flow_urn import DataFlowUrn
from datahub.utilities.urns.data_job_urn import DataJobUrn
Expand All @@ -42,7 +46,7 @@

@dataclass
class DataProcessInstance:
"""This is a DataProcessInstance class which represent an instance of a DataFlow or DataJob.
"""This is a DataProcessInstance class which represents an instance of a DataFlow, DataJob, or a standalone process within a Container.

Args:
id: The id of the dataprocess instance execution.
Expand Down Expand Up @@ -71,6 +75,10 @@
_template_object: Optional[Union[DataJob, DataFlow]] = field(
init=False, default=None, repr=False
)
data_platform_instance: Optional[str] = None
subtype: Optional[str] = None
container_urn: Optional[str] = None
_platform: Optional[str] = field(init=False, repr=False, default=None)

def __post_init__(self):
self.urn = DataProcessInstanceUrn(
Expand All @@ -80,6 +88,28 @@
id=self.id,
).guid()
)
self._platform = self.orchestrator

try:
# We first try to create from string assuming its an urn
self._platform = str(DataPlatformUrn.from_string(self._platform))
except Exception:
# If it fails, we assume its an id
self._platform = str(DataPlatformUrn(self._platform))
Comment on lines +93 to +98
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of scope, but we should have a helper method for this sort of logic


if self.data_platform_instance is not None:
try:

Check warning on line 101 in metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py#L101

Added line #L101 was not covered by tests
# We first try to create from string assuming its an urn
self.data_platform_instance = str(

Check warning on line 103 in metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py#L103

Added line #L103 was not covered by tests
DataPlatformInstanceUrn.from_string(self.data_platform_instance)
)
except Exception:

Check warning on line 106 in metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py#L106

Added line #L106 was not covered by tests
# If it fails, we assume its an id
self.data_platform_instance = str(

Check warning on line 108 in metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py#L108

Added line #L108 was not covered by tests
DataPlatformInstanceUrn(
platform=self._platform, instance=self.data_platform_instance
)
)

def start_event_mcp(
self, start_timestamp_millis: int, attempt: Optional[int] = None
Expand Down Expand Up @@ -269,6 +299,29 @@
)
yield mcp

assert self._platform
if self.data_platform_instance:
mcp = MetadataChangeProposalWrapper(

Check warning on line 304 in metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py#L304

Added line #L304 was not covered by tests
entityUrn=str(self.urn),
aspect=DataPlatformInstanceClass(
platform=self._platform, instance=self.data_platform_instance
),
)
yield mcp

Check warning on line 310 in metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py#L310

Added line #L310 was not covered by tests

if self.subtype:
mcp = MetadataChangeProposalWrapper(

Check warning on line 313 in metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py#L313

Added line #L313 was not covered by tests
entityUrn=str(self.urn), aspect=SubTypesClass(typeNames=[self.subtype])
)
yield mcp

Check warning on line 316 in metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py#L316

Added line #L316 was not covered by tests

if self.container_urn:
mcp = MetadataChangeProposalWrapper(

Check warning on line 319 in metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py#L319

Added line #L319 was not covered by tests
entityUrn=str(self.urn),
aspect=ContainerClass(container=self.container_urn),
)
yield mcp

Check warning on line 323 in metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py#L323

Added line #L323 was not covered by tests

yield from self.generate_inlet_outlet_mcp(materialize_iolets=materialize_iolets)

@staticmethod
Expand Down Expand Up @@ -309,13 +362,20 @@
clone_outlets: bool = False,
) -> "DataProcessInstance":
"""
Generates DataProcessInstance from a DataJob
Generates a DataProcessInstance from a given DataJob.

:param datajob: (DataJob) the datajob from generate the DataProcessInstance
:param id: (str) the id for the DataProcessInstance
:param clone_inlets: (bool) whether to clone datajob's inlets
:param clone_outlets: (bool) whether to clone datajob's outlets
:return: DataProcessInstance
This method creates a DataProcessInstance object using the provided DataJob
and assigns it a unique identifier. Optionally, it can clone the inlets and
outlets from the DataJob to the DataProcessInstance.

Args:
datajob (DataJob): The DataJob instance from which to generate the DataProcessInstance.
id (str): The unique identifier for the DataProcessInstance.
clone_inlets (bool, optional): If True, clones the inlets from the DataJob to the DataProcessInstance. Defaults to False.
clone_outlets (bool, optional): If True, clones the outlets from the DataJob to the DataProcessInstance. Defaults to False.

Returns:
DataProcessInstance: The generated DataProcessInstance object.
"""
dpi: DataProcessInstance = DataProcessInstance(
orchestrator=datajob.flow_urn.orchestrator,
Expand All @@ -332,14 +392,47 @@
return dpi

@staticmethod
def from_dataflow(dataflow: DataFlow, id: str) -> "DataProcessInstance":
def from_container(
container_key: ContainerKey,
id: str,
) -> "DataProcessInstance":
"""
Generates DataProcessInstance from a DataFlow
Create a DataProcessInstance that is located within a Container.
Use this method when you need to represent a DataProcessInstance that
is not an instance of a DataJob or a DataFlow.
e.g. If recording an ad-hoc training run that is just associated with an Experiment.

:param dataflow: (DataFlow) the DataFlow from generate the DataProcessInstance
:param container_key: (ContainerKey) the container key to generate the DataProcessInstance
:param id: (str) the id for the DataProcessInstance
:return: DataProcessInstance
"""
dpi: DataProcessInstance = DataProcessInstance(
id=id,
orchestrator=DataPlatformUrn.from_string(
container_key.platform
).platform_name,
template_urn=None,
container_urn=container_key.as_urn(),
)

return dpi

@staticmethod
def from_dataflow(dataflow: DataFlow, id: str) -> "DataProcessInstance":
"""
Creates a DataProcessInstance from a given DataFlow.

This method generates a DataProcessInstance object using the provided DataFlow
and a specified id. The DataProcessInstance will inherit properties from the
DataFlow such as orchestrator, environment, and template URN.

Args:
dataflow (DataFlow): The DataFlow object from which to generate the DataProcessInstance.
id (str): The unique identifier for the DataProcessInstance.

Returns:
DataProcessInstance: The newly created DataProcessInstance object.
"""
dpi = DataProcessInstance(
id=id,
orchestrator=dataflow.orchestrator,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
import time
import unittest
from unittest.mock import Mock

import datahub.metadata.schema_classes as models
from datahub.api.entities.datajob import DataFlow, DataJob
from datahub.api.entities.dataprocess.dataprocess_instance import (
DataProcessInstance,
InstanceRunResult,
)
from datahub.emitter.mcp_builder import ContainerKey
from datahub.metadata.com.linkedin.pegasus2avro.dataprocess import RunResultType
from datahub.metadata.schema_classes import (
DataProcessRunStatusClass,
DataProcessTypeClass,
)
from datahub.metadata.urns import DataFlowUrn, DataJobUrn, DataPlatformUrn, DatasetUrn


class TestDataProcessInstance(unittest.TestCase):
def setUp(self):
# Common test data
self.test_id = "test_process_123"
self.test_orchestrator = "airflow"
self.test_cluster = "prod"

# Create mock ContainerKey
self.mock_container_key = ContainerKey(
platform="urn:li:dataPlatform:mlflow", name="test_experiment", env="PROD"
)

# Create mock DataJob
self.mock_flow_urn = DataFlowUrn.create_from_ids(
orchestrator="airflow", flow_id="test_flow", env="prod"
)
self.mock_job_urn = DataJobUrn.create_from_ids(
job_id="test_job", data_flow_urn=str(self.mock_flow_urn)
)
self.mock_datajob = DataJob(
id="test_job",
flow_urn=self.mock_flow_urn,
inlets=[
DatasetUrn.from_string(
"urn:li:dataset:(urn:li:dataPlatform:hive,test_input,PROD)"
)
],
outlets=[
DatasetUrn.from_string(
"urn:li:dataset:(urn:li:dataPlatform:hive,test_output,PROD)"
)
],
)

# Create mock DataFlow
self.mock_dataflow = DataFlow(
orchestrator="airflow", id="test_flow", env="prod"
)

def test_basic_initialization(self):
"""Test basic initialization of DataProcessInstance"""

instance = DataProcessInstance(
id=self.test_id,
orchestrator=self.test_orchestrator,
cluster=self.test_cluster,
)

self.assertEqual(instance.id, self.test_id)
self.assertEqual(instance.orchestrator, self.test_orchestrator)
self.assertEqual(instance.cluster, self.test_cluster)
self.assertEqual(instance.type, DataProcessTypeClass.BATCH_SCHEDULED)

def test_from_datajob_creation(self):
"""Test creation of DataProcessInstance from DataJob"""

instance = DataProcessInstance.from_datajob(
datajob=self.mock_datajob,
id=self.test_id,
clone_inlets=True,
clone_outlets=True,
)

self.assertEqual(instance.id, self.test_id)
self.assertEqual(instance.orchestrator, "airflow")
self.assertEqual(instance.template_urn, self.mock_datajob.urn)
self.assertEqual(len(instance.inlets), 1)
self.assertEqual(len(instance.outlets), 1)

def test_from_dataflow_creation(self):
"""Test creation of DataProcessInstance from DataFlow"""

instance = DataProcessInstance.from_dataflow(
dataflow=self.mock_dataflow, id=self.test_id
)

self.assertEqual(instance.id, self.test_id)
self.assertEqual(instance.orchestrator, "airflow")
self.assertEqual(instance.template_urn, self.mock_dataflow.urn)

def test_from_container_creation(self):
"""Test creation of DataProcessInstance from ContainerKey"""

instance = DataProcessInstance.from_container(
container_key=self.mock_container_key, id=self.test_id
)

self.assertEqual(instance.id, self.test_id)
self.assertEqual(instance.orchestrator, "mlflow") # Platform name from URN
self.assertIsNone(
instance.template_urn
) # Should be None for container-based instances
self.assertEqual(instance.container_urn, self.mock_container_key.as_urn())

# Verify the platform is set correctly
expected_platform = str(
DataPlatformUrn.from_string(self.mock_container_key.platform)
)
self.assertEqual(instance._platform, expected_platform)

def test_start_event_generation(self):
"""Test generation of start event MCPs"""

instance = DataProcessInstance(
id=self.test_id, orchestrator=self.test_orchestrator
)

start_time = int(time.time() * 1000)
mcps = list(instance.start_event_mcp(start_time, attempt=1))

self.assertEqual(len(mcps), 1)
start_event = mcps[0]
assert isinstance(start_event.aspect, models.DataProcessInstanceRunEventClass)
self.assertEqual(start_event.aspect.status, DataProcessRunStatusClass.STARTED)
self.assertEqual(start_event.aspect.timestampMillis, start_time)
self.assertEqual(start_event.aspect.attempt, 1)

def test_end_event_generation(self):
"""Test generation of end event MCPs"""

instance = DataProcessInstance(
id=self.test_id, orchestrator=self.test_orchestrator
)

end_time = int(time.time() * 1000)
mcps = list(
instance.end_event_mcp(
end_time, result=InstanceRunResult.SUCCESS, attempt=1
)
)

self.assertEqual(len(mcps), 1)
end_event = mcps[0]
assert isinstance(end_event.aspect, models.DataProcessInstanceRunEventClass)
self.assertEqual(end_event.aspect.status, DataProcessRunStatusClass.COMPLETE)
self.assertEqual(end_event.aspect.timestampMillis, end_time)
assert end_event.aspect.result is not None
self.assertEqual(end_event.aspect.result.type, RunResultType.SUCCESS)

def test_emit_process_with_emitter(self):
"""Test emitting process events with mock emitter"""

mock_emitter = Mock()
instance = DataProcessInstance(
id=self.test_id, orchestrator=self.test_orchestrator
)

# Test emit method
instance.emit(mock_emitter)
self.assertTrue(mock_emitter.emit.called)

# Test emit_process_start
start_time = int(time.time() * 1000)
instance.emit_process_start(mock_emitter, start_time)
self.assertTrue(mock_emitter.emit.called)

# Test emit_process_end
end_time = int(time.time() * 1000)
instance.emit_process_end(
mock_emitter, end_time, result=InstanceRunResult.SUCCESS
)
self.assertTrue(mock_emitter.emit.called)

def test_generate_mcp(self):
"""Test generation of MCPs"""

instance = DataProcessInstance(
id=self.test_id,
orchestrator=self.test_orchestrator,
properties={"env": "prod"},
url="http://test.url",
)

created_time = int(time.time() * 1000)
mcps = list(instance.generate_mcp(created_time, materialize_iolets=True))

# Check if we have the basic MCPs generated
self.assertGreaterEqual(
len(mcps), 2
) # Should at least have properties and relationships

# Verify the properties MCP
properties_mcp = next(
mcp for mcp in mcps if hasattr(mcp.aspect, "customProperties")
)
assert isinstance(
properties_mcp.aspect, models.DataProcessInstancePropertiesClass
)
self.assertEqual(properties_mcp.aspect.name, self.test_id)
self.assertEqual(properties_mcp.aspect.customProperties["env"], "prod")
self.assertEqual(properties_mcp.aspect.externalUrl, "http://test.url")


if __name__ == "__main__":
unittest.main()
Loading