diff --git a/lib/pbench/client/__init__.py b/lib/pbench/client/__init__.py index 73a9312c67..8870dd812f 100644 --- a/lib/pbench/client/__init__.py +++ b/lib/pbench/client/__init__.py @@ -49,6 +49,7 @@ class API(Enum): DATASETS_VALUES = "datasets_values" ENDPOINTS = "endpoints" KEY = "key" + RELAY = "relay" SERVER_AUDIT = "server_audit" SERVER_SETTINGS = "server_settings" UPLOAD = "upload" diff --git a/lib/pbench/server/api/__init__.py b/lib/pbench/server/api/__init__.py index 6cbcfe5f82..7942e06e55 100644 --- a/lib/pbench/server/api/__init__.py +++ b/lib/pbench/server/api/__init__.py @@ -33,9 +33,10 @@ SampleValues, ) from pbench.server.api.resources.query_apis.datasets_search import DatasetsSearch +from pbench.server.api.resources.relay import Relay from pbench.server.api.resources.server_audit import ServerAudit from pbench.server.api.resources.server_settings import ServerSettings -from pbench.server.api.resources.upload_api import Upload +from pbench.server.api.resources.upload import Upload import pbench.server.auth.auth as Auth from pbench.server.database import init_db from pbench.server.database.database import Database @@ -146,6 +147,12 @@ def register_endpoints(api: Api, app: Flask, config: PbenchServerConfig): endpoint="server_settings", resource_class_args=(config,), ) + api.add_resource( + Relay, + f"{base_uri}/relay/", + endpoint="relay", + resource_class_args=(config,), + ) api.add_resource( Upload, f"{base_uri}/upload/", diff --git a/lib/pbench/server/api/resources/upload_api.py b/lib/pbench/server/api/resources/intake_base.py similarity index 54% rename from lib/pbench/server/api/resources/upload_api.py rename to lib/pbench/server/api/resources/intake_base.py index af737ea0cb..a3cb2fd8c9 100644 --- a/lib/pbench/server/api/resources/upload_api.py +++ b/lib/pbench/server/api/resources/intake_base.py @@ -1,3 +1,4 @@ +from dataclasses import dataclass import datetime import errno import hashlib @@ -5,26 +6,21 @@ import os from pathlib import Path import shutil -from typing import Any, Optional +from typing import Any, IO, Optional from flask import current_app, jsonify from flask.wrappers import Request, Response import humanize from pbench.common.utils import Cleanup -from pbench.server import PbenchServerConfig +from pbench.server import JSONOBJECT, PbenchServerConfig from pbench.server.api.resources import ( APIAbort, - ApiAuthorizationType, ApiBase, ApiContext, APIInternalError, - ApiMethod, ApiParams, ApiSchema, - Parameter, - ParamType, - Schema, ) import pbench.server.auth.auth as Auth from pbench.server.cache_manager import CacheManager, DuplicateTarball, MetadataError @@ -32,7 +28,6 @@ Audit, AuditReason, AuditStatus, - AuditType, OperationCode, ) from pbench.server.database.models.datasets import ( @@ -48,72 +43,129 @@ from pbench.server.utils import UtcTimeHelper -class CleanupTime(Exception): - """ - Used to support handling errors during PUT without constantly testing the - current status and additional indentation. This will be raised to an outer - try block when an error occurs. - """ +@dataclass +class Intake: + name: str + md5: str + access: str + metadata: list[str] + uri: Optional[str] - def __init__(self, status: int, message: str): - self.status = status - self.message = message - def __str__(self) -> str: - return self.message +@dataclass +class Access: + length: int + stream: IO[bytes] -class Upload(ApiBase): - """ - Upload a dataset from an agent. This API accepts a tarball and MD5 - value from a client. After validation, it creates a new Dataset DB - row describing the dataset, along with some metadata. +class IntakeBase(ApiBase): + """Framework to assimilate a dataset into the Pbench Server. + + This relies on subclasses to provides specific hook methods to identify and + stream the tarball data: + + _identify: decodes the URI and query parameters to determine the target + dataset name, the appropriate MD5, the initial access type, and + optional metadata to be set. + _stream: decodes the intake data and provides the length and byte IO + stream to be read into a temporary file. """ CHUNK_SIZE = 65536 - def __init__(self, config: PbenchServerConfig): - super().__init__( - config, - ApiSchema( - ApiMethod.PUT, - OperationCode.CREATE, - uri_schema=Schema(Parameter("filename", ParamType.STRING)), - query_schema=Schema( - Parameter("access", ParamType.ACCESS), - Parameter( - "metadata", - ParamType.LIST, - element_type=ParamType.STRING, - string_list=",", - ), - ), - audit_type=AuditType.NONE, - audit_name="upload", - authorization=ApiAuthorizationType.NONE, - ), - ) + def __init__(self, config: PbenchServerConfig, schema: ApiSchema): + super().__init__(config, schema) self.temporary = config.ARCHIVE / CacheManager.TEMPORARY self.temporary.mkdir(mode=0o755, parents=True, exist_ok=True) - current_app.logger.info( - "Configured PUT temporary directory as {}", self.temporary - ) + method = list(self.schemas.schemas.keys())[0] + self.name = self.schemas[method].audit_name + current_app.logger.info("INTAKE temporary directory is {}", self.temporary) - def _put(self, args: ApiParams, request: Request, context: ApiContext) -> Response: - """Upload a dataset to the server. + @staticmethod + def process_metadata(metas: list[str]) -> JSONOBJECT: + """Process 'metadata' query parameter - The client must present an authentication bearer token for a registered - Pbench Server user. + We allow the client to set metadata on the new dataset. We won't do + anything about this until upload is successful, but we process and + validate it here so we can fail early. + + Args: + metas: A list of "key:value[,key:value]..." strings) + + Returns: + A JSON object providing a value for each specified metadata key + + Raises: + APIAbort on bad syntax or a disallowed metadata key:value pair + """ + metadata: dict[str, Any] = {} + errors = [] + for kw in metas: + # an individual value for the "key" parameter is a simple key:value + # pair. + try: + k, v = kw.split(":", maxsplit=1) + except ValueError: + errors.append(f"improper metadata syntax {kw} must be 'k:v'") + continue + k = k.lower() + if not Metadata.is_key_path(k, Metadata.USER_UPDATEABLE_METADATA): + errors.append(f"Key {k} is invalid or isn't settable") + continue + try: + v = Metadata.validate(dataset=None, key=k, value=v) + except MetadataBadValue as e: + errors.append(str(e)) + continue + metadata[k] = v + if errors: + raise APIAbort( + HTTPStatus.BAD_REQUEST, + "at least one specified metadata key is invalid", + errors=errors, + ) + return metadata + + def _identify(self, args: ApiParams, request: Request) -> Intake: + """Identify the tarball to be streamed. + + Must be implemented by each subclass of this base class. + + Args: + args: The API parameters + request: The Flask request object + + Returns: + An Intake instance + """ + raise NotImplementedError() - We get the requested filename from the URI: /api/v1/upload/. + def _stream(self, intake: Intake, request: Request) -> Access: + """Determine how to access the tarball byte stream - We get the dataset's resource ID (which is the tarball's MD5 checksum) - from the "content-md5" HTTP header. + Must be implemented by each subclass of this base class. - We also check that the "content-length" header value is not 0, and that - it matches the final size of the uploaded tarball file. + Args: + intake: The Intake parameters produced by _identify + request: The Flask request object + + Returns: + An Access object with the data byte stream and length + """ + raise NotImplementedError() + + def _intake( + self, args: ApiParams, request: Request, context: ApiContext + ) -> Response: + """Common code to assimilate a remote tarball onto the server + + The client must present an authentication bearer token for a registered + Pbench Server user. + + We support two "intake" modes: - We expect the dataset's tarball file to be uploaded as a data stream. + 1) PUT /api/v1/upload/ + 2) POST /api/v1/relay/ If the new dataset is created successfully, return 201 (CREATED). @@ -141,43 +193,9 @@ def _put(self, args: ApiParams, request: Request, context: ApiContext) -> Respon # Used to record what steps have been completed during the upload, and # need to be undone on failure recovery = Cleanup(current_app.logger) + audit: Optional[Audit] = None username: Optional[str] = None - access = args.query.get("access", Dataset.PRIVATE_ACCESS) - - # We allow the client to set metadata on the new dataset. We won't do - # anything about this until upload is successful, but we process and - # validate it here so we can fail early. - metadata: dict[str, Any] = {} - if "metadata" in args.query: - errors = [] - for kw in args.query["metadata"]: - # an individual value for the "key" parameter is a simple key:value - # pair. - try: - k, v = kw.split(":", maxsplit=1) - except ValueError: - errors.append(f"improper metadata syntax {kw} must be 'k:v'") - continue - k = k.lower() - if not Metadata.is_key_path(k, Metadata.USER_UPDATEABLE_METADATA): - errors.append(f"Key {k} is invalid or isn't settable") - continue - try: - v = Metadata.validate(dataset=None, key=k, value=v) - except MetadataBadValue as e: - errors.append(str(e)) - continue - metadata[k] = v - if errors: - raise APIAbort( - HTTPStatus.BAD_REQUEST, - "at least one specified metadata key is invalid", - errors=errors, - ) - - attributes = {"access": access, "metadata": metadata} - filename = args.uri["filename"] tmp_dir: Optional[Path] = None try: @@ -185,60 +203,32 @@ def _put(self, args: ApiParams, request: Request, context: ApiContext) -> Respon authorized_user = Auth.token_auth.current_user() user_id = authorized_user.id username = authorized_user.username - except Exception: + except Exception as e: username = None user_id = None - raise CleanupTime(HTTPStatus.UNAUTHORIZED, "Verifying user_id failed") + raise APIAbort( + HTTPStatus.UNAUTHORIZED, "Verifying user_id failed" + ) from e + + # Ask our helper to determine the name and resource ID of the new + # dataset, along with requested access and metadata. + intake = self._identify(args, request) + + filename = intake.name + metadata = self.process_metadata(intake.metadata) + attributes = {"access": intake.access, "metadata": metadata} if os.path.basename(filename) != filename: - raise CleanupTime( + raise APIAbort( HTTPStatus.BAD_REQUEST, "Filename must not contain a path" ) if not Dataset.is_tarball(filename): - raise CleanupTime( + raise APIAbort( HTTPStatus.BAD_REQUEST, f"File extension not supported, must be {Dataset.TARBALL_SUFFIX}", ) - try: - md5sum = request.headers["Content-MD5"] - except KeyError: - raise CleanupTime( - HTTPStatus.BAD_REQUEST, "Missing required 'Content-MD5' header" - ) - if not md5sum: - raise CleanupTime( - HTTPStatus.BAD_REQUEST, - "Missing required 'Content-MD5' header value", - ) - try: - length_string = request.headers["Content-Length"] - content_length = int(length_string) - except KeyError: - # NOTE: Werkzeug is "smart" about header access, and knows that - # Content-Length is an integer. Therefore, a non-integer value - # will raise KeyError. It's virtually impossible to report the - # actual incorrect value as we'd just get a KeyError again. - raise CleanupTime( - HTTPStatus.LENGTH_REQUIRED, - "Missing or invalid 'Content-Length' header", - ) - except ValueError: - # NOTE: Because of the way Werkzeug works, this should not be - # possible: if Content-Length isn't an integer, we'll see the - # KeyError. This however serves as a clarifying backup case. - raise CleanupTime( - HTTPStatus.BAD_REQUEST, - f"Invalid 'Content-Length' header, not an integer ({length_string})", - ) - - if content_length <= 0: - raise CleanupTime( - HTTPStatus.BAD_REQUEST, - f"'Content-Length' {content_length} must be greater than 0", - ) - dataset_name = Dataset.stem(filename) # NOTE: we isolate each uploaded tarball into a private MD5-based @@ -247,27 +237,28 @@ def _put(self, args: ApiParams, request: Request, context: ApiContext) -> Respon # tarballs with the same name. (A duplicate MD5 will have already # failed, so that's not a concern.) try: - tmp_dir = self.temporary / md5sum + tmp_dir = self.temporary / intake.md5 tmp_dir.mkdir() - except FileExistsError: - raise CleanupTime( + except FileExistsError as e: + raise APIAbort( HTTPStatus.CONFLICT, "Temporary upload directory already exists", - ) + ) from e tar_full_path = tmp_dir / filename md5_full_path = tmp_dir / f"{filename}.md5" bytes_received = 0 usage = shutil.disk_usage(tar_full_path.parent) current_app.logger.info( - "{} UPLOAD (pre): {:.3}% full, {} remaining", + "{} {} (pre): {:.3}% full, {} remaining", + self.name, tar_full_path.name, float(usage.used) / float(usage.total) * 100.0, humanize.naturalsize(usage.free), ) current_app.logger.info( - "PUT uploading {} for {} to {}", filename, username, tar_full_path + "{} {} for {} to {}", self.name, filename, username, tar_full_path ) # Create a tracking dataset object; it'll begin in UPLOADING state @@ -275,8 +266,8 @@ def _put(self, args: ApiParams, request: Request, context: ApiContext) -> Respon dataset = Dataset( owner=authorized_user, name=dataset_name, - resource_id=md5sum, - access=access, + resource_id=intake.md5, + access=intake.access, ) dataset.add() except DatasetDuplicate: @@ -287,44 +278,47 @@ def _put(self, args: ApiParams, request: Request, context: ApiContext) -> Respon dataset_name, ) try: - Dataset.query(resource_id=md5sum) - except DatasetNotFound: - current_app.logger.error( - "Duplicate dataset {} for user = (user_id: {}, username: {}) not found", - dataset_name, - user_id, - username, - ) - raise CleanupTime( - HTTPStatus.INTERNAL_SERVER_ERROR, "INTERNAL ERROR" - ) + Dataset.query(resource_id=intake.md5) + except DatasetNotFound as e: + raise APIInternalError( + f"Duplicate dataset {intake.md5!r} ({dataset_name!r}) is missing" + ) from e else: response = jsonify(dict(message="Dataset already exists")) response.status_code = HTTPStatus.OK return response - except CleanupTime: - raise # Propagate a CleanupTime exception to the outer block - except Exception: - raise CleanupTime( - HTTPStatus.INTERNAL_SERVER_ERROR, - message="Unable to create dataset", - ) + except APIAbort: + raise # Propagate an APIAbort exception to the outer block + except Exception as e: + raise APIInternalError("Unable to create dataset") from e + recovery.add(dataset.delete) + + # AUDIT the operation start before we get any further audit = Audit.create( operation=OperationCode.CREATE, - name="upload", + name=self.name, user_id=user_id, user_name=username, dataset=dataset, status=AuditStatus.BEGIN, attributes=attributes, ) - recovery.add(dataset.delete) + + # Now we're ready to pull the tarball, so ask our helper for the + # length and data stream. + stream = self._stream(intake, request) + + if stream.length <= 0: + raise APIAbort( + HTTPStatus.BAD_REQUEST, + f"'Content-Length' {stream.length} must be greater than 0", + ) # An exception from this point on MAY leave an uploaded tar file # (possibly partial, or corrupted); remove it if possible on # error recovery. - recovery.add(tar_full_path.unlink) + recovery.add(lambda: tar_full_path.unlink(missing_ok=True)) # NOTE: We know that the MD5 is unique at this point; so even if # two tarballs with the same name are uploaded concurrently, by @@ -335,83 +329,74 @@ def _put(self, args: ApiParams, request: Request, context: ApiContext) -> Respon hash_md5 = hashlib.md5() while True: - chunk = request.stream.read(self.CHUNK_SIZE) + chunk = stream.stream.read(self.CHUNK_SIZE) bytes_received += len(chunk) - if len(chunk) == 0 or bytes_received > content_length: + if len(chunk) == 0 or bytes_received > stream.length: break ofp.write(chunk) hash_md5.update(chunk) except OSError as exc: + # NOTE: Werkzeug doesn't support status 509, so the abort call + # in _dispatch will fail. Rather than figure out how to fix + # that, just report as an internal error. if exc.errno == errno.ENOSPC: - raise CleanupTime( - HTTPStatus.INSUFFICIENT_STORAGE, - f"Out of space on {tar_full_path.root}", - ) + msg = f"Out of space on {tar_full_path.root}" else: - raise CleanupTime( - HTTPStatus.INTERNAL_SERVER_ERROR, - f"Unexpected error {exc.errno} encountered during file upload", - ) - except Exception: - raise CleanupTime( - HTTPStatus.INTERNAL_SERVER_ERROR, - "Unexpected error encountered during file upload", - ) + msg = f"Unexpected error {exc.errno} encountered during file upload" + raise APIInternalError(msg) from exc + except Exception as e: + raise APIInternalError( + "Unexpected error encountered during file upload" + ) from e - if bytes_received != content_length: - raise CleanupTime( + if bytes_received != stream.length: + raise APIAbort( HTTPStatus.BAD_REQUEST, - f"Expected {content_length} bytes but received {bytes_received} bytes", + f"Expected {stream.length} bytes but received {bytes_received} bytes", ) - elif hash_md5.hexdigest() != md5sum: - raise CleanupTime( + elif hash_md5.hexdigest() != intake.md5: + raise APIAbort( HTTPStatus.BAD_REQUEST, - f"MD5 checksum {hash_md5.hexdigest()} does not match expected {md5sum}", + f"MD5 checksum {hash_md5.hexdigest()} does not match expected {intake.md5}", ) - # First write the .md5 - current_app.logger.info("Creating MD5 file {}: {}", md5_full_path, md5sum) - # From this point attempt to remove the MD5 file on error exit recovery.add(md5_full_path.unlink) try: - md5_full_path.write_text(f"{md5sum} {filename}\n") - except Exception: - raise CleanupTime( - HTTPStatus.INTERNAL_SERVER_ERROR, + md5_full_path.write_text(f"{intake.md5} {filename}\n") + except Exception as e: + raise APIInternalError( f"Failed to write .md5 file '{md5_full_path}'", - ) + ) from e # Create a cache manager object try: cache_m = CacheManager(self.config, current_app.logger) - except Exception: - raise CleanupTime( - HTTPStatus.INTERNAL_SERVER_ERROR, "Unable to map the cache manager" - ) + except Exception as e: + raise APIInternalError("Unable to map the cache manager") from e # Move the files to their final location try: tarball = cache_m.create(tar_full_path) - except DuplicateTarball: - raise CleanupTime( + except DuplicateTarball as exc: + raise APIAbort( HTTPStatus.BAD_REQUEST, f"A tarball with the name {dataset_name!r} already exists", - ) + ) from exc except MetadataError as exc: - raise CleanupTime( + raise APIAbort( HTTPStatus.BAD_REQUEST, f"Tarball {dataset.name!r} is invalid or missing required metadata.log: {exc}", - ) + ) from exc except Exception as exc: - raise CleanupTime( - HTTPStatus.INTERNAL_SERVER_ERROR, - f"Unable to create dataset in file system for {tar_full_path}: {exc}", - ) + raise APIInternalError( + f"Unable to create dataset in file system for {tar_full_path}: {exc}" + ) from exc usage = shutil.disk_usage(tar_full_path.parent) current_app.logger.info( - "{} UPLOAD (post): {:.3}% full, {} remaining", + "{} {} (post): {:.3}% full, {} remaining", + self.name, tar_full_path.name, float(usage.used) / float(usage.total) * 100.0, humanize.naturalsize(usage.free), @@ -419,11 +404,6 @@ def _put(self, args: ApiParams, request: Request, context: ApiContext) -> Respon # From this point, failure will remove the tarball from the cache # manager. - # - # NOTE: the Tarball.delete method won't clean up empty controller - # directories. This isn't ideal, but we don't want to deal with the - # potential synchronization issues and it'll become irrelevant with - # the switch to object store. For now we ignore it. recovery.add(tarball.delete) # Add the processed tarball metadata.log file contents, if any. @@ -440,18 +420,16 @@ def _put(self, args: ApiParams, request: Request, context: ApiContext) -> Respon attributes["missing_metadata"] = True Metadata.create(dataset=dataset, key=Metadata.METALOG, value=metalog) except Exception as exc: - raise CleanupTime( - HTTPStatus.INTERNAL_SERVER_ERROR, - f"Unable to create metalog for Tarball {dataset.name!r}: {exc}", - ) + raise APIInternalError( + f"Unable to create metalog for Tarball {dataset.name!r}: {exc}" + ) from exc try: retention_days = self.config.default_retention_period except Exception as e: - raise CleanupTime( - HTTPStatus.INTERNAL_SERVER_ERROR, - f"Unable to get integer retention days: {e!s}", - ) + raise APIInternalError( + f"Unable to get integer retention days: {e!s}" + ) from e # Calculate a default deletion time for the dataset, based on the # time it was uploaded rather than the time it was originally @@ -473,9 +451,7 @@ def _put(self, args: ApiParams, request: Request, context: ApiContext) -> Respon if f: attributes["failures"] = f except Exception as e: - raise CleanupTime( - HTTPStatus.INTERNAL_SERVER_ERROR, f"Unable to set metadata: {e!s}" - ) + raise APIInternalError(f"Unable to set metadata: {e!s}") from e # Finally, update the operational state and Audit success. try: @@ -489,16 +465,14 @@ def _put(self, args: ApiParams, request: Request, context: ApiContext) -> Respon root=audit, status=AuditStatus.SUCCESS, attributes=attributes ) except Exception as exc: - raise CleanupTime( - HTTPStatus.INTERNAL_SERVER_ERROR, - f"Unable to finalize dataset {dataset}: {exc!s}", - ) + raise APIInternalError( + f"Unable to finalize dataset {dataset}: {exc!s}" + ) from exc except Exception as e: - message = str(e) - if isinstance(e, CleanupTime): - status = e.status + if isinstance(e, APIAbort): + exception = e else: - status = HTTPStatus.INTERNAL_SERVER_ERROR + exception = APIInternalError(str(e)) # NOTE: there are nested try blocks so we can't be 100% confident # here that an audit "root" object was created. We don't audit on @@ -506,12 +480,11 @@ def _put(self, args: ApiParams, request: Request, context: ApiContext) -> Respon # we have a "resource" to track. We won't try to audit failure if # we didn't create the root object. if audit: - if status == HTTPStatus.INTERNAL_SERVER_ERROR: + if exception.http_status == HTTPStatus.INTERNAL_SERVER_ERROR: reason = AuditReason.INTERNAL - audit_msg = "INTERNAL ERROR" else: reason = AuditReason.CONSISTENCY - audit_msg = message + audit_msg = exception.message Audit.create( root=audit, status=AuditStatus.FAILURE, @@ -519,10 +492,7 @@ def _put(self, args: ApiParams, request: Request, context: ApiContext) -> Respon attributes={"message": audit_msg}, ) recovery.cleanup() - if status == HTTPStatus.INTERNAL_SERVER_ERROR: - raise APIInternalError(message) from e - else: - raise APIAbort(status, message) from e + raise exception from e finally: if tmp_dir: try: diff --git a/lib/pbench/server/api/resources/relay.py b/lib/pbench/server/api/resources/relay.py new file mode 100644 index 0000000000..a44c58e83e --- /dev/null +++ b/lib/pbench/server/api/resources/relay.py @@ -0,0 +1,142 @@ +from http import HTTPStatus + +from flask import Response +from flask.wrappers import Request +import requests + +from pbench.server import PbenchServerConfig +from pbench.server.api.resources import ( + APIAbort, + ApiAuthorizationType, + ApiContext, + ApiMethod, + ApiParams, + ApiSchema, + Parameter, + ParamType, + Schema, +) +from pbench.server.api.resources.intake_base import Access, Intake, IntakeBase +from pbench.server.database.models.audit import AuditType, OperationCode +from pbench.server.database.models.datasets import Dataset + + +class Relay(IntakeBase): + """Retrieve a dataset from a relay server""" + + def __init__(self, config: PbenchServerConfig): + super().__init__( + config, + ApiSchema( + ApiMethod.POST, + OperationCode.CREATE, + uri_schema=Schema(Parameter("filename", ParamType.STRING)), + query_schema=Schema( + Parameter("access", ParamType.ACCESS), + Parameter( + "metadata", + ParamType.LIST, + element_type=ParamType.STRING, + string_list=",", + ), + ), + audit_type=AuditType.NONE, + audit_name="relay", + authorization=ApiAuthorizationType.NONE, + ), + ) + + def _identify(self, args: ApiParams, request: Request) -> Intake: + """Identify the tarball to be streamed. + + We get the Relay manifest file location from the "uri" API + parameter. + + The Relay manifest is an application/json file which contains the + following required fields: + + uri: The Relay URI of the tarball file + md5: The tarball's MD5 hash (Pbench Server resource ID) + name: The original tarball name (with .tar.xz but no path) + access: The desired Dataset access level (default "private") + metadata: An optional list of "key:value" metadata strings + + This information will be captured in an Intake instance for use by the + base class and by the _stream method. + + Args: + args: API parameters + URI parameters: the Relay manifest URI + request: The original Request object containing query parameters + + Returns: + An Intake object capturing the critical information + + Raises: + APIAbort on failure + """ + uri = args.uri["uri"] + response = requests.get(uri, headers={"Accept": "application/json"}) + if not response.ok: + raise APIAbort( + HTTPStatus.BAD_GATEWAY, + f"Relay manifest URI problem: {response.reason!r}", + ) + + try: + information = response.json() + except Exception as e: + raise APIAbort( + HTTPStatus.BAD_GATEWAY, + f"Relay URI did not return a JSON manifest: {str(e)!r}", + ) from e + + try: + uri = information["uri"] + md5 = information["md5"] + name = information["name"] + access = information.get("access", Dataset.PRIVATE_ACCESS) + metadata = information.get("metadata", []) + except KeyError as e: + raise APIAbort( + HTTPStatus.BAD_GATEWAY, f"Relay info missing {str(e)!r}" + ) from e + + return Intake(name, md5, access, metadata, uri) + + def _stream(self, intake: Intake, request: Request) -> Access: + """Determine how to access the tarball byte stream + + Using the _intake information captured in the Intake instance, perform + a follow-up GET operation to the URI provided by the Relay config file, + returning the length header and the IO stream. + + Args: + intake: The Intake parameters produced by _identify + request: The Flask request object + + Returns: + An Access object with the data byte stream and length + + Raises: + APIAbort on failure + """ + response: requests.Response = requests.get( + url=intake.uri, stream=True, headers={"Accept": "application/octet-stream"} + ) + if not response.ok: + raise APIAbort( + response.status_code, + f"Unable to retrieve relay tarball: {response.reason!r}", + ) + try: + length = int(response.headers["Content-length"]) + return Access(length, response.raw) + except Exception as e: + raise APIAbort( + HTTPStatus.BAD_REQUEST, f"Unable to retrieve relay tarball: {str(e)!r}" + ) from e + + def _post(self, args: ApiParams, request: Request, context: ApiContext) -> Response: + """Launch the Relay operation from an HTTP POST""" + return self._intake(args, request, context) diff --git a/lib/pbench/server/api/resources/upload.py b/lib/pbench/server/api/resources/upload.py new file mode 100644 index 0000000000..4c37e3e3fa --- /dev/null +++ b/lib/pbench/server/api/resources/upload.py @@ -0,0 +1,132 @@ +from http import HTTPStatus + +from flask import Response +from flask.wrappers import Request + +from pbench.server import PbenchServerConfig +from pbench.server.api.resources import ( + APIAbort, + ApiAuthorizationType, + ApiContext, + ApiMethod, + ApiParams, + ApiSchema, + Parameter, + ParamType, + Schema, +) +from pbench.server.api.resources.intake_base import Access, Intake, IntakeBase +from pbench.server.database.models.audit import AuditType, OperationCode +from pbench.server.database.models.datasets import Dataset + + +class Upload(IntakeBase): + """Accept a dataset from a client""" + + def __init__(self, config: PbenchServerConfig): + super().__init__( + config, + ApiSchema( + ApiMethod.PUT, + OperationCode.CREATE, + uri_schema=Schema(Parameter("filename", ParamType.STRING)), + query_schema=Schema( + Parameter("access", ParamType.ACCESS), + Parameter( + "metadata", + ParamType.LIST, + element_type=ParamType.STRING, + string_list=",", + ), + ), + audit_type=AuditType.NONE, + audit_name="upload", + authorization=ApiAuthorizationType.NONE, + ), + ) + + def _identify(self, args: ApiParams, request: Request) -> Intake: + """Identify the tarball to be streamed. + + We get the filename from the URI: /api/v1/upload/. + + We get the dataset's resource ID (which is the tarball's MD5 checksum) + from the "Content-MD5" HTTP header. + + Args: + args: API parameters + URI parameters: filename + Query parameters: desired access and metadata + request: The original Request object containing query parameters + + Returns: + An Intake object capturing the critical information + + Raises: + APIAbort on failure + """ + + access = args.query.get("access", Dataset.PRIVATE_ACCESS) + filename = args.uri["filename"] + + # We allow the client to set metadata on the new dataset. We won't do + # anything about this until upload is successful, but we process and + # validate it here so we can fail early. + metadata = args.query.get("metadata", []) + + try: + md5sum = request.headers["Content-MD5"] + except KeyError as e: + raise APIAbort( + HTTPStatus.BAD_REQUEST, "Missing required 'Content-MD5' header" + ) from e + if not md5sum: + raise APIAbort( + HTTPStatus.BAD_REQUEST, + "Missing required 'Content-MD5' header value", + ) + + return Intake(filename, md5sum, access, metadata, uri=None) + + def _stream(self, intake: Intake, request: Request) -> Access: + """Determine how to access the tarball byte stream + + Check that the "Content-Length" header value is not 0. + + The Flask request object provides the input data stream. + + Args: + intake: The Intake parameters produced by _identify + request: The Flask request object + + Returns: + An Access object with the data byte stream and length + + Raises: + APIAbort on failure + """ + try: + length_string = request.headers["Content-Length"] + content_length = int(length_string) + except KeyError as e: + # NOTE: Werkzeug is "smart" about header access, and knows that + # Content-Length is an integer. Therefore, a non-integer value + # will raise KeyError. It's virtually impossible to report the + # actual incorrect value as we'd just get a KeyError again. + raise APIAbort( + HTTPStatus.LENGTH_REQUIRED, + "Missing or invalid 'Content-Length' header", + ) from e + except ValueError as e: + # NOTE: Because of the way Werkzeug works, this should not be + # possible: if Content-Length isn't an integer, we'll see the + # KeyError. This however serves as a clarifying backup case. + raise APIAbort( + HTTPStatus.BAD_REQUEST, + f"Invalid 'Content-Length' header, not an integer ({length_string})", + ) from e + return Access(content_length, request.stream) + + def _put(self, args: ApiParams, request: Request, context: ApiContext) -> Response: + """Launch the upload operation from an HTTP PUT""" + return self._intake(args, request, context) diff --git a/lib/pbench/test/unit/server/test_endpoint_configure.py b/lib/pbench/test/unit/server/test_endpoint_configure.py index d870a94937..ac167d1e45 100644 --- a/lib/pbench/test/unit/server/test_endpoint_configure.py +++ b/lib/pbench/test/unit/server/test_endpoint_configure.py @@ -106,6 +106,10 @@ def check_config(self, client, server_config, host, my_headers={}): "template": f"{uri}/key/{{key}}", "params": {"key": {"type": "string"}}, }, + "relay": { + "template": f"{uri}/relay/{{uri}}", + "params": {"uri": {"type": "path"}}, + }, "server_audit": {"template": f"{uri}/server/audit", "params": {}}, "server_settings": { "template": f"{uri}/server/settings/{{key}}", diff --git a/lib/pbench/test/unit/server/test_relay.py b/lib/pbench/test/unit/server/test_relay.py new file mode 100644 index 0000000000..ae9affbcd5 --- /dev/null +++ b/lib/pbench/test/unit/server/test_relay.py @@ -0,0 +1,323 @@ +from http import HTTPStatus +from logging import Logger +from pathlib import Path + +from flask import Request +import pytest +import responses + +from pbench.server import OperationCode, PbenchServerConfig +from pbench.server.api.resources import ApiParams +from pbench.server.api.resources.intake_base import Intake +from pbench.server.api.resources.relay import Relay +from pbench.server.cache_manager import CacheManager +from pbench.server.database.models.audit import ( + Audit, + AuditReason, + AuditStatus, + AuditType, +) +from pbench.server.database.models.datasets import Dataset +from pbench.test.unit.server import DRB_USER_ID + + +class TestRelay: + """Test the Relay API. + + This focuses on testing the unique aspects of the _prepare and _access + methods rather than repeating coverage of all the common base class code. + + In particular, failure of either of the two external GET operations to the + relay, and problems in the Relay configuration file. + """ + + cachemanager_created = None + cachemanager_create_fail = None + cachemanager_create_path = None + tarball_deleted = None + create_metadata = True + + @staticmethod + def gen_uri(server_config, uri="https://relay.example.com/sha256"): + return f"{server_config.rest_uri}/relay/{uri}" + + def gen_headers(self, auth_token): + headers = {"Authorization": "Bearer " + auth_token} + return headers + + @pytest.fixture(scope="function", autouse=True) + def fake_cache_manager(self, monkeypatch): + class FakeTarball: + def __init__(self, path: Path): + self.tarball_path = path + self.name = Dataset.stem(path) + self.metadata = None + + def delete(self): + TestRelay.tarball_deleted = self.name + + class FakeCacheManager(CacheManager): + def __init__(self, options: PbenchServerConfig, logger: Logger): + self.controllers = [] + self.datasets = {} + TestRelay.cachemanager_created = self + + def create(self, path: Path) -> FakeTarball: + controller = "ctrl" + TestRelay.cachemanager_create_path = path + if TestRelay.cachemanager_create_fail: + raise TestRelay.cachemanager_create_fail + self.controllers.append(controller) + tarball = FakeTarball(path) + if TestRelay.create_metadata: + tarball.metadata = {"pbench": {"date": "2002-05-16T00:00:00"}} + self.datasets[tarball.name] = tarball + return tarball + + TestRelay.cachemanager_created = None + TestRelay.cachemanager_create_fail = None + TestRelay.cachemanager_create_path = None + TestRelay.tarball_deleted = None + monkeypatch.setattr(CacheManager, "__init__", FakeCacheManager.__init__) + monkeypatch.setattr(CacheManager, "create", FakeCacheManager.create) + + def test_missing_authorization_header(self, client, server_config): + """Verify the authorization check""" + response = client.post(self.gen_uri(server_config)) + assert response.status_code == HTTPStatus.UNAUTHORIZED + assert not self.cachemanager_created + + @responses.activate + def test_relay(self, client, server_config, pbench_drb_token, tarball): + """Verify the success path + + Ensure successful completion when the primary relay URI returns a valid + relay manifest referencing a secondary relay URI containing a tarball. + """ + file, md5file, md5 = tarball + responses.add( + responses.GET, + "https://relay.example.com/uri1", + status=HTTPStatus.OK, + json={ + "uri": "https://relay.example.com/uri2", + "name": file.name, + "md5": md5, + "access": "private", + "metadata": ["global.pbench.test:data"], + }, + ) + responses.add( + responses.GET, + "https://relay.example.com/uri2", + status=HTTPStatus.OK, + body=file.open("rb"), + headers={"content-length": f"{file.stat().st_size}"}, + content_type="application/octet-stream", + ) + response = client.post( + self.gen_uri(server_config, "https://relay.example.com/uri1"), + headers=self.gen_headers(pbench_drb_token), + ) + assert ( + response.status_code == HTTPStatus.CREATED + ), f"Unexpected result, {response.text}" + + audit = Audit.query() + assert len(audit) == 2 + assert audit[0].id == 1 + assert audit[0].root_id is None + assert audit[0].operation == OperationCode.CREATE + assert audit[0].status == AuditStatus.BEGIN + assert audit[0].name == "relay" + assert audit[0].object_type == AuditType.DATASET + assert audit[0].object_id == md5 + assert audit[0].object_name == Dataset.stem(file) + assert audit[0].user_id == DRB_USER_ID + assert audit[0].user_name == "drb" + assert audit[0].reason is None + assert audit[0].attributes == { + "access": "private", + "metadata": {"global.pbench.test": "data"}, + } + assert audit[1].id == 2 + assert audit[1].root_id == 1 + assert audit[1].operation == OperationCode.CREATE + assert audit[1].status == AuditStatus.SUCCESS + assert audit[1].name == "relay" + assert audit[1].object_type == AuditType.DATASET + assert audit[1].object_id == md5 + assert audit[1].object_name == Dataset.stem(file) + assert audit[1].user_id == DRB_USER_ID + assert audit[1].user_name == "drb" + assert audit[1].reason is None + assert audit[1].attributes == { + "access": "private", + "metadata": {"global.pbench.test": "data"}, + } + + @responses.activate + def test_relay_tar_fail(self, client, server_config, pbench_drb_token, tarball): + """Verify failure when secondary relay URI is not found""" + file, md5file, md5 = tarball + responses.add( + responses.GET, + "https://relay.example.com/uri1", + status=HTTPStatus.OK, + json={ + "uri": "https://relay.example.com/uri2", + "name": file.name, + "md5": md5, + "access": "private", + "metadata": [], + }, + ) + responses.add( + responses.GET, "https://relay.example.com/uri2", status=HTTPStatus.NOT_FOUND + ) + response = client.post( + self.gen_uri(server_config, "https://relay.example.com/uri1"), + headers=self.gen_headers(pbench_drb_token), + ) + assert ( + response.status_code == HTTPStatus.NOT_FOUND + ), f"Unexpected result, {response.text}" + + audit = Audit.query() + assert len(audit) == 2 + assert audit[0].id == 1 + assert audit[0].root_id is None + assert audit[0].operation == OperationCode.CREATE + assert audit[0].status == AuditStatus.BEGIN + assert audit[0].name == "relay" + assert audit[0].object_type == AuditType.DATASET + assert audit[0].object_id == md5 + assert audit[0].object_name == Dataset.stem(file) + assert audit[0].user_id == DRB_USER_ID + assert audit[0].user_name == "drb" + assert audit[0].reason is None + assert audit[0].attributes == { + "access": "private", + "metadata": {}, + } + assert audit[1].id == 2 + assert audit[1].root_id == 1 + assert audit[1].operation == OperationCode.CREATE + assert audit[1].status == AuditStatus.FAILURE + assert audit[1].name == "relay" + assert audit[1].object_type == AuditType.DATASET + assert audit[1].object_id == md5 + assert audit[1].object_name == Dataset.stem(file) + assert audit[1].user_id == DRB_USER_ID + assert audit[1].user_name == "drb" + assert audit[1].reason == AuditReason.CONSISTENCY + assert audit[1].attributes == { + "message": "Unable to retrieve relay tarball: 'Not Found'" + } + + @responses.activate + def test_relay_no_manifest(self, client, server_config, pbench_drb_token): + """Verify behavior when the primary relay URI isn't found""" + responses.add( + responses.GET, "https://relay.example.com/uri1", status=HTTPStatus.NOT_FOUND + ) + response = client.post( + self.gen_uri(server_config, "https://relay.example.com/uri1"), + headers=self.gen_headers(pbench_drb_token), + ) + assert response.status_code == HTTPStatus.BAD_GATEWAY + assert response.json["message"] == "Relay manifest URI problem: 'Not Found'" + + @responses.activate + def test_relay_not_json(self, client, server_config, pbench_drb_token): + """Verify behavior when the primary relay URI doesn't return a JSON + document. + """ + responses.add( + responses.GET, + "https://relay.example.com/uri1", + status=HTTPStatus.OK, + body="This isn't JSON", + ) + response = client.post( + self.gen_uri(server_config, "https://relay.example.com/uri1"), + headers=self.gen_headers(pbench_drb_token), + ) + assert response.status_code == HTTPStatus.BAD_GATEWAY + assert ( + response.json["message"] + == "Relay URI did not return a JSON manifest: 'Expecting value: line 1 column 1 (char 0)'" + ) + + @responses.activate + def test_relay_missing_json_field(self, client, server_config, pbench_drb_token): + """Verify behavior when the relay manifest doesn't include the + secondary relay URI field.""" + responses.add( + responses.GET, + "https://relay.example.com/uri1", + status=HTTPStatus.OK, + json={ + "name": "tarball.tar.xz", + "md5": "md5", + "access": "private", + "metadata": [], + }, + ) + response = client.post( + self.gen_uri(server_config, "https://relay.example.com/uri1"), + headers=self.gen_headers(pbench_drb_token), + ) + assert response.status_code == HTTPStatus.BAD_GATEWAY + assert response.json["message"] == "Relay info missing \"'uri'\"" + + def test_relay_bad_identify( + self, client, server_config, pbench_drb_token, monkeypatch + ): + """Verify behavior when an unexpected error occurs in the _identify + helper. + """ + + def throw(self, args: ApiParams, request: Request) -> Intake: + raise Exception("An exception that's not APIAbort") + + monkeypatch.setattr(Relay, "_identify", throw) + response = client.post( + self.gen_uri(server_config, "https://relay.example.com/uri1"), + headers=self.gen_headers(pbench_drb_token), + ) + assert ( + response.status_code == HTTPStatus.INTERNAL_SERVER_ERROR + ), f"Unexpected result, {response.text}" + + @responses.activate + def test_relay_bad_stream( + self, client, server_config, pbench_drb_token, monkeypatch + ): + """Verify behavior when an unexpected error occurs in the _stream + helper. + """ + + def throw(self, args: ApiParams, request: Request) -> Intake: + raise Exception("An exception that's not APIAbort") + + monkeypatch.setattr(Relay, "_stream", throw) + responses.add( + responses.GET, + "https://relay.example.com/uri1", + status=HTTPStatus.OK, + json={ + "name": "tarball.tar.xz", + "md5": "badmd5", + "access": "private", + "metadata": [], + "uri": "https://relay.example.com/uri2", + }, + ) + response = client.post( + self.gen_uri(server_config, "https://relay.example.com/uri1"), + headers=self.gen_headers(pbench_drb_token), + ) + assert ( + response.status_code == HTTPStatus.INTERNAL_SERVER_ERROR + ), f"Unexpected result, {response.text}" diff --git a/lib/pbench/test/unit/server/test_upload.py b/lib/pbench/test/unit/server/test_upload.py index b7e93628e0..34717e69ef 100644 --- a/lib/pbench/test/unit/server/test_upload.py +++ b/lib/pbench/test/unit/server/test_upload.py @@ -1,12 +1,16 @@ +import errno from http import HTTPStatus +from io import BytesIO from logging import Logger from pathlib import Path from typing import Any -from freezegun import freeze_time +from flask import Request import pytest from pbench.server import OperationCode, PbenchServerConfig +from pbench.server.api.resources.intake_base import Access, Intake +from pbench.server.api.resources.upload import Upload from pbench.server.cache_manager import CacheManager, DuplicateTarball from pbench.server.database.models.audit import ( Audit, @@ -18,7 +22,7 @@ Dataset, DatasetNotFound, Metadata, - MetadataKeyError, + MetadataProtectedKey, ) from pbench.test.unit.server import DRB_USER_ID @@ -139,12 +143,15 @@ def test_missing_filename_extension( """Test with URL uploading a file named "f" which is missing the required filename extension""" expected_message = "File extension not supported, must be .tar.xz" - response = client.put( - f"{server_config.rest_uri}/upload/f", - headers={ - "Authorization": "Bearer " + pbench_drb_token, - }, - ) + with BytesIO(b"junk") as f: + response = client.put( + f"{server_config.rest_uri}/upload/f", + data=f, + headers={ + "Authorization": "Bearer " + pbench_drb_token, + "Content-MD5": "abcde", + }, + ) assert response.status_code == HTTPStatus.BAD_REQUEST assert response.json.get("message") == expected_message self.verify_logs(caplog) @@ -183,14 +190,15 @@ def test_bad_length_header_upload( self.verify_logs(caplog) assert not self.cachemanager_created + @pytest.mark.freeze_time("1970-01-01") def test_bad_metadata_upload(self, client, server_config, pbench_drb_token): - with freeze_time("1970-01-01 00:42:00"): + with BytesIO(b"junk") as f: response = client.put( self.gen_uri(server_config), + data=f, headers={ "Authorization": "Bearer " + pbench_drb_token, "Content-MD5": "ANYMD5", - "Content-Length": "STRING", }, query_string={ "metadata": "global.xyz#A@b=z:y,foobar.badpath:data,server.deletion:3000-12-25T23:59:59+00:00" @@ -252,6 +260,45 @@ def test_bad_extension_upload( self.verify_logs(caplog) assert not self.cachemanager_created + @pytest.mark.parametrize("error", (errno.ENOSPC, errno.ENFILE, None)) + def test_bad_stream_read( + self, client, server_config, pbench_drb_token, monkeypatch, error + ): + """Test handling of errors from the intake stream read + + The intake code handles errno.ENOSPC specially; however although the + code tried to raise an APIAbort with HTTPStatus.INSUFFICIENT_SPACE + (50), the werkzeug abort() doesn't support this and ends up with + a generic internal server error. Instead, we now have three distinct + cases which all result (to the client) in identical internal server + errors. Nevertheless, we exercise all three cases here. + """ + stream = BytesIO(b"12345") + + def access(self, intake: Intake, request: Request) -> Access: + return Access(5, stream) + + def read(self): + if error: + e = OSError(error, "something went badly") + else: + e = Exception("Nobody expects the Spanish Exception") + raise e + + monkeypatch.setattr(Upload, "_stream", access) + monkeypatch.setattr(stream, "read", read) + + with BytesIO(b"12345") as data_fp: + response = client.put( + self.gen_uri(server_config, "name.tar.xz"), + data=data_fp, + headers=self.gen_headers(pbench_drb_token, "md5sum"), + ) + assert response.status_code == HTTPStatus.INTERNAL_SERVER_ERROR + assert response.json.get("message").startswith( + "Internal Pbench Server Error: log reference " + ) + def test_invalid_authorization_upload( self, client, caplog, server_config, pbench_drb_token_invalid ): @@ -491,7 +538,7 @@ def test_upload_metadata_error( datafile, _, md5 = tarball def setvalue(dataset: Dataset, key: str, value: Any): - raise MetadataKeyError() + raise MetadataProtectedKey(key) monkeypatch.setattr(Metadata, "setvalue", setvalue) @@ -538,7 +585,11 @@ def setvalue(dataset: Dataset, key: str, value: Any): assert audit[1].user_id == DRB_USER_ID assert audit[1].user_name == "drb" assert audit[1].reason == AuditReason.INTERNAL - assert audit[1].attributes == {"message": "INTERNAL ERROR"} + assert ( + audit[1] + .attributes["message"] + .startswith("Internal Pbench Server Error: log reference ") + ) @pytest.mark.freeze_time("1970-01-01") def test_upload_archive(self, client, pbench_drb_token, server_config, tarball):