Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

try improved cancelled_flow_query #17095

Merged
merged 2 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/prefect/server/services/cancellation_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,22 @@ async def clean_up_cancelled_flow_run_task_runs(
async def clean_up_cancelled_subflow_runs(self, db: PrefectDBInterface) -> None:
high_water_mark = UUID(int=0)
while True:
# Performance optimization: Load only required columns while maintaining ORM functionality
# Required columns:
# - id: for high water mark tracking
# - state_type: for state validation
# - parent_task_run_id: for parent task run checks
# - deployment_id: for determining cancellation state type
subflow_query = (
sa.select(db.FlowRun)
.options(
sa.orm.load_only(
db.FlowRun.id,
db.FlowRun.state_type,
db.FlowRun.parent_task_run_id,
db.FlowRun.deployment_id,
),
)
.where(
or_(
db.FlowRun.state_type == states.StateType.PENDING,
Expand Down
130 changes: 99 additions & 31 deletions tests/server/services/test_cancellation_cleanup.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,35 @@
import pendulum
from typing import Any, Callable

import pytest
from sqlalchemy.ext.asyncio import AsyncSession

from prefect.server import models, schemas
from prefect.server.schemas import states
from prefect.server.schemas.core import Deployment, Flow, FlowRun
from prefect.server.services.cancellation_cleanup import CancellationCleanup
from prefect.types._datetime import DateTime, Duration

NON_TERMINAL_STATE_CONSTRUCTORS = {
NON_TERMINAL_STATE_CONSTRUCTORS: dict[states.StateType, Any] = {
states.StateType.SCHEDULED: states.Scheduled,
states.StateType.PENDING: states.Pending,
states.StateType.RUNNING: states.Running,
states.StateType.PAUSED: states.Paused,
states.StateType.CANCELLING: states.Cancelling,
}

TERMINAL_STATE_CONSTRUCTORS = {
TERMINAL_STATE_CONSTRUCTORS: dict[states.StateType, Any] = {
states.StateType.COMPLETED: states.Completed,
states.StateType.FAILED: states.Failed,
states.StateType.CRASHED: states.Crashed,
states.StateType.CANCELLED: states.Cancelled,
}

THE_PAST = pendulum.now("UTC") - pendulum.Duration(hours=5)
THE_ANCIENT_PAST = pendulum.now("UTC") - pendulum.Duration(days=100)
THE_PAST = DateTime.now("UTC") - Duration(hours=5)
THE_ANCIENT_PAST = DateTime.now("UTC") - Duration(days=100)


@pytest.fixture
async def cancelled_flow_run(session, flow):
async def cancelled_flow_run(session: AsyncSession, flow: Flow):
async with session.begin():
return await models.flow_runs.create_flow_run(
session=session,
Expand All @@ -36,7 +40,7 @@ async def cancelled_flow_run(session, flow):


@pytest.fixture
async def old_cancelled_flow_run(session, flow):
async def old_cancelled_flow_run(session: AsyncSession, flow: Flow):
async with session.begin():
return await models.flow_runs.create_flow_run(
session=session,
Expand All @@ -47,8 +51,8 @@ async def old_cancelled_flow_run(session, flow):


@pytest.fixture
async def orphaned_task_run_maker(session):
async def task_run_maker(flow_run, state_constructor):
async def orphaned_task_run_maker(session: AsyncSession):
async def task_run_maker(flow_run: FlowRun, state_constructor: Any):
async with session.begin():
return await models.task_runs.create_task_run(
session=session,
Expand All @@ -64,8 +68,8 @@ async def task_run_maker(flow_run, state_constructor):


@pytest.fixture
async def orphaned_subflow_run_maker(session, flow):
async def subflow_run_maker(flow_run, state_constructor):
async def orphaned_subflow_run_maker(session: AsyncSession, flow: Flow):
async def subflow_run_maker(flow_run: FlowRun, state_constructor: Any):
async with session.begin():
virtual_task = await models.task_runs.create_task_run(
session=session,
Expand All @@ -91,8 +95,10 @@ async def subflow_run_maker(flow_run, state_constructor):


@pytest.fixture
async def orphaned_subflow_run_from_deployment_maker(session, flow, deployment):
async def subflow_run_maker(flow_run, state_constructor):
async def orphaned_subflow_run_from_deployment_maker(
session: AsyncSession, flow: Flow, deployment: Deployment
):
async def subflow_run_maker(flow_run: FlowRun, state_constructor: Any):
async with session.begin():
virtual_task = await models.task_runs.create_task_run(
session=session,
Expand Down Expand Up @@ -127,12 +133,12 @@ async def test_all_state_types_are_tested():

@pytest.mark.parametrize("state_constructor", NON_TERMINAL_STATE_CONSTRUCTORS.items())
async def test_service_cleans_up_nonterminal_runs(
session,
cancelled_flow_run,
orphaned_task_run_maker,
orphaned_subflow_run_maker,
orphaned_subflow_run_from_deployment_maker,
state_constructor,
session: AsyncSession,
cancelled_flow_run: FlowRun,
orphaned_task_run_maker: Callable[..., Any],
orphaned_subflow_run_maker: Callable[..., Any],
orphaned_subflow_run_from_deployment_maker: Callable[..., Any],
state_constructor: tuple[states.StateType, Any],
):
orphaned_task_run = await orphaned_task_run_maker(
cancelled_flow_run, state_constructor[1]
Expand Down Expand Up @@ -163,12 +169,12 @@ async def test_service_cleans_up_nonterminal_runs(

@pytest.mark.parametrize("state_constructor", NON_TERMINAL_STATE_CONSTRUCTORS.items())
async def test_service_ignores_old_cancellations(
session,
old_cancelled_flow_run,
orphaned_task_run_maker,
orphaned_subflow_run_maker,
orphaned_subflow_run_from_deployment_maker,
state_constructor,
session: AsyncSession,
old_cancelled_flow_run: FlowRun,
orphaned_task_run_maker: Callable[..., Any],
orphaned_subflow_run_maker: Callable[..., Any],
orphaned_subflow_run_from_deployment_maker: Callable[..., Any],
state_constructor: tuple[states.StateType, Any],
):
orphaned_task_run = await orphaned_task_run_maker(
old_cancelled_flow_run, state_constructor[1]
Expand Down Expand Up @@ -200,12 +206,12 @@ async def test_service_ignores_old_cancellations(

@pytest.mark.parametrize("state_constructor", TERMINAL_STATE_CONSTRUCTORS.items())
async def test_service_leaves_terminal_runs_alone(
session,
cancelled_flow_run,
orphaned_task_run_maker,
orphaned_subflow_run_maker,
orphaned_subflow_run_from_deployment_maker,
state_constructor,
session: AsyncSession,
cancelled_flow_run: FlowRun,
orphaned_task_run_maker: Callable[..., Any],
orphaned_subflow_run_maker: Callable[..., Any],
orphaned_subflow_run_from_deployment_maker: Callable[..., Any],
state_constructor: tuple[states.StateType, Any],
):
orphaned_task_run = await orphaned_task_run_maker(
cancelled_flow_run, state_constructor[1]
Expand Down Expand Up @@ -233,3 +239,65 @@ async def test_service_leaves_terminal_runs_alone(
assert orphaned_task_run.state.type == state_constructor[0]
assert orphaned_subflow_run.state.type == state_constructor[0]
assert orphaned_subflow_run_from_deployment.state.type == state_constructor[0]


async def test_service_works_with_partial_flow_run_objects(
session: AsyncSession,
flow: Flow,
deployment: Deployment,
):
"""
Test that the cancellation cleanup service works correctly with partial FlowRun objects.
This verifies that selecting only specific columns in the query doesn't break functionality.
"""
# Create a parent flow run and task run
async with session.begin():
parent_flow = await models.flow_runs.create_flow_run(
session=session,
flow_run=schemas.core.FlowRun(
flow_id=flow.id,
state=states.Cancelled(),
end_time=THE_PAST,
),
)
virtual_task = await models.task_runs.create_task_run(
session=session,
task_run=schemas.core.TaskRun(
flow_run_id=parent_flow.id,
task_key="virtual task",
dynamic_key="dynamic key",
state=states.Running(),
),
)

# Create subflows in different states
async with session.begin():
subflow_with_deployment = await models.flow_runs.create_flow_run(
session=session,
flow_run=schemas.core.FlowRun(
flow_id=flow.id,
parent_task_run_id=virtual_task.id,
state=states.Running(),
deployment_id=deployment.id,
),
)
subflow_without_deployment = await models.flow_runs.create_flow_run(
session=session,
flow_run=schemas.core.FlowRun(
flow_id=flow.id,
parent_task_run_id=virtual_task.id,
state=states.Running(),
),
)

# Run the cleanup service
await CancellationCleanup().start(loops=1)

# Refresh states
async with session.begin():
await session.refresh(subflow_with_deployment)
await session.refresh(subflow_without_deployment)

# Verify correct state transitions with partial objects
assert subflow_with_deployment.state.type == states.StateType.CANCELLING
assert subflow_without_deployment.state.type == states.StateType.CANCELLED