From 0f5ff62342368f5f4356dc3753b5b9f7e6490f95 Mon Sep 17 00:00:00 2001 From: Ankit Chaurasia <8670962+sunank200@users.noreply.github.com> Date: Mon, 12 Dec 2022 12:17:54 +0530 Subject: [PATCH] Fix output property missing for airflow version < 2.4.0 (#1385) # Description ## What is the current behavior? The output was implemented in 2.4.0 according to the release notes (see [here](https://airflow.apache.org/docs/apache-airflow/stable/release_notes.html#id13)) Add output property to MappedOperator https://github.com/apache/airflow/pull/25604 closes: #1359 ## What is the new behavior? - Catch exception for airflow version < 2.4.0 and use `XComArg(...)` instead. ## Does this introduce a breaking change? No ### Checklist - [x] Created tests which fail without the change (if possible) - [x] Extended the README / documentation, if necessary Co-authored-by: Kaxil Naik Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- python-sdk/src/astro/sql/operators/cleanup.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/python-sdk/src/astro/sql/operators/cleanup.py b/python-sdk/src/astro/sql/operators/cleanup.py index 31538a793..084289496 100644 --- a/python-sdk/src/astro/sql/operators/cleanup.py +++ b/python-sdk/src/astro/sql/operators/cleanup.py @@ -232,7 +232,14 @@ def resolve_tables_from_tasks( # noqa: C901 and isinstance(task, MappedOperator) and issubclass(task.operator_class, OPERATOR_CLASSES_WITH_TABLE_OUTPUT) ): - for t in task.output.resolve(context): + from airflow.models.xcom_arg import XComArg + + try: + task_output = task.output + except AttributeError: + task_output = XComArg(operator=task) + + for t in task_output.resolve(context): if isinstance(t, BaseTable): res.append(t) except AirflowException: # pragma: no cover