Skip to content

Commit

Permalink
Add an API to search for jobs by tags and/or state
Browse files Browse the repository at this point in the history
  • Loading branch information
plars committed Jan 25, 2024
1 parent c376630 commit b08f35d
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 0 deletions.
20 changes: 20 additions & 0 deletions server/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,26 @@ server will only return one job.
$ curl http://localhost:8000/v1/job?queue=foo\&queue=bar
** [GET] /v1/job/search ** - Search for jobs by tag(s) and state(s)

Parameters:

tags (array): List of string tags to search for
match (string): Match mode for tags - "all" or "any" (default "any")
state (array): List of job states to include (default all states other than cancelled and completed)
Returns:

Array of matching jobs

Example:

.. code-block:: console
$ curl 'http://localhost:8000/v1/job/search?tags=foo&tags=bar&match=all'

This will find jobs tagged with both "foo" and "bar".


**[POST] /v1/result/<job_id>** - post job outcome data for the specified job_id

- Parameters:
Expand Down
18 changes: 18 additions & 0 deletions server/src/api/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,24 @@ class JobId(Schema):
job_id = fields.String(required=True)


class JobSearchRequest(Schema):
"""Job search request schema"""

tags = fields.List(fields.String, description="List of tags to search for")
match = fields.String(
description="Match mode - 'all' or 'any' (default 'any')"
)
state = fields.List(
fields.String, description="List of job states to include"
)


class JobSearchResponse(Schema):
"""Job search response schema"""

jobs = fields.List(fields.Nested(Job), required=True)


class Result(Schema):
"""Result schema"""

Expand Down
41 changes: 41 additions & 0 deletions server/src/api/v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,47 @@ def job_get_id(job_id):
return job_data


@v1.get("/job/search")
@v1.input(schemas.JobSearchRequest, location="query")
@v1.output(schemas.JobSearchResponse)
def search_jobs(query_data):
"""Search for jobs by tags"""
tags = query_data.get("tags")
match = request.args.get("match", "any")
states = request.args.getlist("state")

if match not in ["any", "all"]:
abort(400, "Invalid match mode")

query = {}
if tags and match == "all":
query["job_data.tags"] = {"$all": tags}
elif tags and match == "any":
query["job_data.tags"] = {"$in": tags}

if states:
query["result_data.job_state"] = {"$in": states}
else:
# Exclude terminal states by default
query["result_data.job_state"] = {"$nin": ["cancelled", "complete"]}

pipeline = [
{"$match": query},
{
"$project": {
"job_id": True,
"created_at": True,
"job_state": "$result_data.job_state",
"_id": False,
},
},
]

jobs = mongo.db.jobs.aggregate(pipeline)

return jsonify(list(jobs))


@v1.post("/result/<job_id>")
@v1.input(schemas.Result, location="json")
def result_post(job_id, json_data):
Expand Down
71 changes: 71 additions & 0 deletions server/tests/test_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,3 +421,74 @@ def test_get_agents_data(mongo_app):
assert len(output.json) == 1
for key, value in agent_data.items():
assert output.json[0][key] == value


def test_search_jobs_by_tags(mongo_app):
"""Test search_jobs by tags"""
app, _ = mongo_app

# Create some test jobs
job1 = {
"job_queue": "test",
"tags": ["tag1", "tag2"],
}
job2 = {
"job_queue": "test",
"tags": ["tag2", "tag3"],
}
job3 = {
"job_queue": "test",
"tags": ["tag3", "tag4"],
}
app.post("/v1/job", json=job1)
app.post("/v1/job", json=job2)
app.post("/v1/job", json=job3)

# Match any of the specified tags
output = app.get("/v1/job/search?tags=tag1&tags=tag2")
assert 200 == output.status_code
assert len(output.json) == 2

# Match all of the specified tags
output = app.get("/v1/job/search?tags=tag2&tags=tag3&match=all")
assert 200 == output.status_code
assert len(output.json) == 1


def test_search_jobs_invalid_match(mongo_app):
"""Test search_jobs with invalid match"""
app, _ = mongo_app

output = app.get("/v1/job/search?match=foo")
assert 400 == output.status_code
assert "Invalid match mode" in output.text


def test_search_jobs_by_state(mongo_app):
"""Test search jobs by state"""
app, _ = mongo_app

job = {
"job_queue": "test",
"tags": ["foo"],
}
# Two jobs that will stay in waiting state
app.post("/v1/job", json=job)
app.post("/v1/job", json=job)

# One job that will be cancelled
job_response = app.post("/v1/job", json=job)
job_id = job_response.json.get("job_id")
result_url = f"/v1/result/{job_id}"
data = {"job_state": "cancelled"}
app.post(result_url, json=data)

# By default, cancelled and completed jobs are filtered
output = app.get("/v1/job/search?tags=foo")
assert 200 == output.status_code
assert len(output.json) == 2

# But we can specify searching for one in any state
output = app.get("/v1/job/search?state=cancelled")
assert 200 == output.status_code
assert len(output.json) == 1

0 comments on commit b08f35d

Please sign in to comment.