Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

get_current_context not present in user_defined_macros #33217

Closed
1 of 2 tasks
raphaelauv opened this issue Aug 8, 2023 · 6 comments · Fixed by #33645
Closed
1 of 2 tasks

get_current_context not present in user_defined_macros #33217

raphaelauv opened this issue Aug 8, 2023 · 6 comments · Fixed by #33645
Assignees
Labels
affected_version:2.6 Issues Reported for 2.6 area:core good first issue kind:bug This is a clearly a bug

Comments

@raphaelauv
Copy link
Contributor

raphaelauv commented Aug 8, 2023

Apache Airflow version

2.6.3

What happened

get_current_context() fail in a user_defined_macros

give

{abstractoperator.py:594} ERROR - Exception rendering Jinja template for task 'toot', field 'op_kwargs'. Template: {'arg': '{{ macro_run_id() }}'}
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/abstractoperator.py", line 586, in _do_render_template_fields
    rendered_content = self.render_template(
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/template/templater.py", line 168, in render_template
    return {k: self.render_template(v, context, jinja_env, oids) for k, v in value.items()}
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/template/templater.py", line 168, in <dictcomp>
    return {k: self.render_template(v, context, jinja_env, oids) for k, v in value.items()}
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/template/templater.py", line 156, in render_template
    return self._render(template, context)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/abstractoperator.py", line 540, in _render
    return super()._render(template, context, dag=dag)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/template/templater.py", line 113, in _render
    return render_template_to_string(template, context)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/helpers.py", line 288, in render_template_to_string
    return render_template(template, cast(MutableMapping[str, Any], context), native=False)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/helpers.py", line 283, in render_template
    return "".join(nodes)
  File "<template>", line 12, in root
  File "/home/airflow/.local/lib/python3.8/site-packages/jinja2/sandbox.py", line 393, in call
    return __context.call(__obj, *args, **kwargs)
  File "/home/airflow/.local/lib/python3.8/site-packages/jinja2/runtime.py", line 298, in call
    return __obj(*args, **kwargs)
  File "/opt/airflow/dags/dags/exporter/finance_closing.py", line 7, in macro_run_id
    schedule_interval = get_current_context()["dag"].schedule_interval.replace("@", "")
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 784, in get_current_context
    raise AirflowException(
airflow.exceptions.AirflowException: Current context was requested but no context was found! Are you running within an airflow task?

What you think should happen instead

User macros should be able to access to the current context

airflow.exceptions.AirflowException: Current context was requested but no context was found! Are you running within an airflow task?

How to reproduce

from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

def macro_run_id():
    from airflow.operators.python import get_current_context
    a = get_current_context()["dag"].schedule_interval.replace("@", "")
    if a == "None":
        a = "manual"
    return a


with DAG(dag_id="example2",
         start_date=days_ago(61),
         user_defined_macros={"macro_run_id": macro_run_id},
         schedule_interval="@monthly"):
    def toto(arg):
        print(arg)


    PythonOperator(task_id="toot", python_callable=toto,
                   op_kwargs={"arg": "{{ macro_run_id() }}"})

Operating System

ubuntu 22.04

Versions of Apache Airflow Providers

No response

Deployment

Other

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@raphaelauv raphaelauv added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Aug 8, 2023
@potiuk potiuk added good first issue and removed needs-triage label for new issues that we didn't triage yet labels Aug 8, 2023
@ivan-afonichkin
Copy link
Contributor

Hello @potiuk @raphaelauv, I would like to take this issue. Will submit PR in a few days. I'm not sure how I can assign it to myself though.

@eladkal eladkal added the affected_version:2.6 Issues Reported for 2.6 label Aug 10, 2023
@eladkal
Copy link
Contributor

eladkal commented Aug 10, 2023

Hello @potiuk @raphaelauv, I would like to take this issue. Will submit PR in a few days. I'm not sure how I can assign it to myself though.

assigned

@josh-fell
Copy link
Contributor

josh-fell commented Aug 15, 2023

FWIW, this can be used as a workaround (i.e. pass the dag object, or other context objects, directly to the macro).

from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago


def macro_run_id(dag):
    a = dag.schedule_interval.replace("@", "")
    if a == "None":
        a = "manual"
    return a


with DAG(
    dag_id="example2",
    start_date=days_ago(61),
    user_defined_macros={"macro_run_id": macro_run_id},
    schedule_interval="@monthly",
):

    def toto(arg):
        print(arg)

    PythonOperator(task_id="toot", python_callable=toto, op_kwargs={"arg": "{{ macro_run_id(dag) }}"})

Task log:

[2023-08-15, 17:19:53 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: example2.toot scheduled__2023-07-01T00:00:00+00:00 [queued]>
[2023-08-15, 17:19:53 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: example2.toot scheduled__2023-07-01T00:00:00+00:00 [queued]>
[2023-08-15, 17:19:53 UTC] {taskinstance.py:1308} INFO - Starting attempt 3 of 3
[2023-08-15, 17:19:53 UTC] {taskinstance.py:1327} INFO - Executing <Task(PythonOperator): toot> on 2023-07-01 00:00:00+00:00
[2023-08-15, 17:19:53 UTC] {standard_task_runner.py:57} INFO - Started process 524 to run task
[2023-08-15, 17:19:53 UTC] {standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', 'example2', 'toot', 'scheduled__2023-07-01T00:00:00+00:00', '--job-id', '11', '--raw', '--subdir', 'DAGS_FOLDER/user_defined_macros_context.py', '--cfg-path', '/tmp/tmpkb1juxn6']
[2023-08-15, 17:19:53 UTC] {standard_task_runner.py:85} INFO - Job 11: Subtask toot
[2023-08-15, 17:19:53 UTC] {task_command.py:410} INFO - Running <TaskInstance: example2.toot scheduled__2023-07-01T00:00:00+00:00 [running]> on host bca190b6c021
[2023-08-15, 17:19:53 UTC] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='example2' AIRFLOW_CTX_TASK_ID='toot' AIRFLOW_CTX_EXECUTION_DATE='2023-07-01T00:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='3' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2023-07-01T00:00:00+00:00'
[2023-08-15, 17:19:53 UTC] {logging_mixin.py:150} INFO - monthly
[2023-08-15, 17:19:53 UTC] {python.py:183} INFO - Done. Returned value was: None
[2023-08-15, 17:19:53 UTC] {taskinstance.py:1345} INFO - Marking task as SUCCESS. dag_id=example2, task_id=toot, execution_date=20230701T000000, start_date=20230815T171953, end_date=20230815T171953
[2023-08-15, 17:19:53 UTC] {local_task_job_runner.py:225} INFO - Task exited with return code 0
[2023-08-15, 17:19:54 UTC] {taskinstance.py:2653} INFO - 0 downstream tasks scheduled from follow-on schedule check

@raphaelauv
Copy link
Contributor Author

Thanks @josh-fell , it's a great hack for my (specific) use case 👍👍👍

@messi-gao
Copy link

messi-gao commented Dec 29, 2023

@ivan-afonichkin I found that this issue was fixed at 2.7.1 version.we may not upgrade airflow now , so is there a way to fix this issue under 2.7.1 version?

@potiuk
Copy link
Member

potiuk commented Dec 29, 2023

@ivan-afonichkin I found that this issue was fixed at 2.7.1 version.we may not upgrade airflow now , so is there a way to fix this issue under 2.7.1 version?

You can attempt to patch it manually - source of airflow is fully available and you might want to patch your source. But I heartily recommend to just upgrade airflow. Every single airflow release contains 10s of fixes to known problems in previous versions, a lot of them contains fixes to security issues and generally - almost by defintion - when you do not run latest version, you run a version that contains known bugs and security fixes. So "upgrade airflow to latest version" is abaolutely best way that you should proceed.

You have to remember that even if it costs some time to upgrade, it will likely cost you (and your organisation) a lot more to NOT upgrade - because of known bugs that are fixed and potential risk of security issues you have not applied (and in the future when acts like CRA (Cyber Resilience Act ) in EU and similar in the US will be approved and become the law (it will be passed in EU in a month or two) your organisation will have to upgrade to latest security-fixed version of software anyway and it will cost penalties if they don't, so I suggest get used to situation that you upgrade software as soon as it is released.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.6 Issues Reported for 2.6 area:core good first issue kind:bug This is a clearly a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants