Skip to content

Commit

Permalink
try improved query
Browse files Browse the repository at this point in the history
  • Loading branch information
zzstoatzz committed Feb 11, 2025
1 parent fa0ec89 commit ff02d57
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 31 deletions.
16 changes: 16 additions & 0 deletions src/prefect/server/services/cancellation_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,24 @@ async def clean_up_cancelled_flow_run_task_runs(
async def clean_up_cancelled_subflow_runs(self, db: PrefectDBInterface) -> None:
high_water_mark = UUID(int=0)
while True:
# Performance optimization: Load only required columns while maintaining ORM functionality
# Required columns:
# - id: for high water mark tracking
# - state_type: for state validation
# - parent_task_run_id: for parent task run checks
# - deployment_id: for determining cancellation state type
# Note: state is a relationship and must be loaded separately
subflow_query = (
sa.select(db.FlowRun)
.options(
sa.orm.load_only(
db.FlowRun.id,
db.FlowRun.state_type,
db.FlowRun.parent_task_run_id,
db.FlowRun.deployment_id,
),
sa.orm.selectinload(db.FlowRun.state),
)
.where(
or_(
db.FlowRun.state_type == states.StateType.PENDING,
Expand Down
130 changes: 99 additions & 31 deletions tests/server/services/test_cancellation_cleanup.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,35 @@
import pendulum
from typing import Any, Callable

import pytest
from sqlalchemy.ext.asyncio import AsyncSession

from prefect.server import models, schemas
from prefect.server.schemas import states
from prefect.server.schemas.core import Deployment, Flow, FlowRun
from prefect.server.services.cancellation_cleanup import CancellationCleanup
from prefect.types._datetime import DateTime, Duration

NON_TERMINAL_STATE_CONSTRUCTORS = {
NON_TERMINAL_STATE_CONSTRUCTORS: dict[states.StateType, Any] = {
states.StateType.SCHEDULED: states.Scheduled,
states.StateType.PENDING: states.Pending,
states.StateType.RUNNING: states.Running,
states.StateType.PAUSED: states.Paused,
states.StateType.CANCELLING: states.Cancelling,
}

TERMINAL_STATE_CONSTRUCTORS = {
TERMINAL_STATE_CONSTRUCTORS: dict[states.StateType, Any] = {
states.StateType.COMPLETED: states.Completed,
states.StateType.FAILED: states.Failed,
states.StateType.CRASHED: states.Crashed,
states.StateType.CANCELLED: states.Cancelled,
}

THE_PAST = pendulum.now("UTC") - pendulum.Duration(hours=5)
THE_ANCIENT_PAST = pendulum.now("UTC") - pendulum.Duration(days=100)
THE_PAST = DateTime.now("UTC") - Duration(hours=5)
THE_ANCIENT_PAST = DateTime.now("UTC") - Duration(days=100)


@pytest.fixture
async def cancelled_flow_run(session, flow):
async def cancelled_flow_run(session: AsyncSession, flow: Flow):
async with session.begin():
return await models.flow_runs.create_flow_run(
session=session,
Expand All @@ -36,7 +40,7 @@ async def cancelled_flow_run(session, flow):


@pytest.fixture
async def old_cancelled_flow_run(session, flow):
async def old_cancelled_flow_run(session: AsyncSession, flow: Flow):
async with session.begin():
return await models.flow_runs.create_flow_run(
session=session,
Expand All @@ -47,8 +51,8 @@ async def old_cancelled_flow_run(session, flow):


@pytest.fixture
async def orphaned_task_run_maker(session):
async def task_run_maker(flow_run, state_constructor):
async def orphaned_task_run_maker(session: AsyncSession):
async def task_run_maker(flow_run: FlowRun, state_constructor: Any):
async with session.begin():
return await models.task_runs.create_task_run(
session=session,
Expand All @@ -64,8 +68,8 @@ async def task_run_maker(flow_run, state_constructor):


@pytest.fixture
async def orphaned_subflow_run_maker(session, flow):
async def subflow_run_maker(flow_run, state_constructor):
async def orphaned_subflow_run_maker(session: AsyncSession, flow: Flow):
async def subflow_run_maker(flow_run: FlowRun, state_constructor: Any):
async with session.begin():
virtual_task = await models.task_runs.create_task_run(
session=session,
Expand All @@ -91,8 +95,10 @@ async def subflow_run_maker(flow_run, state_constructor):


@pytest.fixture
async def orphaned_subflow_run_from_deployment_maker(session, flow, deployment):
async def subflow_run_maker(flow_run, state_constructor):
async def orphaned_subflow_run_from_deployment_maker(
session: AsyncSession, flow: Flow, deployment: Deployment
):
async def subflow_run_maker(flow_run: FlowRun, state_constructor: Any):
async with session.begin():
virtual_task = await models.task_runs.create_task_run(
session=session,
Expand Down Expand Up @@ -127,12 +133,12 @@ async def test_all_state_types_are_tested():

@pytest.mark.parametrize("state_constructor", NON_TERMINAL_STATE_CONSTRUCTORS.items())
async def test_service_cleans_up_nonterminal_runs(
session,
cancelled_flow_run,
orphaned_task_run_maker,
orphaned_subflow_run_maker,
orphaned_subflow_run_from_deployment_maker,
state_constructor,
session: AsyncSession,
cancelled_flow_run: FlowRun,
orphaned_task_run_maker: Callable[..., Any],
orphaned_subflow_run_maker: Callable[..., Any],
orphaned_subflow_run_from_deployment_maker: Callable[..., Any],
state_constructor: tuple[states.StateType, Any],
):
orphaned_task_run = await orphaned_task_run_maker(
cancelled_flow_run, state_constructor[1]
Expand Down Expand Up @@ -163,12 +169,12 @@ async def test_service_cleans_up_nonterminal_runs(

@pytest.mark.parametrize("state_constructor", NON_TERMINAL_STATE_CONSTRUCTORS.items())
async def test_service_ignores_old_cancellations(
session,
old_cancelled_flow_run,
orphaned_task_run_maker,
orphaned_subflow_run_maker,
orphaned_subflow_run_from_deployment_maker,
state_constructor,
session: AsyncSession,
old_cancelled_flow_run: FlowRun,
orphaned_task_run_maker: Callable[..., Any],
orphaned_subflow_run_maker: Callable[..., Any],
orphaned_subflow_run_from_deployment_maker: Callable[..., Any],
state_constructor: tuple[states.StateType, Any],
):
orphaned_task_run = await orphaned_task_run_maker(
old_cancelled_flow_run, state_constructor[1]
Expand Down Expand Up @@ -200,12 +206,12 @@ async def test_service_ignores_old_cancellations(

@pytest.mark.parametrize("state_constructor", TERMINAL_STATE_CONSTRUCTORS.items())
async def test_service_leaves_terminal_runs_alone(
session,
cancelled_flow_run,
orphaned_task_run_maker,
orphaned_subflow_run_maker,
orphaned_subflow_run_from_deployment_maker,
state_constructor,
session: AsyncSession,
cancelled_flow_run: FlowRun,
orphaned_task_run_maker: Callable[..., Any],
orphaned_subflow_run_maker: Callable[..., Any],
orphaned_subflow_run_from_deployment_maker: Callable[..., Any],
state_constructor: tuple[states.StateType, Any],
):
orphaned_task_run = await orphaned_task_run_maker(
cancelled_flow_run, state_constructor[1]
Expand Down Expand Up @@ -233,3 +239,65 @@ async def test_service_leaves_terminal_runs_alone(
assert orphaned_task_run.state.type == state_constructor[0]
assert orphaned_subflow_run.state.type == state_constructor[0]
assert orphaned_subflow_run_from_deployment.state.type == state_constructor[0]


async def test_service_works_with_partial_flow_run_objects(
session: AsyncSession,
flow: Flow,
deployment: Deployment,
):
"""
Test that the cancellation cleanup service works correctly with partial FlowRun objects.
This verifies that selecting only specific columns in the query doesn't break functionality.
"""
# Create a parent flow run and task run
async with session.begin():
parent_flow = await models.flow_runs.create_flow_run(
session=session,
flow_run=schemas.core.FlowRun(
flow_id=flow.id,
state=states.Cancelled(),
end_time=THE_PAST,
),
)
virtual_task = await models.task_runs.create_task_run(
session=session,
task_run=schemas.core.TaskRun(
flow_run_id=parent_flow.id,
task_key="virtual task",
dynamic_key="dynamic key",
state=states.Running(),
),
)

# Create subflows in different states
async with session.begin():
subflow_with_deployment = await models.flow_runs.create_flow_run(
session=session,
flow_run=schemas.core.FlowRun(
flow_id=flow.id,
parent_task_run_id=virtual_task.id,
state=states.Running(),
deployment_id=deployment.id,
),
)
subflow_without_deployment = await models.flow_runs.create_flow_run(
session=session,
flow_run=schemas.core.FlowRun(
flow_id=flow.id,
parent_task_run_id=virtual_task.id,
state=states.Running(),
),
)

# Run the cleanup service
await CancellationCleanup().start(loops=1)

# Refresh states
async with session.begin():
await session.refresh(subflow_with_deployment)
await session.refresh(subflow_without_deployment)

# Verify correct state transitions with partial objects
assert subflow_with_deployment.state.type == states.StateType.CANCELLING
assert subflow_without_deployment.state.type == states.StateType.CANCELLED

0 comments on commit ff02d57

Please sign in to comment.