-
Notifications
You must be signed in to change notification settings - Fork 14.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add KPO pod-template-file jinja template support. #15942
Add KPO pod-template-file jinja template support. #15942
Conversation
This PR adds jinja template support for KubernetesPodOperator pod-template-file.
Just pointing out this was a breaking change - if you previously were using values in templated fields which ended in any of these strings, but were not meant to be references to template files, the scheduler now throws an error when it tries to run the task as it tries to treat the string as a local file and load it. |
@theodickson This PR was released in cncf provider 2.0.0 which is a major release (previous was 1.2.0) there can be breaking changes when upgrading a major version. We also mention it in the docs: Though I don't think this PR is something that can be considered as a breaking change |
Sorry I should have been clearer, I realise it was mentioned in the release notes but as a feature not a breaking change. It's not exactly a huge deal and we found the issue and fixed quickly, but it's definitely a breaking change - code we were running successfully before resulted in an error after upgrading! |
I agree that this is a breaking change, it does feel like something that could be made clearer in the release notes. I think the code that uses this param to check the extensions also is just checking In other operators the leading This means this change is probably overly inclusive of parameters being considered files. |
This commit effectively revers apache#15942 because of the issues it is causing. Until we find better solution we should revert this change Error: ``` [2021-07-26 20:23:54,109] {taskinstance.py:1108} INFO - Executing <Task(KubernetesPodOperator): write-xcom> on 2021-07-26T20:23:27.058907+00:00 [2021-07-26 20:23:54,113] {standard_task_runner.py:52} INFO - Started process 11 to run task [2021-07-26 20:23:54,297] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'k8_pod_operator_xcom', 'write-xcom', '2021-07-26T20:23:27.058907+00:00', '--job-id', '1757', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/k8s_xcom_example.py', '--cfg-path', '/tmp/tmp0q94pkhs', '--error-file', '/tmp/tmpoz9qqp2l'] [2021-07-26 20:23:54,298] {standard_task_runner.py:77} INFO - Job 1757: Subtask write-xcom [2021-07-26 20:23:54,511] {logging_mixin.py:104} INFO - Running <TaskInstance: k8_pod_operator_xcom.write-xcom 2021-07-26T20:23:27.058907+00:00 [running]> on host k8podoperatorxcomwritexcom.21384021df914227ad4e4b3a34313710 [2021-07-26 20:23:54,713] {taskinstance.py:1502} ERROR - Task failed with exception Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1158, in _run_raw_task self._prepare_and_execute_task_with_callbacks(context, task) File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1295, in _prepare_and_execute_task_with_callbacks self.render_templates(context=context) File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1796, in render_templates self.task.render_template_fields(context) File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 999, in render_template_fields self._do_render_template_fields(self, self.template_fields, context, jinja_env, set()) File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1012, in _do_render_template_fields rendered_content = self.render_template(content, context, jinja_env, seen_oids) File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in render_template return [self.render_template(element, context, jinja_env) for element in content] File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in <listcomp> return [self.render_template(element, context, jinja_env) for element in content] File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1047, in render_template return jinja_env.get_template(content).render(**context) File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 883, in get_template return self._load_template(name, self.make_globals(globals)) File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 857, in _load_template template = self.loader.load(self, name, globals) File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 115, in load source, filename, uptodate = self.get_source(environment, name) File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 197, in get_source raise TemplateNotFound(template) jinja2.exceptions.TemplateNotFound: mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json [2021-07-26 20:23:54,797] {taskinstance.py:1552} INFO - Marking task as FAILED. dag_id=k8_pod_operator_xcom, task_id=write-xcom, execution_date=20210726T202327, start_date=20210726T202353, end_date=20210726T202354 [2021-07-26 20:23:54,936] {local_task_job.py:153} INFO - Task exited with return code 1 ``` Dag: ``` from airflow import DAG from airflow.operators.bash import BashOperator from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import ( KubernetesPodOperator, ) from airflow.utils.dates import days_ago from airflow.configuration import conf namespace = conf.get("kubernetes", "NAMESPACE") # This will detect the default namespace locally and read the # environment namespace when deployed to Astronomer. if namespace == "default": config_file = "/usr/local/airflow/include/.kube/config" in_cluster = False else: in_cluster = True config_file = None default_args = { "owner": "airflow", } with DAG( dag_id="k8_pod_operator_xcom", default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=["k8"], ) as dag: write_xcom = KubernetesPodOperator( namespace=namespace, in_cluster=in_cluster, config_file=config_file, image="ubuntu", cmds=[ "sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json", ], name="write-xcom", do_xcom_push=True, is_delete_operator_pod=True, task_id="write-xcom", get_logs=True, ) pod_task_xcom_result = BashOperator( bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"", task_id="pod_task_xcom_result", ) write_xcom >> pod_task_xcom_result ``` closes apache#17186
This commit effectively revers #15942 because of the issues it is causing. Until we find better solution we should revert this change Error: ``` [2021-07-26 20:23:54,109] {taskinstance.py:1108} INFO - Executing <Task(KubernetesPodOperator): write-xcom> on 2021-07-26T20:23:27.058907+00:00 [2021-07-26 20:23:54,113] {standard_task_runner.py:52} INFO - Started process 11 to run task [2021-07-26 20:23:54,297] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'k8_pod_operator_xcom', 'write-xcom', '2021-07-26T20:23:27.058907+00:00', '--job-id', '1757', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/k8s_xcom_example.py', '--cfg-path', '/tmp/tmp0q94pkhs', '--error-file', '/tmp/tmpoz9qqp2l'] [2021-07-26 20:23:54,298] {standard_task_runner.py:77} INFO - Job 1757: Subtask write-xcom [2021-07-26 20:23:54,511] {logging_mixin.py:104} INFO - Running <TaskInstance: k8_pod_operator_xcom.write-xcom 2021-07-26T20:23:27.058907+00:00 [running]> on host k8podoperatorxcomwritexcom.21384021df914227ad4e4b3a34313710 [2021-07-26 20:23:54,713] {taskinstance.py:1502} ERROR - Task failed with exception Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1158, in _run_raw_task self._prepare_and_execute_task_with_callbacks(context, task) File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1295, in _prepare_and_execute_task_with_callbacks self.render_templates(context=context) File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1796, in render_templates self.task.render_template_fields(context) File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 999, in render_template_fields self._do_render_template_fields(self, self.template_fields, context, jinja_env, set()) File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1012, in _do_render_template_fields rendered_content = self.render_template(content, context, jinja_env, seen_oids) File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in render_template return [self.render_template(element, context, jinja_env) for element in content] File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in <listcomp> return [self.render_template(element, context, jinja_env) for element in content] File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1047, in render_template return jinja_env.get_template(content).render(**context) File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 883, in get_template return self._load_template(name, self.make_globals(globals)) File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 857, in _load_template template = self.loader.load(self, name, globals) File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 115, in load source, filename, uptodate = self.get_source(environment, name) File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 197, in get_source raise TemplateNotFound(template) jinja2.exceptions.TemplateNotFound: mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json [2021-07-26 20:23:54,797] {taskinstance.py:1552} INFO - Marking task as FAILED. dag_id=k8_pod_operator_xcom, task_id=write-xcom, execution_date=20210726T202327, start_date=20210726T202353, end_date=20210726T202354 [2021-07-26 20:23:54,936] {local_task_job.py:153} INFO - Task exited with return code 1 ``` Dag: ``` from airflow import DAG from airflow.operators.bash import BashOperator from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import ( KubernetesPodOperator, ) from airflow.utils.dates import days_ago from airflow.configuration import conf namespace = conf.get("kubernetes", "NAMESPACE") # This will detect the default namespace locally and read the # environment namespace when deployed to Astronomer. if namespace == "default": config_file = "/usr/local/airflow/include/.kube/config" in_cluster = False else: in_cluster = True config_file = None default_args = { "owner": "airflow", } with DAG( dag_id="k8_pod_operator_xcom", default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=["k8"], ) as dag: write_xcom = KubernetesPodOperator( namespace=namespace, in_cluster=in_cluster, config_file=config_file, image="ubuntu", cmds=[ "sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json", ], name="write-xcom", do_xcom_push=True, is_delete_operator_pod=True, task_id="write-xcom", get_logs=True, ) pod_task_xcom_result = BashOperator( bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"", task_id="pod_task_xcom_result", ) write_xcom >> pod_task_xcom_result ``` closes #17186
This commit effectively revers apache/airflow#15942 because of the issues it is causing. Until we find better solution we should revert this change Error: ``` [2021-07-26 20:23:54,109] {taskinstance.py:1108} INFO - Executing <Task(KubernetesPodOperator): write-xcom> on 2021-07-26T20:23:27.058907+00:00 [2021-07-26 20:23:54,113] {standard_task_runner.py:52} INFO - Started process 11 to run task [2021-07-26 20:23:54,297] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'k8_pod_operator_xcom', 'write-xcom', '2021-07-26T20:23:27.058907+00:00', '--job-id', '1757', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/k8s_xcom_example.py', '--cfg-path', '/tmp/tmp0q94pkhs', '--error-file', '/tmp/tmpoz9qqp2l'] [2021-07-26 20:23:54,298] {standard_task_runner.py:77} INFO - Job 1757: Subtask write-xcom [2021-07-26 20:23:54,511] {logging_mixin.py:104} INFO - Running <TaskInstance: k8_pod_operator_xcom.write-xcom 2021-07-26T20:23:27.058907+00:00 [running]> on host k8podoperatorxcomwritexcom.21384021df914227ad4e4b3a34313710 [2021-07-26 20:23:54,713] {taskinstance.py:1502} ERROR - Task failed with exception Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1158, in _run_raw_task self._prepare_and_execute_task_with_callbacks(context, task) File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1295, in _prepare_and_execute_task_with_callbacks self.render_templates(context=context) File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1796, in render_templates self.task.render_template_fields(context) File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 999, in render_template_fields self._do_render_template_fields(self, self.template_fields, context, jinja_env, set()) File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1012, in _do_render_template_fields rendered_content = self.render_template(content, context, jinja_env, seen_oids) File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in render_template return [self.render_template(element, context, jinja_env) for element in content] File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in <listcomp> return [self.render_template(element, context, jinja_env) for element in content] File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1047, in render_template return jinja_env.get_template(content).render(**context) File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 883, in get_template return self._load_template(name, self.make_globals(globals)) File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 857, in _load_template template = self.loader.load(self, name, globals) File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 115, in load source, filename, uptodate = self.get_source(environment, name) File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 197, in get_source raise TemplateNotFound(template) jinja2.exceptions.TemplateNotFound: mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json [2021-07-26 20:23:54,797] {taskinstance.py:1552} INFO - Marking task as FAILED. dag_id=k8_pod_operator_xcom, task_id=write-xcom, execution_date=20210726T202327, start_date=20210726T202353, end_date=20210726T202354 [2021-07-26 20:23:54,936] {local_task_job.py:153} INFO - Task exited with return code 1 ``` Dag: ``` from airflow import DAG from airflow.operators.bash import BashOperator from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import ( KubernetesPodOperator, ) from airflow.utils.dates import days_ago from airflow.configuration import conf namespace = conf.get("kubernetes", "NAMESPACE") # This will detect the default namespace locally and read the # environment namespace when deployed to Astronomer. if namespace == "default": config_file = "/usr/local/airflow/include/.kube/config" in_cluster = False else: in_cluster = True config_file = None default_args = { "owner": "airflow", } with DAG( dag_id="k8_pod_operator_xcom", default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=["k8"], ) as dag: write_xcom = KubernetesPodOperator( namespace=namespace, in_cluster=in_cluster, config_file=config_file, image="ubuntu", cmds=[ "sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json", ], name="write-xcom", do_xcom_push=True, is_delete_operator_pod=True, task_id="write-xcom", get_logs=True, ) pod_task_xcom_result = BashOperator( bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"", task_id="pod_task_xcom_result", ) write_xcom >> pod_task_xcom_result ``` closes apache/airflow#17186 GitOrigin-RevId: 73d2b720e0c79323a29741882a07eb8962256762
This commit effectively revers apache/airflow#15942 because of the issues it is causing. Until we find better solution we should revert this change Error: ``` [2021-07-26 20:23:54,109] {taskinstance.py:1108} INFO - Executing <Task(KubernetesPodOperator): write-xcom> on 2021-07-26T20:23:27.058907+00:00 [2021-07-26 20:23:54,113] {standard_task_runner.py:52} INFO - Started process 11 to run task [2021-07-26 20:23:54,297] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'k8_pod_operator_xcom', 'write-xcom', '2021-07-26T20:23:27.058907+00:00', '--job-id', '1757', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/k8s_xcom_example.py', '--cfg-path', '/tmp/tmp0q94pkhs', '--error-file', '/tmp/tmpoz9qqp2l'] [2021-07-26 20:23:54,298] {standard_task_runner.py:77} INFO - Job 1757: Subtask write-xcom [2021-07-26 20:23:54,511] {logging_mixin.py:104} INFO - Running <TaskInstance: k8_pod_operator_xcom.write-xcom 2021-07-26T20:23:27.058907+00:00 [running]> on host k8podoperatorxcomwritexcom.21384021df914227ad4e4b3a34313710 [2021-07-26 20:23:54,713] {taskinstance.py:1502} ERROR - Task failed with exception Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1158, in _run_raw_task self._prepare_and_execute_task_with_callbacks(context, task) File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1295, in _prepare_and_execute_task_with_callbacks self.render_templates(context=context) File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1796, in render_templates self.task.render_template_fields(context) File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 999, in render_template_fields self._do_render_template_fields(self, self.template_fields, context, jinja_env, set()) File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1012, in _do_render_template_fields rendered_content = self.render_template(content, context, jinja_env, seen_oids) File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in render_template return [self.render_template(element, context, jinja_env) for element in content] File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in <listcomp> return [self.render_template(element, context, jinja_env) for element in content] File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1047, in render_template return jinja_env.get_template(content).render(**context) File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 883, in get_template return self._load_template(name, self.make_globals(globals)) File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 857, in _load_template template = self.loader.load(self, name, globals) File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 115, in load source, filename, uptodate = self.get_source(environment, name) File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 197, in get_source raise TemplateNotFound(template) jinja2.exceptions.TemplateNotFound: mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json [2021-07-26 20:23:54,797] {taskinstance.py:1552} INFO - Marking task as FAILED. dag_id=k8_pod_operator_xcom, task_id=write-xcom, execution_date=20210726T202327, start_date=20210726T202353, end_date=20210726T202354 [2021-07-26 20:23:54,936] {local_task_job.py:153} INFO - Task exited with return code 1 ``` Dag: ``` from airflow import DAG from airflow.operators.bash import BashOperator from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import ( KubernetesPodOperator, ) from airflow.utils.dates import days_ago from airflow.configuration import conf namespace = conf.get("kubernetes", "NAMESPACE") # This will detect the default namespace locally and read the # environment namespace when deployed to Astronomer. if namespace == "default": config_file = "/usr/local/airflow/include/.kube/config" in_cluster = False else: in_cluster = True config_file = None default_args = { "owner": "airflow", } with DAG( dag_id="k8_pod_operator_xcom", default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=["k8"], ) as dag: write_xcom = KubernetesPodOperator( namespace=namespace, in_cluster=in_cluster, config_file=config_file, image="ubuntu", cmds=[ "sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json", ], name="write-xcom", do_xcom_push=True, is_delete_operator_pod=True, task_id="write-xcom", get_logs=True, ) pod_task_xcom_result = BashOperator( bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"", task_id="pod_task_xcom_result", ) write_xcom >> pod_task_xcom_result ``` closes apache/airflow#17186 GitOrigin-RevId: 73d2b720e0c79323a29741882a07eb8962256762
This commit effectively revers apache/airflow#15942 because of the issues it is causing. Until we find better solution we should revert this change Error: ``` [2021-07-26 20:23:54,109] {taskinstance.py:1108} INFO - Executing <Task(KubernetesPodOperator): write-xcom> on 2021-07-26T20:23:27.058907+00:00 [2021-07-26 20:23:54,113] {standard_task_runner.py:52} INFO - Started process 11 to run task [2021-07-26 20:23:54,297] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'k8_pod_operator_xcom', 'write-xcom', '2021-07-26T20:23:27.058907+00:00', '--job-id', '1757', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/k8s_xcom_example.py', '--cfg-path', '/tmp/tmp0q94pkhs', '--error-file', '/tmp/tmpoz9qqp2l'] [2021-07-26 20:23:54,298] {standard_task_runner.py:77} INFO - Job 1757: Subtask write-xcom [2021-07-26 20:23:54,511] {logging_mixin.py:104} INFO - Running <TaskInstance: k8_pod_operator_xcom.write-xcom 2021-07-26T20:23:27.058907+00:00 [running]> on host k8podoperatorxcomwritexcom.21384021df914227ad4e4b3a34313710 [2021-07-26 20:23:54,713] {taskinstance.py:1502} ERROR - Task failed with exception Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1158, in _run_raw_task self._prepare_and_execute_task_with_callbacks(context, task) File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1295, in _prepare_and_execute_task_with_callbacks self.render_templates(context=context) File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1796, in render_templates self.task.render_template_fields(context) File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 999, in render_template_fields self._do_render_template_fields(self, self.template_fields, context, jinja_env, set()) File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1012, in _do_render_template_fields rendered_content = self.render_template(content, context, jinja_env, seen_oids) File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in render_template return [self.render_template(element, context, jinja_env) for element in content] File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in <listcomp> return [self.render_template(element, context, jinja_env) for element in content] File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1047, in render_template return jinja_env.get_template(content).render(**context) File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 883, in get_template return self._load_template(name, self.make_globals(globals)) File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 857, in _load_template template = self.loader.load(self, name, globals) File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 115, in load source, filename, uptodate = self.get_source(environment, name) File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 197, in get_source raise TemplateNotFound(template) jinja2.exceptions.TemplateNotFound: mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json [2021-07-26 20:23:54,797] {taskinstance.py:1552} INFO - Marking task as FAILED. dag_id=k8_pod_operator_xcom, task_id=write-xcom, execution_date=20210726T202327, start_date=20210726T202353, end_date=20210726T202354 [2021-07-26 20:23:54,936] {local_task_job.py:153} INFO - Task exited with return code 1 ``` Dag: ``` from airflow import DAG from airflow.operators.bash import BashOperator from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import ( KubernetesPodOperator, ) from airflow.utils.dates import days_ago from airflow.configuration import conf namespace = conf.get("kubernetes", "NAMESPACE") # This will detect the default namespace locally and read the # environment namespace when deployed to Astronomer. if namespace == "default": config_file = "/usr/local/airflow/include/.kube/config" in_cluster = False else: in_cluster = True config_file = None default_args = { "owner": "airflow", } with DAG( dag_id="k8_pod_operator_xcom", default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=["k8"], ) as dag: write_xcom = KubernetesPodOperator( namespace=namespace, in_cluster=in_cluster, config_file=config_file, image="ubuntu", cmds=[ "sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json", ], name="write-xcom", do_xcom_push=True, is_delete_operator_pod=True, task_id="write-xcom", get_logs=True, ) pod_task_xcom_result = BashOperator( bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"", task_id="pod_task_xcom_result", ) write_xcom >> pod_task_xcom_result ``` closes apache/airflow#17186 GitOrigin-RevId: 73d2b720e0c79323a29741882a07eb8962256762
This commit effectively revers apache/airflow#15942 because of the issues it is causing. Until we find better solution we should revert this change Error: ``` [2021-07-26 20:23:54,109] {taskinstance.py:1108} INFO - Executing <Task(KubernetesPodOperator): write-xcom> on 2021-07-26T20:23:27.058907+00:00 [2021-07-26 20:23:54,113] {standard_task_runner.py:52} INFO - Started process 11 to run task [2021-07-26 20:23:54,297] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'k8_pod_operator_xcom', 'write-xcom', '2021-07-26T20:23:27.058907+00:00', '--job-id', '1757', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/k8s_xcom_example.py', '--cfg-path', '/tmp/tmp0q94pkhs', '--error-file', '/tmp/tmpoz9qqp2l'] [2021-07-26 20:23:54,298] {standard_task_runner.py:77} INFO - Job 1757: Subtask write-xcom [2021-07-26 20:23:54,511] {logging_mixin.py:104} INFO - Running <TaskInstance: k8_pod_operator_xcom.write-xcom 2021-07-26T20:23:27.058907+00:00 [running]> on host k8podoperatorxcomwritexcom.21384021df914227ad4e4b3a34313710 [2021-07-26 20:23:54,713] {taskinstance.py:1502} ERROR - Task failed with exception Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1158, in _run_raw_task self._prepare_and_execute_task_with_callbacks(context, task) File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1295, in _prepare_and_execute_task_with_callbacks self.render_templates(context=context) File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1796, in render_templates self.task.render_template_fields(context) File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 999, in render_template_fields self._do_render_template_fields(self, self.template_fields, context, jinja_env, set()) File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1012, in _do_render_template_fields rendered_content = self.render_template(content, context, jinja_env, seen_oids) File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in render_template return [self.render_template(element, context, jinja_env) for element in content] File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in <listcomp> return [self.render_template(element, context, jinja_env) for element in content] File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1047, in render_template return jinja_env.get_template(content).render(**context) File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 883, in get_template return self._load_template(name, self.make_globals(globals)) File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 857, in _load_template template = self.loader.load(self, name, globals) File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 115, in load source, filename, uptodate = self.get_source(environment, name) File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 197, in get_source raise TemplateNotFound(template) jinja2.exceptions.TemplateNotFound: mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json [2021-07-26 20:23:54,797] {taskinstance.py:1552} INFO - Marking task as FAILED. dag_id=k8_pod_operator_xcom, task_id=write-xcom, execution_date=20210726T202327, start_date=20210726T202353, end_date=20210726T202354 [2021-07-26 20:23:54,936] {local_task_job.py:153} INFO - Task exited with return code 1 ``` Dag: ``` from airflow import DAG from airflow.operators.bash import BashOperator from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import ( KubernetesPodOperator, ) from airflow.utils.dates import days_ago from airflow.configuration import conf namespace = conf.get("kubernetes", "NAMESPACE") # This will detect the default namespace locally and read the # environment namespace when deployed to Astronomer. if namespace == "default": config_file = "/usr/local/airflow/include/.kube/config" in_cluster = False else: in_cluster = True config_file = None default_args = { "owner": "airflow", } with DAG( dag_id="k8_pod_operator_xcom", default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=["k8"], ) as dag: write_xcom = KubernetesPodOperator( namespace=namespace, in_cluster=in_cluster, config_file=config_file, image="ubuntu", cmds=[ "sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json", ], name="write-xcom", do_xcom_push=True, is_delete_operator_pod=True, task_id="write-xcom", get_logs=True, ) pod_task_xcom_result = BashOperator( bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"", task_id="pod_task_xcom_result", ) write_xcom >> pod_task_xcom_result ``` closes apache/airflow#17186 GitOrigin-RevId: 73d2b720e0c79323a29741882a07eb8962256762
This commit effectively revers apache/airflow#15942 because of the issues it is causing. Until we find better solution we should revert this change Error: ``` [2021-07-26 20:23:54,109] {taskinstance.py:1108} INFO - Executing <Task(KubernetesPodOperator): write-xcom> on 2021-07-26T20:23:27.058907+00:00 [2021-07-26 20:23:54,113] {standard_task_runner.py:52} INFO - Started process 11 to run task [2021-07-26 20:23:54,297] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'k8_pod_operator_xcom', 'write-xcom', '2021-07-26T20:23:27.058907+00:00', '--job-id', '1757', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/k8s_xcom_example.py', '--cfg-path', '/tmp/tmp0q94pkhs', '--error-file', '/tmp/tmpoz9qqp2l'] [2021-07-26 20:23:54,298] {standard_task_runner.py:77} INFO - Job 1757: Subtask write-xcom [2021-07-26 20:23:54,511] {logging_mixin.py:104} INFO - Running <TaskInstance: k8_pod_operator_xcom.write-xcom 2021-07-26T20:23:27.058907+00:00 [running]> on host k8podoperatorxcomwritexcom.21384021df914227ad4e4b3a34313710 [2021-07-26 20:23:54,713] {taskinstance.py:1502} ERROR - Task failed with exception Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1158, in _run_raw_task self._prepare_and_execute_task_with_callbacks(context, task) File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1295, in _prepare_and_execute_task_with_callbacks self.render_templates(context=context) File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1796, in render_templates self.task.render_template_fields(context) File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 999, in render_template_fields self._do_render_template_fields(self, self.template_fields, context, jinja_env, set()) File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1012, in _do_render_template_fields rendered_content = self.render_template(content, context, jinja_env, seen_oids) File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in render_template return [self.render_template(element, context, jinja_env) for element in content] File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in <listcomp> return [self.render_template(element, context, jinja_env) for element in content] File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1047, in render_template return jinja_env.get_template(content).render(**context) File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 883, in get_template return self._load_template(name, self.make_globals(globals)) File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 857, in _load_template template = self.loader.load(self, name, globals) File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 115, in load source, filename, uptodate = self.get_source(environment, name) File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 197, in get_source raise TemplateNotFound(template) jinja2.exceptions.TemplateNotFound: mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json [2021-07-26 20:23:54,797] {taskinstance.py:1552} INFO - Marking task as FAILED. dag_id=k8_pod_operator_xcom, task_id=write-xcom, execution_date=20210726T202327, start_date=20210726T202353, end_date=20210726T202354 [2021-07-26 20:23:54,936] {local_task_job.py:153} INFO - Task exited with return code 1 ``` Dag: ``` from airflow import DAG from airflow.operators.bash import BashOperator from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import ( KubernetesPodOperator, ) from airflow.utils.dates import days_ago from airflow.configuration import conf namespace = conf.get("kubernetes", "NAMESPACE") # This will detect the default namespace locally and read the # environment namespace when deployed to Astronomer. if namespace == "default": config_file = "/usr/local/airflow/include/.kube/config" in_cluster = False else: in_cluster = True config_file = None default_args = { "owner": "airflow", } with DAG( dag_id="k8_pod_operator_xcom", default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=["k8"], ) as dag: write_xcom = KubernetesPodOperator( namespace=namespace, in_cluster=in_cluster, config_file=config_file, image="ubuntu", cmds=[ "sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json", ], name="write-xcom", do_xcom_push=True, is_delete_operator_pod=True, task_id="write-xcom", get_logs=True, ) pod_task_xcom_result = BashOperator( bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"", task_id="pod_task_xcom_result", ) write_xcom >> pod_task_xcom_result ``` closes apache/airflow#17186 GitOrigin-RevId: 73d2b720e0c79323a29741882a07eb8962256762
This commit effectively revers apache/airflow#15942 because of the issues it is causing. Until we find better solution we should revert this change Error: ``` [2021-07-26 20:23:54,109] {taskinstance.py:1108} INFO - Executing <Task(KubernetesPodOperator): write-xcom> on 2021-07-26T20:23:27.058907+00:00 [2021-07-26 20:23:54,113] {standard_task_runner.py:52} INFO - Started process 11 to run task [2021-07-26 20:23:54,297] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'k8_pod_operator_xcom', 'write-xcom', '2021-07-26T20:23:27.058907+00:00', '--job-id', '1757', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/k8s_xcom_example.py', '--cfg-path', '/tmp/tmp0q94pkhs', '--error-file', '/tmp/tmpoz9qqp2l'] [2021-07-26 20:23:54,298] {standard_task_runner.py:77} INFO - Job 1757: Subtask write-xcom [2021-07-26 20:23:54,511] {logging_mixin.py:104} INFO - Running <TaskInstance: k8_pod_operator_xcom.write-xcom 2021-07-26T20:23:27.058907+00:00 [running]> on host k8podoperatorxcomwritexcom.21384021df914227ad4e4b3a34313710 [2021-07-26 20:23:54,713] {taskinstance.py:1502} ERROR - Task failed with exception Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1158, in _run_raw_task self._prepare_and_execute_task_with_callbacks(context, task) File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1295, in _prepare_and_execute_task_with_callbacks self.render_templates(context=context) File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1796, in render_templates self.task.render_template_fields(context) File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 999, in render_template_fields self._do_render_template_fields(self, self.template_fields, context, jinja_env, set()) File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1012, in _do_render_template_fields rendered_content = self.render_template(content, context, jinja_env, seen_oids) File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in render_template return [self.render_template(element, context, jinja_env) for element in content] File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in <listcomp> return [self.render_template(element, context, jinja_env) for element in content] File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1047, in render_template return jinja_env.get_template(content).render(**context) File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 883, in get_template return self._load_template(name, self.make_globals(globals)) File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 857, in _load_template template = self.loader.load(self, name, globals) File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 115, in load source, filename, uptodate = self.get_source(environment, name) File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 197, in get_source raise TemplateNotFound(template) jinja2.exceptions.TemplateNotFound: mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json [2021-07-26 20:23:54,797] {taskinstance.py:1552} INFO - Marking task as FAILED. dag_id=k8_pod_operator_xcom, task_id=write-xcom, execution_date=20210726T202327, start_date=20210726T202353, end_date=20210726T202354 [2021-07-26 20:23:54,936] {local_task_job.py:153} INFO - Task exited with return code 1 ``` Dag: ``` from airflow import DAG from airflow.operators.bash import BashOperator from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import ( KubernetesPodOperator, ) from airflow.utils.dates import days_ago from airflow.configuration import conf namespace = conf.get("kubernetes", "NAMESPACE") # This will detect the default namespace locally and read the # environment namespace when deployed to Astronomer. if namespace == "default": config_file = "/usr/local/airflow/include/.kube/config" in_cluster = False else: in_cluster = True config_file = None default_args = { "owner": "airflow", } with DAG( dag_id="k8_pod_operator_xcom", default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=["k8"], ) as dag: write_xcom = KubernetesPodOperator( namespace=namespace, in_cluster=in_cluster, config_file=config_file, image="ubuntu", cmds=[ "sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json", ], name="write-xcom", do_xcom_push=True, is_delete_operator_pod=True, task_id="write-xcom", get_logs=True, ) pod_task_xcom_result = BashOperator( bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"", task_id="pod_task_xcom_result", ) write_xcom >> pod_task_xcom_result ``` closes apache/airflow#17186 GitOrigin-RevId: 73d2b720e0c79323a29741882a07eb8962256762
This commit effectively revers apache/airflow#15942 because of the issues it is causing. Until we find better solution we should revert this change Error: ``` [2021-07-26 20:23:54,109] {taskinstance.py:1108} INFO - Executing <Task(KubernetesPodOperator): write-xcom> on 2021-07-26T20:23:27.058907+00:00 [2021-07-26 20:23:54,113] {standard_task_runner.py:52} INFO - Started process 11 to run task [2021-07-26 20:23:54,297] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'k8_pod_operator_xcom', 'write-xcom', '2021-07-26T20:23:27.058907+00:00', '--job-id', '1757', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/k8s_xcom_example.py', '--cfg-path', '/tmp/tmp0q94pkhs', '--error-file', '/tmp/tmpoz9qqp2l'] [2021-07-26 20:23:54,298] {standard_task_runner.py:77} INFO - Job 1757: Subtask write-xcom [2021-07-26 20:23:54,511] {logging_mixin.py:104} INFO - Running <TaskInstance: k8_pod_operator_xcom.write-xcom 2021-07-26T20:23:27.058907+00:00 [running]> on host k8podoperatorxcomwritexcom.21384021df914227ad4e4b3a34313710 [2021-07-26 20:23:54,713] {taskinstance.py:1502} ERROR - Task failed with exception Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1158, in _run_raw_task self._prepare_and_execute_task_with_callbacks(context, task) File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1295, in _prepare_and_execute_task_with_callbacks self.render_templates(context=context) File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1796, in render_templates self.task.render_template_fields(context) File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 999, in render_template_fields self._do_render_template_fields(self, self.template_fields, context, jinja_env, set()) File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1012, in _do_render_template_fields rendered_content = self.render_template(content, context, jinja_env, seen_oids) File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in render_template return [self.render_template(element, context, jinja_env) for element in content] File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in <listcomp> return [self.render_template(element, context, jinja_env) for element in content] File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1047, in render_template return jinja_env.get_template(content).render(**context) File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 883, in get_template return self._load_template(name, self.make_globals(globals)) File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 857, in _load_template template = self.loader.load(self, name, globals) File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 115, in load source, filename, uptodate = self.get_source(environment, name) File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 197, in get_source raise TemplateNotFound(template) jinja2.exceptions.TemplateNotFound: mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json [2021-07-26 20:23:54,797] {taskinstance.py:1552} INFO - Marking task as FAILED. dag_id=k8_pod_operator_xcom, task_id=write-xcom, execution_date=20210726T202327, start_date=20210726T202353, end_date=20210726T202354 [2021-07-26 20:23:54,936] {local_task_job.py:153} INFO - Task exited with return code 1 ``` Dag: ``` from airflow import DAG from airflow.operators.bash import BashOperator from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import ( KubernetesPodOperator, ) from airflow.utils.dates import days_ago from airflow.configuration import conf namespace = conf.get("kubernetes", "NAMESPACE") # This will detect the default namespace locally and read the # environment namespace when deployed to Astronomer. if namespace == "default": config_file = "/usr/local/airflow/include/.kube/config" in_cluster = False else: in_cluster = True config_file = None default_args = { "owner": "airflow", } with DAG( dag_id="k8_pod_operator_xcom", default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=["k8"], ) as dag: write_xcom = KubernetesPodOperator( namespace=namespace, in_cluster=in_cluster, config_file=config_file, image="ubuntu", cmds=[ "sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json", ], name="write-xcom", do_xcom_push=True, is_delete_operator_pod=True, task_id="write-xcom", get_logs=True, ) pod_task_xcom_result = BashOperator( bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"", task_id="pod_task_xcom_result", ) write_xcom >> pod_task_xcom_result ``` closes apache/airflow#17186 GitOrigin-RevId: 73d2b720e0c79323a29741882a07eb8962256762
This commit effectively revers apache/airflow#15942 because of the issues it is causing. Until we find better solution we should revert this change Error: ``` [2021-07-26 20:23:54,109] {taskinstance.py:1108} INFO - Executing <Task(KubernetesPodOperator): write-xcom> on 2021-07-26T20:23:27.058907+00:00 [2021-07-26 20:23:54,113] {standard_task_runner.py:52} INFO - Started process 11 to run task [2021-07-26 20:23:54,297] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'k8_pod_operator_xcom', 'write-xcom', '2021-07-26T20:23:27.058907+00:00', '--job-id', '1757', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/k8s_xcom_example.py', '--cfg-path', '/tmp/tmp0q94pkhs', '--error-file', '/tmp/tmpoz9qqp2l'] [2021-07-26 20:23:54,298] {standard_task_runner.py:77} INFO - Job 1757: Subtask write-xcom [2021-07-26 20:23:54,511] {logging_mixin.py:104} INFO - Running <TaskInstance: k8_pod_operator_xcom.write-xcom 2021-07-26T20:23:27.058907+00:00 [running]> on host k8podoperatorxcomwritexcom.21384021df914227ad4e4b3a34313710 [2021-07-26 20:23:54,713] {taskinstance.py:1502} ERROR - Task failed with exception Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1158, in _run_raw_task self._prepare_and_execute_task_with_callbacks(context, task) File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1295, in _prepare_and_execute_task_with_callbacks self.render_templates(context=context) File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1796, in render_templates self.task.render_template_fields(context) File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 999, in render_template_fields self._do_render_template_fields(self, self.template_fields, context, jinja_env, set()) File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1012, in _do_render_template_fields rendered_content = self.render_template(content, context, jinja_env, seen_oids) File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in render_template return [self.render_template(element, context, jinja_env) for element in content] File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in <listcomp> return [self.render_template(element, context, jinja_env) for element in content] File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1047, in render_template return jinja_env.get_template(content).render(**context) File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 883, in get_template return self._load_template(name, self.make_globals(globals)) File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 857, in _load_template template = self.loader.load(self, name, globals) File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 115, in load source, filename, uptodate = self.get_source(environment, name) File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 197, in get_source raise TemplateNotFound(template) jinja2.exceptions.TemplateNotFound: mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json [2021-07-26 20:23:54,797] {taskinstance.py:1552} INFO - Marking task as FAILED. dag_id=k8_pod_operator_xcom, task_id=write-xcom, execution_date=20210726T202327, start_date=20210726T202353, end_date=20210726T202354 [2021-07-26 20:23:54,936] {local_task_job.py:153} INFO - Task exited with return code 1 ``` Dag: ``` from airflow import DAG from airflow.operators.bash import BashOperator from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import ( KubernetesPodOperator, ) from airflow.utils.dates import days_ago from airflow.configuration import conf namespace = conf.get("kubernetes", "NAMESPACE") # This will detect the default namespace locally and read the # environment namespace when deployed to Astronomer. if namespace == "default": config_file = "/usr/local/airflow/include/.kube/config" in_cluster = False else: in_cluster = True config_file = None default_args = { "owner": "airflow", } with DAG( dag_id="k8_pod_operator_xcom", default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=["k8"], ) as dag: write_xcom = KubernetesPodOperator( namespace=namespace, in_cluster=in_cluster, config_file=config_file, image="ubuntu", cmds=[ "sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json", ], name="write-xcom", do_xcom_push=True, is_delete_operator_pod=True, task_id="write-xcom", get_logs=True, ) pod_task_xcom_result = BashOperator( bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"", task_id="pod_task_xcom_result", ) write_xcom >> pod_task_xcom_result ``` closes apache/airflow#17186 GitOrigin-RevId: 73d2b720e0c79323a29741882a07eb8962256762
This commit effectively revers apache/airflow#15942 because of the issues it is causing. Until we find better solution we should revert this change Error: ``` [2021-07-26 20:23:54,109] {taskinstance.py:1108} INFO - Executing <Task(KubernetesPodOperator): write-xcom> on 2021-07-26T20:23:27.058907+00:00 [2021-07-26 20:23:54,113] {standard_task_runner.py:52} INFO - Started process 11 to run task [2021-07-26 20:23:54,297] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'k8_pod_operator_xcom', 'write-xcom', '2021-07-26T20:23:27.058907+00:00', '--job-id', '1757', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/k8s_xcom_example.py', '--cfg-path', '/tmp/tmp0q94pkhs', '--error-file', '/tmp/tmpoz9qqp2l'] [2021-07-26 20:23:54,298] {standard_task_runner.py:77} INFO - Job 1757: Subtask write-xcom [2021-07-26 20:23:54,511] {logging_mixin.py:104} INFO - Running <TaskInstance: k8_pod_operator_xcom.write-xcom 2021-07-26T20:23:27.058907+00:00 [running]> on host k8podoperatorxcomwritexcom.21384021df914227ad4e4b3a34313710 [2021-07-26 20:23:54,713] {taskinstance.py:1502} ERROR - Task failed with exception Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1158, in _run_raw_task self._prepare_and_execute_task_with_callbacks(context, task) File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1295, in _prepare_and_execute_task_with_callbacks self.render_templates(context=context) File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1796, in render_templates self.task.render_template_fields(context) File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 999, in render_template_fields self._do_render_template_fields(self, self.template_fields, context, jinja_env, set()) File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1012, in _do_render_template_fields rendered_content = self.render_template(content, context, jinja_env, seen_oids) File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in render_template return [self.render_template(element, context, jinja_env) for element in content] File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in <listcomp> return [self.render_template(element, context, jinja_env) for element in content] File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1047, in render_template return jinja_env.get_template(content).render(**context) File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 883, in get_template return self._load_template(name, self.make_globals(globals)) File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 857, in _load_template template = self.loader.load(self, name, globals) File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 115, in load source, filename, uptodate = self.get_source(environment, name) File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 197, in get_source raise TemplateNotFound(template) jinja2.exceptions.TemplateNotFound: mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json [2021-07-26 20:23:54,797] {taskinstance.py:1552} INFO - Marking task as FAILED. dag_id=k8_pod_operator_xcom, task_id=write-xcom, execution_date=20210726T202327, start_date=20210726T202353, end_date=20210726T202354 [2021-07-26 20:23:54,936] {local_task_job.py:153} INFO - Task exited with return code 1 ``` Dag: ``` from airflow import DAG from airflow.operators.bash import BashOperator from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import ( KubernetesPodOperator, ) from airflow.utils.dates import days_ago from airflow.configuration import conf namespace = conf.get("kubernetes", "NAMESPACE") # This will detect the default namespace locally and read the # environment namespace when deployed to Astronomer. if namespace == "default": config_file = "/usr/local/airflow/include/.kube/config" in_cluster = False else: in_cluster = True config_file = None default_args = { "owner": "airflow", } with DAG( dag_id="k8_pod_operator_xcom", default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=["k8"], ) as dag: write_xcom = KubernetesPodOperator( namespace=namespace, in_cluster=in_cluster, config_file=config_file, image="ubuntu", cmds=[ "sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json", ], name="write-xcom", do_xcom_push=True, is_delete_operator_pod=True, task_id="write-xcom", get_logs=True, ) pod_task_xcom_result = BashOperator( bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"", task_id="pod_task_xcom_result", ) write_xcom >> pod_task_xcom_result ``` closes apache/airflow#17186 GitOrigin-RevId: 73d2b720e0c79323a29741882a07eb8962256762
This commit effectively revers apache/airflow#15942 because of the issues it is causing. Until we find better solution we should revert this change Error: ``` [2021-07-26 20:23:54,109] {taskinstance.py:1108} INFO - Executing <Task(KubernetesPodOperator): write-xcom> on 2021-07-26T20:23:27.058907+00:00 [2021-07-26 20:23:54,113] {standard_task_runner.py:52} INFO - Started process 11 to run task [2021-07-26 20:23:54,297] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'k8_pod_operator_xcom', 'write-xcom', '2021-07-26T20:23:27.058907+00:00', '--job-id', '1757', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/k8s_xcom_example.py', '--cfg-path', '/tmp/tmp0q94pkhs', '--error-file', '/tmp/tmpoz9qqp2l'] [2021-07-26 20:23:54,298] {standard_task_runner.py:77} INFO - Job 1757: Subtask write-xcom [2021-07-26 20:23:54,511] {logging_mixin.py:104} INFO - Running <TaskInstance: k8_pod_operator_xcom.write-xcom 2021-07-26T20:23:27.058907+00:00 [running]> on host k8podoperatorxcomwritexcom.21384021df914227ad4e4b3a34313710 [2021-07-26 20:23:54,713] {taskinstance.py:1502} ERROR - Task failed with exception Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1158, in _run_raw_task self._prepare_and_execute_task_with_callbacks(context, task) File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1295, in _prepare_and_execute_task_with_callbacks self.render_templates(context=context) File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1796, in render_templates self.task.render_template_fields(context) File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 999, in render_template_fields self._do_render_template_fields(self, self.template_fields, context, jinja_env, set()) File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1012, in _do_render_template_fields rendered_content = self.render_template(content, context, jinja_env, seen_oids) File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in render_template return [self.render_template(element, context, jinja_env) for element in content] File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in <listcomp> return [self.render_template(element, context, jinja_env) for element in content] File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1047, in render_template return jinja_env.get_template(content).render(**context) File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 883, in get_template return self._load_template(name, self.make_globals(globals)) File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 857, in _load_template template = self.loader.load(self, name, globals) File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 115, in load source, filename, uptodate = self.get_source(environment, name) File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 197, in get_source raise TemplateNotFound(template) jinja2.exceptions.TemplateNotFound: mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json [2021-07-26 20:23:54,797] {taskinstance.py:1552} INFO - Marking task as FAILED. dag_id=k8_pod_operator_xcom, task_id=write-xcom, execution_date=20210726T202327, start_date=20210726T202353, end_date=20210726T202354 [2021-07-26 20:23:54,936] {local_task_job.py:153} INFO - Task exited with return code 1 ``` Dag: ``` from airflow import DAG from airflow.operators.bash import BashOperator from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import ( KubernetesPodOperator, ) from airflow.utils.dates import days_ago from airflow.configuration import conf namespace = conf.get("kubernetes", "NAMESPACE") # This will detect the default namespace locally and read the # environment namespace when deployed to Astronomer. if namespace == "default": config_file = "/usr/local/airflow/include/.kube/config" in_cluster = False else: in_cluster = True config_file = None default_args = { "owner": "airflow", } with DAG( dag_id="k8_pod_operator_xcom", default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=["k8"], ) as dag: write_xcom = KubernetesPodOperator( namespace=namespace, in_cluster=in_cluster, config_file=config_file, image="ubuntu", cmds=[ "sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json", ], name="write-xcom", do_xcom_push=True, is_delete_operator_pod=True, task_id="write-xcom", get_logs=True, ) pod_task_xcom_result = BashOperator( bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"", task_id="pod_task_xcom_result", ) write_xcom >> pod_task_xcom_result ``` closes apache/airflow#17186 GitOrigin-RevId: 73d2b720e0c79323a29741882a07eb8962256762
This commit effectively revers apache/airflow#15942 because of the issues it is causing. Until we find better solution we should revert this change Error: ``` [2021-07-26 20:23:54,109] {taskinstance.py:1108} INFO - Executing <Task(KubernetesPodOperator): write-xcom> on 2021-07-26T20:23:27.058907+00:00 [2021-07-26 20:23:54,113] {standard_task_runner.py:52} INFO - Started process 11 to run task [2021-07-26 20:23:54,297] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'k8_pod_operator_xcom', 'write-xcom', '2021-07-26T20:23:27.058907+00:00', '--job-id', '1757', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/k8s_xcom_example.py', '--cfg-path', '/tmp/tmp0q94pkhs', '--error-file', '/tmp/tmpoz9qqp2l'] [2021-07-26 20:23:54,298] {standard_task_runner.py:77} INFO - Job 1757: Subtask write-xcom [2021-07-26 20:23:54,511] {logging_mixin.py:104} INFO - Running <TaskInstance: k8_pod_operator_xcom.write-xcom 2021-07-26T20:23:27.058907+00:00 [running]> on host k8podoperatorxcomwritexcom.21384021df914227ad4e4b3a34313710 [2021-07-26 20:23:54,713] {taskinstance.py:1502} ERROR - Task failed with exception Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1158, in _run_raw_task self._prepare_and_execute_task_with_callbacks(context, task) File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1295, in _prepare_and_execute_task_with_callbacks self.render_templates(context=context) File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1796, in render_templates self.task.render_template_fields(context) File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 999, in render_template_fields self._do_render_template_fields(self, self.template_fields, context, jinja_env, set()) File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1012, in _do_render_template_fields rendered_content = self.render_template(content, context, jinja_env, seen_oids) File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in render_template return [self.render_template(element, context, jinja_env) for element in content] File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in <listcomp> return [self.render_template(element, context, jinja_env) for element in content] File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1047, in render_template return jinja_env.get_template(content).render(**context) File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 883, in get_template return self._load_template(name, self.make_globals(globals)) File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 857, in _load_template template = self.loader.load(self, name, globals) File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 115, in load source, filename, uptodate = self.get_source(environment, name) File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 197, in get_source raise TemplateNotFound(template) jinja2.exceptions.TemplateNotFound: mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json [2021-07-26 20:23:54,797] {taskinstance.py:1552} INFO - Marking task as FAILED. dag_id=k8_pod_operator_xcom, task_id=write-xcom, execution_date=20210726T202327, start_date=20210726T202353, end_date=20210726T202354 [2021-07-26 20:23:54,936] {local_task_job.py:153} INFO - Task exited with return code 1 ``` Dag: ``` from airflow import DAG from airflow.operators.bash import BashOperator from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import ( KubernetesPodOperator, ) from airflow.utils.dates import days_ago from airflow.configuration import conf namespace = conf.get("kubernetes", "NAMESPACE") # This will detect the default namespace locally and read the # environment namespace when deployed to Astronomer. if namespace == "default": config_file = "/usr/local/airflow/include/.kube/config" in_cluster = False else: in_cluster = True config_file = None default_args = { "owner": "airflow", } with DAG( dag_id="k8_pod_operator_xcom", default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=["k8"], ) as dag: write_xcom = KubernetesPodOperator( namespace=namespace, in_cluster=in_cluster, config_file=config_file, image="ubuntu", cmds=[ "sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json", ], name="write-xcom", do_xcom_push=True, is_delete_operator_pod=True, task_id="write-xcom", get_logs=True, ) pod_task_xcom_result = BashOperator( bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"", task_id="pod_task_xcom_result", ) write_xcom >> pod_task_xcom_result ``` closes apache/airflow#17186 GitOrigin-RevId: 73d2b720e0c79323a29741882a07eb8962256762
This PR adds jinja template support for KubernetesPodOperator
pod-template-file
.fixes #15892
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.