Skip to content

Commit

Permalink
Migrate some pendulum usage to internal shim (#17074)
Browse files Browse the repository at this point in the history
  • Loading branch information
desertaxle authored Feb 11, 2025
1 parent b19a7cb commit 955378c
Show file tree
Hide file tree
Showing 10 changed files with 69 additions and 58 deletions.
40 changes: 20 additions & 20 deletions src/prefect/server/database/query_components.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
)
from uuid import UUID

import pendulum
import sqlalchemy as sa
from cachetools import Cache, TTLCache
from jinja2 import Environment, PackageLoader, select_autoescape
Expand All @@ -35,6 +34,7 @@
from prefect.server.schemas.states import StateType
from prefect.server.utilities.database import UUID as UUIDTypeDecorator
from prefect.server.utilities.database import Timestamp, bindparams_from_clause
from prefect.types._datetime import DateTime

T = TypeVar("T", infer_variance=True)

Expand All @@ -51,7 +51,7 @@ class FlowRunNotificationsFromQueue(NamedTuple):
flow_run_parameters: dict[str, Any]
flow_run_state_type: StateType
flow_run_state_name: str
flow_run_state_timestamp: pendulum.DateTime
flow_run_state_timestamp: DateTime
flow_run_state_message: Optional[str]


Expand All @@ -60,8 +60,8 @@ class FlowRunGraphV2Node(NamedTuple):
id: UUID
label: str
state_type: StateType
start_time: pendulum.DateTime
end_time: Optional[pendulum.DateTime]
start_time: DateTime
end_time: Optional[DateTime]
parent_ids: Optional[list[UUID]]
child_ids: Optional[list[UUID]]
encapsulating_ids: Optional[list[UUID]]
Expand Down Expand Up @@ -126,10 +126,10 @@ def json_arr_agg(self, json_array: sa.ColumnElement[Any]) -> sa.ColumnElement[An
@abstractmethod
def make_timestamp_intervals(
self,
start_time: pendulum.DateTime,
end_time: pendulum.DateTime,
start_time: DateTime,
end_time: DateTime,
interval: datetime.timedelta,
) -> sa.Select[tuple[pendulum.DateTime, pendulum.DateTime]]: ...
) -> sa.Select[tuple[DateTime, DateTime]]: ...

