Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update idempotency key for scheduled runs to disambiguate schedules #17123

Merged
merged 4 commits into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/prefect/_result_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
Optional,
TypeVar,
Union,
cast,
)
from uuid import UUID

Expand Down Expand Up @@ -146,6 +147,8 @@ def serialize_result(self) -> bytes:
@classmethod
def coerce_old_format(cls, value: dict[str, Any] | Any) -> dict[str, Any]:
if isinstance(value, dict):
if TYPE_CHECKING: # TODO: # isintance doesn't accept generic parameters
value = cast(dict[str, Any], value)
if "data" in value:
value["result"] = value.pop("data")
if "metadata" not in value:
Expand Down Expand Up @@ -232,4 +235,6 @@ def deserialize_from_result_and_metadata(
def __eq__(self, other: Any | "ResultRecord[Any]") -> bool:
if not isinstance(other, ResultRecord):
return False
return self.metadata == other.metadata and self.result == other.result
return self.model_dump(include={"metadata", "result"}) == other.model_dump(
include={"metadata", "result"}
)
2 changes: 1 addition & 1 deletion src/prefect/client/orchestration/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@


@overload
def get_client(
def get_client( # type: ignore # TODO
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be fixed when we merge the pyproject migration.

*,
httpx_settings: Optional[dict[str, Any]] = ...,
sync_client: Literal[False] = False,
Expand Down
2 changes: 1 addition & 1 deletion src/prefect/server/models/deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,7 @@ async def _generate_scheduled_flow_runs(
"work_queue_id": deployment.work_queue_id,
"parameters": parameters,
"infrastructure_document_id": deployment.infrastructure_document_id,
"idempotency_key": f"scheduled {deployment.id} {date}",
"idempotency_key": f"scheduled {deployment.id} {deployment_schedule.id} {date}",
"tags": tags,
"auto_scheduled": auto_scheduled,
"state": schemas.states.Scheduled(
Expand Down
136 changes: 122 additions & 14 deletions tests/server/services/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,20 @@
import pendulum
import pytest
import sqlalchemy as sa
from sqlalchemy.ext.asyncio import AsyncSession

from prefect.server import models, schemas
from prefect.server.services.scheduler import RecentDeploymentsScheduler, Scheduler
from prefect.settings import (
PREFECT_API_SERVICES_SCHEDULER_INSERT_BATCH_SIZE,
PREFECT_API_SERVICES_SCHEDULER_MIN_RUNS,
)
from prefect.types._datetime import DateTime, now
from prefect.utilities.callables import parameter_schema


@pytest.fixture
async def deployment_without_schedules(flow, session):
async def deployment_without_schedules(flow: schemas.core.Flow, session: AsyncSession):
deployment = await models.deployments.create_deployment(
session=session,
deployment=schemas.core.Deployment(
Expand All @@ -27,7 +29,9 @@ async def deployment_without_schedules(flow, session):


@pytest.fixture
async def deployment_with_inactive_schedules(flow, session):
async def deployment_with_inactive_schedules(
flow: schemas.core.Flow, session: AsyncSession
):
deployment = await models.deployments.create_deployment(
session=session,
deployment=schemas.core.Deployment(
Expand Down Expand Up @@ -60,7 +64,9 @@ async def deployment_with_inactive_schedules(flow, session):


@pytest.fixture
async def deployment_with_active_schedules(flow, session):
async def deployment_with_active_schedules(
flow: schemas.core.Flow, session: AsyncSession
):
deployment = await models.deployments.create_deployment(
session=session,
deployment=schemas.core.Deployment(
Expand Down Expand Up @@ -93,7 +99,8 @@ async def deployment_with_active_schedules(flow, session):


async def test_create_schedules_from_deployment(
session, deployment_with_active_schedules
session: AsyncSession,
deployment_with_active_schedules: schemas.core.Deployment,
):
active_schedules = [
s.schedule
Expand All @@ -115,7 +122,7 @@ async def test_create_schedules_from_deployment(
runs = await models.flow_runs.read_flow_runs(session)
assert len(runs) == service.min_runs * num_active_schedules

expected_dates = set()
expected_dates: set[DateTime] = set()
for schedule in active_schedules:
expected_dates.update(await schedule.get_dates(service.min_runs))
assert set(expected_dates) == {r.state.state_details.scheduled_time for r in runs}
Expand All @@ -126,12 +133,11 @@ async def test_create_schedules_from_deployment(


async def test_create_parametrized_schedules_from_deployment(
flow,
session,
flow: schemas.core.Flow, session: AsyncSession
):
schedule = schemas.schedules.IntervalSchedule(
interval=datetime.timedelta(days=30),
anchor_date=pendulum.now("UTC"),
anchor_date=now("UTC"),
)

def func_for_params(name: str, x: int = 42):
Expand All @@ -152,6 +158,11 @@ def func_for_params(name: str, x: int = 42):
parameters={"name": "whoami"},
active=True,
),
schemas.core.DeploymentSchedule(
schedule=schedule, # Same schedule/timing
parameters={"name": "whoami2"}, # Different parameters
active=True,
),
],
),
)
Expand All @@ -163,20 +174,111 @@ def func_for_params(name: str, x: int = 42):
service = Scheduler()
await service.start(loops=1)
runs = await models.flow_runs.read_flow_runs(session)
assert len(runs) == service.min_runs

# We expect min_runs * 2 because we have two schedules
# However, we only get min_runs because the second schedule's runs
# overwrite the first schedule's runs due to having the same idempotency key
# (scheduled {deployment.id} {date})
assert len(runs) == service.min_runs * 2 # Should create runs for both schedules

expected_dates = await schedule.get_dates(service.min_runs)
# Each expected date should have two runs with different parameters
assert set(expected_dates) == {r.state.state_details.scheduled_time for r in runs}

# Check that we have runs with both sets of parameters
run_params = {(r.parameters["name"], r.parameters["x"]) for r in runs}
assert run_params == {("whoami", 11), ("whoami2", 11)}

assert all([r.state_name == "Scheduled" for r in runs]), (
"Scheduler sets flow_run.state_name"
)
assert all([r.parameters == dict(name="whoami", x=11) for r in runs])


async def test_create_schedule_respects_max_future_time(flow, session):
async def test_create_parametrized_schedules_with_slugs(
flow: schemas.core.Flow, session: AsyncSession
):
"""Test that schedules with slugs use them in idempotency keys, and schedules without slugs fall back to ID"""
schedule = schemas.schedules.IntervalSchedule(
interval=datetime.timedelta(days=30),
anchor_date=now("UTC"),
)

deployment = await models.deployments.create_deployment(
session=session,
deployment=schemas.core.Deployment(
name="test",
flow_id=flow.id,
parameters={"name": "deployment-test", "x": 11},
schedules=[
schemas.core.DeploymentSchedule(
schedule=schedule,
parameters={"name": "whoami"},
slug="my-schedule",
active=True,
),
schemas.core.DeploymentSchedule(
schedule=schedule, # Same schedule/timing
parameters={"name": "whoami2"}, # Different parameters
active=True, # No slug on this one
),
],
),
)
await session.commit()

n_runs = await models.flow_runs.count_flow_runs(session)
assert n_runs == 0

service = Scheduler()
await service.start(loops=1)
runs = await models.flow_runs.read_flow_runs(session)

# We should get min_runs * 2 because we have two schedules
assert len(runs) == service.min_runs * 2

# Check that we have runs with both sets of parameters
run_params = {(r.parameters["name"], r.parameters["x"]) for r in runs}
assert run_params == {("whoami", 11), ("whoami2", 11)}

# Get the deployment schedules to check their IDs/slugs
schedules = await models.deployments.read_deployment_schedules(
session=session,
deployment_id=deployment.id,
)
schedule_with_slug = next(s for s in schedules if s.slug == "my-schedule")
schedule_without_slug = next(s for s in schedules if not s.slug)

# Verify the schedule with slug has the expected properties
assert schedule_with_slug.slug == "my-schedule"
assert schedule_with_slug.parameters == {"name": "whoami"}
assert schedule_with_slug.active is True

# Check that idempotency keys use slugs when available and IDs when not
expected_dates = await schedule.get_dates(service.min_runs)
for date in expected_dates:
# Find runs for this date
date_runs = [r for r in runs if r.state.state_details.scheduled_time == date]
assert len(date_runs) == 2 # Should have two runs per date

# One run should use the slug in its idempotency key
assert any(
f"scheduled {deployment.id} {schedule_with_slug.id} {date}"
== r.idempotency_key
for r in date_runs
)
# One run should use the ID in its idempotency key
assert any(
f"scheduled {deployment.id} {schedule_without_slug.id} {date}"
== r.idempotency_key
for r in date_runs
)


async def test_create_schedule_respects_max_future_time(
flow: schemas.core.Flow, session: AsyncSession
):
schedule = schemas.schedules.IntervalSchedule(
interval=datetime.timedelta(days=30), anchor_date=pendulum.now("UTC")
interval=datetime.timedelta(days=30), anchor_date=now("UTC")
)

await models.deployments.create_deployment(
Expand Down Expand Up @@ -207,7 +309,9 @@ async def test_create_schedule_respects_max_future_time(flow, session):
assert set(expected_dates) == {r.state.state_details.scheduled_time for r in runs}


async def test_create_schedules_from_multiple_deployments(flow, session):
async def test_create_schedules_from_multiple_deployments(
flow: schemas.core.Flow, session: AsyncSession
):
schedule1 = schemas.schedules.IntervalSchedule(interval=datetime.timedelta(hours=1))
schedule2 = schemas.schedules.IntervalSchedule(interval=datetime.timedelta(days=10))
schedule3 = schemas.schedules.IntervalSchedule(interval=datetime.timedelta(days=5))
Expand Down Expand Up @@ -528,7 +632,11 @@ class TestScheduleRulesWaterfall:
],
)
async def test_create_schedule_respects_max_future_time(
self, flow, session, interval, n
self,
flow: schemas.core.Flow,
session: AsyncSession,
interval: datetime.timedelta,
n: int,
):
await models.deployments.create_deployment(
session=session,
Expand Down
Loading