From 7ac0dc65e15a9ef7c8280bce63f344e799346ca3 Mon Sep 17 00:00:00 2001 From: ryota-cloud Date: Thu, 23 Jan 2025 17:58:06 -0800 Subject: [PATCH] Adding smoke test for batch ingestion throwing exception (#12453) --- smoke-test/tests/restli/restli_test.py | 2 +- .../restli/test_restli_batch_ingestion.py | 54 ++++++++++++++++++- 2 files changed, 54 insertions(+), 2 deletions(-) diff --git a/smoke-test/tests/restli/restli_test.py b/smoke-test/tests/restli/restli_test.py index a0c9a26750c0b0..c10ae3fe584f08 100644 --- a/smoke-test/tests/restli/restli_test.py +++ b/smoke-test/tests/restli/restli_test.py @@ -51,7 +51,7 @@ def make_mcp(self) -> MetadataChangeProposalClass: return mcp -@pytest.fixture(scope="module") +@pytest.fixture(scope="module", autouse=True) def ingest_cleanup_data(auth_session, graph_client, request): yield delete_urns(graph_client, generated_urns) diff --git a/smoke-test/tests/restli/test_restli_batch_ingestion.py b/smoke-test/tests/restli/test_restli_batch_ingestion.py index 0e92988ed64703..ab33a2b26605db 100644 --- a/smoke-test/tests/restli/test_restli_batch_ingestion.py +++ b/smoke-test/tests/restli/test_restli_batch_ingestion.py @@ -3,6 +3,7 @@ import pytest +import datahub.metadata.schema_classes as models from datahub.emitter.mce_builder import make_dashboard_urn from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.serialization_helper import pre_json_transform @@ -12,6 +13,7 @@ ChangeAuditStampsClass, DashboardInfoClass, ) +from datahub.metadata.urns import MlModelUrn from tests.consistency_utils import wait_for_writes_to_sync from tests.restli.restli_test import MetadataChangeProposalInvalidWrapper from tests.utils import delete_urns @@ -19,7 +21,7 @@ generated_urns: List[str] = [] -@pytest.fixture(scope="module") +@pytest.fixture(scope="module", autouse=True) def ingest_cleanup_data(auth_session, graph_client, request): yield delete_urns(graph_client, generated_urns) @@ -84,6 +86,29 @@ def _create_invalid_dashboard_mcp() -> MetadataChangeProposalClass: return mcp_invalid.make_mcp() +def _create_invalid_dataset_mcps() -> List[MetadataChangeProposalWrapper]: + dataset_urn = "urn:li:dataset:(urn:li:dataPlatform:kafka,my_dataset,PROD)" + model_urn = MlModelUrn("mlflow", "my_model", "PROD").urn() + bad_mcps = [ + MetadataChangeProposalWrapper( + entityUrn=dataset_urn, + aspect=models.StatusClass(removed=False), + ), + MetadataChangeProposalWrapper( + entityUrn=dataset_urn, + aspect=models.UpstreamLineageClass( + upstreams=[ + models.UpstreamClass( + dataset=model_urn, + type=models.DatasetLineageTypeClass.TRANSFORMED, + ) + ] + ), + ), + ] + return bad_mcps + + def test_restli_batch_ingestion_sync(graph_client): # Positive Test (all valid MetadataChangeProposal) mcps = _create_valid_dashboard_mcps() @@ -133,3 +158,30 @@ def test_restli_batch_ingestion_async(graph_client): assert aspect.title == "Dummy Title For Testing" assert aspect.description == "Dummy Description For Testing" assert aspect.lastModified is not None + + +def test_restli_batch_ingestion_exception_sync(graph_client): + """ + Test Batch ingestion when an exception occurs in sync mode + """ + bad_mcps = _create_invalid_dataset_mcps() + generated_urns.extend([mcp.entityUrn for mcp in bad_mcps if mcp.entityUrn]) + + try: + graph_client.emit_mcps(bad_mcps, async_flag=False) + raise AssertionError("should have thrown an exception") + except Exception as e: + if isinstance(e, AssertionError): + raise e + print(f"Error emitting MCPs due to {e}") + + +def test_restli_batch_ingestion_exception_async(graph_client): + """ + Test Batch ingestion when an exception occurs in async mode + """ + bad_mcps = _create_invalid_dataset_mcps() + generated_urns.extend([mcp.entityUrn for mcp in bad_mcps if mcp.entityUrn]) + # TODO expectation is that it throws exception, but it doesn't currently.this test case need to change after fix. + ret = graph_client.emit_mcps(bad_mcps, async_flag=True) + assert ret >= 0