Skip to content
This repository was archived by the owner on Sep 4, 2024. It is now read-only.

Pass api version to jobs API calls across operator implementations #66

Merged
merged 6 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 20 additions & 19 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
---
name: Build and test astro databricks provider
on:
on: # yamllint disable-line rule:truthy
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pankajkoti Curious about this comment!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

YAML linter was complaining that the value needs to be True or False. This is a way we have been using for the YAML linter in GitHub CI workflows across other projects to for disabling this false lint report.

push:
branches: [ main]
branches: [main]

pull_request:
branches: [ main, 'release-**' ]
branches: [main, 'release-**']
# Run on PRs from forks
pull_request_target:
branches: [ 'main' ]
branches: ['main']
types: ['labeled']
release:
types: [ 'created' ]
types: ['created']

# This allows a subsequently queued workflow run to interrupt and cancel previous runs
concurrency:
Expand All @@ -31,7 +32,7 @@ jobs:
with:
config-file: '.github/workflows/mlc_config.json'

# TODO: Fix Type-check failures ticket: https://github.com/astronomer/astro-provider-databricks/issues/5
# TODO: Fix Type-check failures ticket: https://github.com/astronomer/astro-provider-databricks/issues/5
# Type-Check:
# if: github.event.action != 'labeled'
# runs-on: ubuntu-latest
Expand Down Expand Up @@ -72,12 +73,12 @@ jobs:
- run: pip3 install nox packaging
- run: nox -s build_docs

