diff --git a/airflow/providers/google/cloud/operators/gcs.py b/airflow/providers/google/cloud/operators/gcs.py index 77fd6a4f76fac..a39f5fd713be3 100644 --- a/airflow/providers/google/cloud/operators/gcs.py +++ b/airflow/providers/google/cloud/operators/gcs.py @@ -24,6 +24,8 @@ from tempfile import NamedTemporaryFile, TemporaryDirectory from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Union +import pendulum + if TYPE_CHECKING: from airflow.utils.context import Context @@ -740,16 +742,20 @@ def execute(self, context: "Context") -> List[str]: try: timespan_start = context["data_interval_start"] timespan_end = context["data_interval_end"] - except KeyError: # Data interval context variables are only available in Airflow 2.2+ - timespan_start = timezone.coerce_datetime(context["execution_date"]) - timespan_end = timezone.coerce_datetime(context["dag"].following_schedule(timespan_start)) + except KeyError: + timespan_start = pendulum.instance(context["execution_date"]) + following_execution_date = context["dag"].following_schedule(context["execution_date"]) + if following_execution_date is None: + timespan_end = None + else: + timespan_end = pendulum.instance(following_execution_date) if timespan_end is None: # Only possible in Airflow before 2.2. self.log.warning("No following schedule found, setting timespan end to max %s", timespan_end) - timespan_end = timezone.coerce_datetime(DateTime.max) + timespan_end = DateTime.max elif timespan_start >= timespan_end: # Airflow 2.2 sets start == end for non-perodic schedules. self.log.warning("DAG schedule not periodic, setting timespan end to max %s", timespan_end) - timespan_end = timezone.coerce_datetime(DateTime.max) + timespan_end = DateTime.max timespan_start = timespan_start.in_timezone(timezone.utc) timespan_end = timespan_end.in_timezone(timezone.utc)