Skip to content

Commit

Permalink
Issue #98: preserve upstream's canonical job result link
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Mar 21, 2023
1 parent 75ec306 commit a45952e
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 5 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ The format is roughly based on [Keep a Changelog](https://keepachangelog.com/en/
- Experimental: allow back-end selection through job option
- Add support for `load_result` with a URL (instead of job_id).
([#95](https://github.com/Open-EO/openeo-aggregator/issues/95))
- Batch job result metadata: preserve upstream's canonical link
([#98](https://github.com/Open-EO/openeo-aggregator/issues/98))

### Changed

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
"requests",
"attrs",
"openeo>=0.9.3.a2.dev",
"openeo_driver>=0.37.0.dev",
"openeo_driver>=0.38.1.dev",
"flask~=2.0",
"gunicorn~=20.0",
"python-json-logger>=2.0.0",
Expand Down
2 changes: 1 addition & 1 deletion src/openeo_aggregator/about.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.7.3a1"
__version__ = "0.7.4a1"
25 changes: 24 additions & 1 deletion src/openeo_aggregator/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
Processing,
SecondaryServices,
ServiceMetadata,
BatchJobResultMetadata,
)
from openeo_driver.datacube import DriverDataCube
from openeo_driver.errors import (
Expand Down Expand Up @@ -701,14 +702,36 @@ def delete_job(self, job_id: str, user_id: str):
self._translate_job_errors(job_id=job_id):
con.job(backend_job_id).delete_job()

def get_results(self, job_id: str, user_id: str) -> Dict[str, dict]:
def get_result_assets(self, job_id: str, user_id: str) -> Dict[str, dict]:
con, backend_job_id = self._get_connection_and_backend_job_id(aggregator_job_id=job_id)
with con.authenticated_from_request(request=flask.request, user=User(user_id)), \
self._translate_job_errors(job_id=job_id):
results = con.job(backend_job_id).get_results()
assets = results.get_assets()
return {a.name: {**a.metadata, **{BatchJobs.ASSET_PUBLIC_HREF: a.href}} for a in assets}

def get_result_metadata(self, job_id: str, user_id: str) -> BatchJobResultMetadata:
con, backend_job_id = self._get_connection_and_backend_job_id(
aggregator_job_id=job_id
)
with con.authenticated_from_request(
request=flask.request, user=User(user_id)
), self._translate_job_errors(job_id=job_id):
results = con.job(backend_job_id).get_results()
metadata = results.get_metadata()
assets = results.get_assets()

assets = {
a.name: {**a.metadata, **{BatchJobs.ASSET_PUBLIC_HREF: a.href}}
for a in assets
}
# TODO: better white/black list for links?
links = [k for k in metadata.get("links", []) if k.get("rel") != "self"]
return BatchJobResultMetadata(
assets=assets,
links=links,
)

def get_log_entries(self, job_id: str, user_id: str, offset: Optional[str] = None) -> List[dict]:
con, backend_job_id = self._get_connection_and_backend_job_id(aggregator_job_id=job_id)
with con.authenticated_from_request(request=flask.request, user=User(user_id)), \
Expand Down
8 changes: 6 additions & 2 deletions src/openeo_aggregator/partitionedjobs/tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,14 +378,18 @@ def get_results(self):
"""Interface `RESTJob.get_results`"""
return self

def get_assets(self):
"""Interface `openeo.rest.JobResult.get_asserts`"""
def get_assets(self) -> List[ResultAsset]:
"""Interface `openeo.rest.JobResult.get_assets`"""
return self.connection.partitioned_job_tracker.get_assets(
user_id=self.connection._user.user_id,
pjob_id=self.pjob_id,
flask_request=self.connection._flask_request
)

def get_metadata(self) -> dict:
"""Interface `openeo.rest.JobResult.get_metadata`"""
return {}

def logs(self, offset=None) -> List[LogEntry]:
"""Interface `RESTJob.logs`"""
return self.connection.partitioned_job_tracker.get_logs(
Expand Down
79 changes: 79 additions & 0 deletions tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -1437,6 +1437,85 @@ def test_get_results_not_found_on_aggregator(self, api100):
res = api100.get("/jobs/nope-and-nope/results")
res.assert_error(404, "JobNotFound", message="The batch job 'nope-and-nope' does not exist.")

def test_get_results_canonical_link(self, api100, requests_mock, backend1):
"""https://github.com/Open-EO/openeo-aggregator/issues/98"""
m1 = requests_mock.get(
backend1 + "/jobs/th3j0b",
json={
"id": "th3j0b",
"status": "finished",
"created": "2017-01-01T09:32:12Z",
},
)
m2 = requests_mock.get(
backend1 + "/jobs/th3j0b/results",
status_code=200,
json={
"assets": {},
"links": [
{
"rel": "canonical",
"href": "https://res.b1.test/123/456789/abc",
"type": "application/json",
}
],
},
)
api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN)
res = api100.get("/jobs/b1-th3j0b/results").assert_status_code(200).json
assert res["id"] == "b1-th3j0b"
assert [l for l in res["links"] if l["rel"] == "canonical"] == [
{
"rel": "canonical",
"href": "https://res.b1.test/123/456789/abc",
"type": "application/json",
},
]
assert m1.call_count == 1
assert m2.call_count == 1

def test_get_results_links(self, api100, requests_mock, backend1):
"""https://github.com/Open-EO/openeo-aggregator/issues/98"""
m1 = requests_mock.get(
backend1 + "/jobs/th3j0b",
json={
"id": "th3j0b",
"status": "finished",
"created": "2017-01-01T09:32:12Z",
},
)
m2 = requests_mock.get(
backend1 + "/jobs/th3j0b/results",
status_code=200,
json={
"assets": {},
"links": [
{"rel": "canonical", "href": "https://c.b1.test/123"},
{"rel": "self", "href": "https://b1.test/123"},
],
},
)
api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN)
res = api100.get("/jobs/b1-th3j0b/results").assert_status_code(200).json
assert res["id"] == "b1-th3j0b"
# Preserve original "canonical" link
assert [l for l in res["links"] if l["rel"] == "canonical"] == [
{
"rel": "canonical",
"href": "https://c.b1.test/123",
},
]
# Aggregator's "self" link
assert [l for l in res["links"] if l["rel"] == "self"] == [
{
"rel": "self",
"href": "http://oeoa.test/openeo/1.0.0/jobs/b1-th3j0b/results",
"type": "application/json",
},
]
assert m1.call_count == 1
assert m2.call_count == 1

def test_get_logs(self, api100, requests_mock, backend1):
def get_logs(request, context):
offset = request.qs.get("offset", ["_"])[0]
Expand Down

0 comments on commit a45952e

Please sign in to comment.