Skip to content
This repository was archived by the owner on Sep 4, 2024. It is now read-only.

Commit

Permalink
Prevent creation of duplicate jobs in Databricks (#76)
Browse files Browse the repository at this point in the history
Previously, Airflow DAG executions could inadvertently create duplicate jobs in the Databricks workspace, even when the job already existed. The root cause of this issue is that we checked if a job exists by querying the Databricks REST API using the `list_jobs()` method in `workflow.py/_get_job_by_name`. However, the REST API returns a limited set of jobs as a result of the paginated API, leading to incomplete results. Consequently, if the job name was not found in the first page of results retrieved by the `list_jobs` API, a duplicate job could be created.

To address this issue, this PR leverages the built-in job name filtering feature of the Databricks REST API within the `list_jobs()` method. This ensures that the API returns jobs with the given name, effectively preventing the creation of duplicate jobs in the Databricks workspace.

closes: #75
  • Loading branch information
Hang1225 authored Apr 12, 2024
1 parent 2a9b53e commit e9f2d38
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 5 deletions.
7 changes: 2 additions & 5 deletions src/astro_databricks/operators/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,8 @@ class DatabricksMetaData:


def _get_job_by_name(job_name: str, jobs_api: JobsApi) -> dict | None:
jobs = jobs_api.list_jobs(version=DATABRICKS_JOBS_API_VERSION).get("jobs", [])
for job in jobs:
if job.get("settings", {}).get("name") == job_name:
return job
return None
jobs = jobs_api.list_jobs(version=DATABRICKS_JOBS_API_VERSION, name=job_name).get("jobs", [])
return jobs[0] if jobs else None


def flatten_node(
Expand Down
6 changes: 6 additions & 0 deletions tests/databricks/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ def test_create_workflow_from_notebooks_with_create(
mock_run_api, mock_jobs_api, mock_api, mock_hook, dag
):
mock_jobs_api.return_value.create_job.return_value = {"job_id": 1}
# In unittest, this function returns a MagicMock object by default, which updates an existing workflow instead of creating a new one.
# This causes the create_job assertion to fail. To prevent this, the function's return value should be overridden to an empty list.
mock_jobs_api.return_value.list_jobs.return_value.get.return_value = []

with dag:
task_group = DatabricksWorkflowTaskGroup(
group_id="test_workflow",
Expand Down Expand Up @@ -391,6 +395,8 @@ def test_create_workflow_from_notebooks_with_different_clusters(
mock_run_api, mock_jobs_api, mock_api, mock_hook, dag
):
mock_jobs_api.return_value.create_job.return_value = {"job_id": 1}
mock_jobs_api.return_value.list_jobs.return_value.get.return_value = []

with dag:
task_group = DatabricksWorkflowTaskGroup(
group_id="test_workflow",
Expand Down

0 comments on commit e9f2d38

Please sign in to comment.