Skip to content

Commit

Permalink
move sensor helper inside standard provider
Browse files Browse the repository at this point in the history
  • Loading branch information
gopidesupavan committed Nov 24, 2024
1 parent 3c4a884 commit 5864f75
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 4 deletions.
2 changes: 1 addition & 1 deletion airflow/sensors/external_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@
from airflow.models.taskinstance import TaskInstance
from airflow.operators.empty import EmptyOperator
from airflow.providers.standard.triggers.external_task import WorkflowTrigger
from airflow.providers.standard.utils.sensor_helper import _get_count, _get_external_task_group_task_ids
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.file import correct_maybe_zipped
from airflow.utils.helpers import build_airflow_url_with_query
from airflow.utils.sensor_helper import _get_count, _get_external_task_group_task_ids
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import State, TaskInstanceState

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
from sqlalchemy import func

from airflow.models import DagRun
from airflow.providers.standard.utils.sensor_helper import _get_count
from airflow.providers.standard.utils.version_references import AIRFLOW_V_3_0_PLUS
from airflow.triggers.base import BaseTrigger, TriggerEvent
from airflow.utils.sensor_helper import _get_count
from airflow.utils.session import NEW_SESSION, provide_session

if typing.TYPE_CHECKING:
Expand Down Expand Up @@ -118,7 +118,7 @@ async def run(self) -> typing.AsyncIterator[TriggerEvent]:
return
allowed_count = await self._get_count(self.allowed_states)
_dates = self.logical_dates if AIRFLOW_V_3_0_PLUS else self.execution_dates
if allowed_count == len(_dates):
if allowed_count == len(_dates): # type: ignore[arg-type]
yield TriggerEvent({"status": "success"})
return
self.log.info("Sleeping for %s seconds", self.poke_interval)
Expand Down Expand Up @@ -190,7 +190,7 @@ async def run(self) -> typing.AsyncIterator[TriggerEvent]:
# mypy confuses typing here
num_dags = await self.count_dags() # type: ignore[call-arg]
_dates = self.logical_dates if AIRFLOW_V_3_0_PLUS else self.execution_dates
if num_dags == len(_dates):
if num_dags == len(_dates): # type: ignore[arg-type]
yield TriggerEvent(self.serialize())
return
await asyncio.sleep(self.poll_interval)
Expand Down
File renamed without changes.

0 comments on commit 5864f75

Please sign in to comment.