@abstractmethod
def set_state_id_on_inserted_flow_runs_statement(
Expand Down Expand Up @@ -204,7 +204,7 @@ def get_scheduled_flow_runs_from_work_queues(
db: PrefectDBInterface,
limit_per_queue: Optional[int] = None,
work_queue_ids: Optional[list[UUID]] = None,
scheduled_before: Optional[pendulum.DateTime] = None,
scheduled_before: Optional[DateTime] = None,
) -> sa.Select[tuple[orm_models.FlowRun, UUID]]:
"""
Returns all scheduled runs in work queues, subject to provided parameters.
Expand Down Expand Up @@ -283,7 +283,7 @@ def _get_scheduled_flow_runs_join(
db: PrefectDBInterface,
work_queue_query: sa.CTE,
limit_per_queue: Optional[int],
scheduled_before: Optional[pendulum.DateTime],
scheduled_before: Optional[DateTime],
) -> tuple[sa.FromClause, sa.ColumnExpressionArgument[bool]]:
"""Used by self.get_scheduled_flow_runs_from_work_queue, allowing just
this function to be changed on a per-dialect basis"""
Expand Down Expand Up @@ -340,8 +340,8 @@ async def get_scheduled_flow_runs_from_work_pool(
queue_limit: Optional[int] = None,
work_pool_ids: Optional[list[UUID]] = None,
work_queue_ids: Optional[list[UUID]] = None,
scheduled_before: Optional[pendulum.DateTime] = None,
scheduled_after: Optional[pendulum.DateTime] = None,
scheduled_before: Optional[DateTime] = None,
scheduled_after: Optional[DateTime] = None,
respect_queue_priorities: bool = False,
) -> list[schemas.responses.WorkerFlowRunResponse]:
template = jinja_env.get_template(
Expand Down Expand Up @@ -473,7 +473,7 @@ def _build_flow_run_graph_v2_query(self) -> sa.Select[FlowRunGraphV2Node]:
The query must accept the following bind parameters:
flow_run_id: UUID
since: pendulum.DateTime
since: DateTime
max_nodes: int
"""
Expand All @@ -484,7 +484,7 @@ async def flow_run_graph_v2(
db: PrefectDBInterface,
session: AsyncSession,
flow_run_id: UUID,
since: pendulum.DateTime,
since: DateTime,
max_nodes: int,
max_artifacts: int,
) -> Graph:
Expand Down Expand Up @@ -643,10 +643,10 @@ def json_arr_agg(self, json_array: sa.ColumnElement[Any]) -> sa.ColumnElement[An

def make_timestamp_intervals(
self,
start_time: pendulum.DateTime,
end_time: pendulum.DateTime,
start_time: DateTime,
end_time: DateTime,
interval: datetime.timedelta,
) -> sa.Select[tuple[pendulum.DateTime, pendulum.DateTime]]:
) -> sa.Select[tuple[DateTime, DateTime]]:
dt = sa.func.generate_series(
start_time, end_time, interval, type_=Timestamp()
).column_valued("dt")
Expand Down Expand Up @@ -959,10 +959,10 @@ def json_arr_agg(self, json_array: sa.ColumnElement[Any]) -> sa.ColumnElement[An

def make_timestamp_intervals(
self,
start_time: pendulum.DateTime,
end_time: pendulum.DateTime,
start_time: DateTime,
end_time: DateTime,
interval: datetime.timedelta,
) -> sa.Select[tuple[pendulum.DateTime, pendulum.DateTime]]:
) -> sa.Select[tuple[DateTime, DateTime]]:
start = sa.bindparam("start_time", start_time, Timestamp)
# subtract interval because recursive where clauses are effectively evaluated on a t-1 lag
stop = sa.bindparam("end_time", end_time - interval, Timestamp)
Expand Down Expand Up @@ -1091,7 +1091,7 @@ def _get_scheduled_flow_runs_join(
db: PrefectDBInterface,
work_queue_query: sa.CTE,
limit_per_queue: Optional[int],
scheduled_before: Optional[pendulum.DateTime],
scheduled_before: Optional[DateTime],
) -> tuple[sa.FromClause, sa.ColumnExpressionArgument[bool]]:
# precompute for readability
FlowRun = db.FlowRun
Expand Down
19 changes: 10 additions & 9 deletions src/prefect/server/utilities/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
overload,
)

import pendulum
import pydantic
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql, sqlite
Expand All @@ -48,6 +47,8 @@
TypeVar,
)

from prefect.types._datetime import DateTime

P = ParamSpec("P")
R = TypeVar("R", infer_variance=True)
T = TypeVar("T", infer_variance=True)
Expand Down Expand Up @@ -128,7 +129,7 @@ def generate_uuid_sqlite(
"""


class Timestamp(TypeDecorator[pendulum.DateTime]):
class Timestamp(TypeDecorator[DateTime]):
"""TypeDecorator that ensures that timestamps have a timezone.
For SQLite, all timestamps are converted to UTC (since they are stored
Expand All @@ -152,27 +153,27 @@ def load_dialect_impl(self, dialect: sa.Dialect) -> TypeEngine[Any]:

def process_bind_param(
self,
value: Optional[pendulum.DateTime],
value: Optional[DateTime],
dialect: sa.Dialect,
) -> Optional[pendulum.DateTime]:
) -> Optional[DateTime]:
if value is None:
return None
else:
if value.tzinfo is None:
raise ValueError("Timestamps must have a timezone.")
elif dialect.name == "sqlite":
return pendulum.instance(value).in_timezone("UTC")
return DateTime.instance(value).in_timezone("UTC")
else:
return value

def process_result_value(
self,
value: Optional[Union[datetime.datetime, pendulum.DateTime]],
value: Optional[Union[datetime.datetime, DateTime]],
dialect: sa.Dialect,
) -> Optional[pendulum.DateTime]:
) -> Optional[DateTime]:
# retrieve timestamps in their native timezone (or UTC)
if value is not None:
return pendulum.instance(value).in_timezone("UTC")
return DateTime.instance(value).in_timezone("UTC")


