Skip to content

Commit

Permalink
Merge branch 'main' into fastapi-memory-optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
zzstoatzz authored Jan 8, 2025
2 parents 41ff850 + d58ecba commit 13312c3
Show file tree
Hide file tree
Showing 15 changed files with 507 additions and 253 deletions.
2 changes: 1 addition & 1 deletion compat-tests
6 changes: 3 additions & 3 deletions src/prefect/blocks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# ensure core blocks are registered

import prefect.blocks.notifications
import prefect.blocks.system
import prefect.blocks.webhook
import prefect.blocks.notifications as notifications
import prefect.blocks.system as system
import prefect.blocks.webhook as webhook

__all__ = ["notifications", "system", "webhook"]
83 changes: 53 additions & 30 deletions src/prefect/blocks/abstract.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
from __future__ import annotations

import logging
import sys
from abc import ABC, abstractmethod
from contextlib import contextmanager
from logging import Logger, LoggerAdapter
from logging import Logger
from pathlib import Path
from typing import (
Any,
BinaryIO,
Dict,
Generator,
Generic,
List,
Optional,
Tuple,
TypeVar,
Union,
)
Expand All @@ -23,7 +23,12 @@

T = TypeVar("T")

LoggerOrAdapter: TypeAlias = Union[Logger, LoggerAdapter]
if sys.version_info >= (3, 12):
LoggingAdapter = logging.LoggerAdapter[logging.Logger]
else:
LoggingAdapter = logging.LoggerAdapter

LoggerOrAdapter: TypeAlias = Union[Logger, LoggingAdapter]


class CredentialsBlock(Block, ABC):
Expand Down Expand Up @@ -52,7 +57,7 @@ def logger(self) -> LoggerOrAdapter:
return get_logger(self.__class__.__name__)

@abstractmethod
def get_client(self, *args, **kwargs):
def get_client(self, *args: Any, **kwargs: Any) -> Any:
"""
Returns a client for interacting with the external system.
Expand Down Expand Up @@ -94,7 +99,7 @@ def logger(self) -> LoggerOrAdapter:
return get_logger(self.__class__.__name__)

@abstractmethod
async def notify(self, body: str, subject: Optional[str] = None) -> None:
async def notify(self, body: str, subject: str | None = None) -> None:
"""
Send a notification.
Expand Down Expand Up @@ -153,7 +158,7 @@ async def fetch_result(self) -> T:
"""


class JobBlock(Block, ABC):
class JobBlock(Block, ABC, Generic[T]):
"""
Block that represents an entity in an external service
that can trigger a long running execution.
Expand All @@ -176,7 +181,7 @@ def logger(self) -> LoggerOrAdapter:
return get_logger(self.__class__.__name__)

