From 6ae502aa244efb760f0104227ba4ae53b36dda08 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Wed, 4 Oct 2023 18:25:20 +0200 Subject: [PATCH] Issue #28 initial implementation of parallel requests (on `/jobs`) --- src/openeo_aggregator/backend.py | 34 +++++---- src/openeo_aggregator/connection.py | 111 +++++++++++++++++++++++++++- 2 files changed, 127 insertions(+), 18 deletions(-) diff --git a/src/openeo_aggregator/backend.py b/src/openeo_aggregator/backend.py index 81cc47d8..bdabafce 100644 --- a/src/openeo_aggregator/backend.py +++ b/src/openeo_aggregator/backend.py @@ -599,22 +599,28 @@ def __init__( def get_user_jobs(self, user_id: str) -> Union[List[BatchJobMetadata], dict]: jobs = [] federation_missing = set() - for con in self.backends: - with con.authenticated_from_request(request=flask.request, user=User(user_id)), \ - TimingLogger(f"get_user_jobs: {con.id}", logger=_log.debug): + + results = self.backends.request_parallel( + path="/jobs", method="GET", expected_status=[200], authenticated_from_request=flask.request + ) + for backend_id, success, result in results: + if success: try: - backend_jobs = con.list_jobs() + for job in result["jobs"]: + try: + job["id"] = JobIdMapping.get_aggregator_job_id( + backend_job_id=job["id"], backend_id=backend_id + ) + jobs.append(BatchJobMetadata.from_api_dict(job)) + except Exception as e: + _log.error(f"get_user_jobs: skipping job with parse issue: {e!r}", exc_info=True) except Exception as e: - # TODO: user warning https://github.com/Open-EO/openeo-api/issues/412 - _log.warning(f"Failed to get job listing from backend {con.id!r}: {e!r}") - federation_missing.add(con.id) - backend_jobs = [] - for job in backend_jobs: - try: - job["id"] = JobIdMapping.get_aggregator_job_id(backend_job_id=job["id"], backend_id=con.id) - jobs.append(BatchJobMetadata.from_api_dict(job)) - except Exception as e: - _log.error(f"get_user_jobs: skipping job with parse issue: {e!r}", exc_info=True) + _log.warning(f"Invalid job listing from backend {backend_id!r}: {e!r}") + federation_missing.add(backend_id) + else: + # TODO: user warning https://github.com/Open-EO/openeo-api/issues/412 + _log.warning(f"Failed to get job listing from backend {backend_id!r}: {result!r}") + federation_missing.add(backend_id) if self.partitioned_job_tracker: for job in self.partitioned_job_tracker.list_user_jobs(user_id=user_id): diff --git a/src/openeo_aggregator/connection.py b/src/openeo_aggregator/connection.py index 92361c64..a4515d50 100644 --- a/src/openeo_aggregator/connection.py +++ b/src/openeo_aggregator/connection.py @@ -1,14 +1,28 @@ import collections +import concurrent.futures import contextlib import logging import re -from typing import Any, Callable, Dict, Iterator, List, Optional, Set, Tuple, Union +from typing import ( + Any, + Callable, + Dict, + Iterable, + Iterator, + List, + Optional, + Set, + Tuple, + Union, +) import flask import requests from openeo import Connection from openeo.capabilities import ComparableVersion from openeo.rest.auth.auth import BearerAuth, OpenEoApiAuthBase +from openeo.rest.connection import RestApiConnection +from openeo.util import TimingLogger from openeo_driver.backend import OidcProvider from openeo_driver.errors import ( AuthenticationRequiredException, @@ -31,6 +45,10 @@ _log = logging.getLogger(__name__) +# Type annotation aliases + +BackendId = str + class LockedAuthException(InternalException): def __init__(self): super().__init__(message="Setting auth while locked.") @@ -105,8 +123,11 @@ def _build_oidc_provider_map(self, configured_providers: List[OidcProvider]) -> def get_oidc_provider_map(self) -> Dict[str, str]: return self._oidc_provider_map - def _get_bearer(self, request: flask.Request) -> str: - """Extract authorization header from request and (optionally) transform for given backend """ + def extract_bearer(self, request: flask.Request) -> str: + """ + Extract authorization header from flask request + and (optionally) transform for current backend. + """ if "Authorization" not in request.headers: raise AuthenticationRequiredException auth = request.headers["Authorization"] @@ -134,7 +155,7 @@ def authenticated_from_request( Context manager to temporarily authenticate upstream connection based on current incoming flask request. """ self._auth_locked = False - self.auth = BearerAuth(bearer=self._get_bearer(request=request)) + self.auth = BearerAuth(bearer=self.extract_bearer(request=request)) # TODO store and use `user` object? try: yield self @@ -256,6 +277,9 @@ def get_connections(self) -> List[BackendConnection]: return self._connections_cache.connections + def __len__(self) -> int: + return len(self._backend_urls) + def __iter__(self) -> Iterator[BackendConnection]: return iter(self.get_connections()) @@ -320,6 +344,85 @@ def map( # TODO: customizable exception handling: skip, warn, re-raise? yield con.id, res + def request_parallel( + self, + path: str, + *, + method: str = "GET", + parse_json: bool = True, + authenticated_from_request: Optional[flask.Request] = None, + expected_status: Union[int, Iterable[int], None] = None, + request_timeout: float = 5, + overall_timeout: float = 8, + max_workers=5, + ) -> List[Tuple[BackendId, bool, Any]]: + """ + Request a given (relative) url on each backend in parallel + :param path: relative (openEO) path to request + :return: + """ + + def do_request( + root_url: str, + path: str, + *, + method: str = "GET", + headers: Optional[dict] = None, + auth: Optional[str] = None, + ) -> Union[dict, bytes]: + """Isolated request, to behanled by future.""" + with TimingLogger(title=f"request_parallel {method} {path} on {root_url}", logger=_log): + con = RestApiConnection(root_url=root_url) + resp = con.request( + method=method, + path=path, + headers=headers, + auth=auth, + timeout=request_timeout, + expected_status=expected_status, + ) + if parse_json: + return resp.json() + else: + return resp.content + + max_workers = min(max_workers, len(self)) + + with TimingLogger( + title=f"request_parallel {method} {path} on {len(self)} backends with thread pool {max_workers=}", + logger=_log, + ), concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + # Submit all futures (one for each backend connection) + futures: List[Tuple[BackendId, concurrent.futures.Future]] = [] + for con in self.get_connections(): + if authenticated_from_request: + auth = BearerAuth(bearer=con.extract_bearer(request=authenticated_from_request)) + else: + auth = None + future = executor.submit( + do_request, + root_url=con.root_url, + path=path, + method=method, + headers=con.default_headers, + auth=auth, + ) + futures.append((con.id, future)) + + # Give futures some time to finish + concurrent.futures.wait([f for (_, f) in futures], timeout=overall_timeout) + + # Collect results. + results: List[Tuple[BackendId, bool, Any]] = [] + for backend_id, future in futures: + try: + result = future.result(timeout=0) + results.append((backend_id, True, result)) + except Exception as e: + results.append((backend_id, False, e)) + + return results + def streaming_flask_response( backend_response: requests.Response,