Skip to content

Commit

Permalink
Introduce BatchJobs.get_result_metadata and allow custom job result…
Browse files Browse the repository at this point in the history
… links

related to Open-EO/openeo-aggregator#98
  • Loading branch information
soxofaan committed Mar 21, 2023
1 parent 4fc539c commit b6b4871
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 30 deletions.
2 changes: 1 addition & 1 deletion openeo_driver/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.38.0a1"
__version__ = "0.38.1a1"
25 changes: 24 additions & 1 deletion openeo_driver/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import abc
import json
import logging
import dataclasses

import sys
from datetime import datetime, timedelta
from pathlib import Path
Expand Down Expand Up @@ -283,6 +285,7 @@ class BatchJobMetadata(NamedTuple):
end_datetime: datetime = None
instruments: List[str] = None
epsg: int = None
# TODO: openEO API associates `links` with the job *result* metadata, not the job itself
links: List[Dict] = None
usage: Dict = None

Expand Down Expand Up @@ -352,6 +355,14 @@ def to_api_dict(self, full=True, api_version: ComparableVersion = None) -> dict:
return dict_no_none(result)


@dataclasses.dataclass
class BatchJobResultMetadata:
# Basic dataclass based wrapper for batch job result metadata (allows cleaner code navigation and discovery)
assets: Dict[str, dict] = dataclasses.field(default_factory=dict)
links: List[dict] = dataclasses.field(default_factory=list)
# TODO: more fields


class BatchJobs(MicroService):
"""
Base contract/implementation for Batch Jobs "microservice"
Expand Down Expand Up @@ -392,6 +403,18 @@ def start_job(self, job_id: str, user: User):
"""
raise NotImplementedError

def get_result_metadata(self, job_id: str, user_id: str) -> BatchJobResultMetadata:
"""
Get job result metadata
https://openeo.org/documentation/1.0/developers/api/reference.html#tag/Batch-Jobs/operation/list-results
"""
# Default implementation, based on existing components
return BatchJobResultMetadata(
assets=self.get_result_assets(job_id=job_id, user_id=user_id),
links=[],
)

def get_result_assets(self, job_id: str, user_id: str) -> Dict[str, dict]:
"""
Return result assets as (filename, metadata) mapping: `filename` is the part that
Expand All @@ -401,7 +424,7 @@ def get_result_assets(self, job_id: str, user_id: str) -> Dict[str, dict]:
related:
https://openeo.org/documentation/1.0/developers/api/reference.html#tag/Batch-Jobs/operation/list-results
"""
# Default implementation: fall back on legacy method
# Default implementation, based on legacy API
return self.get_results(job_id=job_id, user_id=user_id)

