Skip to content

Commit

Permalink
move triggers to standard provider
Browse files Browse the repository at this point in the history
  • Loading branch information
gopidesupavan committed Nov 2, 2024
1 parent c1bd9c5 commit fb25d92
Show file tree
Hide file tree
Showing 28 changed files with 105 additions and 60 deletions.
4 changes: 2 additions & 2 deletions .github/boring-cyborg.yml
Original file line number Diff line number Diff line change
Expand Up @@ -647,12 +647,12 @@ labelPRBasedOnFilePath:
- airflow/cli/commands/triggerer_command.py
- airflow/jobs/triggerer_job_runner.py
- airflow/models/trigger.py
- airflow/triggers/**/*
- providers/src/airflow/providers/standard/triggers/*
- tests/cli/commands/test_triggerer_command.py
- tests/jobs/test_triggerer_job.py
- tests/models/test_trigger.py
- tests/jobs/test_triggerer_job_logging.py
- tests/triggers/**/*
- providers/tests/standard/triggers/*

area:Serialization:
- airflow/serialization/**/*
Expand Down
2 changes: 1 addition & 1 deletion airflow/operators/trigger_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
from airflow.models.dagbag import DagBag
from airflow.models.dagrun import DagRun
from airflow.models.xcom import XCom
from airflow.triggers.external_task import DagStateTrigger
from airflow.providers.standard.triggers.external_task import DagStateTrigger
from airflow.utils import timezone
from airflow.utils.helpers import build_airflow_url_with_query
from airflow.utils.session import provide_session
Expand Down
2 changes: 1 addition & 1 deletion airflow/sensors/external_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
from airflow.models.dagbag import DagBag
from airflow.models.taskinstance import TaskInstance
from airflow.operators.empty import EmptyOperator
from airflow.providers.standard.triggers.external_task import WorkflowTrigger
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.external_task import WorkflowTrigger
from airflow.utils.file import correct_maybe_zipped
from airflow.utils.helpers import build_airflow_url_with_query
from airflow.utils.sensor_helper import _get_count, _get_external_task_group_task_ids
Expand Down
4 changes: 2 additions & 2 deletions airflow/sensors/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.providers.standard.hooks.filesystem import FSHook
from airflow.providers.standard.triggers.file import FileTrigger
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.base import StartTriggerArgs
from airflow.triggers.file import FileTrigger

