Skip to content

Commit

Permalink
Remove coerce_datetime usage from GCSTimeSpanFileTransformOperator (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Łukasz Wyszomirski authored Mar 24, 2022
1 parent 51d61df commit d231e9b
Showing 1 changed file with 11 additions and 5 deletions.
16 changes: 11 additions & 5 deletions airflow/providers/google/cloud/operators/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
from tempfile import NamedTemporaryFile, TemporaryDirectory
from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Union

import pendulum

if TYPE_CHECKING:
from airflow.utils.context import Context

Expand Down Expand Up @@ -740,16 +742,20 @@ def execute(self, context: "Context") -> List[str]:
try:
timespan_start = context["data_interval_start"]
timespan_end = context["data_interval_end"]
except KeyError: # Data interval context variables are only available in Airflow 2.2+
timespan_start = timezone.coerce_datetime(context["execution_date"])
timespan_end = timezone.coerce_datetime(context["dag"].following_schedule(timespan_start))
except KeyError:
timespan_start = pendulum.instance(context["execution_date"])
following_execution_date = context["dag"].following_schedule(context["execution_date"])
if following_execution_date is None:
timespan_end = None
else:
timespan_end = pendulum.instance(following_execution_date)

if timespan_end is None: # Only possible in Airflow before 2.2.
self.log.warning("No following schedule found, setting timespan end to max %s", timespan_end)
timespan_end = timezone.coerce_datetime(DateTime.max)
timespan_end = DateTime.max
elif timespan_start >= timespan_end: # Airflow 2.2 sets start == end for non-perodic schedules.
self.log.warning("DAG schedule not periodic, setting timespan end to max %s", timespan_end)
timespan_end = timezone.coerce_datetime(DateTime.max)
timespan_end = DateTime.max

timespan_start = timespan_start.in_timezone(timezone.utc)
timespan_end = timespan_end.in_timezone(timezone.utc)
Expand Down

0 comments on commit d231e9b

Please sign in to comment.