Skip to content

Commit

Permalink
Issue #93/#95 experimental: enforce back-end selection with job option
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Mar 13, 2023
1 parent 29a3756 commit 14c06e3
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The format is roughly based on [Keep a Changelog](https://keepachangelog.com/en/
### Added

- Add support for more user roles ([#64](https://github.com/Open-EO/openeo-aggregator/issues/64))
- Experimental: allow back-end selection through job option

### Changed

Expand Down
2 changes: 1 addition & 1 deletion src/openeo_aggregator/about.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.7.1a1"
__version__ = "0.7.2a1"
52 changes: 44 additions & 8 deletions src/openeo_aggregator/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@
from openeo_aggregator.config import AggregatorConfig, CONNECTION_TIMEOUT_RESULT, CONNECTION_TIMEOUT_JOB_START
from openeo_aggregator.connection import MultiBackendConnection, BackendConnection, streaming_flask_response
import openeo_aggregator.egi
from openeo_aggregator.constants import (
JOB_OPTION_FORCE_BACKEND,
JOB_OPTION_SPLIT_STRATEGY,
JOB_OPTION_TILE_GRID,
)
from openeo_aggregator.errors import BackendLookupFailureException
from openeo_aggregator.metadata import (
STAC_PROPERTY_PROVIDER_BACKEND,
Expand Down Expand Up @@ -304,13 +309,26 @@ def _get_merged_process_metadata(self) -> dict:
)
return combined_processes

def get_backend_for_process_graph(self, process_graph: dict, api_version: str) -> str:
def get_backend_for_process_graph(
self, process_graph: dict, api_version: str, job_options: Optional[dict] = None
) -> str:
"""
Get backend capable of executing given process graph (based on used collections, processes, ...)
"""
# Initial list of candidates
backend_candidates: List[str] = [b.id for b in self.backends]

if job_options and JOB_OPTION_FORCE_BACKEND in job_options:
# Experimental feature to force a certain upstream back-end through job options
bid = job_options[JOB_OPTION_FORCE_BACKEND]
if bid not in backend_candidates:
# TODO use generic client error class
raise OpenEOApiException(
status_code=404,
message=f"Invalid job option {JOB_OPTION_FORCE_BACKEND!r}: {bid!r} not in {backend_candidates!r}",
)
return bid

# TODO: also check used processes?
collections = set()
collection_backend_constraints = []
Expand Down Expand Up @@ -488,7 +506,10 @@ def create_job(
raise ProcessGraphMissingException()

# TODO: better, more generic/specific job_option(s)?
if job_options and (job_options.get("split_strategy") or job_options.get("tile_grid")):
if job_options and (
job_options.get(JOB_OPTION_SPLIT_STRATEGY)
or job_options.get(JOB_OPTION_TILE_GRID)
):
return self._create_partitioned_job(
user_id=user_id,
process=process,
Expand All @@ -506,13 +527,28 @@ def create_job(
)

def _create_job_standard(
self, user_id: str, process_graph: dict, api_version: str, metadata: dict, job_options: dict = None
self,
user_id: str,
process_graph: dict,
api_version: str,
metadata: dict,
job_options: Optional[dict] = None,
) -> BatchJobMetadata:
"""Standard batch job creation: just proxy to a single batch job on single back-end."""
backend_id = self.processing.get_backend_for_process_graph(
process_graph=process_graph, api_version=api_version
process_graph=process_graph,
api_version=api_version,
job_options=job_options,
)
process_graph = self.processing.preprocess_process_graph(process_graph, backend_id=backend_id)
process_graph = self.processing.preprocess_process_graph(
process_graph, backend_id=backend_id
)
if job_options:
additional = {
k: v for k, v in job_options.items() if not k.startswith("_agg_")
}
else:
additional = None

con = self.backends.get_connection(backend_id)
with con.authenticated_from_request(request=flask.request, user=User(user_id=user_id)), \
Expand All @@ -522,7 +558,7 @@ def _create_job_standard(
process_graph=process_graph,
title=metadata.get("title"), description=metadata.get("description"),
plan=metadata.get("plan"), budget=metadata.get("budget"),
additional=job_options,
additional=additional,
)
except OpenEoApiError as e:
for exc_class in [ProcessGraphMissingException, ProcessGraphInvalidException]:
Expand All @@ -549,9 +585,9 @@ def _create_partitioned_job(
if not self.partitioned_job_tracker:
raise FeatureUnsupportedException(message="Partitioned job tracking is not supported")

if "tile_grid" in job_options:
if JOB_OPTION_TILE_GRID in job_options:
splitter = TileGridSplitter(processing=self.processing)
elif job_options.get("split_strategy") == "flimsy":
elif job_options.get(JOB_OPTION_SPLIT_STRATEGY) == "flimsy":
splitter = FlimsySplitter(processing=self.processing)
else:
raise ValueError("Could not determine splitting strategy from job options")
Expand Down
6 changes: 6 additions & 0 deletions src/openeo_aggregator/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
JOB_OPTION_SPLIT_STRATEGY = "split_strategy"

JOB_OPTION_TILE_GRID = "tile_grid"

# Experimental feature to force a certain upstream back-end through job options
JOB_OPTION_FORCE_BACKEND = "_agg_force_backend"
3 changes: 2 additions & 1 deletion src/openeo_aggregator/partitionedjobs/splitting.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from typing import List

from openeo.internal.process_graph_visitor import ProcessGraphVisitor
from openeo_aggregator.constants import JOB_OPTION_TILE_GRID
from openeo_aggregator.partitionedjobs import PartitionedJob, SubJob, PartitionedJobFailure
from openeo_aggregator.utils import BoundingBox
from openeo_driver.ProcessGraphDeserializer import convert_node, ENV_DRY_RUN_TRACER, ConcreteProcessing
Expand Down Expand Up @@ -152,7 +153,7 @@ def split(self, process: dict, metadata: dict = None, job_options: dict = None)
global_spatial_extent = self._extract_global_spatial_extent(process)
# TODO: pass tile_grid from job_options or from save_result format options?
# see https://github.com/openEOPlatform/architecture-docs/issues/187
tile_grid = TileGrid.from_string(job_options["tile_grid"])
tile_grid = TileGrid.from_string(job_options[JOB_OPTION_TILE_GRID])
tiles = tile_grid.get_tiles(bbox=global_spatial_extent, max_tiles=job_options.get("max_tiles", MAX_TILES))
inject = self._filter_bbox_injector(process_graph=process_graph)

Expand Down
1 change: 1 addition & 0 deletions src/openeo_aggregator/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

# Generic "sentinel object" for unset values (where `None` is valid value)
# https://python-patterns.guide/python/sentinel-object/)
# TODO: this does not have to private
_UNSET = object()

_log = logging.getLogger(__name__)
Expand Down
67 changes: 62 additions & 5 deletions tests/test_views.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
import logging
import re
from typing import Tuple, List

import pytest
import re
import requests
from typing import Tuple, List

from openeo.rest.connection import url_join
from openeo.rest import OpenEoApiError, OpenEoRestError
from openeo.rest.connection import url_join
from openeo.util import rfc3339
from openeo_aggregator.config import AggregatorConfig
from openeo_aggregator.constants import JOB_OPTION_FORCE_BACKEND
from openeo_aggregator.metadata import (
STAC_PROPERTY_PROVIDER_BACKEND,
STAC_PROPERTY_FEDERATION_BACKENDS,
)
from openeo_aggregator.testing import clock_mock, build_capabilities
from openeo_driver.backend import ServiceMetadata
from openeo_driver.errors import JobNotFoundException, JobNotFinishedException, \
ProcessGraphInvalidException, ProcessGraphMissingException
from openeo_driver.backend import ServiceMetadata
from openeo_driver.testing import (
ApiTester,
TEST_USER_AUTH_HEADER,
Expand Down Expand Up @@ -1148,6 +1148,63 @@ def post_jobs(request: requests.Request, context):
res = api100.post("/jobs", json={"process": {"process_graph": pg}})
res.assert_error(500, "Internal", message="Failed to create job on backend 'b1'")

@pytest.mark.parametrize(
["force_backend", "expected"],
[("b1", "b1"), ("b2", "b2"), (None, "b1")],
)
def test_create_job_force_backend(
self, api100, requests_mock, backend1, backend2, force_backend, expected
):
requests_mock.get(
backend1 + "/collections", json={"collections": [{"id": "S2"}]}
)
requests_mock.get(
backend2 + "/collections", json={"collections": [{"id": "S2"}]}
)

jobs = []

def post_jobs(request: requests.Request, context):
nonlocal jobs
jobs.append(request.json())
context.headers["Location"] = backend1 + "/jobs/th3j0b"
context.headers["OpenEO-Identifier"] = "th3j0b"
context.status_code = 201

backend1_post_jobs = requests_mock.post(backend1 + "/jobs", text=post_jobs)
backend2_post_jobs = requests_mock.post(backend2 + "/jobs", text=post_jobs)

pg = {
"lc": {
"process_id": "load_collection",
"arguments": {"id": "S2"},
"result": True,
}
}
job_options = {}

if force_backend:
job_options[JOB_OPTION_FORCE_BACKEND] = force_backend
api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN)
res = api100.post(
"/jobs",
json={
"process": {"process_graph": pg},
"job_options": job_options,
},
).assert_status_code(201)
assert (
res.headers["Location"]
== f"http://oeoa.test/openeo/1.0.0/jobs/{expected}-th3j0b"
)
assert res.headers["OpenEO-Identifier"] == f"{expected}-th3j0b"
assert jobs == [{"process": {"process_graph": pg}}]

assert (backend1_post_jobs.call_count, backend2_post_jobs.call_count) == {
"b1": (1, 0),
"b2": (0, 1),
}[expected]

def test_get_job_metadata(self, api100, requests_mock, backend1):
requests_mock.get(backend1 + "/jobs/th3j0b", json={
"id": "th3j0b",
Expand Down

0 comments on commit 14c06e3

Please sign in to comment.