def get_results(self, job_id: str, user_id: str) -> Dict[str, dict]:
Expand Down
32 changes: 28 additions & 4 deletions openeo_driver/dummy/dummy_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,20 @@
from openeo.internal.process_graph_visitor import ProcessGraphVisitor
from openeo.metadata import CollectionMetadata, Band
from openeo_driver.ProcessGraphDeserializer import ConcreteProcessing
from openeo_driver.backend import (SecondaryServices, OpenEoBackendImplementation, CollectionCatalog, ServiceMetadata,
BatchJobs, BatchJobMetadata, OidcProvider, UserDefinedProcesses,
UserDefinedProcessMetadata, LoadParameters, Processing)
from openeo_driver.backend import (
SecondaryServices,
OpenEoBackendImplementation,
CollectionCatalog,
ServiceMetadata,
BatchJobs,
BatchJobMetadata,
OidcProvider,
UserDefinedProcesses,
UserDefinedProcessMetadata,
LoadParameters,
Processing,
BatchJobResultMetadata,
)
from openeo_driver.datacube import DriverDataCube, DriverMlModel, DriverVectorCube
from openeo_driver.datastructs import StacAsset
from openeo_driver.delayed_vector import DelayedVector
Expand Down Expand Up @@ -570,8 +581,9 @@ def extra_validation(


class DummyBatchJobs(BatchJobs):
_job_registry = {}
_job_registry: Dict[Tuple[str, str], BatchJobMetadata] = {}
_custom_job_logs = {}
_job_result_registry: Dict[Tuple[str, str], BatchJobResultMetadata] = {}

def generate_job_id(self):
return generate_unique_id(prefix="j")
Expand Down Expand Up @@ -612,6 +624,12 @@ def _update_status(cls, job_id: str, user_id: str, status: str):
except KeyError:
raise JobNotFoundException(job_id)

@classmethod
def set_result_metadata(
cls, job_id: str, user_id: str, metadata: BatchJobResultMetadata
):
cls._job_result_registry[(job_id, user_id)] = metadata

def start_job(self, job_id: str, user: User):
self._update_status(
job_id=job_id, user_id=user.user_id, status=JOB_STATUS.RUNNING
Expand All @@ -620,6 +638,12 @@ def start_job(self, job_id: str, user: User):
def _output_root(self) -> str:
return "/data/jobs"

def get_result_metadata(self, job_id: str, user_id: str) -> BatchJobResultMetadata:
if (job_id, user_id) in self._job_result_registry:
return self._job_result_registry[(job_id, user_id)]
else:
return super().get_result_metadata(job_id=job_id, user_id=user_id)

def get_result_assets(self, job_id: str, user_id: str) -> Dict[str, dict]:
if (
self._get_job_info(job_id=job_id, user_id=user_id).status
Expand Down
62 changes: 40 additions & 22 deletions openeo_driver/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -891,9 +891,10 @@ def _list_job_results(job_id, user_id):
if job_info.status != JOB_STATUS.FINISHED:
raise JobNotFinishedException()

results = backend_implementation.batch_jobs.get_result_assets(
result_metadata = backend_implementation.batch_jobs.get_result_metadata(
job_id=job_id, user_id=user_id
)
result_assets = result_metadata.assets

if requested_api_version().at_least("1.0.0"):
def job_results_canonical_url() -> str:
Expand All @@ -912,26 +913,41 @@ def job_results_canonical_url() -> str:
job_id=job_id, user_base64=user_base64, expires=expires, secure_key=secure_key, _external=True
)

links = job_info.links
if links is None:
links = []

links.extend([{
"rel": "self",
"href": url_for('.list_job_results', job_id=job_id, _external=True), # MUST be absolute
"type": "application/json"
}, {
"rel": "canonical",
"href": job_results_canonical_url(),
"type": "application/json"
}, {
"rel": "card4l-document",
"href": "http://ceos.org/ard/files/PFS/SR/v5.0/CARD4L_Product_Family_Specification_Surface_Reflectance-v5.0.pdf",
"type": "application/pdf"
}])
links: List[dict] = result_metadata.links or job_info.links or []

if not any(l.get("rel") == "self" for l in links):
links.append(
{
"rel": "self",
"href": url_for(
".list_job_results", job_id=job_id, _external=True
), # MUST be absolute
"type": "application/json",
}
)
if not any(l.get("rel") == "canonical" for l in links):
links.append(
{
"rel": "canonical",
"href": job_results_canonical_url(),
"type": "application/json",
}
)
if not any(l.get("rel") == "card4l-document" for l in links):
links.append(
{
"rel": "card4l-document",
# TODO: avoid hardcoding this specific URL?
"href": "http://ceos.org/ard/files/PFS/SR/v5.0/CARD4L_Product_Family_Specification_Surface_Reflectance-v5.0.pdf",
"type": "application/pdf",
}
)

assets = {filename: _asset_object(job_id, user_id, filename, asset_metadata)
for filename, asset_metadata in results.items() if asset_metadata.get('asset', True)}
assets = {
filename: _asset_object(job_id, user_id, filename, asset_metadata)
for filename, asset_metadata in result_assets.items()
if asset_metadata.get("asset", True)
}

if requested_api_version().at_least("1.1.0"):
to_datetime = Rfc3339(propagate_none=True).datetime
Expand All @@ -953,7 +969,7 @@ def job_result_item_url(item_id) -> str:
job_id=job_id, user_base64=user_base64, secure_key=secure_key, item_id=item_id, expires=expires,
_external=True)

for filename, metadata in results.items():
for filename, metadata in result_assets.items():
if "data" in metadata.get("roles", []) and "geotiff" in metadata.get("type", ""):
links.append({
"rel": "item",
Expand Down Expand Up @@ -1027,9 +1043,11 @@ def job_result_item_url(item_id) -> str:
if "proj:epsg" in result["properties"]:
result["stac_extensions"].append("projection")
else:
# TODO #47 drop pre-1.0.0 API support
result = {
"links": [
{"href": _job_result_download_url(job_id, user_id, filename)} for filename in results.keys()
{"href": _job_result_download_url(job_id, user_id, filename)}
for filename in result_assets.keys()
]
}

Expand Down
45 changes: 43 additions & 2 deletions tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,15 @@

from openeo.capabilities import ComparableVersion
from openeo_driver.ProcessGraphDeserializer import custom_process_from_process_graph
from openeo_driver.backend import BatchJobMetadata, UserDefinedProcessMetadata, BatchJobs, OpenEoBackendImplementation, \
Processing, not_implemented
from openeo_driver.backend import (
BatchJobMetadata,
UserDefinedProcessMetadata,
BatchJobs,
OpenEoBackendImplementation,
Processing,
not_implemented,
BatchJobResultMetadata,
)
from openeo_driver.dummy import dummy_backend
from openeo_driver.dummy.dummy_backend import DummyBackendImplementation
from openeo_driver.testing import ApiTester, TEST_USER, ApiResponse, TEST_USER_AUTH_HEADER, \
Expand Down Expand Up @@ -934,6 +941,8 @@ def _fresh_job_registry(next_job_id="job-1234", output_root: Optional[Path] = No
budget=4.56,
)
}
dummy_backend.DummyBatchJobs._job_result_registry = {}

if jobs:
for job_id, job_settings in jobs.items():
key = (job_settings.get("user", TEST_USER), job_id)
Expand Down Expand Up @@ -1588,6 +1597,38 @@ def test_get_job_results_signed_with_expiration_110(self, api110, flask_app):
}
}

def test_get_job_results_custom_links(self, api100):
with self._fresh_job_registry(next_job_id="job-362"):
job_id = "07024ee9-7847-4b8a-b260-6c879a2b3cdc"
dummy_backend.DummyBatchJobs._update_status(
job_id=job_id, user_id=TEST_USER, status="finished"
)
dummy_backend.DummyBatchJobs.set_result_metadata(
job_id=job_id,
user_id=TEST_USER,
metadata=BatchJobResultMetadata(
assets={},
links=[
{"rel": "canonical", "href": "https://other.test/j123/results"},
{"rel": "self", "href": "https://other.test/j123"},
{"rel": "card4l-document", "href": "https://c4ld.test"},
{"rel": "cu5t0m!", "href": "https://other.test/j123.readme"},
],
),
)
resp = api100.get(f"/jobs/{job_id}/results", headers=self.AUTH_HEADER)
assert resp.assert_status_code(200).json == DictSubSet(
{
"id": "07024ee9-7847-4b8a-b260-6c879a2b3cdc",
"links": [
{"rel": "canonical", "href": "https://other.test/j123/results"},
{"rel": "self", "href": "https://other.test/j123"},
{"rel": "card4l-document", "href": "https://c4ld.test"},
{"rel": "cu5t0m!", "href": "https://other.test/j123.readme"},
],
}
)

def test_get_job_results_invalid_job(self, api):
api.get('/jobs/deadbeef-f00/results', headers=self.AUTH_HEADER).assert_error(404, "JobNotFound")

Expand Down

0 comments on commit b6b4871

Please sign in to comment.