Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Fix using XCom with
KubernetesPodOperator
(#17760)
This commit effectively revers apache/airflow#15942 because of the issues it is causing. Until we find better solution we should revert this change Error: ``` [2021-07-26 20:23:54,109] {taskinstance.py:1108} INFO - Executing <Task(KubernetesPodOperator): write-xcom> on 2021-07-26T20:23:27.058907+00:00 [2021-07-26 20:23:54,113] {standard_task_runner.py:52} INFO - Started process 11 to run task [2021-07-26 20:23:54,297] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'k8_pod_operator_xcom', 'write-xcom', '2021-07-26T20:23:27.058907+00:00', '--job-id', '1757', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/k8s_xcom_example.py', '--cfg-path', '/tmp/tmp0q94pkhs', '--error-file', '/tmp/tmpoz9qqp2l'] [2021-07-26 20:23:54,298] {standard_task_runner.py:77} INFO - Job 1757: Subtask write-xcom [2021-07-26 20:23:54,511] {logging_mixin.py:104} INFO - Running <TaskInstance: k8_pod_operator_xcom.write-xcom 2021-07-26T20:23:27.058907+00:00 [running]> on host k8podoperatorxcomwritexcom.21384021df914227ad4e4b3a34313710 [2021-07-26 20:23:54,713] {taskinstance.py:1502} ERROR - Task failed with exception Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1158, in _run_raw_task self._prepare_and_execute_task_with_callbacks(context, task) File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1295, in _prepare_and_execute_task_with_callbacks self.render_templates(context=context) File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1796, in render_templates self.task.render_template_fields(context) File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 999, in render_template_fields self._do_render_template_fields(self, self.template_fields, context, jinja_env, set()) File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1012, in _do_render_template_fields rendered_content = self.render_template(content, context, jinja_env, seen_oids) File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in render_template return [self.render_template(element, context, jinja_env) for element in content] File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in <listcomp> return [self.render_template(element, context, jinja_env) for element in content] File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1047, in render_template return jinja_env.get_template(content).render(**context) File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 883, in get_template return self._load_template(name, self.make_globals(globals)) File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 857, in _load_template template = self.loader.load(self, name, globals) File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 115, in load source, filename, uptodate = self.get_source(environment, name) File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 197, in get_source raise TemplateNotFound(template) jinja2.exceptions.TemplateNotFound: mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json [2021-07-26 20:23:54,797] {taskinstance.py:1552} INFO - Marking task as FAILED. dag_id=k8_pod_operator_xcom, task_id=write-xcom, execution_date=20210726T202327, start_date=20210726T202353, end_date=20210726T202354 [2021-07-26 20:23:54,936] {local_task_job.py:153} INFO - Task exited with return code 1 ``` Dag: ``` from airflow import DAG from airflow.operators.bash import BashOperator from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import ( KubernetesPodOperator, ) from airflow.utils.dates import days_ago from airflow.configuration import conf namespace = conf.get("kubernetes", "NAMESPACE") # This will detect the default namespace locally and read the # environment namespace when deployed to Astronomer. if namespace == "default": config_file = "/usr/local/airflow/include/.kube/config" in_cluster = False else: in_cluster = True config_file = None default_args = { "owner": "airflow", } with DAG( dag_id="k8_pod_operator_xcom", default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=["k8"], ) as dag: write_xcom = KubernetesPodOperator( namespace=namespace, in_cluster=in_cluster, config_file=config_file, image="ubuntu", cmds=[ "sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json", ], name="write-xcom", do_xcom_push=True, is_delete_operator_pod=True, task_id="write-xcom", get_logs=True, ) pod_task_xcom_result = BashOperator( bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"", task_id="pod_task_xcom_result", ) write_xcom >> pod_task_xcom_result ``` closes apache/airflow#17186 GitOrigin-RevId: 73d2b720e0c79323a29741882a07eb8962256762
- Loading branch information