From a2e1f2c5ee868dd836589000281b36a3264b209f Mon Sep 17 00:00:00 2001 From: Ivan Zubenko Date: Tue, 6 Aug 2024 08:58:39 +0300 Subject: [PATCH] ENG-264 add storage metrics api (#768) --- .github/workflows/ci.yaml | 2 - Makefile | 6 - platform_storage_api/api.py | 26 ++-- platform_storage_api/aws.py | 47 +++++++ platform_storage_api/config.py | 87 +++++++++--- platform_storage_api/metrics.py | 63 +++++++++ platform_storage_api/s3_storage.py | 109 +++++++++++++++ platform_storage_api/security.py | 6 +- platform_storage_api/storage_usage.py | 55 ++++++-- platform_storage_api/worker.py | 124 ++++++++++++++++++ setup.cfg | 12 +- tests/integration/auth.py | 109 +-------------- tests/integration/conftest.py | 100 ++++++++++++-- tests/integration/conftest_moto.py | 61 +++++++++ tests/integration/docker.py | 16 +-- tests/integration/docker/docker-compose.yml | 26 ++++ .../docker/platform-auth/Dockerfile | 3 + tests/integration/test_metrics.py | 105 +++++++++++++++ tests/integration/test_worker.py | 35 +++++ tests/unit/test_config.py | 42 +++--- tests/unit/test_storage_usage.py | 45 ++++++- 21 files changed, 881 insertions(+), 198 deletions(-) create mode 100644 platform_storage_api/aws.py create mode 100644 platform_storage_api/metrics.py create mode 100644 platform_storage_api/s3_storage.py create mode 100644 platform_storage_api/worker.py create mode 100644 tests/integration/conftest_moto.py create mode 100644 tests/integration/docker/docker-compose.yml create mode 100644 tests/integration/docker/platform-auth/Dockerfile create mode 100644 tests/integration/test_metrics.py create mode 100644 tests/integration/test_worker.py diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 497a8b11..a496e1fe 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -43,8 +43,6 @@ jobs: make lint - name: Run unit tests run: make test_unit - - name: Pull test images - run: make docker_pull_test_images - name: Run integration tests run: make test_integration - name: Build Docker image diff --git a/Makefile b/Makefile index d3bd8f54..e60d0c96 100644 --- a/Makefile +++ b/Makefile @@ -20,8 +20,6 @@ HELM_APP_VERSION ?= 1.0.0 export IMAGE_REPO_BASE -PLATFORMAUTHAPI_IMAGE = $(shell cat AUTH_SERVER_IMAGE_NAME) - setup: pip install -U pip pip install -e .[dev] @@ -55,7 +53,3 @@ docker_build: pip install -U build python -m build docker build -t $(IMAGE_NAME):latest . - -docker_pull_test_images: - docker pull $(PLATFORMAUTHAPI_IMAGE) - docker tag $(PLATFORMAUTHAPI_IMAGE) platformauthapi:latest diff --git a/platform_storage_api/api.py b/platform_storage_api/api.py index 7e866007..77e4b298 100644 --- a/platform_storage_api/api.py +++ b/platform_storage_api/api.py @@ -32,6 +32,7 @@ FileStatus, FileStatusPermission, FileStatusType, + FileSystem, LocalFileSystem, ) from .security import ( @@ -838,6 +839,16 @@ async def add_version_to_header(request: Request, response: web.StreamResponse) response.headers["X-Service-Version"] = f"platform-storage-api/{package_version}" +def create_path_resolver(config: Config, fs: FileSystem) -> StoragePathResolver: + if config.storage.mode == StorageMode.SINGLE: + return SingleStoragePathResolver(config.storage.fs_local_base_path) + return MultipleStoragePathResolver( + fs, + config.storage.fs_local_base_path, + config.storage.fs_local_base_path / config.platform.cluster_name, + ) + + async def create_app(config: Config) -> web.Application: app = web.Application( middlewares=[handle_exceptions], @@ -850,7 +861,7 @@ async def _init_app(app: web.Application) -> AsyncIterator[None]: logger.info("Initializing Auth Client For Storage API") auth_client = await exit_stack.enter_async_context( - AuthClient(config.auth.server_endpoint_url, config.auth.service_token) + AuthClient(config.platform.auth_url, config.platform.token) ) await setup_security( @@ -861,7 +872,7 @@ async def _init_app(app: web.Application) -> AsyncIterator[None]: logger.info( f"Auth Client for Storage API Initialized. " - f"URL={config.auth.server_endpoint_url}" + f"URL={config.platform.auth_url}" ) fs = await exit_stack.enter_async_context( @@ -869,16 +880,7 @@ async def _init_app(app: web.Application) -> AsyncIterator[None]: executor_max_workers=config.storage.fs_local_thread_pool_size ) ) - if config.storage.mode == StorageMode.SINGLE: - path_resolver: StoragePathResolver = SingleStoragePathResolver( - config.storage.fs_local_base_path - ) - else: - path_resolver = MultipleStoragePathResolver( - fs, - config.storage.fs_local_base_path, - config.storage.fs_local_base_path / config.cluster_name, - ) + path_resolver = create_path_resolver(config, fs) storage = Storage(path_resolver, fs) app[API_V1_KEY][STORAGE_KEY] = storage diff --git a/platform_storage_api/aws.py b/platform_storage_api/aws.py new file mode 100644 index 00000000..60380c23 --- /dev/null +++ b/platform_storage_api/aws.py @@ -0,0 +1,47 @@ +from collections.abc import AsyncIterator, Iterator +from contextlib import asynccontextmanager, contextmanager +from typing import Any + +import aiobotocore +import aiobotocore.client +import aiobotocore.session +import botocore +import botocore.client +import botocore.session + +from .config import AWSConfig + + +@contextmanager +def create_s3_client( + session: botocore.session.Session, config: AWSConfig +) -> Iterator[botocore.client.BaseClient]: + client = session.create_client("s3", **_create_s3_client_kwargs(config)) + yield client + client.close() + + +@asynccontextmanager +async def create_async_s3_client( + session: aiobotocore.session.AioSession, config: AWSConfig +) -> AsyncIterator[aiobotocore.client.AioBaseClient]: + async with session.create_client( + "s3", **_create_s3_client_kwargs(config) + ) as client: + yield client + + +def _create_s3_client_kwargs(config: AWSConfig) -> dict[str, Any]: + kwargs: dict[str, Any] = { + "region_name": config.region, + "config": botocore.config.Config( + retries={"mode": "standard"}, # 3 retries by default + ), + } + if config.access_key_id: + kwargs["aws_access_key_id"] = config.access_key_id + if config.secret_access_key: + kwargs["aws_secret_access_key"] = config.secret_access_key + if config.s3_endpoint_url: + kwargs["endpoint_url"] = config.s3_endpoint_url + return kwargs diff --git a/platform_storage_api/config.py b/platform_storage_api/config.py index 88bdec57..355c6e5a 100644 --- a/platform_storage_api/config.py +++ b/platform_storage_api/config.py @@ -11,18 +11,26 @@ class ServerConfig: host: str = "0.0.0.0" port: int = 8080 + + +@dataclass(frozen=True) +class StorageServerConfig(ServerConfig): name: str = "Storage API" keep_alive_timeout_s: float = 75 @classmethod - def from_environ(cls, environ: Optional[dict[str, str]] = None) -> "ServerConfig": - return EnvironConfigFactory(environ).create_server() + def from_environ( + cls, environ: Optional[dict[str, str]] = None + ) -> "StorageServerConfig": + return EnvironConfigFactory(environ).create_storage_server() @dataclass(frozen=True) -class AuthConfig: - server_endpoint_url: Optional[URL] - service_token: str = field(repr=False) +class PlatformConfig: + auth_url: Optional[URL] + admin_url: Optional[URL] + token: str = field(repr=False) + cluster_name: str class StorageMode(str, enum.Enum): @@ -42,12 +50,21 @@ def from_environ(cls, environ: Optional[dict[str, str]] = None) -> "StorageConfi return EnvironConfigFactory(environ).create_storage() +@dataclass(frozen=True) +class AWSConfig: + region: str + metrics_s3_bucket_name: str + access_key_id: Optional[str] = field(repr=False, default=None) + secret_access_key: Optional[str] = field(repr=False, default=None) + s3_endpoint_url: Optional[str] = None + + @dataclass(frozen=True) class Config: - server: ServerConfig + server: StorageServerConfig storage: StorageConfig - auth: AuthConfig - cluster_name: str + platform: PlatformConfig + aws: AWSConfig permission_expiration_interval_s: float = 0 permission_forgetting_interval_s: float = 0 @@ -56,6 +73,12 @@ def from_environ(cls, environ: Optional[dict[str, str]] = None) -> "Config": return EnvironConfigFactory(environ).create() +@dataclass(frozen=True) +class MetricsConfig: + aws: AWSConfig + server: ServerConfig = ServerConfig() + + class EnvironConfigFactory: def __init__(self, environ: Optional[dict[str, str]] = None) -> None: self._environ = environ or os.environ @@ -84,25 +107,41 @@ def create_storage(self) -> StorageConfig: ) def create_server(self) -> ServerConfig: - port = int(self._environ.get("NP_STORAGE_API_PORT", ServerConfig.port)) + return ServerConfig( + host=self._environ.get("SERVER_HOST", ServerConfig.host), + port=int(self._environ.get("SERVER_PORT", ServerConfig.port)), + ) + + def create_storage_server(self) -> StorageServerConfig: + port = int(self._environ.get("NP_STORAGE_API_PORT", StorageServerConfig.port)) keep_alive_timeout_s = int( self._environ.get( - "NP_STORAGE_API_KEEP_ALIVE_TIMEOUT", ServerConfig.keep_alive_timeout_s + "NP_STORAGE_API_KEEP_ALIVE_TIMEOUT", + StorageServerConfig.keep_alive_timeout_s, ) ) - return ServerConfig(port=port, keep_alive_timeout_s=keep_alive_timeout_s) + return StorageServerConfig(port=port, keep_alive_timeout_s=keep_alive_timeout_s) + + def create_platform(self) -> PlatformConfig: + admin_url = self._get_url("NP_PLATFORM_ADMIN_URL") + if admin_url: + admin_url = admin_url / "apis/admin/v1" + return PlatformConfig( + auth_url=self._get_url("NP_PLATFORM_AUTH_URL"), + admin_url=admin_url, + token=self._environ["NP_PLATFORM_TOKEN"], + cluster_name=self._environ["NP_PLATFORM_CLUSTER_NAME"], + ) - def create_auth(self) -> AuthConfig: - url = self._get_url("NP_STORAGE_AUTH_URL") - token = self._environ["NP_STORAGE_AUTH_TOKEN"] - return AuthConfig(server_endpoint_url=url, service_token=token) + def create_aws(self) -> AWSConfig: + return AWSConfig( + region=self._environ["AWS_REGION"], + metrics_s3_bucket_name=self._environ["AWS_METRICS_S3_BUCKET_NAME"], + ) def create(self) -> Config: - server_config = self.create_server() + server_config = self.create_storage_server() storage_config = self.create_storage() - auth_config = self.create_auth() - cluster_name = self._environ["NP_CLUSTER_NAME"] - assert cluster_name permission_expiration_interval_s: float = float( self._environ.get( "NP_PERMISSION_EXPIRATION_INTERVAL", @@ -118,8 +157,14 @@ def create(self) -> Config: return Config( server=server_config, storage=storage_config, - auth=auth_config, - cluster_name=cluster_name, + platform=self.create_platform(), + aws=self.create_aws(), permission_expiration_interval_s=permission_expiration_interval_s, permission_forgetting_interval_s=permission_forgetting_interval_s, ) + + def create_metrics(self) -> MetricsConfig: + return MetricsConfig( + server=self.create_server(), + aws=self.create_aws(), + ) diff --git a/platform_storage_api/metrics.py b/platform_storage_api/metrics.py new file mode 100644 index 00000000..feb1d293 --- /dev/null +++ b/platform_storage_api/metrics.py @@ -0,0 +1,63 @@ +import logging +from collections.abc import Iterator +from contextlib import ExitStack, contextmanager + +import botocore +import botocore.client +import botocore.config +import botocore.session +from fastapi import FastAPI +from neuro_logging import init_logging +from prometheus_client import CollectorRegistry, make_asgi_app + +from .aws import create_s3_client +from .config import EnvironConfigFactory, MetricsConfig +from .s3_storage import StorageMetricsS3Storage +from .storage_usage import StorageUsageCollector + + +@contextmanager +def create_app(config: MetricsConfig) -> Iterator[FastAPI]: + with ExitStack() as exit_stack: + session = botocore.session.get_session() + s3_client = exit_stack.enter_context( + create_s3_client(session=session, config=config.aws) + ) + + storage_metrics_s3_storage = StorageMetricsS3Storage( + s3_client, config.aws.metrics_s3_bucket_name + ) + + collector = StorageUsageCollector( + config=config.aws, storage_metrics_s3_storage=storage_metrics_s3_storage + ) + registry = CollectorRegistry() + registry.register(collector) + + metrics_app = make_asgi_app(registry=registry) + app = FastAPI(debug=False) + + app.mount("/metrics", metrics_app) + + yield app + + +def main() -> None: + import uvicorn + + init_logging() + + config = EnvironConfigFactory().create_metrics() + logging.info("Loaded config: %r", config) + with create_app(config) as app: + uvicorn.run( + app, + host=config.server.host, + port=config.server.port, + proxy_headers=True, + log_config=None, + ) + + +if __name__ == "__main__": + main() diff --git a/platform_storage_api/s3_storage.py b/platform_storage_api/s3_storage.py new file mode 100644 index 00000000..838458f4 --- /dev/null +++ b/platform_storage_api/s3_storage.py @@ -0,0 +1,109 @@ +import functools +from collections.abc import Sequence +from typing import Optional, Union + +import aiobotocore.client +import botocore.client +import pydantic +from botocore.exceptions import ClientError + +from .storage_usage import StorageUsage + +_AWS_STORAGE_USAGE_KEY = "storage_usage.json" + + +@pydantic.dataclasses.dataclass(frozen=True) +class _StorageUsage: + @pydantic.dataclasses.dataclass(frozen=True) + class Project: + project_name: str + used: int + org_name: Optional[str] = None + + projects: Sequence[Project] + + +class _PayloadFactory: + @classmethod + def create_storage_usage(cls, storage_usage: StorageUsage) -> bytes: + data = _StorageUsage( + projects=[ + _StorageUsage.Project( + org_name=p.org_name, + project_name=p.project_name, + used=p.used, + ) + for p in storage_usage.projects + ] + ) + return pydantic.TypeAdapter(_StorageUsage).dump_json(data) + + +class _EntityFactory: + @classmethod + def create_storage_usage(cls, payload: Union[str, bytes]) -> StorageUsage: + storage_usage = pydantic.TypeAdapter(_StorageUsage).validate_json(payload) + return StorageUsage( + projects=[ + StorageUsage.Project( + org_name=p.org_name, + project_name=p.project_name, + used=p.used, + ) + for p in storage_usage.projects + ] + ) + + +class StorageMetricsS3Storage: + def __init__(self, s3_client: botocore.client.BaseClient, bucket_name: str) -> None: + self._s3_client = s3_client + self._bucket_name = bucket_name + + def get_storage_usage(self) -> StorageUsage: + try: + response = self._s3_client.get_object( + Bucket=self._bucket_name, Key=_AWS_STORAGE_USAGE_KEY + ) + except ClientError as err: + if err.response["ResponseMetadata"]["HTTPStatusCode"] == 404: + return StorageUsage(projects=[]) + with response["Body"]: + payload = response["Body"].read() + return _EntityFactory.create_storage_usage(payload) + + +class StorageMetricsAsyncS3Storage: + def __init__( + self, s3_client: aiobotocore.client.AioBaseClient, bucket_name: str + ) -> None: + self._s3_client = s3_client + self._bucket_name = bucket_name + + async def put_storage_usage(self, storage_usage: StorageUsage) -> None: + data = _PayloadFactory.create_storage_usage(storage_usage) + put_object = functools.partial( + self._s3_client.put_object, + Bucket=self._bucket_name, + Key=_AWS_STORAGE_USAGE_KEY, + Body=data, + ) + try: + await put_object() + except ClientError as err: + if err.response["ResponseMetadata"]["HTTPStatusCode"] != 404: + raise + await self._s3_client.create_bucket(Bucket=self._bucket_name) + await put_object() + + async def get_storage_usage(self) -> StorageUsage: + try: + response = await self._s3_client.get_object( + Bucket=self._bucket_name, Key=_AWS_STORAGE_USAGE_KEY + ) + except ClientError as err: + if err.response["ResponseMetadata"]["HTTPStatusCode"] == 404: + return StorageUsage(projects=[]) + async with response["Body"]: + payload = await response["Body"].read() + return _EntityFactory.create_storage_usage(payload) diff --git a/platform_storage_api/security.py b/platform_storage_api/security.py index 9f2d04e3..4bbf2386 100644 --- a/platform_storage_api/security.py +++ b/platform_storage_api/security.py @@ -50,10 +50,12 @@ def __init__(self, app: web.Application, config: Config) -> None: def _path_to_uri(self, target_path: PurePath) -> str: assert str(target_path)[0] == "/" - assert self._config.cluster_name + assert self._config.platform.cluster_name return str( URL.build( - scheme="storage", host=self._config.cluster_name, path=str(target_path) + scheme="storage", + host=self._config.platform.cluster_name, + path=str(target_path), ) ) diff --git a/platform_storage_api/storage_usage.py b/platform_storage_api/storage_usage.py index 3d8f8b48..78fc59b1 100644 --- a/platform_storage_api/storage_usage.py +++ b/platform_storage_api/storage_usage.py @@ -1,18 +1,24 @@ from __future__ import annotations -from collections.abc import Sequence +from collections.abc import Iterable, Sequence from dataclasses import dataclass from datetime import timezone from pathlib import PurePath -from typing import Optional +from typing import TYPE_CHECKING, Optional from neuro_admin_client import AdminClient +from prometheus_client.metrics_core import GaugeMetricFamily, Metric +from prometheus_client.registry import Collector +from .config import AWSConfig, Config from .fs.local import FileStatusType, FileSystem from .storage import StoragePathResolver UTC = timezone.utc +if TYPE_CHECKING: + from .s3_storage import StorageMetricsAsyncS3Storage, StorageMetricsS3Storage + @dataclass(frozen=True) class StorageUsage: @@ -35,15 +41,17 @@ class OrgProjectPath: class StorageUsageService: def __init__( self, - path_resolver: StoragePathResolver, - fs: FileSystem, + config: Config, admin_client: AdminClient, - cluster_name: str, + storage_metrics_s3_storage: StorageMetricsAsyncS3Storage, + fs: FileSystem, + path_resolver: StoragePathResolver, ) -> None: + self._config = config + self._admin_client = admin_client + self._storage_metrics_s3_storage = storage_metrics_s3_storage self._fs = fs self._path_resolver = path_resolver - self._admin_client = admin_client - self._cluster_name = cluster_name async def get_storage_usage(self) -> StorageUsage: org_project_paths = await self._get_org_project_paths() @@ -63,7 +71,9 @@ async def get_storage_usage(self) -> StorageUsage: ) async def _get_org_project_paths(self) -> list[OrgProjectPath]: - org_clusters = await self._admin_client.list_org_clusters(self._cluster_name) + org_clusters = await self._admin_client.list_org_clusters( + self._config.platform.cluster_name + ) org_names = {org_cluster.org_name for org_cluster in org_clusters} result = await self._get_no_org_project_paths(org_names) for org_cluster in org_clusters: @@ -91,7 +101,7 @@ async def _get_no_org_project_paths( ) -> list[OrgProjectPath]: result = [] no_org_path = await self._path_resolver.resolve_base_path( - PurePath(f"/{self._cluster_name}") + PurePath(f"/{self._config.platform.cluster_name}") ) async with self._fs.iterstatus(no_org_path) as statuses: async for status in statuses: @@ -107,3 +117,30 @@ async def _get_no_org_project_paths( ) ) return result + + async def upload_storage_usage(self) -> None: + storage_usage = await self.get_storage_usage() + await self._storage_metrics_s3_storage.put_storage_usage(storage_usage) + + +class StorageUsageCollector(Collector): + def __init__( + self, config: AWSConfig, storage_metrics_s3_storage: StorageMetricsS3Storage + ) -> None: + super().__init__() + + self._config = config + self._storage_metrics_s3_storage = storage_metrics_s3_storage + + def collect(self) -> Iterable[Metric]: + storage_usage = self._storage_metrics_s3_storage.get_storage_usage() + metric_family = GaugeMetricFamily( + "storage_used_bytes", + "The amount of used storage space in bytes", + labels=["org_name", "project_name"], + ) + for project in storage_usage.projects: + metric_family.add_metric( + [project.org_name or "no_org", project.project_name], project.used + ) + yield metric_family diff --git a/platform_storage_api/worker.py b/platform_storage_api/worker.py new file mode 100644 index 00000000..c523b6c8 --- /dev/null +++ b/platform_storage_api/worker.py @@ -0,0 +1,124 @@ +from __future__ import annotations + +import asyncio +import logging +import signal +from collections.abc import AsyncIterator +from contextlib import AsyncExitStack, asynccontextmanager +from dataclasses import dataclass +from typing import NoReturn + +import aiobotocore +import aiobotocore.session +from neuro_admin_client import AdminClient +from neuro_logging import init_logging + +from .aws import create_async_s3_client +from .config import Config, EnvironConfigFactory, StorageMode +from .fs.local import FileSystem, LocalFileSystem +from .s3_storage import StorageMetricsAsyncS3Storage +from .storage import ( + MultipleStoragePathResolver, + SingleStoragePathResolver, + StoragePathResolver, +) +from .storage_usage import StorageUsageService + +LOGGER = logging.getLogger(__name__) + + +@dataclass +class App: + storage_usage_service: StorageUsageService + + async def upload_storage_usage(self) -> None: + await self.storage_usage_service.upload_storage_usage() + + +def create_path_resolver(config: Config, fs: FileSystem) -> StoragePathResolver: + if config.storage.mode == StorageMode.SINGLE: + return SingleStoragePathResolver(config.storage.fs_local_base_path) + return MultipleStoragePathResolver( + fs, + config.storage.fs_local_base_path, + config.storage.fs_local_base_path / config.platform.cluster_name, + ) + + +@asynccontextmanager +async def create_app(config: Config) -> AsyncIterator[App]: + async with AsyncExitStack() as exit_stack: + session = aiobotocore.session.get_session() + s3_client = await exit_stack.enter_async_context( + create_async_s3_client(session, config.aws) + ) + + storage_metrics_s3_storage = StorageMetricsAsyncS3Storage( + s3_client, config.aws.metrics_s3_bucket_name + ) + + admin_client = await exit_stack.enter_async_context( + AdminClient( + base_url=config.platform.admin_url, + service_token=config.platform.token, + ) + ) + + fs = await exit_stack.enter_async_context( + LocalFileSystem( + executor_max_workers=config.storage.fs_local_thread_pool_size + ) + ) + + path_resolver = create_path_resolver(config, fs) + + storage_usage_service = StorageUsageService( + config=config, + admin_client=admin_client, + storage_metrics_s3_storage=storage_metrics_s3_storage, + fs=fs, + path_resolver=path_resolver, + ) + + yield App( + storage_usage_service=storage_usage_service, + ) + + +class GracefulExitError(SystemExit): + code = 1 + + +def _raise_graceful_exit() -> NoReturn: + raise GracefulExitError() + + +def setup() -> None: + loop = asyncio.get_event_loop() + loop.add_signal_handler(signal.SIGINT, _raise_graceful_exit) + loop.add_signal_handler(signal.SIGTERM, _raise_graceful_exit) + + +def cleanup() -> None: + loop = asyncio.get_event_loop() + loop.remove_signal_handler(signal.SIGINT) + loop.remove_signal_handler(signal.SIGTERM) + + +async def run(config: Config) -> None: + setup() + + try: + async with create_app(config) as app: + await app.upload_storage_usage() + finally: + cleanup() + + +def main() -> None: + init_logging() + + config = EnvironConfigFactory().create() + LOGGER.info("Loaded config: %s", config) + + asyncio.run(run(config)) diff --git a/setup.cfg b/setup.cfg index e063ed2c..534f257f 100644 --- a/setup.cfg +++ b/setup.cfg @@ -27,6 +27,8 @@ install_requires = pydantic==2.8.2 prometheus-client==0.20.0 fastapi==0.112.0 + aiobotocore==2.13.1 + uvicorn==0.30.5 [options.entry_points] console_scripts = @@ -38,9 +40,11 @@ dev = pre-commit==3.8.0 docker==7.1.0 pytest==8.3.2 - pytest-asyncio==0.23.8 + pytest-asyncio==0.21.2 pdbpp==0.10.3 aioresponses==0.7.6 + pytest-docker==3.1.1 + pytest-aiohttp==1.0.5 [flake8] max-line-length = 88 @@ -101,3 +105,9 @@ ignore_missing_imports = true [mypy-prometheus_client.*] ignore_missing_imports = true + +[mypy-aiobotocore.*] +ignore_missing_imports = true + +[mypy-botocore.*] +ignore_missing_imports = true diff --git a/tests/integration/auth.py b/tests/integration/auth.py index c233c01f..fea6a06d 100644 --- a/tests/integration/auth.py +++ b/tests/integration/auth.py @@ -1,37 +1,25 @@ from __future__ import annotations -import asyncio import logging import os import uuid -from collections.abc import AsyncIterator, Awaitable, Callable, Iterator +from collections.abc import AsyncIterator, Awaitable, Callable from dataclasses import dataclass from typing import Any -import aiohttp import pytest -from async_timeout import timeout -from docker import DockerClient -from docker.errors import NotFound as ContainerNotFound -from docker.models.containers import Container from jose import jwt from neuro_auth_client import AuthClient, User +from pytest_docker.plugin import Services from yarl import URL -from platform_storage_api.config import AuthConfig - logger = logging.getLogger(__name__) @pytest.fixture(scope="session") -def auth_image() -> str: - with open("AUTH_SERVER_IMAGE_NAME") as f: - return f.read().strip() - - -@pytest.fixture(scope="session") -def auth_name() -> str: - return "platform-storage-api-auth" +def auth_server(docker_ip: str, docker_services: Services) -> URL: + port = docker_services.port_for("platform-auth", 8080) + return URL(f"http://{docker_ip}:{port}") @pytest.fixture(scope="session") @@ -39,88 +27,6 @@ def auth_jwt_secret() -> str: return os.environ.get("NP_JWT_SECRET", "secret") -def _create_url(container: Container, in_docker: bool) -> URL: - exposed_port = 8080 - if in_docker: - host, port = container.attrs["NetworkSettings"]["IPAddress"], exposed_port - else: - host, port = "0.0.0.0", container.ports[f"{exposed_port}/tcp"][0]["HostPort"] - return URL(f"http://{host}:{port}") - - -@pytest.fixture(scope="session") -def _auth_url() -> URL: - return URL(os.environ.get("AUTH_URL", "")) - - -@pytest.fixture(scope="session") -def _auth_server( - docker_client: DockerClient, - in_docker: bool, - reuse_docker: bool, - auth_image: str, - auth_name: str, - auth_jwt_secret: str, - _auth_url: URL, -) -> Iterator[URL]: - - if _auth_url: - yield _auth_url - return - - try: - container = docker_client.containers.get(auth_name) - if reuse_docker: - yield _create_url(container, in_docker) - return - else: - container.remove(force=True) - except ContainerNotFound: - pass - - # `run` performs implicit `pull` - container = docker_client.containers.run( - image=auth_image, - name=auth_name, - publish_all_ports=True, - stdout=False, - stderr=False, - detach=True, - environment={"NP_JWT_SECRET": auth_jwt_secret}, - ) - container.reload() - - yield _create_url(container, in_docker) - - if not reuse_docker: - container.remove(force=True) - - -async def wait_for_auth_server( - url: URL, timeout_s: float = 300, interval_s: float = 1 -) -> None: - last_exc = None - try: - async with timeout(timeout_s): - while True: - try: - async with AuthClient(url=url, token="") as auth_client: - await auth_client.ping() - break - except (AssertionError, OSError, aiohttp.ClientError) as exc: - last_exc = exc - logger.debug(f"waiting for {url}: {last_exc}") - await asyncio.sleep(interval_s) - except asyncio.TimeoutError: - pytest.fail(f"failed to connect to {url}: {last_exc}") - - -@pytest.fixture -async def auth_server(_auth_server: URL) -> AsyncIterator[URL]: - await wait_for_auth_server(_auth_server) - yield _auth_server - - _TokenFactory = Callable[[str], str] @@ -155,11 +61,6 @@ async def auth_client(auth_server: URL, admin_token: str) -> AsyncIterator[AuthC yield client -@pytest.fixture -def auth_config(auth_server: URL, admin_token: str) -> AuthConfig: - return AuthConfig(server_endpoint_url=auth_server, service_token=admin_token) - - @dataclass class _User: name: str diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index c8049710..f2885ec4 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -1,29 +1,55 @@ +from __future__ import annotations + import json -import os from collections.abc import AsyncIterable, AsyncIterator +from contextlib import asynccontextmanager from dataclasses import replace -from pathlib import Path, PurePath +from pathlib import Path from typing import Any, NamedTuple +import aiobotocore.client import aiohttp import pytest import pytest_asyncio +import uvicorn +from neuro_admin_client import AdminClient +from yarl import URL from platform_storage_api.api import create_app from platform_storage_api.config import ( - AuthConfig, + AWSConfig, Config, - ServerConfig, + MetricsConfig, + PlatformConfig, StorageConfig, StorageMode, + StorageServerConfig, ) +from platform_storage_api.fs.local import FileSystem +from platform_storage_api.s3_storage import StorageMetricsAsyncS3Storage +from platform_storage_api.storage import SingleStoragePathResolver +from platform_storage_api.storage_usage import StorageUsageService pytest_plugins = [ "tests.integration.docker", "tests.integration.auth", + "tests.integration.conftest_moto", ] +@asynccontextmanager +async def run_asgi_app( + app: Any, *, host: str = "0.0.0.0", port: int = 8080 +) -> AsyncIterator[None]: + server = uvicorn.Server(uvicorn.Config(app=app, host=host, port=port)) + server.should_exit = True + await server.serve() + try: + yield + finally: + await server.shutdown() + + class ApiConfig(NamedTuple): host: str port: int @@ -51,23 +77,41 @@ def multi_storage_server_url(multi_storage_api: ApiConfig) -> str: return multi_storage_api.storage_base_url +@pytest.fixture() +def platform_config( + auth_server: URL, admin_token: str, cluster_name: str +) -> PlatformConfig: + return PlatformConfig( + auth_url=auth_server, + admin_url=URL("http://platform-admin/apis/admin/v1"), + token=admin_token, + cluster_name=cluster_name, + ) + + @pytest.fixture -def config(admin_token: str, cluster_name: str, auth_config: AuthConfig) -> Config: - server_config = ServerConfig() - path = PurePath(os.path.realpath("/tmp/np_storage")) - storage_config = StorageConfig(fs_local_base_path=path) +def config( + platform_config: PlatformConfig, aws_config: AWSConfig, local_tmp_dir_path: Path +) -> Config: + server_config = StorageServerConfig() + storage_config = StorageConfig(fs_local_base_path=local_tmp_dir_path) return Config( server=server_config, storage=storage_config, - auth=auth_config, - cluster_name=cluster_name, + platform=platform_config, + aws=aws_config, ) +@pytest.fixture +def metrics_config(aws_config: AWSConfig) -> MetricsConfig: + return MetricsConfig(aws=aws_config) + + @pytest.fixture def multi_storage_config(config: Config) -> Config: config = replace(config, storage=replace(config.storage, mode=StorageMode.MULTIPLE)) - Path(config.storage.fs_local_base_path, config.cluster_name).mkdir( + Path(config.storage.fs_local_base_path, config.platform.cluster_name).mkdir( parents=True, exist_ok=True ) return config @@ -120,3 +164,37 @@ def get_liststatus_dict(response_json: dict[str, Any]) -> list[Any]: def get_filestatus_dict(response_json: dict[str, Any]) -> dict[str, Any]: return response_json["FileStatus"] + + +@pytest.fixture() +async def admin_client() -> AsyncIterator[AdminClient]: + async with AdminClient( + base_url=URL("http://platform-admin/apis/admin/v1") + ) as client: + yield client + + +@pytest.fixture() +def storage_metrics_s3_storage( + s3_client: aiobotocore.client.AioBaseClient, aws_config: AWSConfig +) -> StorageMetricsAsyncS3Storage: + return StorageMetricsAsyncS3Storage( + s3_client=s3_client, bucket_name=aws_config.metrics_s3_bucket_name + ) + + +@pytest.fixture() +def storage_usage_service( + config: Config, + admin_client: AdminClient, + storage_metrics_s3_storage: StorageMetricsAsyncS3Storage, + local_fs: FileSystem, + local_tmp_dir_path: Path, +) -> StorageUsageService: + return StorageUsageService( + config=config, + admin_client=admin_client, + storage_metrics_s3_storage=storage_metrics_s3_storage, + fs=local_fs, + path_resolver=SingleStoragePathResolver(local_tmp_dir_path), + ) diff --git a/tests/integration/conftest_moto.py b/tests/integration/conftest_moto.py new file mode 100644 index 00000000..94c1a2bc --- /dev/null +++ b/tests/integration/conftest_moto.py @@ -0,0 +1,61 @@ +from __future__ import annotations + +from collections.abc import AsyncIterator + +import aiobotocore.session +import aiohttp +import pytest +from aiobotocore.client import AioBaseClient +from aiobotocore.session import AioSession +from pytest_docker.plugin import Services +from yarl import URL + +from platform_storage_api.config import AWSConfig + + +@pytest.fixture(scope="session") +def _moto_server(docker_ip: str, docker_services: Services) -> URL: + port = docker_services.port_for("moto", 5000) + return URL(f"http://{docker_ip}:{port}") + + +@pytest.fixture() +async def moto_server(_moto_server: URL) -> AsyncIterator[URL]: + yield _moto_server + await _reset_moto_server(_moto_server) + + +async def _reset_moto_server(moto_url: URL) -> None: + async with aiohttp.ClientSession() as client: + async with client.post(moto_url / "moto-api/reset"): + pass + + +@pytest.fixture() +def aws_config(moto_server: URL) -> AWSConfig: + return AWSConfig( + region="us-east-1", + access_key_id="test-access-key", + secret_access_key="test-secret-key", + s3_endpoint_url=str(moto_server), + metrics_s3_bucket_name="storage-metrics", + ) + + +@pytest.fixture() +def _session() -> AioSession: + return aiobotocore.session.get_session() + + +@pytest.fixture() +async def s3_client( + moto_server: URL, aws_config: AWSConfig, _session: AioSession +) -> AsyncIterator[AioBaseClient]: + async with _session.create_client( + "s3", + region_name=aws_config.region, + aws_access_key_id=aws_config.access_key_id, + aws_secret_access_key=aws_config.secret_access_key, + endpoint_url=str(moto_server), + ) as client: + yield client diff --git a/tests/integration/docker.py b/tests/integration/docker.py index 37d6f575..f71aa844 100644 --- a/tests/integration/docker.py +++ b/tests/integration/docker.py @@ -1,11 +1,9 @@ from __future__ import annotations -import os -from collections.abc import Iterator +from pathlib import Path from typing import Any import pytest -from docker import DockerClient PYTEST_REUSE_DOCKER_OPT = "--reuse-docker" @@ -24,12 +22,12 @@ def reuse_docker(request: Any) -> bool: @pytest.fixture(scope="session") -def in_docker() -> bool: - return os.path.isfile("/.dockerenv") +def docker_compose_file() -> str: + return str(Path(__file__).parent.resolve() / "docker/docker-compose.yml") @pytest.fixture(scope="session") -def docker_client() -> Iterator[DockerClient]: - client = DockerClient() - yield client - client.close() +def docker_setup(reuse_docker: bool) -> list[str]: + if reuse_docker: + return [] + return ["up --build --wait -d"] diff --git a/tests/integration/docker/docker-compose.yml b/tests/integration/docker/docker-compose.yml new file mode 100644 index 00000000..b36e0539 --- /dev/null +++ b/tests/integration/docker/docker-compose.yml @@ -0,0 +1,26 @@ +services: + platform-auth: + image: neuro-inc/platform-auth:latest + container_name: platform-storage-platform-auth + build: + context: ./platform-auth + dockerfile: Dockerfile + environment: + - NP_AUTH_API_PORT=8080 + - NP_JWT_SECRET=secret + ports: + - 8080 + healthcheck: + test: curl --fail http://localhost:8080/api/v1/ping + interval: 1s + timeout: 5s + retries: 10 + + moto: + image: motoserver/moto:4.2.12 + container_name: platform-storage-moto + command: + - -p + - "5000" + ports: + - 5000 diff --git a/tests/integration/docker/platform-auth/Dockerfile b/tests/integration/docker/platform-auth/Dockerfile new file mode 100644 index 00000000..d884d601 --- /dev/null +++ b/tests/integration/docker/platform-auth/Dockerfile @@ -0,0 +1,3 @@ +FROM ghcr.io/neuro-inc/platformauthapi:latest + +RUN apt-get update -y && apt-get install -y curl diff --git a/tests/integration/test_metrics.py b/tests/integration/test_metrics.py new file mode 100644 index 00000000..acdcb689 --- /dev/null +++ b/tests/integration/test_metrics.py @@ -0,0 +1,105 @@ +from __future__ import annotations + +import textwrap +from collections.abc import AsyncIterator, Callable, Iterator +from pathlib import Path + +import aiohttp +import pytest +from aiobotocore.client import AioBaseClient +from aioresponses import aioresponses +from fastapi import FastAPI +from yarl import URL + +from platform_storage_api.config import MetricsConfig +from platform_storage_api.metrics import create_app +from platform_storage_api.storage_usage import StorageUsageService + +from .conftest import run_asgi_app + + +@pytest.fixture() +def app(metrics_config: MetricsConfig) -> Iterator[FastAPI]: + with create_app(config=metrics_config) as app: + yield app + + +@pytest.fixture() +async def metrics_server( + app: FastAPI, unused_tcp_port_factory: Callable[[], int] +) -> AsyncIterator[URL]: + host = "0.0.0.0" + port = unused_tcp_port_factory() + async with run_asgi_app(app, host=host, port=port): + yield URL(f"http://{host}:{port}") + + +@pytest.fixture() +async def client(metrics_server: URL) -> AsyncIterator[aiohttp.ClientSession]: + async with aiohttp.ClientSession(base_url=metrics_server) as client: + yield client + + +class TestMetrics: + async def test_metrics__no_bucket(self, client: aiohttp.ClientSession) -> None: + response = await client.get("/metrics") + + assert response.status == 200 + data = await response.text() + assert data == textwrap.dedent( + """\ + # HELP storage_used_bytes The amount of used storage space in bytes + # TYPE storage_used_bytes gauge + """ + ) + + async def test_metrics__no_key( + self, + client: aiohttp.ClientSession, + s3_client: AioBaseClient, + metrics_config: MetricsConfig, + ) -> None: + await s3_client.create_bucket(Bucket=metrics_config.aws.metrics_s3_bucket_name) + + response = await client.get("/metrics") + + assert response.status == 200 + data = await response.text() + assert data == textwrap.dedent( + """\ + # HELP storage_used_bytes The amount of used storage space in bytes + # TYPE storage_used_bytes gauge + """ + ) + + async def test_metrics( + self, + client: aiohttp.ClientSession, + storage_usage_service: StorageUsageService, + cluster_name: str, + local_tmp_dir_path: Path, + ) -> None: + with aioresponses( + passthrough=["http://0.0.0.0", "http://127.0.0.1"] + ) as aiohttp_mock: + aiohttp_mock.get( + f"http://platform-admin/apis/admin/v1/clusters/{cluster_name}/orgs", + payload=[], + ) + + (local_tmp_dir_path / "test-project").mkdir() + + await storage_usage_service.upload_storage_usage() + + response = await client.get("/metrics") + + assert response.status == 200 + data = await response.text() + assert data.startswith( + textwrap.dedent( + """\ + # HELP storage_used_bytes The amount of used storage space in bytes + # TYPE storage_used_bytes gauge + storage_used_bytes{org_name="no_org",project_name="test-project"}""" + ) + ) diff --git a/tests/integration/test_worker.py b/tests/integration/test_worker.py new file mode 100644 index 00000000..041540d1 --- /dev/null +++ b/tests/integration/test_worker.py @@ -0,0 +1,35 @@ +from pathlib import Path +from unittest import mock + +from aioresponses import aioresponses + +from platform_storage_api.config import Config +from platform_storage_api.s3_storage import StorageMetricsAsyncS3Storage +from platform_storage_api.storage_usage import StorageUsage +from platform_storage_api.worker import run + + +class TestUploadStorageUsage: + async def test_run( + self, config: Config, storage_metrics_s3_storage: StorageMetricsAsyncS3Storage + ) -> None: + Path(config.storage.fs_local_base_path / "test-project").mkdir() + + with aioresponses( + passthrough=["http://0.0.0.0", "http://127.0.0.1"] + ) as aiohttp_mock: + aiohttp_mock.get( + "http://platform-admin/apis/admin/v1/clusters" + f"/{config.platform.cluster_name}/orgs", + payload=[], + ) + + await run(config) + + storage_usage = await storage_metrics_s3_storage.get_storage_usage() + + assert storage_usage == StorageUsage( + projects=[ + StorageUsage.Project(project_name="test-project", used=mock.ANY), + ] + ) diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index 371c1eae..6d1bede0 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -5,21 +5,21 @@ from platform_storage_api.config import ( Config, - ServerConfig, StorageConfig, StorageMode, + StorageServerConfig, ) class TestServerConfig: def test_from_environ(self) -> None: environ = {"NP_STORAGE_API_PORT": "1234"} - config = ServerConfig.from_environ(environ) + config = StorageServerConfig.from_environ(environ) assert config.port == 1234 def test_default_port(self) -> None: environ: dict[str, str] = {} - config = ServerConfig.from_environ(environ) + config = StorageServerConfig.from_environ(environ) assert config.port == 8080 @@ -39,9 +39,12 @@ class TestConfig: def test_from_environ_defaults(self) -> None: environ = { "NP_STORAGE_LOCAL_BASE_PATH": "/path/to/dir", - "NP_STORAGE_AUTH_URL": "-", - "NP_STORAGE_AUTH_TOKEN": "hello-token", - "NP_CLUSTER_NAME": "test-cluster", + "NP_PLATFORM_AUTH_URL": "-", + "NP_PLATFORM_ADMIN_URL": "-", + "NP_PLATFORM_TOKEN": "test-token", + "NP_PLATFORM_CLUSTER_NAME": "test-cluster", + "AWS_REGION": "test-region", + "AWS_METRICS_S3_BUCKET_NAME": "test-bucket", } config = Config.from_environ(environ) assert config.server.port == 8080 @@ -49,25 +52,34 @@ def test_from_environ_defaults(self) -> None: assert config.storage.mode == StorageMode.SINGLE assert config.storage.fs_local_base_path == PurePath("/path/to/dir") assert config.storage.fs_local_thread_pool_size == 100 - assert config.auth.server_endpoint_url is None - assert config.auth.service_token == "hello-token" - assert config.cluster_name == "test-cluster" + assert config.platform.auth_url is None + assert config.platform.admin_url is None + assert config.platform.token == "test-token" + assert config.platform.cluster_name == "test-cluster" + assert config.aws.region == "test-region" + assert config.aws.metrics_s3_bucket_name == "test-bucket" def test_from_environ_custom(self) -> None: environ = { "NP_STORAGE_MODE": "multiple", "NP_STORAGE_LOCAL_BASE_PATH": "/path/to/dir", "NP_STORAGE_LOCAL_THREAD_POOL_SIZE": "123", - "NP_STORAGE_AUTH_URL": "http://127.0.0.1/", - "NP_STORAGE_AUTH_TOKEN": "hello-token", - "NP_CLUSTER_NAME": "test-cluster", + "NP_PLATFORM_AUTH_URL": "http://platform-auth", + "NP_PLATFORM_ADMIN_URL": "http://platform-admin", + "NP_PLATFORM_TOKEN": "test-token", + "NP_PLATFORM_CLUSTER_NAME": "test-cluster", "NP_STORAGE_API_KEEP_ALIVE_TIMEOUT": "900", + "AWS_REGION": "test-region", + "AWS_METRICS_S3_BUCKET_NAME": "test-bucket", } config = Config.from_environ(environ) assert config.server.port == 8080 assert config.storage.mode == StorageMode.MULTIPLE assert config.storage.fs_local_base_path == PurePath("/path/to/dir") assert config.storage.fs_local_thread_pool_size == 123 - assert config.auth.server_endpoint_url == URL("http://127.0.0.1/") - assert config.auth.service_token == "hello-token" - assert config.cluster_name == "test-cluster" + assert config.platform.auth_url == URL("http://platform-auth") + assert config.platform.admin_url == URL("http://platform-admin/apis/admin/v1") + assert config.platform.token == "test-token" + assert config.platform.cluster_name == "test-cluster" + assert config.aws.region == "test-region" + assert config.aws.metrics_s3_bucket_name == "test-bucket" diff --git a/tests/unit/test_storage_usage.py b/tests/unit/test_storage_usage.py index 881dad1a..2883032a 100644 --- a/tests/unit/test_storage_usage.py +++ b/tests/unit/test_storage_usage.py @@ -1,5 +1,6 @@ +import os from collections.abc import AsyncIterator -from pathlib import Path +from pathlib import Path, PurePath from unittest import mock import pytest @@ -7,11 +8,38 @@ from neuro_admin_client import AdminClient from yarl import URL +from platform_storage_api.config import ( + AWSConfig, + Config, + PlatformConfig, + StorageConfig, + StorageServerConfig, +) from platform_storage_api.fs.local import FileSystem from platform_storage_api.storage import SingleStoragePathResolver from platform_storage_api.storage_usage import StorageUsage, StorageUsageService +@pytest.fixture +def config() -> Config: + return Config( + server=StorageServerConfig(), + storage=StorageConfig( + fs_local_base_path=PurePath(os.path.realpath("/tmp/np_storage")) + ), + platform=PlatformConfig( + auth_url=URL("http://platform-auth"), + admin_url=URL("http://platform-admin"), + token="test-token", + cluster_name="test-cluster", + ), + aws=AWSConfig( + region="test-region", + metrics_s3_bucket_name="test-bucket", + ), + ) + + class TestStorageUsage: @pytest.fixture async def admin_client(self) -> AsyncIterator[AdminClient]: @@ -22,13 +50,18 @@ async def admin_client(self) -> AsyncIterator[AdminClient]: @pytest.fixture def storage_usage_service( - self, local_fs: FileSystem, local_tmp_dir_path: Path, admin_client: AdminClient + self, + config: Config, + local_fs: FileSystem, + local_tmp_dir_path: Path, + admin_client: AdminClient, ) -> StorageUsageService: return StorageUsageService( + config=config, path_resolver=SingleStoragePathResolver(local_tmp_dir_path), fs=local_fs, admin_client=admin_client, - cluster_name="default", + storage_metrics_s3_storage=mock.AsyncMock(), ) async def test_disk_usage( @@ -38,7 +71,7 @@ async def test_disk_usage( aiohttp_mock: aioresponses, ) -> None: aiohttp_mock.get( - URL("http://platform-admin/apis/admin/v1/clusters/default/orgs"), + URL("http://platform-admin/apis/admin/v1/clusters/test-cluster/orgs"), payload=[ { "org_name": "test-org", @@ -65,7 +98,7 @@ async def test_disk_usage__empty_storage( self, storage_usage_service: StorageUsageService, aiohttp_mock: aioresponses ) -> None: aiohttp_mock.get( - URL("http://platform-admin/apis/admin/v1/clusters/default/orgs"), + URL("http://platform-admin/apis/admin/v1/clusters/test-cluster/orgs"), payload=[ { "org_name": "test-org", @@ -83,7 +116,7 @@ async def test_disk_usage__no_orgs( self, storage_usage_service: StorageUsageService, aiohttp_mock: aioresponses ) -> None: aiohttp_mock.get( - URL("http://platform-admin/apis/admin/v1/clusters/default/orgs"), + URL("http://platform-admin/apis/admin/v1/clusters/test-cluster/orgs"), payload=[], )