if TYPE_CHECKING:
from airflow.utils.context import Context
Expand Down Expand Up @@ -63,7 +63,7 @@ class FileSensor(BaseSensorOperator):
template_fields: Sequence[str] = ("filepath",)
ui_color = "#91818a"
start_trigger_args = StartTriggerArgs(
trigger_cls="airflow.triggers.file.FileTrigger",
trigger_cls="airflow.providers.standard.triggers.file.FileTrigger",
trigger_kwargs={},
next_method="execute_complete",
next_kwargs=None,
Expand Down
12 changes: 6 additions & 6 deletions docs/apache-airflow/authoring-and-scheduling/deferring.rst
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ When writing a deferrable operators these are the main points to consider:
from airflow.configuration import conf
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.temporal import TimeDeltaTrigger
from airflow.providers.standard.triggers.temporal import TimeDeltaTrigger
from airflow.utils.context import Context
Expand Down Expand Up @@ -122,7 +122,7 @@ This example shows the structure of a basic trigger, a very simplified version o
self.moment = moment
def serialize(self):
return ("airflow.triggers.temporal.DateTimeTrigger", {"moment": self.moment})
return ("airflow.providers.standard.triggers.temporal.DateTimeTrigger", {"moment": self.moment})
async def run(self):
while self.moment > timezone.utcnow():
Expand Down Expand Up @@ -177,7 +177,7 @@ Here's a basic example of how a sensor might trigger deferral:
from typing import TYPE_CHECKING, Any
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.temporal import TimeDeltaTrigger
from airflow.providers.standard.triggers.temporal import TimeDeltaTrigger
if TYPE_CHECKING:
from airflow.utils.context import Context
Expand Down Expand Up @@ -237,7 +237,7 @@ In the sensor part, we'll need to provide the path to ``TimeDeltaTrigger`` as ``
class WaitOneHourSensor(BaseSensorOperator):
start_trigger_args = StartTriggerArgs(
trigger_cls="airflow.triggers.temporal.TimeDeltaTrigger",
trigger_cls="airflow.providers.standard.triggers.temporal.TimeDeltaTrigger",
trigger_kwargs={"moment": timedelta(hours=1)},
next_method="execute_complete",
next_kwargs=None,
Expand Down Expand Up @@ -268,7 +268,7 @@ In the sensor part, we'll need to provide the path to ``TimeDeltaTrigger`` as ``
class WaitHoursSensor(BaseSensorOperator):
start_trigger_args = StartTriggerArgs(
trigger_cls="airflow.triggers.temporal.TimeDeltaTrigger",
trigger_cls="airflow.providers.standard.triggers.temporal.TimeDeltaTrigger",
trigger_kwargs={"moment": timedelta(hours=1)},
next_method="execute_complete",
next_kwargs=None,
Expand Down Expand Up @@ -307,7 +307,7 @@ After the trigger has finished executing, the task may be sent back to the worke
class WaitHoursSensor(BaseSensorOperator):
start_trigger_args = StartTriggerArgs(
trigger_cls="airflow.triggers.temporal.TimeDeltaTrigger",
trigger_cls="airflow.providers.standard.triggers.temporal.TimeDeltaTrigger",
trigger_kwargs={"moment": timedelta(hours=1)},
next_method="execute_complete",
next_kwargs=None,
Expand Down
1 change: 1 addition & 0 deletions generated/provider_dependencies.json
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,7 @@
"plugins": [],
"cross-providers-deps": [
"amazon",
"common.compat",
"google",
"oracle",
"sftp"
Expand Down
31 changes: 31 additions & 0 deletions providers/src/airflow/providers/common/compat/standard/triggers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from airflow.providers.standard.triggers.temporal import TimeDeltaTrigger
else:
try:
from airflow.providers.standard.triggers.temporal import TimeDeltaTrigger
except ModuleNotFoundError:
from airflow.triggers.temporal import TimeDeltaTrigger


__all__ = ["TimeDeltaTrigger"]
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
from typing import TYPE_CHECKING, Any, Callable, Sequence

from airflow.exceptions import AirflowException
from airflow.providers.common.compat.standard.triggers import TimeDeltaTrigger
from airflow.providers.microsoft.azure.hooks.msgraph import KiotaRequestAdapterHook
from airflow.providers.microsoft.azure.triggers.msgraph import MSGraphTrigger, ResponseSerializer
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.temporal import TimeDeltaTrigger

if TYPE_CHECKING:
from datetime import timedelta
Expand Down
7 changes: 7 additions & 0 deletions providers/src/airflow/providers/standard/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,10 @@ hooks:
- airflow.providers.standard.hooks.filesystem
- airflow.providers.standard.hooks.package_index
- airflow.providers.standard.hooks.subprocess

triggers:
- integration-name: Standard
python-modules:
- airflow.providers.standard.triggers.external_task
- airflow.providers.standard.triggers.file
- airflow.providers.standard.triggers.temporal
4 changes: 2 additions & 2 deletions providers/src/airflow/providers/standard/sensors/date_time.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import datetime
from typing import TYPE_CHECKING, Any, NoReturn, Sequence

from airflow.providers.standard.triggers.temporal import DateTimeTrigger
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.base import StartTriggerArgs
from airflow.triggers.temporal import DateTimeTrigger
from airflow.utils import timezone

if TYPE_CHECKING:
Expand Down Expand Up @@ -93,7 +93,7 @@ class DateTimeSensorAsync(DateTimeSensor):
"""

start_trigger_args = StartTriggerArgs(
trigger_cls="airflow.triggers.temporal.DateTimeTrigger",
trigger_cls="airflow.providers.standard.triggers.temporal.DateTimeTrigger",
trigger_kwargs={"moment": "", "end_from_trigger": False},
next_method="execute_complete",
next_kwargs=None,
Expand Down
4 changes: 2 additions & 2 deletions providers/src/airflow/providers/standard/sensors/time.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import datetime
from typing import TYPE_CHECKING, Any, NoReturn

from airflow.providers.standard.triggers.temporal import DateTimeTrigger
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.base import StartTriggerArgs
from airflow.triggers.temporal import DateTimeTrigger
from airflow.utils import timezone

if TYPE_CHECKING:
Expand Down Expand Up @@ -68,7 +68,7 @@ class TimeSensorAsync(BaseSensorOperator):
"""

start_trigger_args = StartTriggerArgs(
trigger_cls="airflow.triggers.temporal.DateTimeTrigger",
trigger_cls="airflow.providers.standard.triggers.temporal.DateTimeTrigger",
trigger_kwargs={"moment": "", "end_from_trigger": False},
next_method="execute_complete",
next_kwargs=None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@

from airflow.configuration import conf
from airflow.exceptions import AirflowSkipException
from airflow.providers.standard.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger
from airflow.utils import timezone

if TYPE_CHECKING:
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def __init__(
def serialize(self) -> tuple[str, dict[str, Any]]:
"""Serialize the trigger param and module path."""
return (
"airflow.triggers.external_task.WorkflowTrigger",
"airflow.providers.standard.triggers.external_task.WorkflowTrigger",
{
"external_dag_id": self.external_dag_id,
"external_task_ids": self.external_task_ids,
Expand Down Expand Up @@ -159,7 +159,7 @@ def __init__(
def serialize(self) -> tuple[str, dict[str, typing.Any]]:
"""Serialize DagStateTrigger arguments and classpath."""
return (
"airflow.triggers.external_task.DagStateTrigger",
"airflow.providers.standard.triggers.external_task.DagStateTrigger",
{
"dag_id": self.dag_id,
"states": self.states,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import datetime
import os
import typing
import warnings
from glob import glob
from typing import Any

Expand Down Expand Up @@ -48,21 +47,12 @@ def __init__(
super().__init__()
self.filepath = filepath
self.recursive = recursive
if kwargs.get("poll_interval") is not None:
warnings.warn(
"`poll_interval` has been deprecated and will be removed in future."
"Please use `poke_interval` instead.",
DeprecationWarning,
stacklevel=2,
)
self.poke_interval: float = kwargs["poll_interval"]
else:
self.poke_interval = poke_interval
self.poke_interval = poke_interval

def serialize(self) -> tuple[str, dict[str, Any]]:
"""Serialize FileTrigger arguments and classpath."""
return (
"airflow.triggers.file.FileTrigger",
"airflow.providers.standard.triggers.file.FileTrigger",
{
"filepath": self.filepath,
"recursive": self.recursive,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def __init__(self, moment: datetime.datetime, *, end_from_trigger: bool = False)

def serialize(self) -> tuple[str, dict[str, Any]]:
return (
"airflow.triggers.temporal.DateTimeTrigger",
"airflow.providers.standard.triggers.temporal.DateTimeTrigger",
{"moment": self.moment, "end_from_trigger": self.end_from_trigger},
)

Expand Down
2 changes: 1 addition & 1 deletion providers/tests/standard/sensors/test_time.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from airflow.exceptions import TaskDeferred
from airflow.models.dag import DAG
from airflow.providers.standard.sensors.time import TimeSensor, TimeSensorAsync
from airflow.triggers.temporal import DateTimeTrigger
from airflow.providers.standard.triggers.temporal import DateTimeTrigger
from airflow.utils import timezone

DEFAULT_TIMEZONE = "Asia/Singapore" # UTC+08:00
Expand Down
16 changes: 16 additions & 0 deletions providers/tests/standard/triggers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@

from airflow.models.dag import DAG
from airflow.models.dagrun import DagRun
from airflow.providers.standard.triggers.external_task import DagStateTrigger, WorkflowTrigger
from airflow.triggers.base import TriggerEvent
from airflow.triggers.external_task import DagStateTrigger, WorkflowTrigger
from airflow.utils import timezone
from airflow.utils.state import DagRunState

Expand All @@ -37,7 +37,7 @@ class TestWorkflowTrigger:
STATES = ["success", "fail"]

@pytest.mark.flaky(reruns=5)
@mock.patch("airflow.triggers.external_task._get_count")
@mock.patch("airflow.providers.standard.triggers.external_task._get_count")
@pytest.mark.asyncio
async def test_task_workflow_trigger_success(self, mock_get_count):
"""check the db count get called correctly."""
Expand Down Expand Up @@ -70,7 +70,7 @@ async def test_task_workflow_trigger_success(self, mock_get_count):
await gen.__anext__()

@pytest.mark.flaky(reruns=5)
@mock.patch("airflow.triggers.external_task._get_count")
@mock.patch("airflow.providers.standard.triggers.external_task._get_count")
@pytest.mark.asyncio
async def test_task_workflow_trigger_failed(self, mock_get_count):
mock_get_count.side_effect = mocked_get_count
Expand Down Expand Up @@ -102,7 +102,7 @@ async def test_task_workflow_trigger_failed(self, mock_get_count):
with pytest.raises(StopAsyncIteration):
await gen.__anext__()

@mock.patch("airflow.triggers.external_task._get_count")
@mock.patch("airflow.providers.standard.triggers.external_task._get_count")
@pytest.mark.asyncio
async def test_task_workflow_trigger_fail_count_eq_0(self, mock_get_count):
mock_get_count.return_value = 0
Expand Down Expand Up @@ -133,7 +133,7 @@ async def test_task_workflow_trigger_fail_count_eq_0(self, mock_get_count):
await gen.__anext__()

@pytest.mark.flaky(reruns=5)
@mock.patch("airflow.triggers.external_task._get_count")
@mock.patch("airflow.providers.standard.triggers.external_task._get_count")
@pytest.mark.asyncio
async def test_task_workflow_trigger_skipped(self, mock_get_count):
mock_get_count.side_effect = mocked_get_count
Expand Down Expand Up @@ -162,7 +162,7 @@ async def test_task_workflow_trigger_skipped(self, mock_get_count):
states=["success", "fail"],
)

@mock.patch("airflow.triggers.external_task._get_count")
@mock.patch("airflow.providers.standard.triggers.external_task._get_count")
@mock.patch("asyncio.sleep")
@pytest.mark.asyncio
async def test_task_workflow_trigger_sleep_success(self, mock_sleep, mock_get_count):
Expand Down Expand Up @@ -203,7 +203,7 @@ def test_serialization(self):
poke_interval=5,
)
classpath, kwargs = trigger.serialize()
assert classpath == "airflow.triggers.external_task.WorkflowTrigger"
assert classpath == "airflow.providers.standard.triggers.external_task.WorkflowTrigger"
assert kwargs == {
"external_dag_id": self.DAG_ID,
"execution_dates": [timezone.datetime(2022, 1, 1)],
Expand Down Expand Up @@ -271,7 +271,7 @@ def test_serialization(self):
poll_interval=5,
)
classpath, kwargs = trigger.serialize()
assert classpath == "airflow.triggers.external_task.DagStateTrigger"
assert classpath == "airflow.providers.standard.triggers.external_task.DagStateTrigger"
assert kwargs == {
"dag_id": self.DAG_ID,
"states": self.STATES,
Expand Down
Loading

0 comments on commit fb25d92

Please sign in to comment.