Skip to content
This repository was archived by the owner on Sep 4, 2024. It is now read-only.

Prevent creation of duplicate jobs in Databricks #76

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions src/astro_databricks/operators/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,8 @@ class DatabricksMetaData:


def _get_job_by_name(job_name: str, jobs_api: JobsApi) -> dict | None:
jobs = jobs_api.list_jobs(version=DATABRICKS_JOBS_API_VERSION).get("jobs", [])
for job in jobs:
if job.get("settings", {}).get("name") == job_name:
return job
return None
jobs = jobs_api.list_jobs(version=DATABRICKS_JOBS_API_VERSION, name=job_name).get("jobs", [])
return jobs[0] if jobs else None


def flatten_node(
Expand Down
6 changes: 6 additions & 0 deletions tests/databricks/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ def test_create_workflow_from_notebooks_with_create(
mock_run_api, mock_jobs_api, mock_api, mock_hook, dag
):
mock_jobs_api.return_value.create_job.return_value = {"job_id": 1}
# In unittest, this function returns a MagicMock object by default, which updates an existing workflow instead of creating a new one.
# This causes the create_job assertion to fail. To prevent this, the function's return value should be overridden to an empty list.
mock_jobs_api.return_value.list_jobs.return_value.get.return_value = []

with dag:
task_group = DatabricksWorkflowTaskGroup(
group_id="test_workflow",
Expand Down Expand Up @@ -391,6 +395,8 @@ def test_create_workflow_from_notebooks_with_different_clusters(
mock_run_api, mock_jobs_api, mock_api, mock_hook, dag
):
mock_jobs_api.return_value.create_job.return_value = {"job_id": 1}
mock_jobs_api.return_value.list_jobs.return_value.get.return_value = []

with dag:
task_group = DatabricksWorkflowTaskGroup(
group_id="test_workflow",
Expand Down
Loading