From e9f2d383949bf3e355306ffc400463cfd3a702fb Mon Sep 17 00:00:00 2001 From: Hang Zhang Cao <94388760+Hang1225@users.noreply.github.com> Date: Fri, 12 Apr 2024 13:36:30 -0400 Subject: [PATCH] Prevent creation of duplicate jobs in Databricks (#76) Previously, Airflow DAG executions could inadvertently create duplicate jobs in the Databricks workspace, even when the job already existed. The root cause of this issue is that we checked if a job exists by querying the Databricks REST API using the `list_jobs()` method in `workflow.py/_get_job_by_name`. However, the REST API returns a limited set of jobs as a result of the paginated API, leading to incomplete results. Consequently, if the job name was not found in the first page of results retrieved by the `list_jobs` API, a duplicate job could be created. To address this issue, this PR leverages the built-in job name filtering feature of the Databricks REST API within the `list_jobs()` method. This ensures that the API returns jobs with the given name, effectively preventing the creation of duplicate jobs in the Databricks workspace. closes: #75 --- src/astro_databricks/operators/workflow.py | 7 ++----- tests/databricks/test_workflow.py | 6 ++++++ 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/astro_databricks/operators/workflow.py b/src/astro_databricks/operators/workflow.py index c659783..625c4c3 100644 --- a/src/astro_databricks/operators/workflow.py +++ b/src/astro_databricks/operators/workflow.py @@ -41,11 +41,8 @@ class DatabricksMetaData: def _get_job_by_name(job_name: str, jobs_api: JobsApi) -> dict | None: - jobs = jobs_api.list_jobs(version=DATABRICKS_JOBS_API_VERSION).get("jobs", []) - for job in jobs: - if job.get("settings", {}).get("name") == job_name: - return job - return None + jobs = jobs_api.list_jobs(version=DATABRICKS_JOBS_API_VERSION, name=job_name).get("jobs", []) + return jobs[0] if jobs else None def flatten_node( diff --git a/tests/databricks/test_workflow.py b/tests/databricks/test_workflow.py index d60f9a5..176c880 100644 --- a/tests/databricks/test_workflow.py +++ b/tests/databricks/test_workflow.py @@ -115,6 +115,10 @@ def test_create_workflow_from_notebooks_with_create( mock_run_api, mock_jobs_api, mock_api, mock_hook, dag ): mock_jobs_api.return_value.create_job.return_value = {"job_id": 1} + # In unittest, this function returns a MagicMock object by default, which updates an existing workflow instead of creating a new one. + # This causes the create_job assertion to fail. To prevent this, the function's return value should be overridden to an empty list. + mock_jobs_api.return_value.list_jobs.return_value.get.return_value = [] + with dag: task_group = DatabricksWorkflowTaskGroup( group_id="test_workflow", @@ -391,6 +395,8 @@ def test_create_workflow_from_notebooks_with_different_clusters( mock_run_api, mock_jobs_api, mock_api, mock_hook, dag ): mock_jobs_api.return_value.create_job.return_value = {"job_id": 1} + mock_jobs_api.return_value.list_jobs.return_value.get.return_value = [] + with dag: task_group = DatabricksWorkflowTaskGroup( group_id="test_workflow",