diff --git a/src/astro_databricks/operators/workflow.py b/src/astro_databricks/operators/workflow.py index c659783..625c4c3 100644 --- a/src/astro_databricks/operators/workflow.py +++ b/src/astro_databricks/operators/workflow.py @@ -41,11 +41,8 @@ class DatabricksMetaData: def _get_job_by_name(job_name: str, jobs_api: JobsApi) -> dict | None: - jobs = jobs_api.list_jobs(version=DATABRICKS_JOBS_API_VERSION).get("jobs", []) - for job in jobs: - if job.get("settings", {}).get("name") == job_name: - return job - return None + jobs = jobs_api.list_jobs(version=DATABRICKS_JOBS_API_VERSION, name=job_name).get("jobs", []) + return jobs[0] if jobs else None def flatten_node( diff --git a/tests/databricks/test_workflow.py b/tests/databricks/test_workflow.py index d60f9a5..176c880 100644 --- a/tests/databricks/test_workflow.py +++ b/tests/databricks/test_workflow.py @@ -115,6 +115,10 @@ def test_create_workflow_from_notebooks_with_create( mock_run_api, mock_jobs_api, mock_api, mock_hook, dag ): mock_jobs_api.return_value.create_job.return_value = {"job_id": 1} + # In unittest, this function returns a MagicMock object by default, which updates an existing workflow instead of creating a new one. + # This causes the create_job assertion to fail. To prevent this, the function's return value should be overridden to an empty list. + mock_jobs_api.return_value.list_jobs.return_value.get.return_value = [] + with dag: task_group = DatabricksWorkflowTaskGroup( group_id="test_workflow", @@ -391,6 +395,8 @@ def test_create_workflow_from_notebooks_with_different_clusters( mock_run_api, mock_jobs_api, mock_api, mock_hook, dag ): mock_jobs_api.return_value.create_job.return_value = {"job_id": 1} + mock_jobs_api.return_value.list_jobs.return_value.get.return_value = [] + with dag: task_group = DatabricksWorkflowTaskGroup( group_id="test_workflow",