Run-Unit-Tests:
Run-Unit-Tests: # yamllint disable-line
strategy:
fail-fast: false
matrix:
python: [ '3.8', '3.9', '3.10' ]
airflow: [ 2.5 ]
python: ['3.8', '3.9', '3.10']
airflow: [2.5]
if: >-
github.event_name == 'push' ||
(
Expand Down Expand Up @@ -123,8 +124,8 @@ jobs:
strategy:
fail-fast: false
matrix:
python: [ '3.8', '3.9', '3.10' ]
airflow: [ '2.2.4', '2.3', '2.4', '2.5' ]
python: ['3.8', '3.9', '3.10']
airflow: ['2.3', '2.4', '2.5', '2.6', '2.7', '2.8']

if: >-
github.event_name == 'push' ||
Expand Down Expand Up @@ -215,14 +216,14 @@ jobs:
- Code-Coverage
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
with:
python-version: '3.8'
architecture: 'x64'
- run: pip3 install hatch
- run: hatch build
- run: hatch publish
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
with:
python-version: '3.8'
architecture: 'x64'
- run: pip3 install hatch
- run: hatch build
- run: hatch publish
env:
HATCH_INDEX_USER: __token__
HATCH_INDEX_AUTH: ${{ secrets.PYPI_TOKEN }}
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,5 @@ webserver_config.py

# VIM
*.sw[a-z]

dev/logs/
1 change: 1 addition & 0 deletions dev/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
airflow_settings.yaml
__pycache__/
astro
logs/
2 changes: 1 addition & 1 deletion dev/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM quay.io/astronomer/astro-runtime:7.3.0-base
FROM quay.io/astronomer/astro-runtime:10.3.0-base

USER root

Expand Down
1 change: 0 additions & 1 deletion dev/logs/scheduler/latest

This file was deleted.

16 changes: 6 additions & 10 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from pathlib import Path

import nox
from packaging import version

nox.options.sessions = ["dev"]
nox.options.error_on_external_run = False
Expand All @@ -30,24 +29,21 @@ def _expand_env_vars(file_path: Path):


@nox.session(python=["3.8", "3.9", "3.10"])
@nox.parametrize("airflow", ["2.2.4", "2.3", "2.4", "2.5"])
@nox.parametrize("airflow", ["2.3", "2.4", "2.5", "2.6", "2.7", "2.8"])
def test(session: nox.Session, airflow) -> None:
"""Run both unit and integration tests."""
env = {
"AIRFLOW_HOME": f"~/airflow-{airflow}-python-{session.python}",
"AIRFLOW__CORE__ALLOWED_DESERIALIZATION_CLASSES": "airflow\\.* astro\\.* astro_databricks\\.*",
}

if version.parse(airflow) == version.parse("2.2.4"):
# constraints file raised a few exceptions for Airflow 2.2.4
session.install("apache-airflow-providers-databricks<4.2")
session.install(f"apache-airflow=={airflow}")
session.run("pip", "uninstall", "apache-airflow-providers-common-io", "-y")
else:
session.install(f"apache-airflow[databricks]=={airflow}", "--constraint", f"https://raw.githubusercontent.com/apache/airflow/constraints-{airflow}.0/constraints-{session.python}.txt")
session.install(
f"apache-airflow[databricks]=={airflow}",
"--constraint",
f"https://raw.githubusercontent.com/apache/airflow/constraints-{airflow}.0/constraints-{session.python}.txt",
)
session.install("-e", ".[tests]")


# Log all the installed dependencies
session.log("Installed Dependencies:")
session.run("pip3", "freeze")
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ classifiers = [
"Programming Language :: Python :: 3.10",
]
dependencies = [
"apache-airflow>=2.2.4",
"apache-airflow>=2.3",
"databricks-sql-connector>=2.0.4;python_version>='3.10'",
"databricks-cli>=0.17.7",
"apache-airflow-providers-databricks>=2.2.0",
Expand Down
2 changes: 1 addition & 1 deletion src/astro_databricks/operators/notebook.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ def launch_notebook_job(self):
else:
raise ValueError("Must specify either existing_cluster_id or new_cluster")
runs_api = RunsApi(api_client)
run = runs_api.submit_run(run_json)
run = runs_api.submit_run(run_json, version=JOBS_API_VERSION)
self.databricks_run_id = run["run_id"]
return run

Expand Down
21 changes: 15 additions & 6 deletions src/astro_databricks/operators/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from databricks_cli.sdk.api_client import ApiClient
from mergedeep import merge

from astro_databricks.constants import JOBS_API_VERSION
from astro_databricks.plugins.plugin import (
DatabricksJobRepairAllFailedLink,
DatabricksJobRunLink,
Expand All @@ -40,7 +41,7 @@ class DatabricksMetaData:


def _get_job_by_name(job_name: str, jobs_api: JobsApi) -> dict | None:
jobs = jobs_api.list_jobs().get("jobs", [])
jobs = jobs_api.list_jobs(version=JOBS_API_VERSION).get("jobs", [])
for job in jobs:
if job.get("settings", {}).get("name") == job_name:
return job
Expand Down Expand Up @@ -175,27 +176,33 @@ def execute(self, context: Context) -> Any:
)

jobs_api.reset_job(
json={"job_id": job_id, "new_settings": current_job_spec}
json={"job_id": job_id, "new_settings": current_job_spec},
version=JOBS_API_VERSION,
)
else:
self.log.info(
"Creating new job with spec %s", json.dumps(current_job_spec, indent=4)
)
job_id = jobs_api.create_job(json=current_job_spec)["job_id"]
job_id = jobs_api.create_job(
json=current_job_spec, version=JOBS_API_VERSION
)["job_id"]

run_id = jobs_api.run_now(
job_id=job_id,
jar_params=self.task_group.jar_params,
notebook_params=self.notebook_params,
python_params=self.task_group.python_params,
spark_submit_params=self.task_group.spark_submit_params,
version=JOBS_API_VERSION,
)["run_id"]
self.databricks_run_id = run_id

runs_api = RunsApi(api_client)
url = runs_api.get_run(run_id).get("run_page_url")
url = runs_api.get_run(run_id, version=JOBS_API_VERSION).get("run_page_url")
self.log.info(f"Check the job run in Databricks: {url}")
state = runs_api.get_run(run_id)["state"]["life_cycle_state"]
state = runs_api.get_run(run_id, version=JOBS_API_VERSION)["state"][
"life_cycle_state"
]
self.log.info(f"Job state: {state}")

if state not in ("PENDING", "BLOCKED", "RUNNING"):
Expand All @@ -206,7 +213,9 @@ def execute(self, context: Context) -> Any:
while state in ("PENDING", "BLOCKED"):
self.log.info(f"Job {state}")
time.sleep(5)
state = runs_api.get_run(run_id)["state"]["life_cycle_state"]
state = runs_api.get_run(run_id, version=JOBS_API_VERSION)["state"][
"life_cycle_state"
]

return {
"databricks_conn_id": self.databricks_conn_id,
Expand Down
15 changes: 9 additions & 6 deletions tests/databricks/test_notebook.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import pytest
from airflow.exceptions import AirflowException
from astro_databricks.constants import JOBS_API_VERSION
from astro_databricks.operators.notebook import DatabricksNotebookOperator
from astro_databricks.operators.workflow import (
DatabricksWorkflowTaskGroup,
Expand Down Expand Up @@ -151,7 +152,8 @@ def test_databricks_notebook_operator_without_taskgroup_new_cluster(
"libraries": [{"nb_index": {"package": "nb_package"}}],
"timeout_seconds": 0,
"email_notifications": {},
}
},
version=JOBS_API_VERSION,
)
mock_monitor.assert_called_once()

Expand Down Expand Up @@ -197,7 +199,8 @@ def test_databricks_notebook_operator_without_taskgroup_existing_cluster(
"libraries": [{"nb_index": {"package": "nb_package"}}],
"timeout_seconds": 0,
"email_notifications": {},
}
},
version=JOBS_API_VERSION,
)
mock_monitor.assert_called_once()