class UUID(TypeDecorator[uuid.UUID]):
Expand Down Expand Up @@ -341,7 +342,7 @@ def bindparams_from_clause(
# Platform-independent datetime and timedelta arithmetic functions


class date_add(functions.GenericFunction[pendulum.DateTime]):
class date_add(functions.GenericFunction[DateTime]):
"""Platform-independent way to add a timestamp and an interval"""

type: Timestamp = Timestamp()
Expand Down
7 changes: 3 additions & 4 deletions src/prefect/server/utilities/schemas/bases.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@
from typing import TYPE_CHECKING, Any, ClassVar, Optional, TypeVar
from uuid import UUID, uuid4

import pendulum
from pydantic import BaseModel, ConfigDict, Field
from pydantic.config import JsonDict
from typing_extensions import Self

from prefect.types import DateTime
from prefect.types._datetime import DateTime, human_friendly_diff

if TYPE_CHECKING:
from pydantic.main import IncEx
Expand Down Expand Up @@ -108,9 +107,9 @@ def __rich_repr__(self) -> "RichReprResult":
value = str(value)
elif isinstance(value, datetime.datetime):
value = (
pendulum.instance(value).isoformat()
value.isoformat()
if name == "timestamp"
else pendulum.instance(value).diff_for_humans()
else human_friendly_diff(value)
)

yield name, value, field.get_default()
Expand Down
6 changes: 3 additions & 3 deletions src/prefect/states.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
UnfinishedRun,
)
from prefect.logging.loggers import get_logger, get_run_logger
from prefect.types._datetime import DateTime, PendulumDuration
from prefect.types._datetime import DateTime, Duration, now
from prefect.utilities.annotations import BaseAnnotation
from prefect.utilities.asyncutils import in_async_main_thread, sync_compatible
from prefect.utilities.collections import ensure_iterable
Expand Down Expand Up @@ -660,7 +660,7 @@ def Scheduled(
"""
state_details = StateDetails.model_validate(kwargs.pop("state_details", {}))
if scheduled_time is None:
scheduled_time = DateTime.now("UTC")
scheduled_time = now()
elif state_details.scheduled_time:
raise ValueError("An extra scheduled_time was provided in state_details")
state_details.scheduled_time = scheduled_time
Expand Down Expand Up @@ -761,7 +761,7 @@ def Paused(
state_details.pause_timeout = (
DateTime.instance(pause_expiration_time)
if pause_expiration_time
else DateTime.now("UTC") + PendulumDuration(seconds=timeout_seconds or 0)
else now() + Duration(seconds=timeout_seconds or 0)
)

state_details.pause_reschedule = reschedule
Expand Down
6 changes: 3 additions & 3 deletions src/prefect/task_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
)
from prefect.telemetry.run_telemetry import RunTelemetry
from prefect.transactions import IsolationLevel, Transaction, transaction
from prefect.types._datetime import DateTime, PendulumDuration
from prefect.types._datetime import DateTime, Duration
from prefect.utilities._engine import get_hook_name
from prefect.utilities.annotations import NotSet
from prefect.utilities.asyncutils import run_coro_as_sync
Expand Down Expand Up @@ -437,7 +437,7 @@ def set_state(self, state: State[R], force: bool = False) -> State[R]:
if last_state.timestamp == new_state.timestamp:
# Ensure that the state timestamp is unique, or at least not equal to the last state.
# This might occur especially on Windows where the timestamp resolution is limited.
new_state.timestamp += PendulumDuration(microseconds=1)
new_state.timestamp += Duration(microseconds=1)

# Ensure that the state_details are populated with the current run IDs
new_state.state_details.task_run_id = self.task_run.id
Expand Down Expand Up @@ -970,7 +970,7 @@ async def set_state(self, state: State, force: bool = False) -> State:
if last_state.timestamp == new_state.timestamp:
# Ensure that the state timestamp is unique, or at least not equal to the last state.
# This might occur especially on Windows where the timestamp resolution is limited.
new_state.timestamp += PendulumDuration(microseconds=1)
new_state.timestamp += Duration(microseconds=1)

# Ensure that the state_details are populated with the current run IDs
new_state.state_details.task_run_id = self.task_run.id
Expand Down
8 changes: 4 additions & 4 deletions src/prefect/testing/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import anyio
import httpx
import pendulum
import pytest
from starlette.status import WS_1008_POLICY_VIOLATION
from websockets.exceptions import ConnectionClosed
Expand All @@ -34,6 +33,7 @@
temporary_settings,
)
from prefect.testing.utilities import AsyncMock
from prefect.types._datetime import DateTime
from prefect.utilities.asyncutils import sync_compatible
from prefect.utilities.processutils import open_process

Expand Down Expand Up @@ -192,7 +192,7 @@ def mock_anyio_sleep(
Provides "assert_sleeps_for" context manager which asserts a sleep time occurred
within the context while using the actual runtime of the context as a tolerance.
"""
original_now = pendulum.now
original_now = DateTime.now
original_sleep = anyio.sleep
time_shift = 0.0

Expand All @@ -202,15 +202,15 @@ async def callback(delay_in_seconds: float) -> None:
# Preserve yield effects of sleep
await original_sleep(0)

def latest_now(*args: Any) -> pendulum.DateTime:
def latest_now(*args: Any) -> DateTime:
# Fast-forwards the time by the total sleep time
return original_now(*args).add(
# Ensure we retain float precision
seconds=int(time_shift),
microseconds=int((time_shift - int(time_shift)) * 1000000),
)

monkeypatch.setattr("pendulum.now", latest_now)
monkeypatch.setattr("prefect.types._datetime.now", latest_now)

sleep = AsyncMock(side_effect=callback)
monkeypatch.setattr("anyio.sleep", sleep)
Expand Down
11 changes: 11 additions & 0 deletions src/prefect/types/_datetime.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,14 @@ def from_format(
locale: str | None = None,
) -> DateTime:
return DateTime.instance(pendulum.from_format(value, fmt, tz, locale))


def human_friendly_diff(dt: DateTime | datetime.datetime) -> str:
if isinstance(dt, DateTime):
return dt.diff_for_humans()
else:
return DateTime.instance(dt).diff_for_humans()


def now(tz: str | Timezone = pendulum.tz.UTC) -> DateTime:
return DateTime.now(tz)
4 changes: 2 additions & 2 deletions src/prefect/utilities/dockerutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
from typing import TYPE_CHECKING, Any, Optional, TextIO, Union, cast
from urllib.parse import urlsplit

import pendulum
from packaging.version import Version
from typing_extensions import Self

import prefect
from prefect.types._datetime import now
from prefect.utilities.importtools import lazy_import
from prefect.utilities.slugify import slugify

Expand Down Expand Up @@ -428,7 +428,7 @@ def push_image(
"""

if not tag:
tag = slugify(pendulum.now("utc").isoformat())
tag = slugify(now("utc").isoformat())

_, registry, _, _, _ = urlsplit(registry_url)
repository = f"{registry}/{name}"
Expand Down
Loading

0 comments on commit 955378c

Please sign in to comment.