Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding smoke test for batch ingestion throwing exception #12453

Merged
merged 5 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion smoke-test/tests/restli/restli_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def make_mcp(self) -> MetadataChangeProposalClass:
return mcp


@pytest.fixture(scope="module")
@pytest.fixture(scope="module", autouse=True)
def ingest_cleanup_data(auth_session, graph_client, request):
yield
delete_urns(graph_client, generated_urns)
Expand Down
54 changes: 53 additions & 1 deletion smoke-test/tests/restli/test_restli_batch_ingestion.py
ryota-cloud marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import pytest

import datahub.metadata.schema_classes as models
from datahub.emitter.mce_builder import make_dashboard_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.serialization_helper import pre_json_transform
Expand All @@ -12,14 +13,15 @@
ChangeAuditStampsClass,
DashboardInfoClass,
)
from datahub.metadata.urns import MlModelUrn
from tests.consistency_utils import wait_for_writes_to_sync
from tests.restli.restli_test import MetadataChangeProposalInvalidWrapper
from tests.utils import delete_urns

generated_urns: List[str] = []


@pytest.fixture(scope="module")
@pytest.fixture(scope="module", autouse=True)
def ingest_cleanup_data(auth_session, graph_client, request):
yield
delete_urns(graph_client, generated_urns)
Expand Down Expand Up @@ -84,6 +86,29 @@ def _create_invalid_dashboard_mcp() -> MetadataChangeProposalClass:
return mcp_invalid.make_mcp()


def _create_invalid_dataset_mcps() -> List[MetadataChangeProposalWrapper]:
dataset_urn = "urn:li:dataset:(urn:li:dataPlatform:kafka,my_dataset,PROD)"
model_urn = MlModelUrn("mlflow", "my_model", "PROD").urn()
bad_mcps = [
MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=models.StatusClass(removed=False),
),
MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=models.UpstreamLineageClass(
upstreams=[
models.UpstreamClass(
dataset=model_urn,
type=models.DatasetLineageTypeClass.TRANSFORMED,
)
]
),
),
]
return bad_mcps


def test_restli_batch_ingestion_sync(graph_client):
# Positive Test (all valid MetadataChangeProposal)
mcps = _create_valid_dashboard_mcps()
Expand Down Expand Up @@ -133,3 +158,30 @@ def test_restli_batch_ingestion_async(graph_client):
assert aspect.title == "Dummy Title For Testing"
assert aspect.description == "Dummy Description For Testing"
assert aspect.lastModified is not None


def test_restli_batch_ingestion_exception_sync(graph_client):
"""
Test Batch ingestion when an exception occurs in sync mode
"""
bad_mcps = _create_invalid_dataset_mcps()
generated_urns.extend([mcp.entityUrn for mcp in bad_mcps if mcp.entityUrn])

try:
graph_client.emit_mcps(bad_mcps, async_flag=False)
raise AssertionError("should have thrown an exception")
except Exception as e:
if isinstance(e, AssertionError):
raise e
print(f"Error emitting MCPs due to {e}")


def test_restli_batch_ingestion_exception_async(graph_client):
"""
Test Batch ingestion when an exception occurs in async mode
"""
bad_mcps = _create_invalid_dataset_mcps()
generated_urns.extend([mcp.entityUrn for mcp in bad_mcps if mcp.entityUrn])
# TODO expectation is that it throws exception, but it doesn't currently.this test case need to change after fix.
ret = graph_client.emit_mcps(bad_mcps, async_flag=True)
assert ret >= 0
Loading