Expand Down Expand Up @@ -294,7 +297,7 @@ def test_wait_for_pending_task(mock_sleep, mock_runs_api, databricks_notebook_op
{"state": {"life_cycle_state": "RUNNING"}},
]
databricks_notebook_operator._wait_for_pending_task(current_task, mock_runs_api)
mock_runs_api.get_run.assert_called_with("123", version="2.1")
mock_runs_api.get_run.assert_called_with("123", version=JOBS_API_VERSION)
assert mock_runs_api.get_run.call_count == 2
mock_runs_api.reset_mock()

Expand All @@ -311,7 +314,7 @@ def test_wait_for_terminating_task(
{"state": {"life_cycle_state": "TERMINATED"}},
]
databricks_notebook_operator._wait_for_terminating_task(current_task, mock_runs_api)
mock_runs_api.get_run.assert_called_with("123", version="2.1")
mock_runs_api.get_run.assert_called_with("123", version=JOBS_API_VERSION)
assert mock_runs_api.get_run.call_count == 3
mock_runs_api.reset_mock()

Expand All @@ -326,7 +329,7 @@ def test_wait_for_running_task(mock_sleep, mock_runs_api, databricks_notebook_op
{"state": {"life_cycle_state": "TERMINATED"}},
]
databricks_notebook_operator._wait_for_running_task(current_task, mock_runs_api)
mock_runs_api.get_run.assert_called_with("123", version="2.1")
mock_runs_api.get_run.assert_called_with("123", version=JOBS_API_VERSION)
assert mock_runs_api.get_run.call_count == 3
mock_runs_api.reset_mock()

Expand Down Expand Up @@ -380,7 +383,7 @@ def test_monitor_databricks_job_success(
databricks_notebook_operator.databricks_run_id = "1"
databricks_notebook_operator.monitor_databricks_job()
mock_runs_api.return_value.get_run.assert_called_with(
databricks_notebook_operator.databricks_run_id, version="2.1"
databricks_notebook_operator.databricks_run_id, version=JOBS_API_VERSION
)
assert (
"Check the job run in Databricks: https://databricks-instance-xyz.cloud.databricks.com/#job/1234/run/1"
Expand Down
Loading
Loading