Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update deployments PATCH endpoint to make more targeted updates with slugs #17027

Merged
merged 3 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/v3/api-ref/rest-api/server/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -16502,7 +16502,7 @@
}
],
"title": "Slug",
"description": "A unique slug for the schedule."
"description": "A unique identifier for the schedule."
}
},
"additionalProperties": false,
Expand Down Expand Up @@ -16572,7 +16572,7 @@
}
],
"title": "Slug",
"description": "A unique slug for the schedule."
"description": "A unique identifier for the schedule."
}
},
"additionalProperties": false,
Expand Down Expand Up @@ -16631,7 +16631,7 @@
},
"schedules": {
"items": {
"$ref": "#/components/schemas/DeploymentScheduleCreate"
"$ref": "#/components/schemas/DeploymentScheduleUpdate"
},
"type": "array",
"title": "Schedules",
Expand Down
57 changes: 57 additions & 0 deletions src/prefect/server/api/deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,41 @@ async def update_deployment(
status.HTTP_404_NOT_FOUND, detail="Deployment not found."
)

# Checking how we should handle schedule updates
# If not all existing schedules have slugs then we'll fall back to the existing logic where are schedules are recreated to match the request.
# If the existing schedules have slugs, but not all provided schedules have slugs, then we'll return a 422 to avoid accidentally blowing away schedules.
# Otherwise, we'll use the existing slugs and the provided slugs to make targeted updates to the deployment's schedules.
schedules_to_patch: list[schemas.actions.DeploymentScheduleUpdate] = []
schedules_to_create: list[schemas.actions.DeploymentScheduleUpdate] = []
all_provided_have_slugs = deployment.schedules and all(
schedule.slug is not None for schedule in deployment.schedules
)
all_existing_have_slugs = existing_deployment.schedules and all(
schedule.slug is not None for schedule in existing_deployment.schedules
)
if all_provided_have_slugs and all_existing_have_slugs:
current_slugs = [
schedule.slug for schedule in existing_deployment.schedules
]

for schedule in deployment.schedules:
if schedule.slug in current_slugs:
schedules_to_patch.append(schedule)
elif schedule.schedule:
schedules_to_create.append(schedule)
else:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Unable to create new deployment schedules without a schedule configuration.",
)
# Clear schedules to handle their update/creation separately
deployment.schedules = None
elif not all_provided_have_slugs and all_existing_have_slugs:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Please provide a slug for each schedule in your request to ensure schedules are updated correctly.",
)

if deployment.work_pool_name:
# Make sure that deployment is valid before beginning creation process
work_pool = await models.workers.read_work_pool_by_name(
Expand Down Expand Up @@ -261,6 +296,28 @@ async def update_deployment(
result = await models.deployments.update_deployment(
session=session, deployment_id=deployment_id, deployment=deployment
)

for schedule in schedules_to_patch:
await models.deployments.update_deployment_schedule(
session=session,
deployment_id=deployment_id,
schedule=schedule,
deployment_schedule_slug=schedule.slug,
)
if schedules_to_create:
await models.deployments.create_deployment_schedules(
session=session,
deployment_id=deployment_id,
schedules=[
schemas.actions.DeploymentScheduleCreate(
schedule=schedule.schedule, # type: ignore We will raise above if schedule is not provided
active=schedule.active if schedule.active is not None else True,
slug=schedule.slug,
parameters=schedule.parameters,
)
for schedule in schedules_to_create
],
)
if not result:
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Deployment not found.")

Expand Down
41 changes: 31 additions & 10 deletions src/prefect/server/models/deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
Intended for internal use by the Prefect REST API.
"""

from __future__ import annotations

import datetime
from collections.abc import Iterable, Sequence
from typing import TYPE_CHECKING, Any, Optional, TypeVar, cast
Expand Down Expand Up @@ -155,6 +157,7 @@ async def create_deployment(
schedule=schedule.schedule,
active=schedule.active, # type: ignore[call-arg]
parameters=schedule.parameters,
slug=schedule.slug,
)
for schedule in schedules
],
Expand Down Expand Up @@ -272,6 +275,8 @@ async def update_deployment(
schemas.actions.DeploymentScheduleCreate(
schedule=schedule.schedule,
active=schedule.active, # type: ignore[call-arg]
parameters=schedule.parameters,
slug=schedule.slug,
)
for schedule in schedules
],
Expand Down Expand Up @@ -952,8 +957,9 @@ async def update_deployment_schedule(
db: PrefectDBInterface,
session: AsyncSession,
deployment_id: UUID,
deployment_schedule_id: UUID,
schedule: schemas.actions.DeploymentScheduleUpdate,
deployment_schedule_id: UUID | None = None,
deployment_schedule_slug: str | None = None,
) -> bool:
"""
Updates a deployment's schedules.
Expand All @@ -963,17 +969,32 @@ async def update_deployment_schedule(
deployment_schedule_id: a deployment schedule id
schedule: a deployment schedule update action
"""

result = await session.execute(
sa.update(db.DeploymentSchedule)
.where(
sa.and_(
db.DeploymentSchedule.id == deployment_schedule_id,
db.DeploymentSchedule.deployment_id == deployment_id,
if deployment_schedule_id:
result = await session.execute(
sa.update(db.DeploymentSchedule)
.where(
sa.and_(
db.DeploymentSchedule.id == deployment_schedule_id,
db.DeploymentSchedule.deployment_id == deployment_id,
)
)
.values(**schedule.model_dump(exclude_none=True))
)
elif deployment_schedule_slug:
result = await session.execute(
sa.update(db.DeploymentSchedule)
.where(
sa.and_(
db.DeploymentSchedule.slug == deployment_schedule_slug,
db.DeploymentSchedule.deployment_id == deployment_id,
)
)
.values(**schedule.model_dump(exclude_none=True))
)
else:
raise ValueError(
"Either deployment_schedule_id or deployment_schedule_slug must be provided"
)
.values(**schedule.model_dump(exclude_none=True))
)

return result.rowcount > 0

Expand Down
6 changes: 3 additions & 3 deletions src/prefect/server/schemas/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class DeploymentScheduleCreate(ActionBaseModel):
)
slug: Optional[str] = Field(
default=None,
description="A unique slug for the schedule.",
description="A unique identifier for the schedule.",
)

@field_validator("max_scheduled_runs")
Expand Down Expand Up @@ -146,7 +146,7 @@ class DeploymentScheduleUpdate(ActionBaseModel):
)
slug: Optional[str] = Field(
default=None,
description="A unique slug for the schedule.",
description="A unique identifier for the schedule.",
)

@field_validator("max_scheduled_runs")
Expand Down Expand Up @@ -281,7 +281,7 @@ def remove_old_fields(cls, values: dict[str, Any]) -> dict[str, Any]:
paused: bool = Field(
default=False, description="Whether or not the deployment is paused."
)
schedules: List[DeploymentScheduleCreate] = Field(
schedules: List[DeploymentScheduleUpdate] = Field(
default_factory=list,
description="A list of schedules for the deployment.",
)
Expand Down
Loading
Loading