From 11f143b182b7e5ed1da8a10ac38624a45190787d Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Tue, 19 Mar 2024 17:27:56 +0530 Subject: [PATCH 1/6] Pass JOBS_API_VERSION to databricks sdk calls --- src/astro_databricks/operators/notebook.py | 2 +- src/astro_databricks/operators/workflow.py | 22 ++++++++++++++++------ 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/src/astro_databricks/operators/notebook.py b/src/astro_databricks/operators/notebook.py index 4808b40..87767bf 100644 --- a/src/astro_databricks/operators/notebook.py +++ b/src/astro_databricks/operators/notebook.py @@ -300,7 +300,7 @@ def launch_notebook_job(self): else: raise ValueError("Must specify either existing_cluster_id or new_cluster") runs_api = RunsApi(api_client) - run = runs_api.submit_run(run_json) + run = runs_api.submit_run(run_json, version=JOBS_API_VERSION) self.databricks_run_id = run["run_id"] return run diff --git a/src/astro_databricks/operators/workflow.py b/src/astro_databricks/operators/workflow.py index 8da15a8..d81762b 100644 --- a/src/astro_databricks/operators/workflow.py +++ b/src/astro_databricks/operators/workflow.py @@ -4,6 +4,8 @@ from logging import Logger from typing import TYPE_CHECKING +from constants import JOBS_API_VERSION + if TYPE_CHECKING: pass import json @@ -40,7 +42,7 @@ class DatabricksMetaData: def _get_job_by_name(job_name: str, jobs_api: JobsApi) -> dict | None: - jobs = jobs_api.list_jobs().get("jobs", []) + jobs = jobs_api.list_jobs(version=JOBS_API_VERSION).get("jobs", []) for job in jobs: if job.get("settings", {}).get("name") == job_name: return job @@ -175,13 +177,16 @@ def execute(self, context: Context) -> Any: ) jobs_api.reset_job( - json={"job_id": job_id, "new_settings": current_job_spec} + json={"job_id": job_id, "new_settings": current_job_spec}, + version=JOBS_API_VERSION, ) else: self.log.info( "Creating new job with spec %s", json.dumps(current_job_spec, indent=4) ) - job_id = jobs_api.create_job(json=current_job_spec)["job_id"] + job_id = jobs_api.create_job( + json=current_job_spec, version=JOBS_API_VERSION + )["job_id"] run_id = jobs_api.run_now( job_id=job_id, @@ -189,13 +194,16 @@ def execute(self, context: Context) -> Any: notebook_params=self.notebook_params, python_params=self.task_group.python_params, spark_submit_params=self.task_group.spark_submit_params, + version=JOBS_API_VERSION, )["run_id"] self.databricks_run_id = run_id runs_api = RunsApi(api_client) - url = runs_api.get_run(run_id).get("run_page_url") + url = runs_api.get_run(run_id, version=JOBS_API_VERSION).get("run_page_url") self.log.info(f"Check the job run in Databricks: {url}") - state = runs_api.get_run(run_id)["state"]["life_cycle_state"] + state = runs_api.get_run(run_id, version=JOBS_API_VERSION)["state"][ + "life_cycle_state" + ] self.log.info(f"Job state: {state}") if state not in ("PENDING", "BLOCKED", "RUNNING"): @@ -206,7 +214,9 @@ def execute(self, context: Context) -> Any: while state in ("PENDING", "BLOCKED"): self.log.info(f"Job {state}") time.sleep(5) - state = runs_api.get_run(run_id)["state"]["life_cycle_state"] + state = runs_api.get_run(run_id, version=JOBS_API_VERSION)["state"][ + "life_cycle_state" + ] return { "databricks_conn_id": self.databricks_conn_id, From ed147a68c9c3b9e3ca435e4ebcbb481ad926b218 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Tue, 19 Mar 2024 17:28:39 +0530 Subject: [PATCH 2/6] Pass JOBS_API_VERSION to databricks sdk calls --- src/astro_databricks/operators/workflow.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/astro_databricks/operators/workflow.py b/src/astro_databricks/operators/workflow.py index d81762b..ede9a14 100644 --- a/src/astro_databricks/operators/workflow.py +++ b/src/astro_databricks/operators/workflow.py @@ -4,8 +4,6 @@ from logging import Logger from typing import TYPE_CHECKING -from constants import JOBS_API_VERSION - if TYPE_CHECKING: pass import json @@ -28,6 +26,7 @@ from databricks_cli.sdk.api_client import ApiClient from mergedeep import merge +from astro_databricks.constants import JOBS_API_VERSION from astro_databricks.plugins.plugin import ( DatabricksJobRepairAllFailedLink, DatabricksJobRunLink, From 9af597fb6b4a2f129a6790d9687520b138a49ec9 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Wed, 20 Mar 2024 15:26:29 +0530 Subject: [PATCH 3/6] Add expected version param in mock call asserts --- tests/databricks/test_notebook.py | 6 ++- tests/databricks/test_workflow.py | 89 +++++-------------------------- 2 files changed, 16 insertions(+), 79 deletions(-) diff --git a/tests/databricks/test_notebook.py b/tests/databricks/test_notebook.py index 2cf0208..c619acc 100644 --- a/tests/databricks/test_notebook.py +++ b/tests/databricks/test_notebook.py @@ -151,7 +151,8 @@ def test_databricks_notebook_operator_without_taskgroup_new_cluster( "libraries": [{"nb_index": {"package": "nb_package"}}], "timeout_seconds": 0, "email_notifications": {}, - } + }, + version="2.1", ) mock_monitor.assert_called_once() @@ -197,7 +198,8 @@ def test_databricks_notebook_operator_without_taskgroup_existing_cluster( "libraries": [{"nb_index": {"package": "nb_package"}}], "timeout_seconds": 0, "email_notifications": {}, - } + }, + version="2.1", ) mock_monitor.assert_called_once() diff --git a/tests/databricks/test_workflow.py b/tests/databricks/test_workflow.py index 4f5d827..4d21996 100644 --- a/tests/databricks/test_workflow.py +++ b/tests/databricks/test_workflow.py @@ -142,6 +142,7 @@ def test_create_workflow_from_notebooks_with_create( task_group.children["test_workflow.launch"].execute(context={}) mock_jobs_api.return_value.create_job.assert_called_once_with( json=expected_workflow_json, + version="2.1", ) mock_jobs_api.return_value.run_now.assert_called_once_with( job_id=1, @@ -149,6 +150,7 @@ def test_create_workflow_from_notebooks_with_create( notebook_params={"notebook_path": "/foo/bar"}, python_params=[], spark_submit_params=[], + version="2.1", ) @@ -289,79 +291,6 @@ def test_create_workflow_with_arbitrary_extra_job_params( ) -@mock.patch("astro_databricks.operators.workflow.DatabricksHook") -@mock.patch("astro_databricks.operators.workflow.ApiClient") -@mock.patch("astro_databricks.operators.workflow.JobsApi") -@mock.patch("astro_databricks.operators.workflow._get_job_by_name") -@mock.patch( - "astro_databricks.operators.workflow.RunsApi.get_run", - return_value={"state": {"life_cycle_state": "RUNNING"}}, -) -def test_create_workflow_with_arbitrary_extra_job_params( - mock_run_api, mock_get_jobs, mock_jobs_api, mock_api, mock_hook, dag -): - mock_get_jobs.return_value = {"job_id": 862519602273592} - - extra_job_params = { - "timeout_seconds": 10, # default: 0 - "webhook_notifications": { - "on_failure": [{"id": "b0aea8ab-ea8c-4a45-a2e9-9a26753fd702"}], - }, - "email_notifications": { - "no_alert_for_skipped_runs": True, # default: False - "on_start": ["user.name@databricks.com"], - }, - "git_source": { # no default value - "git_url": "https://github.com/astronomer/astro-provider-databricks", - "git_provider": "gitHub", - "git_branch": "main", - }, - } - with dag: - task_group = DatabricksWorkflowTaskGroup( - group_id="test_workflow", - databricks_conn_id="foo", - job_clusters=[{"job_cluster_key": "foo"}], - notebook_params={"notebook_path": "/foo/bar"}, - extra_job_params=extra_job_params, - ) - with task_group: - notebook_with_extra = DatabricksNotebookOperator( - task_id="notebook_with_extra", - databricks_conn_id="foo", - notebook_path="/foo/bar", - source="WORKSPACE", - job_cluster_key="foo", - ) - notebook_with_extra - - assert len(task_group.children) == 2 - - task_group.children["test_workflow.launch"].create_workflow_json() - task_group.children["test_workflow.launch"].execute(context={}) - - mock_jobs_api.return_value.reset_job.assert_called_once() - kwargs = mock_jobs_api.return_value.reset_job.call_args_list[0].kwargs["json"] - - assert kwargs["job_id"] == 862519602273592 - assert ( - kwargs["new_settings"]["email_notifications"] - == extra_job_params["email_notifications"] - ) - assert ( - kwargs["new_settings"]["timeout_seconds"] == extra_job_params["timeout_seconds"] - ) - assert kwargs["new_settings"]["git_source"] == extra_job_params["git_source"] - assert ( - kwargs["new_settings"]["webhook_notifications"] - == extra_job_params["webhook_notifications"] - ) - assert ( - kwargs["new_settings"]["email_notifications"] - == extra_job_params["email_notifications"] - ) - - @mock.patch("astro_databricks.operators.workflow.DatabricksHook") @mock.patch("astro_databricks.operators.workflow.ApiClient") @mock.patch("astro_databricks.operators.workflow.JobsApi") @@ -399,7 +328,7 @@ def test_create_workflow_with_nested_task_groups( extra_job_params=extra_job_params, notebook_packages=[ {"pypi": {"package": "mlflow==2.4.0"}}, - ] + ], ) with outer_task_group: direct_notebook = DatabricksNotebookOperator( @@ -433,8 +362,14 @@ def test_create_workflow_with_nested_task_groups( inner_notebook_json = kwargs["new_settings"]["tasks"][0] outer_notebook_json = kwargs["new_settings"]["tasks"][1] - assert inner_notebook_json["task_key"] == "unit_test_dag__test_workflow__direct_notebook" + assert ( + inner_notebook_json["task_key"] + == "unit_test_dag__test_workflow__direct_notebook" + ) assert inner_notebook_json["libraries"] == [{"pypi": {"package": "mlflow==2.4.0"}}] - assert outer_notebook_json["task_key"] == "unit_test_dag__test_workflow__middle_task_group__inner_task_group__inner_notebook" - assert outer_notebook_json["libraries"] == [{"pypi": {"package": "mlflow==2.4.0"}}] \ No newline at end of file + assert ( + outer_notebook_json["task_key"] + == "unit_test_dag__test_workflow__middle_task_group__inner_task_group__inner_notebook" + ) + assert outer_notebook_json["libraries"] == [{"pypi": {"package": "mlflow==2.4.0"}}] From 4803d4a8b0d8a483140cc4e590983d31106f6301 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Wed, 20 Mar 2024 16:24:51 +0530 Subject: [PATCH 4/6] Drop support for Airflow 2.2.4 and set min Airflow version to 2.3 --- .github/workflows/ci.yml | 39 ++++++++++++++++++++------------------- dev/.gitignore | 1 + dev/Dockerfile | 2 +- noxfile.py | 16 ++++++---------- pyproject.toml | 2 +- 5 files changed, 29 insertions(+), 31 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7f4b8c8..8def6e5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,16 +1,17 @@ +--- name: Build and test astro databricks provider -on: +on: # yamllint disable-line rule:truthy push: - branches: [ main] + branches: [main] pull_request: - branches: [ main, 'release-**' ] + branches: [main, 'release-**'] # Run on PRs from forks pull_request_target: - branches: [ 'main' ] + branches: ['main'] types: ['labeled'] release: - types: [ 'created' ] + types: ['created'] # This allows a subsequently queued workflow run to interrupt and cancel previous runs concurrency: @@ -31,7 +32,7 @@ jobs: with: config-file: '.github/workflows/mlc_config.json' -# TODO: Fix Type-check failures ticket: https://github.com/astronomer/astro-provider-databricks/issues/5 +# TODO: Fix Type-check failures ticket: https://github.com/astronomer/astro-provider-databricks/issues/5 # Type-Check: # if: github.event.action != 'labeled' # runs-on: ubuntu-latest @@ -72,12 +73,12 @@ jobs: - run: pip3 install nox packaging - run: nox -s build_docs - Run-Unit-Tests: + Run-Unit-Tests: # yamllint disable-line strategy: fail-fast: false matrix: - python: [ '3.8', '3.9', '3.10' ] - airflow: [ 2.5 ] + python: ['3.8', '3.9', '3.10'] + airflow: [2.8] if: >- github.event_name == 'push' || ( @@ -123,8 +124,8 @@ jobs: strategy: fail-fast: false matrix: - python: [ '3.8', '3.9', '3.10' ] - airflow: [ '2.2.4', '2.3', '2.4', '2.5' ] + python: ['3.8', '3.9', '3.10'] + airflow: ['2.3', '2.4', '2.5', '2.6', '2.7', '2.8'] if: >- github.event_name == 'push' || @@ -215,14 +216,14 @@ jobs: - Code-Coverage runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 - - uses: actions/setup-python@v2 - with: - python-version: '3.8' - architecture: 'x64' - - run: pip3 install hatch - - run: hatch build - - run: hatch publish + - uses: actions/checkout@v2 + - uses: actions/setup-python@v2 + with: + python-version: '3.8' + architecture: 'x64' + - run: pip3 install hatch + - run: hatch build + - run: hatch publish env: HATCH_INDEX_USER: __token__ HATCH_INDEX_AUTH: ${{ secrets.PYPI_TOKEN }} diff --git a/dev/.gitignore b/dev/.gitignore index db9d36a..940171e 100644 --- a/dev/.gitignore +++ b/dev/.gitignore @@ -4,3 +4,4 @@ airflow_settings.yaml __pycache__/ astro +logs/ diff --git a/dev/Dockerfile b/dev/Dockerfile index 409e598..0cba4d4 100644 --- a/dev/Dockerfile +++ b/dev/Dockerfile @@ -1,4 +1,4 @@ -FROM quay.io/astronomer/astro-runtime:7.3.0-base +FROM quay.io/astronomer/astro-runtime:10.3.0-base USER root diff --git a/noxfile.py b/noxfile.py index 73ad71a..6bb9d08 100644 --- a/noxfile.py +++ b/noxfile.py @@ -3,7 +3,6 @@ from pathlib import Path import nox -from packaging import version nox.options.sessions = ["dev"] nox.options.error_on_external_run = False @@ -30,7 +29,7 @@ def _expand_env_vars(file_path: Path): @nox.session(python=["3.8", "3.9", "3.10"]) -@nox.parametrize("airflow", ["2.2.4", "2.3", "2.4", "2.5"]) +@nox.parametrize("airflow", ["2.3", "2.4", "2.5", "2.6", "2.7", "2.8"]) def test(session: nox.Session, airflow) -> None: """Run both unit and integration tests.""" env = { @@ -38,16 +37,13 @@ def test(session: nox.Session, airflow) -> None: "AIRFLOW__CORE__ALLOWED_DESERIALIZATION_CLASSES": "airflow\\.* astro\\.* astro_databricks\\.*", } - if version.parse(airflow) == version.parse("2.2.4"): - # constraints file raised a few exceptions for Airflow 2.2.4 - session.install("apache-airflow-providers-databricks<4.2") - session.install(f"apache-airflow=={airflow}") - session.run("pip", "uninstall", "apache-airflow-providers-common-io", "-y") - else: - session.install(f"apache-airflow[databricks]=={airflow}", "--constraint", f"https://raw.githubusercontent.com/apache/airflow/constraints-{airflow}.0/constraints-{session.python}.txt") + session.install( + f"apache-airflow[databricks]=={airflow}", + "--constraint", + f"https://raw.githubusercontent.com/apache/airflow/constraints-{airflow}.0/constraints-{session.python}.txt", + ) session.install("-e", ".[tests]") - # Log all the installed dependencies session.log("Installed Dependencies:") session.run("pip3", "freeze") diff --git a/pyproject.toml b/pyproject.toml index a9fcb0c..280439e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,7 +35,7 @@ classifiers = [ "Programming Language :: Python :: 3.10", ] dependencies = [ - "apache-airflow>=2.2.4", + "apache-airflow>=2.3", "databricks-sql-connector>=2.0.4;python_version>='3.10'", "databricks-cli>=0.17.7", "apache-airflow-providers-databricks>=2.2.0", From 2c9ed165bfd3fa51e15f5cf63dac055d6f7721b8 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Wed, 20 Mar 2024 16:27:01 +0530 Subject: [PATCH 5/6] Remove log file tracking from git --- .github/workflows/ci.yml | 2 +- .gitignore | 2 ++ dev/logs/scheduler/latest | 1 - 3 files changed, 3 insertions(+), 2 deletions(-) delete mode 120000 dev/logs/scheduler/latest diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8def6e5..612ec2d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -78,7 +78,7 @@ jobs: fail-fast: false matrix: python: ['3.8', '3.9', '3.10'] - airflow: [2.8] + airflow: [2.5] if: >- github.event_name == 'push' || ( diff --git a/.gitignore b/.gitignore index 01da1a6..ccc6b7b 100644 --- a/.gitignore +++ b/.gitignore @@ -153,3 +153,5 @@ webserver_config.py # VIM *.sw[a-z] + +dev/logs/ diff --git a/dev/logs/scheduler/latest b/dev/logs/scheduler/latest deleted file mode 120000 index fd211d2..0000000 --- a/dev/logs/scheduler/latest +++ /dev/null @@ -1 +0,0 @@ -/usr/local/airflow/logs/scheduler/2023-03-02 \ No newline at end of file From 769744c5c56aa020d60b4d21dfa27b668fb33942 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Wed, 20 Mar 2024 17:27:29 +0530 Subject: [PATCH 6/6] Reference JOBS_API_VERSION instead of hardcoding value in asserts --- tests/databricks/test_notebook.py | 13 +++++++------ tests/databricks/test_workflow.py | 5 +++-- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/tests/databricks/test_notebook.py b/tests/databricks/test_notebook.py index c619acc..02f4f4a 100644 --- a/tests/databricks/test_notebook.py +++ b/tests/databricks/test_notebook.py @@ -3,6 +3,7 @@ import pytest from airflow.exceptions import AirflowException +from astro_databricks.constants import JOBS_API_VERSION from astro_databricks.operators.notebook import DatabricksNotebookOperator from astro_databricks.operators.workflow import ( DatabricksWorkflowTaskGroup, @@ -152,7 +153,7 @@ def test_databricks_notebook_operator_without_taskgroup_new_cluster( "timeout_seconds": 0, "email_notifications": {}, }, - version="2.1", + version=JOBS_API_VERSION, ) mock_monitor.assert_called_once() @@ -199,7 +200,7 @@ def test_databricks_notebook_operator_without_taskgroup_existing_cluster( "timeout_seconds": 0, "email_notifications": {}, }, - version="2.1", + version=JOBS_API_VERSION, ) mock_monitor.assert_called_once() @@ -296,7 +297,7 @@ def test_wait_for_pending_task(mock_sleep, mock_runs_api, databricks_notebook_op {"state": {"life_cycle_state": "RUNNING"}}, ] databricks_notebook_operator._wait_for_pending_task(current_task, mock_runs_api) - mock_runs_api.get_run.assert_called_with("123", version="2.1") + mock_runs_api.get_run.assert_called_with("123", version=JOBS_API_VERSION) assert mock_runs_api.get_run.call_count == 2 mock_runs_api.reset_mock() @@ -313,7 +314,7 @@ def test_wait_for_terminating_task( {"state": {"life_cycle_state": "TERMINATED"}}, ] databricks_notebook_operator._wait_for_terminating_task(current_task, mock_runs_api) - mock_runs_api.get_run.assert_called_with("123", version="2.1") + mock_runs_api.get_run.assert_called_with("123", version=JOBS_API_VERSION) assert mock_runs_api.get_run.call_count == 3 mock_runs_api.reset_mock() @@ -328,7 +329,7 @@ def test_wait_for_running_task(mock_sleep, mock_runs_api, databricks_notebook_op {"state": {"life_cycle_state": "TERMINATED"}}, ] databricks_notebook_operator._wait_for_running_task(current_task, mock_runs_api) - mock_runs_api.get_run.assert_called_with("123", version="2.1") + mock_runs_api.get_run.assert_called_with("123", version=JOBS_API_VERSION) assert mock_runs_api.get_run.call_count == 3 mock_runs_api.reset_mock() @@ -382,7 +383,7 @@ def test_monitor_databricks_job_success( databricks_notebook_operator.databricks_run_id = "1" databricks_notebook_operator.monitor_databricks_job() mock_runs_api.return_value.get_run.assert_called_with( - databricks_notebook_operator.databricks_run_id, version="2.1" + databricks_notebook_operator.databricks_run_id, version=JOBS_API_VERSION ) assert ( "Check the job run in Databricks: https://databricks-instance-xyz.cloud.databricks.com/#job/1234/run/1" diff --git a/tests/databricks/test_workflow.py b/tests/databricks/test_workflow.py index 4d21996..cca566b 100644 --- a/tests/databricks/test_workflow.py +++ b/tests/databricks/test_workflow.py @@ -6,6 +6,7 @@ import pytest from airflow.exceptions import AirflowException from airflow.utils.task_group import TaskGroup +from astro_databricks.constants import JOBS_API_VERSION from astro_databricks.operators.notebook import DatabricksNotebookOperator from astro_databricks.operators.workflow import DatabricksWorkflowTaskGroup @@ -142,7 +143,7 @@ def test_create_workflow_from_notebooks_with_create( task_group.children["test_workflow.launch"].execute(context={}) mock_jobs_api.return_value.create_job.assert_called_once_with( json=expected_workflow_json, - version="2.1", + version=JOBS_API_VERSION, ) mock_jobs_api.return_value.run_now.assert_called_once_with( job_id=1, @@ -150,7 +151,7 @@ def test_create_workflow_from_notebooks_with_create( notebook_params={"notebook_path": "/foo/bar"}, python_params=[], spark_submit_params=[], - version="2.1", + version=JOBS_API_VERSION, )