@abstractmethod
async def trigger(self) -> JobRun:
async def trigger(self) -> JobRun[T]:
"""
Triggers a job run in an external service and returns a JobRun object
to track the execution of the run.
Expand Down Expand Up @@ -221,8 +226,11 @@ def logger(self) -> LoggerOrAdapter:

@abstractmethod
async def fetch_one(
self, operation, parameters=None, **execution_kwargs
) -> Tuple[Any]:
self,
operation: str,
parameters: dict[str, Any] | None = None,
**execution_kwargs: Any,
) -> tuple[Any, ...]:
"""
Fetch a single result from the database.
Expand All @@ -238,8 +246,12 @@ async def fetch_one(

@abstractmethod
async def fetch_many(
self, operation, parameters=None, size=None, **execution_kwargs
) -> List[Tuple[Any]]:
self,
operation: str,
parameters: dict[str, Any] | None = None,
size: int | None = None,
**execution_kwargs: Any,
) -> list[tuple[Any, ...]]:
"""
Fetch a limited number of results from the database.
Expand All @@ -256,8 +268,11 @@ async def fetch_many(

@abstractmethod
async def fetch_all(
self, operation, parameters=None, **execution_kwargs
) -> List[Tuple[Any]]:
self,
operation: str,
parameters: dict[str, Any] | None = None,
**execution_kwargs: Any,
) -> list[tuple[Any, ...]]:
"""
Fetch all results from the database.
Expand All @@ -272,7 +287,12 @@ async def fetch_all(
"""

@abstractmethod
async def execute(self, operation, parameters=None, **execution_kwargs) -> None:
async def execute(
self,
operation: str,
parameters: dict[str, Any] | None = None,
**execution_kwargs: Any,
) -> None:
"""
Executes an operation on the database. This method is intended to be used
for operations that do not return data, such as INSERT, UPDATE, or DELETE.
Expand All @@ -285,7 +305,10 @@ async def execute(self, operation, parameters=None, **execution_kwargs) -> None:

@abstractmethod
async def execute_many(
self, operation, seq_of_parameters, **execution_kwargs
self,
operation: str,
seq_of_parameters: list[dict[str, Any]],
**execution_kwargs: Any,
) -> None:
"""
Executes multiple operations on the database. This method is intended to be used
Expand All @@ -307,7 +330,7 @@ async def __aenter__(self) -> Self:
f"{self.__class__.__name__} does not support async context management."
)

async def __aexit__(self, *args) -> None:
async def __aexit__(self, *args: Any) -> None:
"""
Context management method for async databases.
"""
Expand All @@ -323,7 +346,7 @@ def __enter__(self) -> Self:
f"{self.__class__.__name__} does not support context management."
)

def __exit__(self, *args) -> None:
def __exit__(self, *args: Any) -> None:
"""
Context management method for databases.
"""
Expand Down Expand Up @@ -358,8 +381,8 @@ def logger(self) -> LoggerOrAdapter:
async def download_object_to_path(
self,
from_path: str,
to_path: Union[str, Path],
**download_kwargs: Dict[str, Any],
to_path: str | Path,
**download_kwargs: Any,
) -> Path:
"""
Downloads an object from the object storage service to a path.
Expand All @@ -378,7 +401,7 @@ async def download_object_to_file_object(
self,
from_path: str,
to_file_object: BinaryIO,
**download_kwargs: Dict[str, Any],
**download_kwargs: Any,
) -> BinaryIO:
"""
Downloads an object from the object storage service to a file-like object,
Expand All @@ -397,8 +420,8 @@ async def download_object_to_file_object(
async def download_folder_to_path(
self,
from_folder: str,
to_folder: Union[str, Path],
**download_kwargs: Dict[str, Any],
to_folder: str | Path,
**download_kwargs: Any,
) -> Path:
"""
Downloads a folder from the object storage service to a path.
Expand All @@ -414,7 +437,7 @@ async def download_folder_to_path(

@abstractmethod
async def upload_from_path(
self, from_path: Union[str, Path], to_path: str, **upload_kwargs: Dict[str, Any]
self, from_path: str | Path, to_path: str, **upload_kwargs: Any
) -> str:
"""
Uploads an object from a path to the object storage service.
Expand All @@ -430,7 +453,7 @@ async def upload_from_path(

@abstractmethod
async def upload_from_file_object(
self, from_file_object: BinaryIO, to_path: str, **upload_kwargs: Dict[str, Any]
self, from_file_object: BinaryIO, to_path: str, **upload_kwargs: Any
) -> str:
"""
Uploads an object to the object storage service from a file-like object,
Expand All @@ -448,9 +471,9 @@ async def upload_from_file_object(
@abstractmethod
async def upload_from_folder(
self,
from_folder: Union[str, Path],
from_folder: str | Path,
to_folder: str,
**upload_kwargs: Dict[str, Any],
**upload_kwargs: Any,
) -> str:
"""
Uploads a folder to the object storage service from a path.
Expand Down Expand Up @@ -496,7 +519,7 @@ async def read_secret(self) -> bytes:
"""

@abstractmethod
async def write_secret(self, secret_data) -> str:
async def write_secret(self, secret_data: bytes) -> str:
"""
Writes secret data to the configured secret in the secret storage service.
Expand Down
Loading

0 comments on commit 13312c3

Please sign in to comment.