diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst index d598742bbef46..838f658caafef 100644 --- a/CONTRIBUTING.rst +++ b/CONTRIBUTING.rst @@ -1376,7 +1376,7 @@ We are using certain prefixes for email subjects for different purposes. Start y Voting is governed by the rules described in `Voting `_ We are all devoting our time for community as individuals who except for being active in Apache Airflow have -families, daily jobs, right for vacation. Sometimes we are in different time zones or simply are +families, daily jobs, right for vacation. Sometimes we are in different timezones or simply are busy with day-to-day duties that our response time might be delayed. For us it's crucial to remember to respect each other in the project with no formal structure. There are no managers, departments, most of us is autonomous in our opinions, decisions. diff --git a/UPDATING.md b/UPDATING.md index 2ed4aace4f6eb..0273d5a26669e 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -184,7 +184,7 @@ Similarly, `DAG.concurrency` has been renamed to `DAG.max_active_tasks`. ```python dag = DAG( dag_id="example_dag", - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, concurrency=3, ) @@ -195,7 +195,7 @@ dag = DAG( ```python dag = DAG( dag_id="example_dag", - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, max_active_tasks=3, ) @@ -3216,7 +3216,7 @@ Type "help", "copyright", "credits" or "license" for more information. >>> from airflow.models.dag import DAG >>> from airflow.operators.dummy import DummyOperator >>> ->>> dag = DAG('simple_dag', start_date=datetime(2017, 9, 1)) +>>> dag = DAG('simple_dag', start_date=pendulum.datetime(2017, 9, 1, tz="UTC")) >>> >>> task = DummyOperator(task_id='task_1', dag=dag) >>> diff --git a/airflow/example_dags/example_bash_operator.py b/airflow/example_dags/example_bash_operator.py index f679f8d87532f..8204592220350 100644 --- a/airflow/example_dags/example_bash_operator.py +++ b/airflow/example_dags/example_bash_operator.py @@ -18,7 +18,9 @@ """Example DAG demonstrating the usage of the BashOperator.""" -from datetime import datetime, timedelta +import datetime + +import pendulum from airflow import DAG from airflow.operators.bash import BashOperator @@ -27,9 +29,9 @@ with DAG( dag_id='example_bash_operator', schedule_interval='0 0 * * *', - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, - dagrun_timeout=timedelta(minutes=60), + dagrun_timeout=datetime.timedelta(minutes=60), tags=['example', 'example2'], params={"example_key": "example_value"}, ) as dag: diff --git a/airflow/example_dags/example_branch_datetime_operator.py b/airflow/example_dags/example_branch_datetime_operator.py index bdc50ca43686b..76b109fb688e7 100644 --- a/airflow/example_dags/example_branch_datetime_operator.py +++ b/airflow/example_dags/example_branch_datetime_operator.py @@ -20,7 +20,7 @@ Example DAG demonstrating the usage of DateTimeBranchOperator with datetime as well as time objects as targets. """ -import datetime +import pendulum from airflow import DAG from airflow.operators.datetime import BranchDateTimeOperator @@ -28,7 +28,7 @@ dag = DAG( dag_id="example_branch_datetime_operator", - start_date=datetime.datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["example"], schedule_interval="@daily", @@ -42,8 +42,8 @@ task_id='datetime_branch', follow_task_ids_if_true=['date_in_range'], follow_task_ids_if_false=['date_outside_range'], - target_upper=datetime.datetime(2020, 10, 10, 15, 0, 0), - target_lower=datetime.datetime(2020, 10, 10, 14, 0, 0), + target_upper=pendulum.datetime(2020, 10, 10, 15, 0, 0), + target_lower=pendulum.datetime(2020, 10, 10, 14, 0, 0), dag=dag, ) @@ -54,7 +54,7 @@ dag = DAG( dag_id="example_branch_datetime_operator_2", - start_date=datetime.datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["example"], schedule_interval="@daily", @@ -67,8 +67,8 @@ task_id='datetime_branch', follow_task_ids_if_true=['date_in_range'], follow_task_ids_if_false=['date_outside_range'], - target_upper=datetime.time(0, 0, 0), - target_lower=datetime.time(15, 0, 0), + target_upper=pendulum.time(0, 0, 0), + target_lower=pendulum.time(15, 0, 0), dag=dag, ) diff --git a/airflow/example_dags/example_branch_day_of_week_operator.py b/airflow/example_dags/example_branch_day_of_week_operator.py index 6d1a33117cfb5..dae303a9035fb 100644 --- a/airflow/example_dags/example_branch_day_of_week_operator.py +++ b/airflow/example_dags/example_branch_day_of_week_operator.py @@ -19,7 +19,7 @@ """ Example DAG demonstrating the usage of BranchDayOfWeekOperator. """ -from datetime import datetime +import pendulum from airflow import DAG from airflow.operators.dummy import DummyOperator @@ -27,7 +27,7 @@ with DAG( dag_id="example_weekday_branch_operator", - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["example"], schedule_interval="@daily", diff --git a/airflow/example_dags/example_branch_labels.py b/airflow/example_dags/example_branch_labels.py index bd6ce09819885..2215bcfe19c41 100644 --- a/airflow/example_dags/example_branch_labels.py +++ b/airflow/example_dags/example_branch_labels.py @@ -19,14 +19,17 @@ """ Example DAG demonstrating the usage of labels with different branches. """ -from datetime import datetime +import pendulum from airflow import DAG from airflow.operators.dummy import DummyOperator from airflow.utils.edgemodifier import Label with DAG( - "example_branch_labels", schedule_interval="@daily", start_date=datetime(2021, 1, 1), catchup=False + "example_branch_labels", + schedule_interval="@daily", + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + catchup=False, ) as dag: ingest = DummyOperator(task_id="ingest") analyse = DummyOperator(task_id="analyze") diff --git a/airflow/example_dags/example_branch_operator.py b/airflow/example_dags/example_branch_operator.py index 69f939e9df20e..eaa1532eeef81 100644 --- a/airflow/example_dags/example_branch_operator.py +++ b/airflow/example_dags/example_branch_operator.py @@ -19,7 +19,8 @@ """Example DAG demonstrating the usage of the BranchPythonOperator.""" import random -from datetime import datetime + +import pendulum from airflow import DAG from airflow.operators.dummy import DummyOperator @@ -29,7 +30,7 @@ with DAG( dag_id='example_branch_operator', - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, schedule_interval="@daily", tags=['example', 'example2'], diff --git a/airflow/example_dags/example_branch_python_dop_operator_3.py b/airflow/example_dags/example_branch_python_dop_operator_3.py index 09d96bea7edb0..d85eda140aedc 100644 --- a/airflow/example_dags/example_branch_python_dop_operator_3.py +++ b/airflow/example_dags/example_branch_python_dop_operator_3.py @@ -20,7 +20,7 @@ Example DAG demonstrating the usage of BranchPythonOperator with depends_on_past=True, where tasks may be run or skipped on alternating runs. """ -from datetime import datetime +import pendulum from airflow import DAG from airflow.operators.dummy import DummyOperator @@ -49,7 +49,7 @@ def should_run(**kwargs): with DAG( dag_id='example_branch_dop_operator_v3', schedule_interval='*/1 * * * *', - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, default_args={'depends_on_past': True}, tags=['example'], diff --git a/airflow/example_dags/example_complex.py b/airflow/example_dags/example_complex.py index a141236cd20b6..22e1906c042dd 100644 --- a/airflow/example_dags/example_complex.py +++ b/airflow/example_dags/example_complex.py @@ -19,7 +19,7 @@ """ Example Airflow DAG that shows the complex DAG structure. """ -from datetime import datetime +import pendulum from airflow import models from airflow.models.baseoperator import chain @@ -28,7 +28,7 @@ with models.DAG( dag_id="example_complex", schedule_interval=None, - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=['example', 'example2', 'example3'], ) as dag: diff --git a/airflow/example_dags/example_dag_decorator.py b/airflow/example_dags/example_dag_decorator.py index 66b0fa4ab0bd5..af1438cddbbee 100644 --- a/airflow/example_dags/example_dag_decorator.py +++ b/airflow/example_dags/example_dag_decorator.py @@ -15,10 +15,10 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -from datetime import datetime from typing import Any, Dict import httpx +import pendulum from airflow.decorators import dag, task from airflow.models.baseoperator import BaseOperator @@ -37,7 +37,12 @@ def execute(self, context): # [START dag_decorator_usage] -@dag(schedule_interval=None, start_date=datetime(2021, 1, 1), catchup=False, tags=['example']) +@dag( + schedule_interval=None, + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + catchup=False, + tags=['example'], +) def example_dag_decorator(email: str = 'example@example.com'): """ DAG to send server IP to email. diff --git a/airflow/example_dags/example_external_task_marker_dag.py b/airflow/example_dags/example_external_task_marker_dag.py index 851a7ad71ca54..eed2f727fc317 100644 --- a/airflow/example_dags/example_external_task_marker_dag.py +++ b/airflow/example_dags/example_external_task_marker_dag.py @@ -37,13 +37,13 @@ exception """ -import datetime +import pendulum from airflow import DAG from airflow.operators.dummy import DummyOperator from airflow.sensors.external_task import ExternalTaskMarker, ExternalTaskSensor -start_date = datetime.datetime(2015, 1, 1) +start_date = pendulum.datetime(2021, 1, 1, tz="UTC") with DAG( dag_id="example_external_task_marker_parent", diff --git a/airflow/example_dags/example_kubernetes_executor.py b/airflow/example_dags/example_kubernetes_executor.py index f984909cfccb1..6318d51af1fe9 100644 --- a/airflow/example_dags/example_kubernetes_executor.py +++ b/airflow/example_dags/example_kubernetes_executor.py @@ -20,7 +20,8 @@ """ import logging import os -from datetime import datetime + +import pendulum from airflow import DAG from airflow.configuration import conf @@ -45,7 +46,7 @@ with DAG( dag_id='example_kubernetes_executor', schedule_interval=None, - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=['example3'], ) as dag: diff --git a/airflow/example_dags/example_latest_only_with_trigger.py b/airflow/example_dags/example_latest_only_with_trigger.py index 76b5f630c7d9d..67f004aef38f7 100644 --- a/airflow/example_dags/example_latest_only_with_trigger.py +++ b/airflow/example_dags/example_latest_only_with_trigger.py @@ -20,7 +20,9 @@ """ # [START example] -import datetime as dt +import datetime + +import pendulum from airflow import DAG from airflow.operators.dummy import DummyOperator @@ -29,8 +31,8 @@ with DAG( dag_id='latest_only_with_trigger', - schedule_interval=dt.timedelta(hours=4), - start_date=dt.datetime(2021, 1, 1), + schedule_interval=datetime.timedelta(hours=4), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=['example3'], ) as dag: diff --git a/airflow/example_dags/example_nested_branch_dag.py b/airflow/example_dags/example_nested_branch_dag.py index add81a9fd692d..27e71054d1176 100644 --- a/airflow/example_dags/example_nested_branch_dag.py +++ b/airflow/example_dags/example_nested_branch_dag.py @@ -21,7 +21,7 @@ ``none_failed_min_one_success`` trigger rule such that they are skipped whenever their corresponding ``BranchPythonOperator`` are skipped. """ -from datetime import datetime +import pendulum from airflow.models import DAG from airflow.operators.dummy import DummyOperator @@ -30,7 +30,7 @@ with DAG( dag_id="example_nested_branch_dag", - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, schedule_interval="@daily", tags=["example"], diff --git a/airflow/example_dags/example_passing_params_via_test_command.py b/airflow/example_dags/example_passing_params_via_test_command.py index d4781afab8d98..f97f941db0d3b 100644 --- a/airflow/example_dags/example_passing_params_via_test_command.py +++ b/airflow/example_dags/example_passing_params_via_test_command.py @@ -18,10 +18,12 @@ """Example DAG demonstrating the usage of the params arguments in templated arguments.""" +import datetime import os -from datetime import datetime, timedelta from textwrap import dedent +import pendulum + from airflow import DAG from airflow.decorators import task from airflow.operators.bash import BashOperator @@ -61,9 +63,9 @@ def print_env_vars(test_mode=None): with DAG( "example_passing_params_via_test_command", schedule_interval='*/1 * * * *', - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, - dagrun_timeout=timedelta(minutes=4), + dagrun_timeout=datetime.timedelta(minutes=4), tags=['example'], ) as dag: run_this = my_py_command(params={"miff": "agg"}) diff --git a/airflow/example_dags/example_python_operator.py b/airflow/example_dags/example_python_operator.py index d533d84506af1..0f9a7fc476acb 100644 --- a/airflow/example_dags/example_python_operator.py +++ b/airflow/example_dags/example_python_operator.py @@ -23,9 +23,10 @@ import logging import shutil import time -from datetime import datetime from pprint import pprint +import pendulum + from airflow import DAG from airflow.decorators import task @@ -34,7 +35,7 @@ with DAG( dag_id='example_python_operator', schedule_interval=None, - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=['example'], ) as dag: diff --git a/airflow/example_dags/example_short_circuit_operator.py b/airflow/example_dags/example_short_circuit_operator.py index d349685eaea99..4c1187aee3465 100644 --- a/airflow/example_dags/example_short_circuit_operator.py +++ b/airflow/example_dags/example_short_circuit_operator.py @@ -17,7 +17,7 @@ # under the License. """Example DAG demonstrating the usage of the ShortCircuitOperator.""" -from datetime import datetime +import pendulum from airflow import DAG from airflow.models.baseoperator import chain @@ -26,7 +26,7 @@ with DAG( dag_id='example_short_circuit_operator', - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=['example'], ) as dag: diff --git a/airflow/example_dags/example_skip_dag.py b/airflow/example_dags/example_skip_dag.py index cb664e7e1f195..0e67ed1dc91f7 100644 --- a/airflow/example_dags/example_skip_dag.py +++ b/airflow/example_dags/example_skip_dag.py @@ -18,7 +18,7 @@ """Example DAG demonstrating the DummyOperator and a custom DummySkipOperator which skips by default.""" -from datetime import datetime +import pendulum from airflow import DAG from airflow.exceptions import AirflowSkipException @@ -54,6 +54,11 @@ def create_test_pipeline(suffix, trigger_rule): join >> final -with DAG(dag_id='example_skip_dag', start_date=datetime(2021, 1, 1), catchup=False, tags=['example']) as dag: +with DAG( + dag_id='example_skip_dag', + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + catchup=False, + tags=['example'], +) as dag: create_test_pipeline('1', TriggerRule.ALL_SUCCESS) create_test_pipeline('2', TriggerRule.ONE_SUCCESS) diff --git a/airflow/example_dags/example_sla_dag.py b/airflow/example_dags/example_sla_dag.py index 7a46bc4ec118e..0db6bc1ba7fcc 100644 --- a/airflow/example_dags/example_sla_dag.py +++ b/airflow/example_dags/example_sla_dag.py @@ -15,8 +15,10 @@ # specific language governing permissions and limitations # under the License. +import datetime import time -from datetime import datetime, timedelta + +import pendulum from airflow.decorators import dag, task @@ -39,13 +41,13 @@ def sla_callback(dag, task_list, blocking_task_list, slas, blocking_tis): @dag( schedule_interval="*/2 * * * *", - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, sla_miss_callback=sla_callback, default_args={'email': "email@example.com"}, ) def example_sla_dag(): - @task(sla=timedelta(seconds=10)) + @task(sla=datetime.timedelta(seconds=10)) def sleep_20(): """Sleep for 20 seconds""" time.sleep(20) diff --git a/airflow/example_dags/example_task_group.py b/airflow/example_dags/example_task_group.py index d81bf007bab58..46f709eaf873b 100644 --- a/airflow/example_dags/example_task_group.py +++ b/airflow/example_dags/example_task_group.py @@ -17,7 +17,7 @@ # under the License. """Example DAG demonstrating the usage of the TaskGroup.""" -from datetime import datetime +import pendulum from airflow.models.dag import DAG from airflow.operators.bash import BashOperator @@ -26,7 +26,10 @@ # [START howto_task_group] with DAG( - dag_id="example_task_group", start_date=datetime(2021, 1, 1), catchup=False, tags=["example"] + dag_id="example_task_group", + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + catchup=False, + tags=["example"], ) as dag: start = DummyOperator(task_id="start") diff --git a/airflow/example_dags/example_task_group_decorator.py b/airflow/example_dags/example_task_group_decorator.py index 0e53a98ea4376..30f9d6f1ab2ca 100644 --- a/airflow/example_dags/example_task_group_decorator.py +++ b/airflow/example_dags/example_task_group_decorator.py @@ -18,7 +18,7 @@ """Example DAG demonstrating the usage of the @taskgroup decorator.""" -from datetime import datetime +import pendulum from airflow.decorators import task, task_group from airflow.models.dag import DAG @@ -65,7 +65,10 @@ def task_group_function(value): # Executing Tasks and TaskGroups with DAG( - dag_id="example_task_group_decorator", start_date=datetime(2021, 1, 1), catchup=False, tags=["example"] + dag_id="example_task_group_decorator", + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + catchup=False, + tags=["example"], ) as dag: start_task = task_start() end_task = task_end() diff --git a/airflow/example_dags/example_time_delta_sensor_async.py b/airflow/example_dags/example_time_delta_sensor_async.py index ce8cab005e64a..1a7126a22627a 100644 --- a/airflow/example_dags/example_time_delta_sensor_async.py +++ b/airflow/example_dags/example_time_delta_sensor_async.py @@ -21,7 +21,9 @@ defers and doesn't occupy a worker slot while it waits """ -from datetime import datetime, timedelta +import datetime + +import pendulum from airflow import DAG from airflow.operators.dummy import DummyOperator @@ -30,10 +32,10 @@ with DAG( dag_id="example_time_delta_sensor_async", schedule_interval=None, - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["example"], ) as dag: - wait = TimeDeltaSensorAsync(task_id="wait", delta=timedelta(seconds=10)) + wait = TimeDeltaSensorAsync(task_id="wait", delta=datetime.timedelta(seconds=10)) finish = DummyOperator(task_id="finish") wait >> finish diff --git a/airflow/example_dags/example_trigger_controller_dag.py b/airflow/example_dags/example_trigger_controller_dag.py index 27df3d2651007..a017c9a5b4176 100644 --- a/airflow/example_dags/example_trigger_controller_dag.py +++ b/airflow/example_dags/example_trigger_controller_dag.py @@ -21,14 +21,14 @@ 1. 1st DAG (example_trigger_controller_dag) holds a TriggerDagRunOperator, which will trigger the 2nd DAG 2. 2nd DAG (example_trigger_target_dag) which will be triggered by the TriggerDagRunOperator in the 1st DAG """ -from datetime import datetime +import pendulum from airflow import DAG from airflow.operators.trigger_dagrun import TriggerDagRunOperator with DAG( dag_id="example_trigger_controller_dag", - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, schedule_interval="@once", tags=['example'], diff --git a/airflow/example_dags/example_trigger_target_dag.py b/airflow/example_dags/example_trigger_target_dag.py index 41aecf1a1b613..64ccb59e0348d 100644 --- a/airflow/example_dags/example_trigger_target_dag.py +++ b/airflow/example_dags/example_trigger_target_dag.py @@ -21,7 +21,7 @@ 1. 1st DAG (example_trigger_controller_dag) holds a TriggerDagRunOperator, which will trigger the 2nd DAG 2. 2nd DAG (example_trigger_target_dag) which will be triggered by the TriggerDagRunOperator in the 1st DAG """ -from datetime import datetime +import pendulum from airflow import DAG from airflow.decorators import task @@ -41,7 +41,7 @@ def run_this_func(dag_run=None): with DAG( dag_id="example_trigger_target_dag", - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, schedule_interval=None, tags=['example'], diff --git a/airflow/example_dags/example_xcom.py b/airflow/example_dags/example_xcom.py index 405d5c527d1e4..b55d4e5d667cd 100644 --- a/airflow/example_dags/example_xcom.py +++ b/airflow/example_dags/example_xcom.py @@ -17,7 +17,7 @@ # under the License. """Example DAG demonstrating the usage of XComs.""" -from datetime import datetime +import pendulum from airflow import DAG from airflow.decorators import task @@ -64,7 +64,7 @@ def pull_value_from_bash_push(ti=None): with DAG( 'example_xcom', schedule_interval="@once", - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=['example'], ) as dag: diff --git a/airflow/example_dags/example_xcomargs.py b/airflow/example_dags/example_xcomargs.py index 7e0cdd901cedb..00af4725c240f 100644 --- a/airflow/example_dags/example_xcomargs.py +++ b/airflow/example_dags/example_xcomargs.py @@ -18,7 +18,8 @@ """Example DAG demonstrating the usage of the XComArgs.""" import logging -from datetime import datetime + +import pendulum from airflow import DAG from airflow.decorators import task @@ -41,7 +42,7 @@ def print_value(value, ts=None): with DAG( dag_id='example_xcom_args', - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, schedule_interval=None, tags=['example'], @@ -50,7 +51,7 @@ def print_value(value, ts=None): with DAG( "example_xcom_args_with_operators", - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, schedule_interval=None, tags=['example'], diff --git a/airflow/example_dags/subdags/subdag.py b/airflow/example_dags/subdags/subdag.py index d337a03679c48..7c913099b3107 100644 --- a/airflow/example_dags/subdags/subdag.py +++ b/airflow/example_dags/subdags/subdag.py @@ -19,7 +19,7 @@ """Helper function to generate a DAG and operators given some arguments.""" # [START subdag] -from datetime import datetime +import pendulum from airflow import DAG from airflow.operators.dummy import DummyOperator @@ -38,7 +38,7 @@ def subdag(parent_dag_name, child_dag_name, args): dag_subdag = DAG( dag_id=f'{parent_dag_name}.{child_dag_name}', default_args=args, - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, schedule_interval="@daily", ) diff --git a/airflow/example_dags/tutorial_etl_dag.py b/airflow/example_dags/tutorial_etl_dag.py index dd18449786973..d039a73488c18 100644 --- a/airflow/example_dags/tutorial_etl_dag.py +++ b/airflow/example_dags/tutorial_etl_dag.py @@ -24,9 +24,10 @@ # [START tutorial] # [START import_module] import json -from datetime import datetime from textwrap import dedent +import pendulum + # The DAG object; we'll need this to instantiate a DAG from airflow import DAG @@ -45,7 +46,7 @@ # [END default_args] description='ETL DAG tutorial', schedule_interval=None, - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=['example'], ) as dag: diff --git a/airflow/example_dags/tutorial_taskflow_api_etl.py b/airflow/example_dags/tutorial_taskflow_api_etl.py index 3b0ba51a28c87..f6af78f0a5a2c 100644 --- a/airflow/example_dags/tutorial_taskflow_api_etl.py +++ b/airflow/example_dags/tutorial_taskflow_api_etl.py @@ -20,7 +20,8 @@ # [START tutorial] # [START import_module] import json -from datetime import datetime + +import pendulum from airflow.decorators import dag, task @@ -28,7 +29,12 @@ # [START instantiate_dag] -@dag(schedule_interval=None, start_date=datetime(2021, 1, 1), catchup=False, tags=['example']) +@dag( + schedule_interval=None, + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + catchup=False, + tags=['example'], +) def tutorial_taskflow_api_etl(): """ ### TaskFlow API Tutorial Documentation diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 477e597b49129..150220cbc4142 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -187,6 +187,9 @@ class DAG(LoggingMixin): DAGs essentially act as namespaces for tasks. A task_id can only be added once to a DAG. + Note that if you plan to use time zones all the dates provided should be pendulum + dates. See :ref:`timezone_aware_dags`. + :param dag_id: The id of the DAG; must consist exclusively of alphanumeric characters, dashes, dots and underscores (all ASCII) :type dag_id: str diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index bc8361dcb7ca8..ebe20b03b03be 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -42,6 +42,7 @@ from airflow.settings import json from airflow.timetables.base import Timetable from airflow.utils.code_utils import get_python_source +from airflow.utils.docs import get_docs_url from airflow.utils.module_loading import as_importable_string, import_string from airflow.utils.task_group import TaskGroup @@ -113,7 +114,10 @@ def encode_timezone(var: Timezone) -> Union[str, int]: return var.offset if isinstance(var, Timezone): return var.name - raise ValueError(f"DAG timezone should be a pendulum.tz.Timezone, not {var!r}") + raise ValueError( + f"DAG timezone should be a pendulum.tz.Timezone, not {var!r}. " + f"See {get_docs_url('timezone.html#time-zone-aware-dags')}" + ) def decode_timezone(var: Union[str, int]) -> Timezone: diff --git a/docs/apache-airflow/best-practices.rst b/docs/apache-airflow/best-practices.rst index 951e6b4290779..3b01b3ece98a0 100644 --- a/docs/apache-airflow/best-practices.rst +++ b/docs/apache-airflow/best-practices.rst @@ -121,7 +121,7 @@ Bad example: .. code-block:: python - from datetime import datetime + import pendulum from airflow import DAG from airflow.operators.python import PythonOperator @@ -131,7 +131,7 @@ Bad example: with DAG( dag_id="example_python_operator", schedule_interval=None, - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["example"], ) as dag: @@ -151,7 +151,7 @@ Good example: .. code-block:: python - from datetime import datetime + import pendulum from airflow import DAG from airflow.operators.python import PythonOperator @@ -159,7 +159,7 @@ Good example: with DAG( dag_id="example_python_operator", schedule_interval=None, - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["example"], ) as dag: @@ -237,12 +237,13 @@ Then you can import and use the ``ALL_TASKS`` constant in all your DAGs like tha .. code-block:: python + import pendulum from my_company_utils.common import ALL_TASKS with DAG( dag_id="my_dag", schedule_interval=None, - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, ) as dag: for task in ALL_TASKS: @@ -486,13 +487,14 @@ This is an example test want to verify the structure of a code-generated DAG aga .. code-block:: python import datetime + import pendulum import pytest from airflow.utils.state import DagRunState from airflow.utils.types import DagRunType - DATA_INTERVAL_START = datetime.datetime(2021, 9, 13) + DATA_INTERVAL_START = pendulum.datetime(2021, 9, 13, tz="UTC") DATA_INTERVAL_END = DATA_INTERVAL_START + datetime.timedelta(days=1) TEST_DAG_ID = "my_custom_operator_dag" diff --git a/docs/apache-airflow/concepts/dags.rst b/docs/apache-airflow/concepts/dags.rst index e339abeda65bc..32d21cea19788 100644 --- a/docs/apache-airflow/concepts/dags.rst +++ b/docs/apache-airflow/concepts/dags.rst @@ -38,19 +38,22 @@ There are three ways to declare a DAG - either you can use a context manager, which will add the DAG to anything inside it implicitly:: with DAG( - "my_dag_name", start_date=datetime(2021, 1, 1), schedule_interval="@daily", catchup=False + "my_dag_name", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + schedule_interval="@daily", catchup=False ) as dag: op = DummyOperator(task_id="task") Or, you can use a standard constructor, passing the dag into any operators you use:: - my_dag = DAG("my_dag_name", start_date=datetime(2021, 1, 1), schedule_interval="@daily", catchup=False) + my_dag = DAG("my_dag_name", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + schedule_interval="@daily", catchup=False) op = DummyOperator(task_id="task", dag=my_dag) Or, you can use the ``@dag`` decorator to :ref:`turn a function into a DAG generator `:: - @dag(start_date=datetime(2021, 1, 1), schedule_interval="@daily", catchup=False) + @dag(start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + schedule_interval="@daily", catchup=False) def generate_dag(): op = DummyOperator(task_id="task") @@ -214,10 +217,11 @@ Default Arguments Often, many Operators inside a DAG need the same set of default arguments (such as their ``retries``). Rather than having to specify this individually for every Operator, you can instead pass ``default_args`` to the DAG when you create it, and it will auto-apply them to any operator tied to it:: + import pendulum with DAG( dag_id='my_dag', - start_date=datetime(2016, 1, 1), + start_date=pendulum.datetime(2016, 1, 1, tz="UTC"), schedule_interval='@daily', catchup=False, default_args={'retries': 2}, @@ -390,7 +394,7 @@ You can also combine this with the :ref:`concepts:depends-on-past` functionality .. code-block:: python # dags/branch_without_trigger.py - import datetime as dt + import pendulum from airflow.models import DAG from airflow.operators.dummy import DummyOperator @@ -399,7 +403,7 @@ You can also combine this with the :ref:`concepts:depends-on-past` functionality dag = DAG( dag_id="branch_without_trigger", schedule_interval="@once", - start_date=dt.datetime(2019, 2, 28), + start_date=pendulum.datetime(2019, 2, 28, tz="UTC"), ) run_this_first = DummyOperator(task_id="run_this_first", dag=dag) @@ -483,9 +487,11 @@ Dependency relationships can be applied across all tasks in a TaskGroup with the TaskGroup also supports ``default_args`` like DAG, it will overwrite the ``default_args`` in DAG level:: + import pendulum + with DAG( dag_id='dag1', - start_date=datetime(2016, 1, 1), + start_date=pendulum.datetime(2016, 1, 1, tz="UTC"), schedule_interval="@daily", catchup=False, default_args={'retries': 1}, @@ -563,9 +569,13 @@ This is especially useful if your tasks are built dynamically from configuration """ ### My great DAG """ + import pendulum dag = DAG( - "my_dag", start_date=datetime(2021, 1, 1), schedule_interval="@daily", catchup=False + "my_dag", + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + schedule_interval="@daily", + catchup=False, ) dag.doc_md = __doc__ diff --git a/docs/apache-airflow/concepts/operators.rst b/docs/apache-airflow/concepts/operators.rst index 13020f1be4ca1..30ec5ceff8b3f 100644 --- a/docs/apache-airflow/concepts/operators.rst +++ b/docs/apache-airflow/concepts/operators.rst @@ -175,7 +175,7 @@ you can pass ``render_template_as_native_obj=True`` to the DAG as follows: dag = DAG( dag_id="example_template_as_python_object", schedule_interval=None, - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, render_template_as_native_obj=True, ) diff --git a/docs/apache-airflow/dag-run.rst b/docs/apache-airflow/dag-run.rst index 62555b10ed3ab..1a2bbe3d6ecc4 100644 --- a/docs/apache-airflow/dag-run.rst +++ b/docs/apache-airflow/dag-run.rst @@ -113,17 +113,18 @@ in the configuration file. When turned off, the scheduler creates a DAG run only """ from airflow.models.dag import DAG from airflow.operators.bash import BashOperator - from datetime import datetime, timedelta + import datetime + import pendulum dag = DAG( "tutorial", default_args={ "depends_on_past": True, "retries": 1, - "retry_delay": timedelta(minutes=3), + "retry_delay": datetime.timedelta(minutes=3), }, - start_date=datetime(2015, 12, 1), + start_date=pendulum.datetime(2015, 12, 1, tz="UTC"), description="A simple tutorial DAG", schedule_interval="@daily", catchup=False, @@ -225,7 +226,7 @@ Example of a parameterized DAG: .. code-block:: python - from datetime import datetime + import pendulum from airflow import DAG from airflow.operators.bash import BashOperator @@ -233,7 +234,7 @@ Example of a parameterized DAG: dag = DAG( "example_parameterized_dag", schedule_interval=None, - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, ) diff --git a/docs/apache-airflow/executor/kubernetes.rst b/docs/apache-airflow/executor/kubernetes.rst index 341f9fea0c03c..496e3e7eadb2d 100644 --- a/docs/apache-airflow/executor/kubernetes.rst +++ b/docs/apache-airflow/executor/kubernetes.rst @@ -154,7 +154,8 @@ Here is an example of a task with both features: .. code-block:: python import os - from datetime import datetime + + import pendulum from airflow import DAG from airflow.decorators import task @@ -166,7 +167,7 @@ Here is an example of a task with both features: with DAG( dag_id="example_pod_template_file", schedule_interval=None, - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["example3"], ) as dag: diff --git a/docs/apache-airflow/faq.rst b/docs/apache-airflow/faq.rst index 7f72a0d162f5f..6f2e778f2bfb4 100644 --- a/docs/apache-airflow/faq.rst +++ b/docs/apache-airflow/faq.rst @@ -149,7 +149,8 @@ until ``min_file_process_interval`` is reached since DAG Parser will look for mo from airflow import DAG from airflow.operators.python_operator import PythonOperator - from datetime import datetime + + import pendulum def create_dag(dag_id, schedule, dag_number, default_args): @@ -157,7 +158,12 @@ until ``min_file_process_interval`` is reached since DAG Parser will look for mo print("Hello World") print("This is DAG: {}".format(str(dag_number))) - dag = DAG(dag_id, schedule_interval=schedule, default_args=default_args) + dag = DAG( + dag_id, + schedule_interval=schedule, + default_args=default_args, + pendulum.datetime(2021, 9, 13, tz="UTC"), + ) with dag: t1 = PythonOperator(task_id="hello_world", python_callable=hello_world_py) @@ -213,6 +219,14 @@ backfill CLI command, gets overridden by the backfill's ``start_date`` commands. This allows for a backfill on tasks that have ``depends_on_past=True`` to actually start. If this were not the case, the backfill just would not start. +Using time zones +---------------- + +Creating a time zone aware datetime (e.g. DAG's ``start_date``) is quite simple. Just make sure to supply +a time zone aware dates using ``pendulum``. Don't try to use standard library +`timezone `_ as they are known to +have limitations and we deliberately disallow using them in DAGs. + .. _faq:what-does-execution-date-mean: @@ -360,12 +374,12 @@ upstream task. .. code-block:: python + import pendulum + from airflow.decorators import dag, task from airflow.exceptions import AirflowException from airflow.utils.trigger_rule import TriggerRule - from datetime import datetime - @task def a_func(): @@ -379,7 +393,7 @@ upstream task. pass - @dag(schedule_interval="@once", start_date=datetime(2021, 1, 1)) + @dag(schedule_interval="@once", start_date=pendulum.datetime(2021, 1, 1, tz="UTC")) def my_dag(): a = a_func() b = b_func() diff --git a/docs/apache-airflow/howto/timetable.rst b/docs/apache-airflow/howto/timetable.rst index 1fd71028071cf..ed902fcffed19 100644 --- a/docs/apache-airflow/howto/timetable.rst +++ b/docs/apache-airflow/howto/timetable.rst @@ -69,7 +69,7 @@ file: .. code-block:: python - import datetime + import pendulum from airflow import DAG from airflow.example_dags.plugins.workday import AfterWorkdayTimetable @@ -77,7 +77,7 @@ file: with DAG( dag_id="example_after_workday_timetable_dag", - start_date=datetime.datetime(2021, 3, 10), + start_date=pendulum.datetime(2021, 3, 10, tz="UTC"), timetable=AfterWorkdayTimetable(), tags=["example", "timetable"], ) as dag: @@ -190,7 +190,7 @@ For reference, here's our plugin and DAG files in their entirety: .. code-block:: python - import datetime + import pendulum from airflow import DAG from airflow.example_dags.plugins.workday import AfterWorkdayTimetable @@ -199,7 +199,7 @@ For reference, here's our plugin and DAG files in their entirety: with DAG( dag_id="example_workday_timetable", - start_date=datetime.datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), timetable=AfterWorkdayTimetable(), tags=["example", "timetable"], ) as dag: diff --git a/docs/apache-airflow/lineage.rst b/docs/apache-airflow/lineage.rst index 9b8bb71a16385..3a4db94dc5615 100644 --- a/docs/apache-airflow/lineage.rst +++ b/docs/apache-airflow/lineage.rst @@ -30,7 +30,8 @@ works. .. code-block:: python - from datetime import datetime, timedelta + import datetime + import pendulum from airflow.lineage import AUTO from airflow.lineage.entities import File @@ -42,10 +43,10 @@ works. dag = DAG( dag_id="example_lineage", - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), schedule_interval="0 0 * * *", catchup=False, - dagrun_timeout=timedelta(minutes=60), + dagrun_timeout=datetime.timedelta(minutes=60), ) f_final = File(url="/tmp/final") diff --git a/docs/apache-airflow/logging-monitoring/callbacks.rst b/docs/apache-airflow/logging-monitoring/callbacks.rst index 77ac594aa37b4..15bbacbabe849 100644 --- a/docs/apache-airflow/logging-monitoring/callbacks.rst +++ b/docs/apache-airflow/logging-monitoring/callbacks.rst @@ -51,7 +51,9 @@ In the following example, failures in any task call the ``task_failure_alert`` f .. code-block:: python - from datetime import datetime, timedelta + import datetime + import pendulum + from airflow import DAG from airflow.operators.dummy import DummyOperator @@ -67,8 +69,8 @@ In the following example, failures in any task call the ``task_failure_alert`` f with DAG( dag_id="example_callback", schedule_interval=None, - start_date=datetime(2021, 1, 1), - dagrun_timeout=timedelta(minutes=60), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + dagrun_timeout=datetime.timedelta(minutes=60), catchup=False, on_success_callback=None, on_failure_callback=task_failure_alert, diff --git a/docs/apache-airflow/timezone.rst b/docs/apache-airflow/timezone.rst index 32e52239f453b..bcbbd11293000 100644 --- a/docs/apache-airflow/timezone.rst +++ b/docs/apache-airflow/timezone.rst @@ -40,6 +40,7 @@ The time zone is set in ``airflow.cfg``. By default it is set to utc, but you ch an arbitrary IANA time zone, e.g. ``Europe/Amsterdam``. It is dependent on ``pendulum``, which is more accurate than ``pytz``. Pendulum is installed when you install Airflow. + Web UI ------ @@ -90,7 +91,11 @@ words if you have a default time zone setting of ``Europe/Amsterdam`` and create .. code-block:: python - dag = DAG("my_dag", start_date=datetime(2017, 1, 1), default_args={"retries": 3}) + dag = DAG( + "my_dag", + start_date=pendulum.datetime(2017, 1, 1, tz="UTC"), + default_args={"retries": 3}, + ) op = BashOperator(task_id="dummy", bash_command="Hello World!", dag=dag) print(op.retries) # 3 @@ -120,19 +125,21 @@ it is therefore important to make sure this setting is equal on all Airflow node .. note:: For more information on setting the configuration, see :doc:`howto/set-config` +.. _timezone_aware_dags: + Time zone aware DAGs -------------------- Creating a time zone aware DAG is quite simple. Just make sure to supply a time zone aware ``start_date`` -using ``pendulum``. +using ``pendulum``. Don't try to use standard library +`timezone `_ as they are known to +have limitations and we deliberately disallow using them in DAGs. .. code-block:: python import pendulum - local_tz = pendulum.timezone("Europe/Amsterdam") - - dag = DAG("my_tz_dag", start_date=datetime(2016, 1, 1, tzinfo=local_tz)) + dag = DAG("my_tz_dag", start_date=pendulum.datetime(2016, 1, 1, tz="Europe/Amsterdam")) op = DummyOperator(task_id="dummy", dag=dag) print(dag.timezone) # @@ -170,6 +177,6 @@ Time deltas Time zone aware DAGs that use ``timedelta`` or ``relativedelta`` schedules respect daylight savings time for the start date but do not adjust for daylight savings time when scheduling subsequent runs. For example, a -DAG with a start date of ``pendulum.datetime(2020, 1, 1, tz="US/Eastern")`` +DAG with a start date of ``pendulum.datetime(2020, 1, 1, tz="UTC")`` and a schedule interval of ``timedelta(days=1)`` will run daily at 05:00 UTC regardless of daylight savings time. diff --git a/docs/apache-airflow/tutorial.rst b/docs/apache-airflow/tutorial.rst index a8f76bc8689dd..085be42b7bd01 100644 --- a/docs/apache-airflow/tutorial.rst +++ b/docs/apache-airflow/tutorial.rst @@ -230,6 +230,14 @@ Note that when executing your script, Airflow will raise exceptions when it finds cycles in your DAG or when a dependency is referenced more than once. +Using time zones +---------------- + +Creating a time zone aware DAG is quite simple. Just make sure to supply a time zone aware dates +using ``pendulum``. Don't try to use standard library +`timezone `_ as they are known to +have limitations and we deliberately disallow using them in DAGs. + Recap ----- Alright, so we have a pretty basic DAG. At this point your code should look @@ -474,7 +482,8 @@ Lets look at our DAG: .. code-block:: python - from datetime import datetime, timedelta + import datetime + import pendulum import requests from airflow.decorators import dag, task @@ -483,9 +492,9 @@ Lets look at our DAG: @dag( schedule_interval="0 0 * * *", - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, - dagrun_timeout=timedelta(minutes=60), + dagrun_timeout=datetime.timedelta(minutes=60), ) def Etl(): @task diff --git a/docs/docker-stack/docker-examples/extending/embedding-dags/test_dag.py b/docs/docker-stack/docker-examples/extending/embedding-dags/test_dag.py index 25ceeba6d3e8e..a12f2f65d34ff 100644 --- a/docs/docker-stack/docker-examples/extending/embedding-dags/test_dag.py +++ b/docs/docker-stack/docker-examples/extending/embedding-dags/test_dag.py @@ -17,13 +17,15 @@ # under the License. # [START dag] """This dag only runs some simple tasks to test Airflow's task execution.""" -from datetime import datetime, timedelta +import datetime + +import pendulum from airflow.models.dag import DAG from airflow.operators.dummy import DummyOperator -now = datetime.now() -now_to_the_hour = (now - timedelta(0, 0, 0, 0, 0, 3)).replace(minute=0, second=0, microsecond=0) +now = pendulum.now(tz="UTC") +now_to_the_hour = (now - datetime.timedelta(0, 0, 0, 0, 0, 3)).replace(minute=0, second=0, microsecond=0) START_DATE = now_to_the_hour DAG_NAME = 'test_dag_v1' @@ -31,7 +33,7 @@ DAG_NAME, schedule_interval='*/10 * * * *', default_args={'depends_on_past': True}, - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, )