Skip to content

Commit

Permalink
ENG-264 add storage metrics api (#768)
Browse files Browse the repository at this point in the history
  • Loading branch information
zubenkoivan authored Aug 6, 2024
1 parent f579d84 commit a2e1f2c
Show file tree
Hide file tree
Showing 21 changed files with 881 additions and 198 deletions.
2 changes: 0 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ jobs:
make lint
- name: Run unit tests
run: make test_unit
- name: Pull test images
run: make docker_pull_test_images
- name: Run integration tests
run: make test_integration
- name: Build Docker image
Expand Down
6 changes: 0 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ HELM_APP_VERSION ?= 1.0.0

export IMAGE_REPO_BASE

PLATFORMAUTHAPI_IMAGE = $(shell cat AUTH_SERVER_IMAGE_NAME)

setup:
pip install -U pip
pip install -e .[dev]
Expand Down Expand Up @@ -55,7 +53,3 @@ docker_build:
pip install -U build
python -m build
docker build -t $(IMAGE_NAME):latest .

docker_pull_test_images:
docker pull $(PLATFORMAUTHAPI_IMAGE)
docker tag $(PLATFORMAUTHAPI_IMAGE) platformauthapi:latest
26 changes: 14 additions & 12 deletions platform_storage_api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
FileStatus,
FileStatusPermission,
FileStatusType,
FileSystem,
LocalFileSystem,
)
from .security import (
Expand Down Expand Up @@ -838,6 +839,16 @@ async def add_version_to_header(request: Request, response: web.StreamResponse)
response.headers["X-Service-Version"] = f"platform-storage-api/{package_version}"


def create_path_resolver(config: Config, fs: FileSystem) -> StoragePathResolver:
if config.storage.mode == StorageMode.SINGLE:
return SingleStoragePathResolver(config.storage.fs_local_base_path)
return MultipleStoragePathResolver(
fs,
config.storage.fs_local_base_path,
config.storage.fs_local_base_path / config.platform.cluster_name,
)


async def create_app(config: Config) -> web.Application:
app = web.Application(
middlewares=[handle_exceptions],
Expand All @@ -850,7 +861,7 @@ async def _init_app(app: web.Application) -> AsyncIterator[None]:
logger.info("Initializing Auth Client For Storage API")

auth_client = await exit_stack.enter_async_context(
AuthClient(config.auth.server_endpoint_url, config.auth.service_token)
AuthClient(config.platform.auth_url, config.platform.token)
)

await setup_security(
Expand All @@ -861,24 +872,15 @@ async def _init_app(app: web.Application) -> AsyncIterator[None]:

logger.info(
f"Auth Client for Storage API Initialized. "
f"URL={config.auth.server_endpoint_url}"
f"URL={config.platform.auth_url}"
)

fs = await exit_stack.enter_async_context(
LocalFileSystem(
executor_max_workers=config.storage.fs_local_thread_pool_size
)
)
if config.storage.mode == StorageMode.SINGLE:
path_resolver: StoragePathResolver = SingleStoragePathResolver(
config.storage.fs_local_base_path
)
else:
path_resolver = MultipleStoragePathResolver(
fs,
config.storage.fs_local_base_path,
config.storage.fs_local_base_path / config.cluster_name,
)
path_resolver = create_path_resolver(config, fs)
storage = Storage(path_resolver, fs)
app[API_V1_KEY][STORAGE_KEY] = storage

Expand Down
47 changes: 47 additions & 0 deletions platform_storage_api/aws.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from collections.abc import AsyncIterator, Iterator
from contextlib import asynccontextmanager, contextmanager
from typing import Any

import aiobotocore
import aiobotocore.client
import aiobotocore.session
import botocore
import botocore.client
import botocore.session

from .config import AWSConfig


@contextmanager
def create_s3_client(
session: botocore.session.Session, config: AWSConfig
) -> Iterator[botocore.client.BaseClient]:
client = session.create_client("s3", **_create_s3_client_kwargs(config))
yield client
client.close()


@asynccontextmanager
async def create_async_s3_client(
session: aiobotocore.session.AioSession, config: AWSConfig
) -> AsyncIterator[aiobotocore.client.AioBaseClient]:
async with session.create_client(
"s3", **_create_s3_client_kwargs(config)
) as client:
yield client


def _create_s3_client_kwargs(config: AWSConfig) -> dict[str, Any]:
kwargs: dict[str, Any] = {
"region_name": config.region,
"config": botocore.config.Config(
retries={"mode": "standard"}, # 3 retries by default
),
}
if config.access_key_id:
kwargs["aws_access_key_id"] = config.access_key_id
if config.secret_access_key:
kwargs["aws_secret_access_key"] = config.secret_access_key
if config.s3_endpoint_url:
kwargs["endpoint_url"] = config.s3_endpoint_url
return kwargs
87 changes: 66 additions & 21 deletions platform_storage_api/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,26 @@
class ServerConfig:
host: str = "0.0.0.0"
port: int = 8080


@dataclass(frozen=True)
class StorageServerConfig(ServerConfig):
name: str = "Storage API"
keep_alive_timeout_s: float = 75

@classmethod
def from_environ(cls, environ: Optional[dict[str, str]] = None) -> "ServerConfig":
return EnvironConfigFactory(environ).create_server()
def from_environ(
cls, environ: Optional[dict[str, str]] = None
) -> "StorageServerConfig":
return EnvironConfigFactory(environ).create_storage_server()


@dataclass(frozen=True)
class AuthConfig:
server_endpoint_url: Optional[URL]
service_token: str = field(repr=False)
class PlatformConfig:
auth_url: Optional[URL]
admin_url: Optional[URL]
token: str = field(repr=False)
cluster_name: str


class StorageMode(str, enum.Enum):
Expand All @@ -42,12 +50,21 @@ def from_environ(cls, environ: Optional[dict[str, str]] = None) -> "StorageConfi
return EnvironConfigFactory(environ).create_storage()


@dataclass(frozen=True)
class AWSConfig:
region: str
metrics_s3_bucket_name: str
access_key_id: Optional[str] = field(repr=False, default=None)
secret_access_key: Optional[str] = field(repr=False, default=None)
s3_endpoint_url: Optional[str] = None


@dataclass(frozen=True)
class Config:
server: ServerConfig
server: StorageServerConfig
storage: StorageConfig
auth: AuthConfig
cluster_name: str
platform: PlatformConfig
aws: AWSConfig
permission_expiration_interval_s: float = 0
permission_forgetting_interval_s: float = 0

Expand All @@ -56,6 +73,12 @@ def from_environ(cls, environ: Optional[dict[str, str]] = None) -> "Config":
return EnvironConfigFactory(environ).create()


@dataclass(frozen=True)
class MetricsConfig:
aws: AWSConfig
server: ServerConfig = ServerConfig()


class EnvironConfigFactory:
def __init__(self, environ: Optional[dict[str, str]] = None) -> None:
self._environ = environ or os.environ
Expand Down Expand Up @@ -84,25 +107,41 @@ def create_storage(self) -> StorageConfig:
)

def create_server(self) -> ServerConfig:
port = int(self._environ.get("NP_STORAGE_API_PORT", ServerConfig.port))
return ServerConfig(
host=self._environ.get("SERVER_HOST", ServerConfig.host),
port=int(self._environ.get("SERVER_PORT", ServerConfig.port)),
)

def create_storage_server(self) -> StorageServerConfig:
port = int(self._environ.get("NP_STORAGE_API_PORT", StorageServerConfig.port))
keep_alive_timeout_s = int(
self._environ.get(
"NP_STORAGE_API_KEEP_ALIVE_TIMEOUT", ServerConfig.keep_alive_timeout_s
"NP_STORAGE_API_KEEP_ALIVE_TIMEOUT",
StorageServerConfig.keep_alive_timeout_s,
)
)
return ServerConfig(port=port, keep_alive_timeout_s=keep_alive_timeout_s)
return StorageServerConfig(port=port, keep_alive_timeout_s=keep_alive_timeout_s)

def create_platform(self) -> PlatformConfig:
admin_url = self._get_url("NP_PLATFORM_ADMIN_URL")
if admin_url:
admin_url = admin_url / "apis/admin/v1"
return PlatformConfig(
auth_url=self._get_url("NP_PLATFORM_AUTH_URL"),
admin_url=admin_url,
token=self._environ["NP_PLATFORM_TOKEN"],
cluster_name=self._environ["NP_PLATFORM_CLUSTER_NAME"],
)

def create_auth(self) -> AuthConfig:
url = self._get_url("NP_STORAGE_AUTH_URL")
token = self._environ["NP_STORAGE_AUTH_TOKEN"]
return AuthConfig(server_endpoint_url=url, service_token=token)
def create_aws(self) -> AWSConfig:
return AWSConfig(
region=self._environ["AWS_REGION"],
metrics_s3_bucket_name=self._environ["AWS_METRICS_S3_BUCKET_NAME"],
)

def create(self) -> Config:
server_config = self.create_server()
server_config = self.create_storage_server()
storage_config = self.create_storage()
auth_config = self.create_auth()
cluster_name = self._environ["NP_CLUSTER_NAME"]
assert cluster_name
permission_expiration_interval_s: float = float(
self._environ.get(
"NP_PERMISSION_EXPIRATION_INTERVAL",
Expand All @@ -118,8 +157,14 @@ def create(self) -> Config:
return Config(
server=server_config,
storage=storage_config,
auth=auth_config,
cluster_name=cluster_name,
platform=self.create_platform(),
aws=self.create_aws(),
permission_expiration_interval_s=permission_expiration_interval_s,
permission_forgetting_interval_s=permission_forgetting_interval_s,
)

def create_metrics(self) -> MetricsConfig:
return MetricsConfig(
server=self.create_server(),
aws=self.create_aws(),
)
63 changes: 63 additions & 0 deletions platform_storage_api/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import logging
from collections.abc import Iterator
from contextlib import ExitStack, contextmanager

import botocore
import botocore.client
import botocore.config
import botocore.session
from fastapi import FastAPI
from neuro_logging import init_logging
from prometheus_client import CollectorRegistry, make_asgi_app

from .aws import create_s3_client
from .config import EnvironConfigFactory, MetricsConfig
from .s3_storage import StorageMetricsS3Storage
from .storage_usage import StorageUsageCollector


@contextmanager
def create_app(config: MetricsConfig) -> Iterator[FastAPI]:
with ExitStack() as exit_stack:
session = botocore.session.get_session()
s3_client = exit_stack.enter_context(
create_s3_client(session=session, config=config.aws)
)

storage_metrics_s3_storage = StorageMetricsS3Storage(
s3_client, config.aws.metrics_s3_bucket_name
)

collector = StorageUsageCollector(
config=config.aws, storage_metrics_s3_storage=storage_metrics_s3_storage
)
registry = CollectorRegistry()
registry.register(collector)

metrics_app = make_asgi_app(registry=registry)
app = FastAPI(debug=False)

app.mount("/metrics", metrics_app)

yield app


def main() -> None:
import uvicorn

init_logging()

config = EnvironConfigFactory().create_metrics()
logging.info("Loaded config: %r", config)
with create_app(config) as app:
uvicorn.run(
app,
host=config.server.host,
port=config.server.port,
proxy_headers=True,
log_config=None,
)


if __name__ == "__main__":
main()
Loading

0 comments on commit a2e1f2c

Please sign in to comment.