Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor DatabricksJobRunLink to not create ad hoc TaskInstances #22571

Merged
merged 1 commit into from
Mar 31, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions airflow/providers/databricks/operators/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Union

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator, BaseOperatorLink, TaskInstance
from airflow.models import BaseOperator, BaseOperatorLink, XCom
from airflow.providers.databricks.hooks.databricks import DatabricksHook

if TYPE_CHECKING:
from airflow.models.taskinstance import TaskInstanceKey
from airflow.utils.context import Context

XCOM_RUN_ID_KEY = 'run_id'
Expand Down Expand Up @@ -107,9 +108,23 @@ class DatabricksJobRunLink(BaseOperatorLink):

name = "See Databricks Job Run"

def get_link(self, operator, dttm):
ti = TaskInstance(task=operator, execution_date=dttm)
run_page_url = ti.xcom_pull(task_ids=operator.task_id, key=XCOM_RUN_PAGE_URL_KEY)
def get_link(
self,
operator,
dttm=None,
*,
ti_key: Optional["TaskInstanceKey"] = None,
) -> str:
if ti_key is not None:
run_page_url = XCom.get_value(key=XCOM_RUN_PAGE_URL_KEY, ti_key=ti_key)
else:
assert dttm
run_page_url = XCom.get_one(
key=XCOM_RUN_PAGE_URL_KEY,
dag_id=operator.dag.dag_id,
task_id=operator.task_id,
execution_date=dttm,
)

return run_page_url

Expand Down