From b5eb07f89dae7fa467506b61399da779e5da4ef6 Mon Sep 17 00:00:00 2001 From: Varun Valada Date: Fri, 24 May 2024 09:31:21 -0500 Subject: [PATCH 1/5] Add job status endpoint to server --- docs/reference/job-schema.rst | 6 +- server/README.rst | 25 + server/pyproject.toml | 2 + server/src/api/schemas.py | 18 + server/src/api/v1.py | 1194 +++++++++++++++++---------------- server/tests/test_v1.py | 22 + server/tox.ini | 1 + 7 files changed, 693 insertions(+), 575 deletions(-) diff --git a/docs/reference/job-schema.rst b/docs/reference/job-schema.rst index fec40dd8..cf7630a9 100644 --- a/docs/reference/job-schema.rst +++ b/docs/reference/job-schema.rst @@ -47,7 +47,11 @@ The following table lists the key elements that a job definition file should con | - test | - allocate | - reserve - | For detailed information about how to define the data to include in each test phase, see :doc:`test-phases`. + | For detailed information about how to define the data to include in each test phase, see :doc:`test-phases`. + * - ``job_status_webhook`` + - string + - / + - | (Optional) URL to send job status updates to. These updates originate from the agent and get posted to the server which then posts the update to the webhook. If no webhook is specified, these updates will not be generated. Example jobs in YAML ---------------------------- diff --git a/server/README.rst b/server/README.rst index d0f3d7c7..da1f63cc 100644 --- a/server/README.rst +++ b/server/README.rst @@ -356,3 +356,28 @@ This will find jobs tagged with both "foo" and "bar". -X POST --header "Content-Type: application/json" \ --data '{ "job_id": "00000000-0000-0000-0000-000000000000", \ "exit_code": 1, "detail":"foo" }' + +**[POST] /v1/status - Receive job status updates from an agent and posts them to the specified webhook. + +The job_status_webhook parameter is required for this endpoint. Other parameters included here will be forwarded to the webhook. + +- Parameters: + - job_status_webhook: webhook URL to post status updates to + +- Returns: + + Text response from the webhook if the server was successfully able to post. + +- Status Codes: + + - HTTP 200 (OK) + - HTTP 400 (Bad request) - The arguments could not be processed by the server + - HTTP 504 (Gateway Timeout) - The webhook URL timed out + +- Example: + + .. code-block:: console + + $ curl -X POST \ + -H "Content-Type: application/json" \ + -d '{"agent_id": "agent-00", "job_queue": "myqueue", "job_status_webhook": "http://mywebhook", "events": [{"event_name": "started_provisioning", "timestamp": "2024-05-03T19:11:33.541130+00:00", "detail": "my_detailed_message"}]}' http://localhost:8000/v1/agents/status diff --git a/server/pyproject.toml b/server/pyproject.toml index a6bc29c0..35addf08 100644 --- a/server/pyproject.toml +++ b/server/pyproject.toml @@ -15,6 +15,8 @@ gevent = "^24.2.1" prometheus-client = "^0.20.0" pyyaml = "^6.0.1" sentry-sdk = { extras = ["flask"], version = "^2.0.1" } +requests = "^2.31.0" +urllib3 = "^2.2.1" [tool.poetry.dev-dependencies] pytest = "^8.1.2" diff --git a/server/src/api/schemas.py b/server/src/api/schemas.py index c54446ad..411c381b 100644 --- a/server/src/api/schemas.py +++ b/server/src/api/schemas.py @@ -114,6 +114,7 @@ class Job(Schema): test_data = fields.Nested(TestData, required=False) allocate_data = fields.Dict(required=False) reserve_data = fields.Dict(required=False) + job_status_webhook = fields.String(required=False) class JobId(Schema): @@ -170,6 +171,23 @@ class Result(Schema): job_state = fields.String(required=False) +class JobEvent(Schema): + """Job Event schema""" + + event_name = fields.String(required=True) + timestamp = fields.String(required=True) + detail = fields.String(required=False) + + +class StatusUpdate(Schema): + """Status Update schema""" + + agent_id = fields.String(required=False) + job_queue = fields.String(required=False) + job_status_webhook = fields.URL(required=True) + events = fields.List(fields.Nested(JobEvent), required=False) + + job_empty = { 204: { "description": "No job found", diff --git a/server/src/api/v1.py b/server/src/api/v1.py index 2443064f..9b89c998 100644 --- a/server/src/api/v1.py +++ b/server/src/api/v1.py @@ -1,574 +1,620 @@ -# Copyright (C) 2022 Canonical -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -# -""" -Testflinger v1 API -""" - -import uuid -from datetime import datetime, timezone - -import pkg_resources -from apiflask import APIBlueprint, abort -from flask import jsonify, request, send_file -from prometheus_client import Counter -from werkzeug.exceptions import BadRequest - -from src import database -from . import schemas - - -jobs_metric = Counter("jobs", "Number of jobs", ["queue"]) -reservations_metric = Counter( - "reservations", "Number of reservations", ["queue"] -) - - -v1 = APIBlueprint("v1", __name__) - - -@v1.get("/") -def home(): - """Identify ourselves""" - return get_version() - - -def get_version(): - """Return the Testflinger version""" - try: - version = pkg_resources.get_distribution("testflinger").version - except pkg_resources.DistributionNotFound: - version = "devel" - return "Testflinger Server v{}".format(version) - - -@v1.post("/job") -@v1.input(schemas.Job, location="json") -@v1.output(schemas.JobId) -def job_post(json_data: dict): - """Add a job to the queue""" - try: - job_queue = json_data.get("job_queue") - except (AttributeError, BadRequest): - # Set job_queue to None so we take the failure path below - job_queue = "" - if not job_queue: - abort(422, message="Invalid data or no job_queue specified") - - try: - job = job_builder(json_data) - except ValueError: - abort(400, message="Invalid job_id specified") - - jobs_metric.labels(queue=job_queue).inc() - if "reserve_data" in json_data: - reservations_metric.labels(queue=job_queue).inc() - - # CAUTION! If you ever move this line, you may need to pass data as a copy - # because it will get modified by submit_job and other things it calls - database.add_job(job) - return jsonify(job_id=job.get("job_id")) - - -def has_attachments(data: dict) -> bool: - """Predicate if the job described by `data` involves attachments""" - return any( - nested_field == "attachments" - for field, nested_dict in data.items() - if field.endswith("_data") - for nested_field in nested_dict - ) - - -def job_builder(data): - """Build a job from a dictionary of data""" - job = { - "created_at": datetime.utcnow(), - "result_data": { - "job_state": "waiting", - }, - } - # If the job_id is provided, keep it as long as the uuid is good. - # This is for job resubmission - job_id = data.pop("job_id", None) - if job_id and isinstance(job_id, str): - # This job already came with a job_id, so it was resubmitted - if not check_valid_uuid(job_id): - raise ValueError - else: - # This is a new job, so generate a new job_id - job_id = str(uuid.uuid4()) - - # side effect: modify the job dict - if has_attachments(data): - data["attachments_status"] = "waiting" - - job["job_id"] = job_id - job["job_data"] = data - return job - - -@v1.get("/job") -@v1.output(schemas.Job) -@v1.doc(responses=schemas.job_empty) -def job_get(): - """Request a job to run from supported queues""" - queue_list = request.args.getlist("queue") - if not queue_list: - return "No queue(s) specified in request", 400 - job = database.pop_job(queue_list=queue_list) - if job: - return jsonify(job) - return {}, 204 - - -@v1.get("/job/") -@v1.output(schemas.Job) -def job_get_id(job_id): - """Request the json job definition for a specified job, even if it has - already run - - :param job_id: - UUID as a string for the job - :return: - JSON data for the job or error string and http error - """ - if not check_valid_uuid(job_id): - abort(400, message="Invalid job_id specified") - response = database.mongo.db.jobs.find_one( - {"job_id": job_id}, projection={"job_data": True, "_id": False} - ) - if not response: - return {}, 204 - job_data = response.get("job_data") - job_data["job_id"] = job_id - return job_data - - -@v1.get("/job//attachments") -def attachment_get(job_id): - """Return the attachments bundle for a specified job_id - - :param job_id: - UUID as a string for the job - :return: - send_file stream of attachment tarball to download - """ - if not check_valid_uuid(job_id): - return "Invalid job id\n", 400 - try: - file = database.retrieve_file(filename=f"{job_id}.attachments") - except FileNotFoundError: - return "", 204 - return send_file(file, mimetype="application/gzip") - - -@v1.post("/job//attachments") -def attachments_post(job_id): - """Post attachment bundle for a specified job_id - - :param job_id: - UUID as a string for the job - """ - if not check_valid_uuid(job_id): - return "Invalid job id\n", 400 - try: - attachments_status = database.get_attachments_status(job_id) - except ValueError: - return f"Job {job_id} is not valid\n", 422 - if attachments_status is None: - return f"Job {job_id} not awaiting attachments\n", 422 - if attachments_status == "complete": - # attachments already submitted: successful, could be due to a retry - return "OK", 200 - - # save attachments archive in the database - database.save_file( - data=request.files["file"], - filename=f"{job_id}.attachments", - ) - - # now the job can be processed - database.attachments_received(job_id) - return "OK", 200 - - -@v1.get("/job/search") -@v1.input(schemas.JobSearchRequest, location="query") -@v1.output(schemas.JobSearchResponse) -def search_jobs(query_data): - """Search for jobs by tags""" - tags = query_data.get("tags") - match = request.args.get("match", "any") - states = request.args.getlist("state") - - query = {} - if tags and match == "all": - query["job_data.tags"] = {"$all": tags} - elif tags and match == "any": - query["job_data.tags"] = {"$in": tags} - - if "active" in states: - query["result_data.job_state"] = { - "$nin": ["cancelled", "complete", "completed"] - } - elif states: - query["result_data.job_state"] = {"$in": states} - - pipeline = [ - {"$match": query}, - { - "$project": { - "job_id": True, - "created_at": True, - "job_state": "$result_data.job_state", - "_id": False, - }, - }, - ] - - jobs = database.mongo.db.jobs.aggregate(pipeline) - - return jsonify(list(jobs)) - - -@v1.post("/result/") -@v1.input(schemas.Result, location="json") -def result_post(job_id, json_data): - """Post a result for a specified job_id - - :param job_id: - UUID as a string for the job - """ - if not check_valid_uuid(job_id): - abort(400, message="Invalid job_id specified") - - # First, we need to prepend "result_data" to each key in the result_data - for key in list(json_data): - json_data[f"result_data.{key}"] = json_data.pop(key) - - database.mongo.db.jobs.update_one({"job_id": job_id}, {"$set": json_data}) - return "OK" - - -@v1.get("/result/") -@v1.output(schemas.Result) -def result_get(job_id): - """Return results for a specified job_id - - :param job_id: - UUID as a string for the job - """ - if not check_valid_uuid(job_id): - abort(400, message="Invalid job_id specified") - response = database.mongo.db.jobs.find_one( - {"job_id": job_id}, {"result_data": True, "_id": False} - ) - - if not response or not (results := response.get("result_data")): - return "", 204 - results = response.get("result_data") - return results - - -@v1.post("/result//artifact") -def artifacts_post(job_id): - """Post artifact bundle for a specified job_id - - :param job_id: - UUID as a string for the job - """ - if not check_valid_uuid(job_id): - return "Invalid job id\n", 400 - database.save_file( - data=request.files["file"], - filename=f"{job_id}.artifact", - ) - return "OK" - - -@v1.get("/result//artifact") -def artifacts_get(job_id): - """Return artifact bundle for a specified job_id - - :param job_id: - UUID as a string for the job - :return: - send_file stream of artifact tarball to download - """ - if not check_valid_uuid(job_id): - return "Invalid job id\n", 400 - try: - file = database.retrieve_file(filename=f"{job_id}.artifact") - except FileNotFoundError: - return "", 204 - return send_file(file, download_name="artifact.tar.gz") - - -@v1.get("/result//output") -def output_get(job_id): - """Get latest output for a specified job ID - - :param job_id: - UUID as a string for the job - :return: - Output lines - """ - if not check_valid_uuid(job_id): - return "Invalid job id\n", 400 - response = database.mongo.db.output.find_one_and_delete( - {"job_id": job_id}, {"_id": False} - ) - output = response.get("output", []) if response else None - if output: - return "\n".join(output) - return "", 204 - - -@v1.post("/result//output") -def output_post(job_id): - """Post output for a specified job ID - - :param job_id: - UUID as a string for the job - :param data: - A string containing the latest lines of output to post - """ - if not check_valid_uuid(job_id): - abort(400, message="Invalid job_id specified") - data = request.get_data().decode("utf-8") - timestamp = datetime.utcnow() - database.mongo.db.output.update_one( - {"job_id": job_id}, - {"$set": {"updated_at": timestamp}, "$push": {"output": data}}, - upsert=True, - ) - return "OK" - - -@v1.post("/job//action") -@v1.input(schemas.ActionIn, location="json") -def action_post(job_id, json_data): - """Take action on the job status for a specified job ID - - :param job_id: - UUID as a string for the job - """ - if not check_valid_uuid(job_id): - return "Invalid job id\n", 400 - action = json_data["action"] - supported_actions = { - "cancel": cancel_job, - } - # Validation of actions happens in schemas.py:ActionIn - return supported_actions[action](job_id) - - -@v1.get("/agents/queues") -@v1.doc(responses=schemas.queues_out) -def queues_get(): - """Get all advertised queues from this server - - Returns a dict of queue names and descriptions, ex: - { - "some_queue": "A queue for testing", - "other_queue": "A queue for something else" - } - """ - all_queues = database.mongo.db.queues.find( - {}, projection={"_id": False, "name": True, "description": True} - ) - queue_dict = {} - # Create a dict of queues and descriptions - for queue in all_queues: - queue_dict[queue.get("name")] = queue.get("description", "") - return jsonify(queue_dict) - - -@v1.post("/agents/queues") -def queues_post(): - """Tell testflinger the queue names that are being serviced - - Some agents may want to advertise some of the queues they listen on so that - the user can check which queues are valid to use. - """ - queue_dict = request.get_json() - for queue, description in queue_dict.items(): - database.mongo.db.queues.update_one( - {"name": queue}, - {"$set": {"description": description}}, - upsert=True, - ) - return "OK" - - -@v1.get("/agents/images/") -@v1.doc(responses=schemas.images_out) -def images_get(queue): - """Get a dict of known images for a given queue""" - queue_data = database.mongo.db.queues.find_one( - {"name": queue}, {"_id": False, "images": True} - ) - if not queue_data: - return jsonify({}) - # It's ok for this to just return an empty result if there are none found - return jsonify(queue_data.get("images", {})) - - -@v1.post("/agents/images") -def images_post(): - """Tell testflinger about known images for a specified queue - images will be stored in a dict of key/value pairs as part of the queues - collection. That dict will contain image_name:provision_data mappings, ex: - { - "some_queue": { - "core22": "http://cdimage.ubuntu.com/.../core-22.tar.gz", - "jammy": "http://cdimage.ubuntu.com/.../ubuntu-22.04.tar.gz" - }, - "other_queue": { - ... - } - } - """ - image_dict = request.get_json() - # We need to delete and recreate the images in case some were removed - for queue, image_data in image_dict.items(): - database.mongo.db.queues.update_one( - {"name": queue}, - {"$set": {"images": image_data}}, - upsert=True, - ) - return "OK" - - -@v1.get("/agents/data") -@v1.output(schemas.AgentOut) -def agents_get_all(): - """Get all agent data""" - agents = database.mongo.db.agents.find({}, {"_id": False, "log": False}) - return jsonify(list(agents)) - - -@v1.post("/agents/data/") -@v1.input(schemas.AgentIn, location="json") -def agents_post(agent_name, json_data): - """Post information about the agent to the server - - The json sent to this endpoint may contain data such as the following: - { - "state": string, # State the device is in - "queues": array[string], # Queues the device is listening on - "location": string, # Location of the device - "job_id": string, # Job ID the device is running, if any - "log": array[string], # push and keep only the last 100 lines - } - """ - - json_data["name"] = agent_name - json_data["updated_at"] = datetime.utcnow() - # extract log from data so we can push it instead of setting it - log = json_data.pop("log", []) - - database.mongo.db.agents.update_one( - {"name": agent_name}, - {"$set": json_data, "$push": {"log": {"$each": log, "$slice": -100}}}, - upsert=True, - ) - return "OK" - - -@v1.post("/agents/provision_logs/") -@v1.input(schemas.ProvisionLogsIn, location="json") -def agents_provision_logs_post(agent_name, json_data): - """Post provision logs for the agent to the server""" - agent_record = {} - - # timestamp this agent record and provision log entry - timestamp = datetime.now(timezone.utc) - agent_record["updated_at"] = json_data["timestamp"] = timestamp - - update_operation = { - "$set": json_data, - "$push": { - "provision_log": {"$each": [json_data], "$slice": -100}, - }, - } - database.mongo.db.provision_logs.update_one( - {"name": agent_name}, - update_operation, - upsert=True, - ) - return "OK" - - -def check_valid_uuid(job_id): - """Check that the specified job_id is a valid UUID only - - :param job_id: - UUID as a string for the job - :return: - True if job_id is valid, False if not - """ - - try: - uuid.UUID(job_id) - except ValueError: - return False - return True - - -@v1.get("/job//position") -def job_position_get(job_id): - """Return the position of the specified jobid in the queue""" - job_data, status = job_get_id(job_id) - if status == 204: - return "Job not found or already started\n", 410 - if status != 200: - return job_data - try: - queue = job_data.json.get("job_queue") - except (AttributeError, TypeError): - return "Invalid json returned for id: {}\n".format(job_id), 400 - # Get all jobs with job_queue=queue and return only the _id - jobs = database.mongo.db.jobs.find( - {"job_data.job_queue": queue, "result_data.job_state": "waiting"}, - {"job_id": 1}, - ) - # Create a dict mapping job_id (as a string) to the position in the queue - jobs_id_position = {job.get("job_id"): pos for pos, job in enumerate(jobs)} - if job_id in jobs_id_position: - return str(jobs_id_position[job_id]) - return "Job not found or already started\n", 410 - - -def cancel_job(job_id): - """Cancellation for a specified job ID - - :param job_id: - UUID as a string for the job - """ - # Set the job status to cancelled - response = database.mongo.db.jobs.update_one( - { - "job_id": job_id, - "result_data.job_state": { - "$nin": ["cancelled", "complete", "completed"] - }, - }, - {"$set": {"result_data.job_state": "cancelled"}}, - ) - if response.modified_count == 0: - return "The job is already completed or cancelled", 400 - return "OK" +# Copyright (C) 2022 Canonical +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +""" +Testflinger v1 API +""" + +import uuid +from datetime import datetime, timezone + +import pkg_resources +from apiflask import APIBlueprint, abort +from flask import jsonify, request, send_file +from prometheus_client import Counter +from werkzeug.exceptions import BadRequest + +import requests +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry + +from src import database +from . import schemas + + +jobs_metric = Counter("jobs", "Number of jobs", ["queue"]) +reservations_metric = Counter( + "reservations", "Number of reservations", ["queue"] +) + + +v1 = APIBlueprint("v1", __name__) + + +@v1.get("/") +def home(): + """Identify ourselves""" + return get_version() + + +def get_version(): + """Return the Testflinger version""" + try: + version = pkg_resources.get_distribution("testflinger").version + except pkg_resources.DistributionNotFound: + version = "devel" + return "Testflinger Server v{}".format(version) + + +@v1.post("/job") +@v1.input(schemas.Job, location="json") +@v1.output(schemas.JobId) +def job_post(json_data: dict): + """Add a job to the queue""" + try: + job_queue = json_data.get("job_queue") + except (AttributeError, BadRequest): + # Set job_queue to None so we take the failure path below + job_queue = "" + if not job_queue: + abort(422, message="Invalid data or no job_queue specified") + + try: + job = job_builder(json_data) + except ValueError: + abort(400, message="Invalid job_id specified") + + jobs_metric.labels(queue=job_queue).inc() + if "reserve_data" in json_data: + reservations_metric.labels(queue=job_queue).inc() + + # CAUTION! If you ever move this line, you may need to pass data as a copy + # because it will get modified by submit_job and other things it calls + database.add_job(job) + return jsonify(job_id=job.get("job_id")) + + +def has_attachments(data: dict) -> bool: + """Predicate if the job described by `data` involves attachments""" + return any( + nested_field == "attachments" + for field, nested_dict in data.items() + if field.endswith("_data") + for nested_field in nested_dict + ) + + +def job_builder(data): + """Build a job from a dictionary of data""" + job = { + "created_at": datetime.utcnow(), + "result_data": { + "job_state": "waiting", + }, + } + # If the job_id is provided, keep it as long as the uuid is good. + # This is for job resubmission + job_id = data.pop("job_id", None) + if job_id and isinstance(job_id, str): + # This job already came with a job_id, so it was resubmitted + if not check_valid_uuid(job_id): + raise ValueError + else: + # This is a new job, so generate a new job_id + job_id = str(uuid.uuid4()) + + # side effect: modify the job dict + if has_attachments(data): + data["attachments_status"] = "waiting" + + job["job_id"] = job_id + job["job_data"] = data + return job + + +@v1.get("/job") +@v1.output(schemas.Job) +@v1.doc(responses=schemas.job_empty) +def job_get(): + """Request a job to run from supported queues""" + queue_list = request.args.getlist("queue") + if not queue_list: + return "No queue(s) specified in request", 400 + job = database.pop_job(queue_list=queue_list) + if job: + return jsonify(job) + return {}, 204 + + +@v1.get("/job/") +@v1.output(schemas.Job) +def job_get_id(job_id): + """Request the json job definition for a specified job, even if it has + already run + + :param job_id: + UUID as a string for the job + :return: + JSON data for the job or error string and http error + """ + if not check_valid_uuid(job_id): + abort(400, message="Invalid job_id specified") + response = database.mongo.db.jobs.find_one( + {"job_id": job_id}, projection={"job_data": True, "_id": False} + ) + if not response: + return {}, 204 + job_data = response.get("job_data") + job_data["job_id"] = job_id + return job_data + + +@v1.get("/job//attachments") +def attachment_get(job_id): + """Return the attachments bundle for a specified job_id + + :param job_id: + UUID as a string for the job + :return: + send_file stream of attachment tarball to download + """ + if not check_valid_uuid(job_id): + return "Invalid job id\n", 400 + try: + file = database.retrieve_file(filename=f"{job_id}.attachments") + except FileNotFoundError: + return "", 204 + return send_file(file, mimetype="application/gzip") + + +@v1.post("/job//attachments") +def attachments_post(job_id): + """Post attachment bundle for a specified job_id + + :param job_id: + UUID as a string for the job + """ + if not check_valid_uuid(job_id): + return "Invalid job id\n", 400 + try: + attachments_status = database.get_attachments_status(job_id) + except ValueError: + return f"Job {job_id} is not valid\n", 422 + if attachments_status is None: + return f"Job {job_id} not awaiting attachments\n", 422 + if attachments_status == "complete": + # attachments already submitted: successful, could be due to a retry + return "OK", 200 + + # save attachments archive in the database + database.save_file( + data=request.files["file"], + filename=f"{job_id}.attachments", + ) + + # now the job can be processed + database.attachments_received(job_id) + return "OK", 200 + + +@v1.get("/job/search") +@v1.input(schemas.JobSearchRequest, location="query") +@v1.output(schemas.JobSearchResponse) +def search_jobs(query_data): + """Search for jobs by tags""" + tags = query_data.get("tags") + match = request.args.get("match", "any") + states = request.args.getlist("state") + + query = {} + if tags and match == "all": + query["job_data.tags"] = {"$all": tags} + elif tags and match == "any": + query["job_data.tags"] = {"$in": tags} + + if "active" in states: + query["result_data.job_state"] = { + "$nin": ["cancelled", "complete", "completed"] + } + elif states: + query["result_data.job_state"] = {"$in": states} + + pipeline = [ + {"$match": query}, + { + "$project": { + "job_id": True, + "created_at": True, + "job_state": "$result_data.job_state", + "_id": False, + }, + }, + ] + + jobs = database.mongo.db.jobs.aggregate(pipeline) + + return jsonify(list(jobs)) + + +@v1.post("/result/") +@v1.input(schemas.Result, location="json") +def result_post(job_id, json_data): + """Post a result for a specified job_id + + :param job_id: + UUID as a string for the job + """ + if not check_valid_uuid(job_id): + abort(400, message="Invalid job_id specified") + + # First, we need to prepend "result_data" to each key in the result_data + for key in list(json_data): + json_data[f"result_data.{key}"] = json_data.pop(key) + + database.mongo.db.jobs.update_one({"job_id": job_id}, {"$set": json_data}) + return "OK" + + +@v1.get("/result/") +@v1.output(schemas.Result) +def result_get(job_id): + """Return results for a specified job_id + + :param job_id: + UUID as a string for the job + """ + if not check_valid_uuid(job_id): + abort(400, message="Invalid job_id specified") + response = database.mongo.db.jobs.find_one( + {"job_id": job_id}, {"result_data": True, "_id": False} + ) + + if not response or not (results := response.get("result_data")): + return "", 204 + results = response.get("result_data") + return results + + +@v1.post("/result//artifact") +def artifacts_post(job_id): + """Post artifact bundle for a specified job_id + + :param job_id: + UUID as a string for the job + """ + if not check_valid_uuid(job_id): + return "Invalid job id\n", 400 + database.save_file( + data=request.files["file"], + filename=f"{job_id}.artifact", + ) + return "OK" + + +@v1.get("/result//artifact") +def artifacts_get(job_id): + """Return artifact bundle for a specified job_id + + :param job_id: + UUID as a string for the job + :return: + send_file stream of artifact tarball to download + """ + if not check_valid_uuid(job_id): + return "Invalid job id\n", 400 + try: + file = database.retrieve_file(filename=f"{job_id}.artifact") + except FileNotFoundError: + return "", 204 + return send_file(file, download_name="artifact.tar.gz") + + +@v1.get("/result//output") +def output_get(job_id): + """Get latest output for a specified job ID + + :param job_id: + UUID as a string for the job + :return: + Output lines + """ + if not check_valid_uuid(job_id): + return "Invalid job id\n", 400 + response = database.mongo.db.output.find_one_and_delete( + {"job_id": job_id}, {"_id": False} + ) + output = response.get("output", []) if response else None + if output: + return "\n".join(output) + return "", 204 + + +@v1.post("/result//output") +def output_post(job_id): + """Post output for a specified job ID + + :param job_id: + UUID as a string for the job + :param data: + A string containing the latest lines of output to post + """ + if not check_valid_uuid(job_id): + abort(400, message="Invalid job_id specified") + data = request.get_data().decode("utf-8") + timestamp = datetime.utcnow() + database.mongo.db.output.update_one( + {"job_id": job_id}, + {"$set": {"updated_at": timestamp}, "$push": {"output": data}}, + upsert=True, + ) + return "OK" + + +@v1.post("/job//action") +@v1.input(schemas.ActionIn, location="json") +def action_post(job_id, json_data): + """Take action on the job status for a specified job ID + + :param job_id: + UUID as a string for the job + """ + if not check_valid_uuid(job_id): + return "Invalid job id\n", 400 + action = json_data["action"] + supported_actions = { + "cancel": cancel_job, + } + # Validation of actions happens in schemas.py:ActionIn + return supported_actions[action](job_id) + + +@v1.get("/agents/queues") +@v1.doc(responses=schemas.queues_out) +def queues_get(): + """Get all advertised queues from this server + + Returns a dict of queue names and descriptions, ex: + { + "some_queue": "A queue for testing", + "other_queue": "A queue for something else" + } + """ + all_queues = database.mongo.db.queues.find( + {}, projection={"_id": False, "name": True, "description": True} + ) + queue_dict = {} + # Create a dict of queues and descriptions + for queue in all_queues: + queue_dict[queue.get("name")] = queue.get("description", "") + return jsonify(queue_dict) + + +@v1.post("/agents/queues") +def queues_post(): + """Tell testflinger the queue names that are being serviced + + Some agents may want to advertise some of the queues they listen on so that + the user can check which queues are valid to use. + """ + queue_dict = request.get_json() + for queue, description in queue_dict.items(): + database.mongo.db.queues.update_one( + {"name": queue}, + {"$set": {"description": description}}, + upsert=True, + ) + return "OK" + + +@v1.get("/agents/images/") +@v1.doc(responses=schemas.images_out) +def images_get(queue): + """Get a dict of known images for a given queue""" + queue_data = database.mongo.db.queues.find_one( + {"name": queue}, {"_id": False, "images": True} + ) + if not queue_data: + return jsonify({}) + # It's ok for this to just return an empty result if there are none found + return jsonify(queue_data.get("images", {})) + + +@v1.post("/agents/images") +def images_post(): + """Tell testflinger about known images for a specified queue + images will be stored in a dict of key/value pairs as part of the queues + collection. That dict will contain image_name:provision_data mappings, ex: + { + "some_queue": { + "core22": "http://cdimage.ubuntu.com/.../core-22.tar.gz", + "jammy": "http://cdimage.ubuntu.com/.../ubuntu-22.04.tar.gz" + }, + "other_queue": { + ... + } + } + """ + image_dict = request.get_json() + # We need to delete and recreate the images in case some were removed + for queue, image_data in image_dict.items(): + database.mongo.db.queues.update_one( + {"name": queue}, + {"$set": {"images": image_data}}, + upsert=True, + ) + return "OK" + + +@v1.get("/agents/data") +@v1.output(schemas.AgentOut) +def agents_get_all(): + """Get all agent data""" + agents = database.mongo.db.agents.find({}, {"_id": False, "log": False}) + return jsonify(list(agents)) + + +@v1.post("/agents/data/") +@v1.input(schemas.AgentIn, location="json") +def agents_post(agent_name, json_data): + """Post information about the agent to the server + + The json sent to this endpoint may contain data such as the following: + { + "state": string, # State the device is in + "queues": array[string], # Queues the device is listening on + "location": string, # Location of the device + "job_id": string, # Job ID the device is running, if any + "log": array[string], # push and keep only the last 100 lines + } + """ + + json_data["name"] = agent_name + json_data["updated_at"] = datetime.utcnow() + # extract log from data so we can push it instead of setting it + log = json_data.pop("log", []) + + database.mongo.db.agents.update_one( + {"name": agent_name}, + {"$set": json_data, "$push": {"log": {"$each": log, "$slice": -100}}}, + upsert=True, + ) + return "OK" + + +@v1.post("/agents/provision_logs/") +@v1.input(schemas.ProvisionLogsIn, location="json") +def agents_provision_logs_post(agent_name, json_data): + """Post provision logs for the agent to the server""" + agent_record = {} + + # timestamp this agent record and provision log entry + timestamp = datetime.now(timezone.utc) + agent_record["updated_at"] = json_data["timestamp"] = timestamp + + update_operation = { + "$set": json_data, + "$push": { + "provision_log": {"$each": [json_data], "$slice": -100}, + }, + } + database.mongo.db.provision_logs.update_one( + {"name": agent_name}, + update_operation, + upsert=True, + ) + return "OK" + + +@v1.post("/agents/status") +@v1.input(schemas.StatusUpdate, location="json") +def agents_status_post(json_data): + """Posts status updates from the agent to the server to be forwarded + to TestObserver + + The json sent to this endpoint may contain data such as the following: + { + "agent_id": "", + "job_queue": "", + "job_status_webhook": "", + "events": [ + { + "event_name": "", + "timestamp": "", + "detail": "" + }, + ... + ] + } + + """ + request_json = json_data + webhook_url = request_json.pop("job_status_webhook") + try: + s = requests.Session() + s.mount( + "", + HTTPAdapter( + max_retries=Retry( + total=3, + allowed_methods=frozenset(["PUT"]), + backoff_factor=1, + ) + ), + ) + response = s.put(webhook_url, json=request_json, timeout=3) + return response.text, response.status_code + except requests.exceptions.Timeout: + return "Webhook Timeout", 504 + + +def check_valid_uuid(job_id): + """Check that the specified job_id is a valid UUID only + + :param job_id: + UUID as a string for the job + :return: + True if job_id is valid, False if not + """ + + try: + uuid.UUID(job_id) + except ValueError: + return False + return True + + +@v1.get("/job//position") +def job_position_get(job_id): + """Return the position of the specified jobid in the queue""" + job_data, status = job_get_id(job_id) + if status == 204: + return "Job not found or already started\n", 410 + if status != 200: + return job_data + try: + queue = job_data.json.get("job_queue") + except (AttributeError, TypeError): + return "Invalid json returned for id: {}\n".format(job_id), 400 + # Get all jobs with job_queue=queue and return only the _id + jobs = database.mongo.db.jobs.find( + {"job_data.job_queue": queue, "result_data.job_state": "waiting"}, + {"job_id": 1}, + ) + # Create a dict mapping job_id (as a string) to the position in the queue + jobs_id_position = {job.get("job_id"): pos for pos, job in enumerate(jobs)} + if job_id in jobs_id_position: + return str(jobs_id_position[job_id]) + return "Job not found or already started\n", 410 + + +def cancel_job(job_id): + """Cancellation for a specified job ID + + :param job_id: + UUID as a string for the job + """ + # Set the job status to cancelled + response = database.mongo.db.jobs.update_one( + { + "job_id": job_id, + "result_data.job_state": { + "$nin": ["cancelled", "complete", "completed"] + }, + }, + {"$set": {"result_data.job_state": "cancelled"}}, + ) + if response.modified_count == 0: + return "The job is already completed or cancelled", 400 + return "OK" diff --git a/server/tests/test_v1.py b/server/tests/test_v1.py index 8baef601..6500d971 100644 --- a/server/tests/test_v1.py +++ b/server/tests/test_v1.py @@ -528,6 +528,28 @@ def test_agents_provision_logs_post(mongo_app): assert len(provision_log_records["provision_log"]) == 2 +def test_agents_status_put(mongo_app, requests_mock): + """Test api to receive agent status requests""" + app, _ = mongo_app + webhook = "http://mywebhook.com" + requests_mock.put(webhook, status_code=200, text="webhook requested") + status_update_data = { + "agent_id": "agent1", + "job_queue": "myjobqueue", + "job_status_webhook": webhook, + "events": [ + { + "event_name": "my_event", + "timestamp": "2014-12-22T03:12:58.019077+00:00", + "detail": "mymsg", + } + ], + } + output = app.post("/v1/agents/status", json=status_update_data) + assert 200 == output.status_code + assert "webhook requested" == output.text + + def test_get_agents_data(mongo_app): """Test api to retrieve agent data""" app, _ = mongo_app diff --git a/server/tox.ini b/server/tox.ini index 15bf8a10..785db2fa 100644 --- a/server/tox.ini +++ b/server/tox.ini @@ -17,6 +17,7 @@ deps = pytest-mock pytest-cov requests + requests-mock # cosl needed for unit tests to pass cosl -r charm/requirements.txt From 15fd41c9559f03e39ab827856f8afbf571b56e26 Mon Sep 17 00:00:00 2001 From: Varun Valada Date: Fri, 24 May 2024 09:32:05 -0500 Subject: [PATCH 2/5] Add agent posting of status updates to the server --- agent/testflinger_agent/agent.py | 55 +++++-- agent/testflinger_agent/client.py | 39 +++++ agent/testflinger_agent/event_emitter.py | 50 +++++++ agent/testflinger_agent/job.py | 18 +-- agent/testflinger_agent/runner.py | 20 +-- .../stop_condition_checkers.py | 33 +++-- agent/testflinger_agent/tests/test_agent.py | 135 ++++++++++++++++++ agent/testflinger_agent/tests/test_client.py | 30 ++++ agent/testflinger_agent/tests/test_job.py | 15 +- .../tests/test_stop_condition_checkers.py | 22 ++- common/testflinger_common/enums.py | 33 +++++ 11 files changed, 395 insertions(+), 55 deletions(-) create mode 100644 agent/testflinger_agent/event_emitter.py diff --git a/agent/testflinger_agent/agent.py b/agent/testflinger_agent/agent.py index dea9d61b..fac54b4d 100644 --- a/agent/testflinger_agent/agent.py +++ b/agent/testflinger_agent/agent.py @@ -22,7 +22,9 @@ from testflinger_agent.job import TestflingerJob from testflinger_agent.errors import TFServerError from testflinger_agent.config import ATTACHMENTS_DIR -from testflinger_common.enums import JobState, TestPhase +from testflinger_agent.event_emitter import EventEmitter +from testflinger_common.enums import JobState, TestPhase, TestEvent + try: # attempt importing a tarfile filter, to check if filtering is supported @@ -189,6 +191,7 @@ def unpack_attachments(self, job_data: dict, cwd: Path): def process_jobs(self): """Coordinate checking for new jobs and handling them if they exists""" + TEST_PHASES = [ TestPhase.SETUP, TestPhase.PROVISION, @@ -202,11 +205,17 @@ def process_jobs(self): self.retry_old_results() self.check_restart() - job_data = self.client.check_jobs() while job_data: try: job = TestflingerJob(job_data, self.client) + event_emitter = EventEmitter( + job_data.get("job_queue"), + job_data.get("job_status_webhook"), + self.client, + ) + job_end_reason = TestEvent.NORMAL_EXIT + logger.info("Starting job %s", job.job_id) rundir = os.path.join( self.client.config.get("execution_basedir"), job.job_id @@ -243,32 +252,48 @@ def process_jobs(self): == JobState.CANCELLED ): logger.info("Job cancellation was requested, exiting.") + event_emitter.emit_event(TestEvent.CANCELLED) break + self.client.post_job_state(job.job_id, phase) self.set_agent_state(phase) - exit_code = job.run_test_phase(phase, rundir) - self.client.post_influx(phase, exit_code) + event_emitter.emit_event(TestEvent(phase + "_start")) + exitcode, exit_event, exit_reason = job.run_test_phase( + phase, rundir + ) + self.client.post_influx(phase, exitcode) + event_emitter.emit_event(exit_event, exit_reason) - if phase == "provision": + if exitcode: # exit code 46 is our indication that recovery failed! # In this case, we need to mark the device offline - if exit_code == 46: + if exitcode == 46: self.mark_device_offline() - # Replace with TestEvent enum values once it lands - detail = "recovery_fail" - detail = "provision_fail" if exit_code else "" - self.client.post_provision_log( - job.job_id, exit_code, detail - ) - if phase != "test" and exit_code: - logger.debug("Phase %s failed, aborting job" % phase) - break + exit_event = TestEvent.RECOVERY_FAIL + else: + exit_event = TestEvent(phase + "_fail") + event_emitter.emit_event(exit_event) + if phase == "provision": + self.client.post_provision_log( + job.job_id, exit_code, exit_event + ) + if phase != "test": + logger.debug( + "Phase %s failed, aborting job" % phase + ) + job_end_reason = exit_event + break + else: + event_emitter.emit_event(TestEvent(phase + "_success")) except Exception as e: logger.exception(e) finally: # Always run the cleanup, even if the job was cancelled + event_emitter.emit_event(TestEvent.CLEANUP_START) job.run_test_phase(TestPhase.CLEANUP, rundir) + event_emitter.emit_event(TestEvent.CLEANUP_SUCCESS) + event_emitter.emit_event(TestEvent.JOB_END, job_end_reason) # clear job id self.client.post_agent_data({"job_id": ""}) diff --git a/agent/testflinger_agent/client.py b/agent/testflinger_agent/client.py index a52fd486..b6f743ba 100644 --- a/agent/testflinger_agent/client.py +++ b/agent/testflinger_agent/client.py @@ -21,6 +21,7 @@ import tempfile import time +from typing import List, Dict from urllib.parse import urljoin from requests.adapters import HTTPAdapter from requests.packages.urllib3.util.retry import Retry @@ -387,3 +388,41 @@ def post_provision_log(self, job_id: str, exit_code: int, detail: str): self.session.post(agent_data_url, json=data, timeout=30) except RequestException as exc: logger.warning("Unable to post provision log to server: %s", exc) + + def post_status_update( + self, job_queue: str, webhook: str, events: List[Dict[str, str]] + ): + """ + Posts status updates about the running job as long as there is a + webhook + + :param job_queue: + TestFlinger queue the currently running job belongs to + :param webhook: + String URL to post status update to + :param events: + List of accumulated test events + + """ + if webhook is None: + return + + status_update_request = { + "agent_id": self.config.get("agent_id"), + "job_queue": job_queue, + "job_status_webhook": webhook, + "events": events, + } + status_update_uri = urljoin(self.server, "/v1/agents/status") + try: + job_request = self.session.post( + status_update_uri, json=status_update_request, timeout=30 + ) + except RequestException as exc: + logger.error("Server Error: %s" % exc) + job_request = None + if not job_request: + logger.error( + "Unable to post status updates to: %s (error: %s)" + % (status_update_uri, job_request.status_code) + ) diff --git a/agent/testflinger_agent/event_emitter.py b/agent/testflinger_agent/event_emitter.py new file mode 100644 index 00000000..21a9a3af --- /dev/null +++ b/agent/testflinger_agent/event_emitter.py @@ -0,0 +1,50 @@ +# Copyright (C) 2024 Canonical +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see + + +from datetime import timezone, datetime + +from testflinger_agent.client import TestflingerClient +from testflinger_common.enums import TestEvent + + +class EventEmitter: + def __init__( + self, job_queue: str, webhook: str, client: TestflingerClient + ): + """ + :param job_queue: + String representing job_queue the running job belongs to + :param webhook: + String url to send status updates to + :param client: + TestflingerClient used to post status updates to the server + + """ + self.job_queue = job_queue + self.webhook = webhook + self.events = [] + self.client = client + + def emit_event(self, test_event: TestEvent, detail: str = ""): + if test_event is not None: + new_event_json = { + "event_name": test_event, + "timestamp": datetime.now(timezone.utc).isoformat(), + "detail": detail, + } + self.events.append(new_event_json) + self.client.post_status_update( + self.job_queue, self.webhook, self.events + ) diff --git a/agent/testflinger_agent/job.py b/agent/testflinger_agent/job.py index edced7d8..c3a932aa 100644 --- a/agent/testflinger_agent/job.py +++ b/agent/testflinger_agent/job.py @@ -59,24 +59,24 @@ def run_test_phase(self, phase, rundir): node = self.client.config.get("agent_id") if not cmd: logger.info("No %s_command configured, skipping...", phase) - return 0 + return 0, None, None if phase == "provision" and not self.job_data.get("provision_data"): logger.info("No provision_data defined in job data, skipping...") - return 0 + return 0, None, None if phase == "firmware_update" and not self.job_data.get( "firmware_update_data" ): logger.info( "No firmware_update_data defined in job data, skipping..." ) - return 0 + return 0, None, None if phase == "test" and not self.job_data.get("test_data"): logger.info("No test_data defined in job data, skipping...") - return 0 + return 0, None, None if phase == "allocate" and not self.job_data.get("allocate_data"): - return 0 + return 0, None, None if phase == "reserve" and not self.job_data.get("reserve_data"): - return 0 + return 0, None, None results_file = os.path.join(rundir, "testflinger-outcome.json") output_log = os.path.join(rundir, phase + ".log") serial_log = os.path.join(rundir, phase + "-serial.log") @@ -117,8 +117,8 @@ def run_test_phase(self, phase, rundir): ): runner.run(f"echo '{line}'") try: - exitcode, exit_reason = runner.run(cmd) - except Exception as exc: + exitcode, exit_event, exit_reason = runner.run(cmd) + except Exception as e: logger.exception(exc) exitcode = 100 exit_reason = str(exc) # noqa: F841 - ignore this until it's used @@ -128,7 +128,7 @@ def run_test_phase(self, phase, rundir): ) if phase == "allocate": self.allocate_phase(rundir) - return exitcode + return exitcode, exit_event, exit_reason def _update_phase_results( self, results_file, phase, exitcode, output_log, serial_log diff --git a/agent/testflinger_agent/runner.py b/agent/testflinger_agent/runner.py index fd51e6f2..3fc067de 100644 --- a/agent/testflinger_agent/runner.py +++ b/agent/testflinger_agent/runner.py @@ -25,6 +25,8 @@ from enum import Enum from typing import Callable, List, Optional, Tuple +from testflinger_common.enums import TestEvent + logger = logging.getLogger(__name__) OutputHandlerType = Callable[[str], None] @@ -78,16 +80,16 @@ def post_output(self, data: str): def register_stop_condition_checker(self, checker: StopConditionType): self.stop_condition_checkers.append(checker) - def check_stop_conditions(self) -> str: + def check_stop_conditions(self) -> Tuple[Optional[TestEvent], str]: """ Check stop conditions and return the reason if any are met. Otherwise, return an empty string if none are met """ for checker in self.stop_condition_checkers: - output = checker() - if output: - return output - return "" + event, detail = checker() + if event is not None: + return event, detail + return None, "" def check_and_post_output(self): raw_output = self.process.stdout.read() @@ -117,9 +119,10 @@ def cleanup(self): if self.process is not None: self.process.kill() - def run(self, cmd: str) -> Tuple[int, str]: + def run(self, cmd: str) -> Tuple[int, Optional[TestEvent], str]: # Ensure that the process is None before starting self.process = None + stop_event = None stop_reason = "" signal.signal(signal.SIGTERM, lambda signum, frame: self.cleanup()) @@ -136,7 +139,8 @@ def run(self, cmd: str) -> Tuple[int, str]: while self.process.poll() is None: time.sleep(10) - if stop_reason := self.check_stop_conditions(): + stop_event, stop_reason = self.check_stop_conditions() + if stop_event is not None: self.post_output(f"\n{stop_reason}\n") self.cleanup() break @@ -150,7 +154,7 @@ def run(self, cmd: str) -> Tuple[int, str]: if stop_reason == "": stop_reason = get_stop_reason(self.process.returncode, "") - return self.process.returncode, stop_reason + return self.process.returncode, stop_event, stop_reason def get_stop_reason(returncode: int, stop_reason: str) -> str: diff --git a/agent/testflinger_agent/stop_condition_checkers.py b/agent/testflinger_agent/stop_condition_checkers.py index bb4d2dca..7bedb668 100644 --- a/agent/testflinger_agent/stop_condition_checkers.py +++ b/agent/testflinger_agent/stop_condition_checkers.py @@ -13,8 +13,8 @@ # along with this program. If not, see import time -from typing import Optional - +from typing import Optional, Tuple +from testflinger_common.enums import JobState, TestEvent from .client import TestflingerClient @@ -23,10 +23,13 @@ def __init__(self, client: TestflingerClient, job_id: str): self.client = client self.job_id = job_id - def __call__(self) -> Optional[str]: - if self.client.check_job_state(self.job_id) == "cancelled": - return "Job cancellation was requested, exiting." - return None + def __call__(self) -> Tuple[Optional[TestEvent], str]: + if self.client.check_job_state(self.job_id) == JobState.CANCELLED: + return ( + TestEvent.CANCELLED, + "Job cancellation was requested, exiting.", + ) + return None, "" class GlobalTimeoutChecker: @@ -34,10 +37,13 @@ def __init__(self, timeout: int): self.timeout = timeout self.start_time = time.time() - def __call__(self) -> Optional[str]: + def __call__(self) -> Tuple[Optional[TestEvent], str]: if time.time() - self.start_time > self.timeout: - return f"ERROR: Global timeout reached! ({self.timeout}s)" - return None + return ( + TestEvent.GLOBAL_TIMEOUT, + f"ERROR: Global timeout reached! ({self.timeout}s)", + ) + return None, "" class OutputTimeoutChecker: @@ -45,10 +51,13 @@ def __init__(self, timeout: int): self.timeout = timeout self.last_output_time = time.time() - def __call__(self) -> Optional[str]: + def __call__(self) -> Tuple[Optional[TestEvent], str]: if time.time() - self.last_output_time > self.timeout: - return f"ERROR: Output timeout reached! ({self.timeout}s)" - return None + return ( + TestEvent.OUTPUT_TIMEOUT, + f"ERROR: Output timeout reached! ({self.timeout}s)", + ) + return None, "" def update(self): """Update the last output time to the current time.""" diff --git a/agent/testflinger_agent/tests/test_agent.py b/agent/testflinger_agent/tests/test_agent.py index bda8eacd..cfb5d8f3 100644 --- a/agent/testflinger_agent/tests/test_agent.py +++ b/agent/testflinger_agent/tests/test_agent.py @@ -16,6 +16,7 @@ from testflinger_agent.errors import TFServerError from testflinger_agent.client import TestflingerClient as _TestflingerClient from testflinger_agent.agent import TestflingerAgent as _TestflingerAgent +from testflinger_common.enums import TestPhase class TestClient: @@ -399,3 +400,137 @@ def test_post_agent_data(self, agent): "provision_type": self.config["provision_type"], } ) + + def test_post_agent_status_update(self, agent, requests_mock): + self.config["test_command"] = "echo test1" + fake_job_data = { + "job_id": str(uuid.uuid1()), + "job_queue": "test", + "test_data": {"test_cmds": "foo"}, + "job_status_webhook": "https://mywebhook", + } + requests_mock.get( + "http://127.0.0.1:8000/v1/job?queue=test", + [{"text": json.dumps(fake_job_data)}, {"text": "{}"}], + ) + status_url = "http://127.0.0.1:8000/v1/agents/status" + requests_mock.post(status_url, status_code=200) + with patch("shutil.rmtree"): + agent.process_jobs() + + status_update_requests = list( + filter( + lambda req: req.url == status_url, + requests_mock.request_history, + ) + ) + event_list = status_update_requests[-1].json()["events"] + event_name_list = [event["event_name"] for event in event_list] + expected_event_name_list = [ + phase.value + postfix + for phase in TestPhase + for postfix in ["_start", "_success"] + ] + expected_event_name_list.append("job_end") + + assert event_list[-1]["detail"] == "normal_exit" + assert event_name_list == expected_event_name_list + + def test_post_agent_status_update_cancelled(self, agent, requests_mock): + self.config["test_command"] = "echo test1" + job_id = str(uuid.uuid1()) + fake_job_data = { + "job_id": job_id, + "job_queue": "test", + "test_data": {"test_cmds": "foo"}, + "job_status_webhook": "https://mywebhook", + } + requests_mock.get( + "http://127.0.0.1:8000/v1/job?queue=test", + [{"text": json.dumps(fake_job_data)}, {"text": "{}"}], + ) + status_url = "http://127.0.0.1:8000/v1/agents/status" + requests_mock.post(status_url, status_code=200) + + requests_mock.get( + "http://127.0.0.1:8000/v1/result/" + job_id, + json={"job_state": "cancelled"}, + ) + with patch("shutil.rmtree"): + agent.process_jobs() + + status_update_requests = list( + filter( + lambda req: req.url == status_url, + requests_mock.request_history, + ) + ) + event_list = status_update_requests[-1].json()["events"] + event_name_list = [event["event_name"] for event in event_list] + + assert "cancelled" in event_name_list + + def test_post_agent_status_update_global_timeout( + self, agent, requests_mock + ): + self.config["test_command"] = "sleep 12" + job_id = str(uuid.uuid1()) + fake_job_data = { + "job_id": job_id, + "job_queue": "test", + "test_data": {"test_cmds": "foo"}, + "job_status_webhook": "https://mywebhook", + "global_timeout": 1, + } + requests_mock.get( + "http://127.0.0.1:8000/v1/job?queue=test", + [{"text": json.dumps(fake_job_data)}, {"text": "{}"}], + ) + status_url = "http://127.0.0.1:8000/v1/agents/status" + requests_mock.post(status_url, status_code=200) + + with patch("shutil.rmtree"): + agent.process_jobs() + + status_update_requests = list( + filter( + lambda req: req.url == status_url, + requests_mock.request_history, + ) + ) + event_list = status_update_requests[-1].json()["events"] + event_name_list = [event["event_name"] for event in event_list] + + assert "global_timeout" in event_name_list + + def test_post_agent_status_update_output_timeout( + self, agent, requests_mock + ): + self.config["test_command"] = "sleep 12" + job_id = str(uuid.uuid1()) + fake_job_data = { + "job_id": job_id, + "job_queue": "test", + "test_data": {"test_cmds": "foo"}, + "job_status_webhook": "https://mywebhook", + "output_timeout": 1, + } + requests_mock.get( + "http://127.0.0.1:8000/v1/job?queue=test", + [{"text": json.dumps(fake_job_data)}, {"text": "{}"}], + ) + status_url = "http://127.0.0.1:8000/v1/agents/status" + requests_mock.post(status_url, status_code=200) + + with patch("shutil.rmtree"): + agent.process_jobs() + + status_update_requests = list( + filter( + lambda req: req.url == status_url, + requests_mock.request_history, + ) + ) + event_list = status_update_requests[-1].json()["events"] + event_name_list = [event["event_name"] for event in event_list] + assert "output_timeout" in event_name_list diff --git a/agent/testflinger_agent/tests/test_client.py b/agent/testflinger_agent/tests/test_client.py index f14bb7c6..9dcd2782 100644 --- a/agent/testflinger_agent/tests/test_client.py +++ b/agent/testflinger_agent/tests/test_client.py @@ -126,3 +126,33 @@ def test_transmit_job_outcome_missing_json(self, client, tmp_path, caplog): """ client.transmit_job_outcome(tmp_path) assert "Unable to read job ID" in caplog.text + + def test_post_status_update(self, client, requests_mock): + """ + Test that the agent sends a status update to the status endpoint + if there is a valid webhook + """ + webhook = "http://foo" + requests_mock.post( + "http://127.0.0.1:8000/v1/agents/status", status_code=200 + ) + events = [ + { + "event_name": "provision_start", + "timestamp": "2014-12-22T03:12:58.019077+00:00", + "detail": "", + }, + { + "event_name": "provision_success", + "timestamp": "2014-12-22T03:12:58.019077+00:00", + "detail": "", + }, + ] + client.post_status_update("myjobqueue", webhook, events) + expected_json = { + "agent_id": client.config.get("agent_id"), + "job_queue": "myjobqueue", + "job_status_webhook": webhook, + "events": events, + } + assert requests_mock.last_request.json() == expected_json diff --git a/agent/testflinger_agent/tests/test_job.py b/agent/testflinger_agent/tests/test_job.py index 15098aaa..480bc046 100644 --- a/agent/testflinger_agent/tests/test_job.py +++ b/agent/testflinger_agent/tests/test_job.py @@ -15,6 +15,7 @@ GlobalTimeoutChecker, OutputTimeoutChecker, ) +from testflinger_common.enums import TestEvent class TestJob: @@ -46,8 +47,10 @@ def test_skip_missing_data(self, client, phase): job = _TestflingerJob(fake_job_data, client) self.config[f"{phase}_command"] = "/bin/true" - return_value = job.run_test_phase(phase, None) + return_value, exit_event, exit_reason = job.run_test_phase(phase, None) assert return_value == 0 + assert exit_event is None + assert exit_reason is None @pytest.mark.parametrize( "phase", ["setup", "provision", "test", "allocate", "reserve"] @@ -59,8 +62,10 @@ def test_skip_empty_provision_data(self, client, phase): self.config[f"{phase}_command"] = "" fake_job_data = {"global_timeout": 1, f"{phase}_data": "foo"} job = _TestflingerJob(fake_job_data, client) - return_value = job.run_test_phase(phase, None) + return_value, exit_event, exit_reason = job.run_test_phase(phase, None) assert return_value == 0 + assert exit_event is None + assert exit_reason is None def test_job_global_timeout(self, tmp_path): """Test that timeout from job_data is respected""" @@ -71,12 +76,13 @@ def test_job_global_timeout(self, tmp_path): runner.register_output_handler(log_handler) global_timeout_checker = GlobalTimeoutChecker(1) runner.register_stop_condition_checker(global_timeout_checker) - exit_code, exit_reason = runner.run("sleep 12") + exit_code, exit_event, exit_reason = runner.run("sleep 12") with open(logfile) as log: log_data = log.read() assert timeout_str in log_data assert exit_reason == timeout_str assert exit_code == -9 + assert exit_event == TestEvent.GLOBAL_TIMEOUT def test_config_global_timeout(self, client): """Test that timeout from device config is preferred""" @@ -97,12 +103,13 @@ def test_job_output_timeout(self, tmp_path): runner.register_stop_condition_checker(output_timeout_checker) # unfortunately, we need to sleep for longer that 10 seconds here # or else we fall under the polling time - exit_code, exit_reason = runner.run("sleep 12") + exit_code, exit_event, exit_reason = runner.run("sleep 12") with open(logfile) as log: log_data = log.read() assert timeout_str in log_data assert exit_reason == timeout_str assert exit_code == -9 + assert exit_event == TestEvent.OUTPUT_TIMEOUT def test_config_output_timeout(self, client): """Test that output timeout from device config is preferred""" diff --git a/agent/testflinger_agent/tests/test_stop_condition_checkers.py b/agent/testflinger_agent/tests/test_stop_condition_checkers.py index bf785546..baba8327 100644 --- a/agent/testflinger_agent/tests/test_stop_condition_checkers.py +++ b/agent/testflinger_agent/tests/test_stop_condition_checkers.py @@ -21,6 +21,8 @@ OutputTimeoutChecker, ) +from testflinger_common.enums import TestEvent + class TestStopConditionCheckers: def test_job_cancelled_checker(self, mocker): @@ -30,27 +32,33 @@ def test_job_cancelled_checker(self, mocker): # Nothing should happen if the job is not cancelled mocker.patch.object(client, "check_job_state", return_value="test") - assert checker() is None + assert checker() == (None, "") # If the job is cancelled, the checker should return a message mocker.patch.object( client, "check_job_state", return_value="cancelled" ) - assert "Job cancellation was requested, exiting." in checker() + stop_event, detail = checker() + assert "Job cancellation was requested, exiting." in detail + assert stop_event == TestEvent.CANCELLED def test_global_timeout_checker(self): """Test that the global timeout checker works as expected.""" checker = GlobalTimeoutChecker(0.5) - assert checker() is None + assert checker() == (None, "") time.sleep(0.6) - assert "ERROR: Global timeout reached! (0.5s)" in checker() + stop_event, detail = checker() + assert "ERROR: Global timeout reached! (0.5s)" in detail + assert stop_event == TestEvent.GLOBAL_TIMEOUT def test_output_timeout_checker(self): """Test that the output timeout checker works as expected.""" checker = OutputTimeoutChecker(0.5) - assert checker() is None + assert checker() == (None, "") time.sleep(0.6) - assert "ERROR: Output timeout reached! (0.5s)" in checker() + stop_event, detail = checker() + assert "ERROR: Output timeout reached! (0.5s)" in detail + assert stop_event == TestEvent.OUTPUT_TIMEOUT def test_output_timeout_update(self): """ @@ -61,4 +69,4 @@ def test_output_timeout_update(self): for _ in range(5): time.sleep(0.1) checker.update() - assert checker() is None + assert checker() == (None, "") diff --git a/common/testflinger_common/enums.py b/common/testflinger_common/enums.py index 067267fb..678e24cb 100644 --- a/common/testflinger_common/enums.py +++ b/common/testflinger_common/enums.py @@ -43,3 +43,36 @@ class TestPhase(StrEnum): RESERVE = "reserve" CLEANUP = "cleanup" + +class TestEvent(StrEnum): + SETUP_START = "setup_start" + PROVISION_START = "provision_start" + FIRMWARE_UPDATE_START = "firmware_update_start" + TEST_START = "test_start" + ALLOCATE_START = "allocate_start" + RESERVE_START = "reserve_start" + CLEANUP_START = "cleanup_start" + + SETUP_SUCCESS = "setup_success" + PROVISION_SUCCESS = "provision_success" + FIRMWARE_UPDATE_SUCCESS = "firmware_update_success" + TEST_SUCCESS = "test_success" + ALLOCATE_SUCCESS = "allocate_success" + RESERVE_SUCCESS = "reserve_success" + CLEANUP_SUCCESS = "cleanup_success" + + SETUP_FAIL = "setup_fail" + PROVISION_FAIL = "provision_fail" + FIRMWARE_UPDATE_FAIL = "firmware_update_fail" + TEST_FAIL = "test_fail" + ALLOCATE_FAIL = "allocate_fail" + RESERVE_FAIL = "reserve_fail" + CLEANUP_FAIL = "cleanup_fail" + + CANCELLED = "cancelled" + GLOBAL_TIMEOUT = "global_timeout" + OUTPUT_TIMEOUT = "output_timeout" + RECOVERY_FAILED = "recovery_failed" + + NORMAL_EXIT = "normal_exit" + JOB_END = "job_end" From b074c27934ed09dcc78b45dcb4c6d6d764498e29 Mon Sep 17 00:00:00 2001 From: Varun Valada Date: Thu, 11 Jul 2024 17:39:56 -0500 Subject: [PATCH 3/5] Change status update endpoint to include job_id --- agent/testflinger_agent/agent.py | 5 + agent/testflinger_agent/client.py | 10 +- agent/testflinger_agent/event_emitter.py | 11 +- agent/testflinger_agent/tests/test_agent.py | 12 +- agent/testflinger_agent/tests/test_client.py | 5 +- common/testflinger_common/enums.py | 3 +- server/README.rst | 5 +- server/src/api/v1.py | 1217 +++++++++--------- server/tests/test_v1.py | 6 +- 9 files changed, 639 insertions(+), 635 deletions(-) diff --git a/agent/testflinger_agent/agent.py b/agent/testflinger_agent/agent.py index fac54b4d..3a1e8e74 100644 --- a/agent/testflinger_agent/agent.py +++ b/agent/testflinger_agent/agent.py @@ -213,10 +213,15 @@ def process_jobs(self): job_data.get("job_queue"), job_data.get("job_status_webhook"), self.client, + job.job_id, ) job_end_reason = TestEvent.NORMAL_EXIT logger.info("Starting job %s", job.job_id) + event_emitter.emit_event( + TestEvent.JOB_START, + f"{self.client.server}/job/{job.job_id}/events", + ) rundir = os.path.join( self.client.config.get("execution_basedir"), job.job_id ) diff --git a/agent/testflinger_agent/client.py b/agent/testflinger_agent/client.py index b6f743ba..a9ad96c0 100644 --- a/agent/testflinger_agent/client.py +++ b/agent/testflinger_agent/client.py @@ -390,7 +390,11 @@ def post_provision_log(self, job_id: str, exit_code: int, detail: str): logger.warning("Unable to post provision log to server: %s", exc) def post_status_update( - self, job_queue: str, webhook: str, events: List[Dict[str, str]] + self, + job_queue: str, + webhook: str, + events: List[Dict[str, str]], + job_id: str, ): """ Posts status updates about the running job as long as there is a @@ -402,6 +406,8 @@ def post_status_update( String URL to post status update to :param events: List of accumulated test events + :param job_id: + id for the job on which we want to post results """ if webhook is None: @@ -413,7 +419,7 @@ def post_status_update( "job_status_webhook": webhook, "events": events, } - status_update_uri = urljoin(self.server, "/v1/agents/status") + status_update_uri = urljoin(self.server, f"/v1/job/{job_id}/events") try: job_request = self.session.post( status_update_uri, json=status_update_request, timeout=30 diff --git a/agent/testflinger_agent/event_emitter.py b/agent/testflinger_agent/event_emitter.py index 21a9a3af..359f822f 100644 --- a/agent/testflinger_agent/event_emitter.py +++ b/agent/testflinger_agent/event_emitter.py @@ -21,7 +21,11 @@ class EventEmitter: def __init__( - self, job_queue: str, webhook: str, client: TestflingerClient + self, + job_queue: str, + webhook: str, + client: TestflingerClient, + job_id: str, ): """ :param job_queue: @@ -30,12 +34,15 @@ def __init__( String url to send status updates to :param client: TestflingerClient used to post status updates to the server + :param job_id: + id for the job on which we want to post updates """ self.job_queue = job_queue self.webhook = webhook self.events = [] self.client = client + self.job_id = job_id def emit_event(self, test_event: TestEvent, detail: str = ""): if test_event is not None: @@ -46,5 +53,5 @@ def emit_event(self, test_event: TestEvent, detail: str = ""): } self.events.append(new_event_json) self.client.post_status_update( - self.job_queue, self.webhook, self.events + self.job_queue, self.webhook, self.events, self.job_id ) diff --git a/agent/testflinger_agent/tests/test_agent.py b/agent/testflinger_agent/tests/test_agent.py index cfb5d8f3..c82867dd 100644 --- a/agent/testflinger_agent/tests/test_agent.py +++ b/agent/testflinger_agent/tests/test_agent.py @@ -403,8 +403,9 @@ def test_post_agent_data(self, agent): def test_post_agent_status_update(self, agent, requests_mock): self.config["test_command"] = "echo test1" + job_id = str(uuid.uuid1()) fake_job_data = { - "job_id": str(uuid.uuid1()), + "job_id": job_id, "job_queue": "test", "test_data": {"test_cmds": "foo"}, "job_status_webhook": "https://mywebhook", @@ -413,7 +414,7 @@ def test_post_agent_status_update(self, agent, requests_mock): "http://127.0.0.1:8000/v1/job?queue=test", [{"text": json.dumps(fake_job_data)}, {"text": "{}"}], ) - status_url = "http://127.0.0.1:8000/v1/agents/status" + status_url = f"http://127.0.0.1:8000/v1/job/{job_id}/events" requests_mock.post(status_url, status_code=200) with patch("shutil.rmtree"): agent.process_jobs() @@ -431,6 +432,7 @@ def test_post_agent_status_update(self, agent, requests_mock): for phase in TestPhase for postfix in ["_start", "_success"] ] + expected_event_name_list.insert(0, "job_start") expected_event_name_list.append("job_end") assert event_list[-1]["detail"] == "normal_exit" @@ -449,7 +451,7 @@ def test_post_agent_status_update_cancelled(self, agent, requests_mock): "http://127.0.0.1:8000/v1/job?queue=test", [{"text": json.dumps(fake_job_data)}, {"text": "{}"}], ) - status_url = "http://127.0.0.1:8000/v1/agents/status" + status_url = f"http://127.0.0.1:8000/v1/job/{job_id}/events" requests_mock.post(status_url, status_code=200) requests_mock.get( @@ -486,7 +488,7 @@ def test_post_agent_status_update_global_timeout( "http://127.0.0.1:8000/v1/job?queue=test", [{"text": json.dumps(fake_job_data)}, {"text": "{}"}], ) - status_url = "http://127.0.0.1:8000/v1/agents/status" + status_url = f"http://127.0.0.1:8000/v1/job/{job_id}/events" requests_mock.post(status_url, status_code=200) with patch("shutil.rmtree"): @@ -519,7 +521,7 @@ def test_post_agent_status_update_output_timeout( "http://127.0.0.1:8000/v1/job?queue=test", [{"text": json.dumps(fake_job_data)}, {"text": "{}"}], ) - status_url = "http://127.0.0.1:8000/v1/agents/status" + status_url = f"http://127.0.0.1:8000/v1/job/{job_id}/events" requests_mock.post(status_url, status_code=200) with patch("shutil.rmtree"): diff --git a/agent/testflinger_agent/tests/test_client.py b/agent/testflinger_agent/tests/test_client.py index 9dcd2782..17e46406 100644 --- a/agent/testflinger_agent/tests/test_client.py +++ b/agent/testflinger_agent/tests/test_client.py @@ -133,8 +133,9 @@ def test_post_status_update(self, client, requests_mock): if there is a valid webhook """ webhook = "http://foo" + job_id = job_id = str(uuid.uuid1()) requests_mock.post( - "http://127.0.0.1:8000/v1/agents/status", status_code=200 + f"http://127.0.0.1:8000/v1/job/{job_id}/events", status_code=200 ) events = [ { @@ -148,7 +149,7 @@ def test_post_status_update(self, client, requests_mock): "detail": "", }, ] - client.post_status_update("myjobqueue", webhook, events) + client.post_status_update("myjobqueue", webhook, events, job_id) expected_json = { "agent_id": client.config.get("agent_id"), "job_queue": "myjobqueue", diff --git a/common/testflinger_common/enums.py b/common/testflinger_common/enums.py index 678e24cb..4265cb2f 100644 --- a/common/testflinger_common/enums.py +++ b/common/testflinger_common/enums.py @@ -72,7 +72,8 @@ class TestEvent(StrEnum): CANCELLED = "cancelled" GLOBAL_TIMEOUT = "global_timeout" OUTPUT_TIMEOUT = "output_timeout" - RECOVERY_FAILED = "recovery_failed" + RECOVERY_FAIL = "recovery_fail" NORMAL_EXIT = "normal_exit" + JOB_START = "job_start" JOB_END = "job_end" diff --git a/server/README.rst b/server/README.rst index da1f63cc..d8d1f4d9 100644 --- a/server/README.rst +++ b/server/README.rst @@ -357,11 +357,12 @@ This will find jobs tagged with both "foo" and "bar". --data '{ "job_id": "00000000-0000-0000-0000-000000000000", \ "exit_code": 1, "detail":"foo" }' -**[POST] /v1/status - Receive job status updates from an agent and posts them to the specified webhook. +**[POST] /v1/job//events - Receive job status updates from an agent and posts them to the specified webhook. The job_status_webhook parameter is required for this endpoint. Other parameters included here will be forwarded to the webhook. - Parameters: + - job_id: test job identifier as a UUID - job_status_webhook: webhook URL to post status updates to - Returns: @@ -380,4 +381,4 @@ The job_status_webhook parameter is required for this endpoint. Other parameters $ curl -X POST \ -H "Content-Type: application/json" \ - -d '{"agent_id": "agent-00", "job_queue": "myqueue", "job_status_webhook": "http://mywebhook", "events": [{"event_name": "started_provisioning", "timestamp": "2024-05-03T19:11:33.541130+00:00", "detail": "my_detailed_message"}]}' http://localhost:8000/v1/agents/status + -d '{"agent_id": "agent-00", "job_queue": "myqueue", "job_status_webhook": "http://mywebhook", "events": [{"event_name": "started_provisioning", "timestamp": "2024-05-03T19:11:33.541130+00:00", "detail": "my_detailed_message"}]}' http://localhost:8000/v1/job/00000000-0000-0000-0000-000000000000/events diff --git a/server/src/api/v1.py b/server/src/api/v1.py index 9b89c998..57eefd83 100644 --- a/server/src/api/v1.py +++ b/server/src/api/v1.py @@ -1,620 +1,597 @@ -# Copyright (C) 2022 Canonical -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -# -""" -Testflinger v1 API -""" - -import uuid -from datetime import datetime, timezone - -import pkg_resources -from apiflask import APIBlueprint, abort -from flask import jsonify, request, send_file -from prometheus_client import Counter -from werkzeug.exceptions import BadRequest - -import requests -from requests.adapters import HTTPAdapter -from urllib3.util.retry import Retry - -from src import database -from . import schemas - - -jobs_metric = Counter("jobs", "Number of jobs", ["queue"]) -reservations_metric = Counter( - "reservations", "Number of reservations", ["queue"] -) - - -v1 = APIBlueprint("v1", __name__) - - -@v1.get("/") -def home(): - """Identify ourselves""" - return get_version() - - -def get_version(): - """Return the Testflinger version""" - try: - version = pkg_resources.get_distribution("testflinger").version - except pkg_resources.DistributionNotFound: - version = "devel" - return "Testflinger Server v{}".format(version) - - -@v1.post("/job") -@v1.input(schemas.Job, location="json") -@v1.output(schemas.JobId) -def job_post(json_data: dict): - """Add a job to the queue""" - try: - job_queue = json_data.get("job_queue") - except (AttributeError, BadRequest): - # Set job_queue to None so we take the failure path below - job_queue = "" - if not job_queue: - abort(422, message="Invalid data or no job_queue specified") - - try: - job = job_builder(json_data) - except ValueError: - abort(400, message="Invalid job_id specified") - - jobs_metric.labels(queue=job_queue).inc() - if "reserve_data" in json_data: - reservations_metric.labels(queue=job_queue).inc() - - # CAUTION! If you ever move this line, you may need to pass data as a copy - # because it will get modified by submit_job and other things it calls - database.add_job(job) - return jsonify(job_id=job.get("job_id")) - - -def has_attachments(data: dict) -> bool: - """Predicate if the job described by `data` involves attachments""" - return any( - nested_field == "attachments" - for field, nested_dict in data.items() - if field.endswith("_data") - for nested_field in nested_dict - ) - - -def job_builder(data): - """Build a job from a dictionary of data""" - job = { - "created_at": datetime.utcnow(), - "result_data": { - "job_state": "waiting", - }, - } - # If the job_id is provided, keep it as long as the uuid is good. - # This is for job resubmission - job_id = data.pop("job_id", None) - if job_id and isinstance(job_id, str): - # This job already came with a job_id, so it was resubmitted - if not check_valid_uuid(job_id): - raise ValueError - else: - # This is a new job, so generate a new job_id - job_id = str(uuid.uuid4()) - - # side effect: modify the job dict - if has_attachments(data): - data["attachments_status"] = "waiting" - - job["job_id"] = job_id - job["job_data"] = data - return job - - -@v1.get("/job") -@v1.output(schemas.Job) -@v1.doc(responses=schemas.job_empty) -def job_get(): - """Request a job to run from supported queues""" - queue_list = request.args.getlist("queue") - if not queue_list: - return "No queue(s) specified in request", 400 - job = database.pop_job(queue_list=queue_list) - if job: - return jsonify(job) - return {}, 204 - - -@v1.get("/job/") -@v1.output(schemas.Job) -def job_get_id(job_id): - """Request the json job definition for a specified job, even if it has - already run - - :param job_id: - UUID as a string for the job - :return: - JSON data for the job or error string and http error - """ - if not check_valid_uuid(job_id): - abort(400, message="Invalid job_id specified") - response = database.mongo.db.jobs.find_one( - {"job_id": job_id}, projection={"job_data": True, "_id": False} - ) - if not response: - return {}, 204 - job_data = response.get("job_data") - job_data["job_id"] = job_id - return job_data - - -@v1.get("/job//attachments") -def attachment_get(job_id): - """Return the attachments bundle for a specified job_id - - :param job_id: - UUID as a string for the job - :return: - send_file stream of attachment tarball to download - """ - if not check_valid_uuid(job_id): - return "Invalid job id\n", 400 - try: - file = database.retrieve_file(filename=f"{job_id}.attachments") - except FileNotFoundError: - return "", 204 - return send_file(file, mimetype="application/gzip") - - -@v1.post("/job//attachments") -def attachments_post(job_id): - """Post attachment bundle for a specified job_id - - :param job_id: - UUID as a string for the job - """ - if not check_valid_uuid(job_id): - return "Invalid job id\n", 400 - try: - attachments_status = database.get_attachments_status(job_id) - except ValueError: - return f"Job {job_id} is not valid\n", 422 - if attachments_status is None: - return f"Job {job_id} not awaiting attachments\n", 422 - if attachments_status == "complete": - # attachments already submitted: successful, could be due to a retry - return "OK", 200 - - # save attachments archive in the database - database.save_file( - data=request.files["file"], - filename=f"{job_id}.attachments", - ) - - # now the job can be processed - database.attachments_received(job_id) - return "OK", 200 - - -@v1.get("/job/search") -@v1.input(schemas.JobSearchRequest, location="query") -@v1.output(schemas.JobSearchResponse) -def search_jobs(query_data): - """Search for jobs by tags""" - tags = query_data.get("tags") - match = request.args.get("match", "any") - states = request.args.getlist("state") - - query = {} - if tags and match == "all": - query["job_data.tags"] = {"$all": tags} - elif tags and match == "any": - query["job_data.tags"] = {"$in": tags} - - if "active" in states: - query["result_data.job_state"] = { - "$nin": ["cancelled", "complete", "completed"] - } - elif states: - query["result_data.job_state"] = {"$in": states} - - pipeline = [ - {"$match": query}, - { - "$project": { - "job_id": True, - "created_at": True, - "job_state": "$result_data.job_state", - "_id": False, - }, - }, - ] - - jobs = database.mongo.db.jobs.aggregate(pipeline) - - return jsonify(list(jobs)) - - -@v1.post("/result/") -@v1.input(schemas.Result, location="json") -def result_post(job_id, json_data): - """Post a result for a specified job_id - - :param job_id: - UUID as a string for the job - """ - if not check_valid_uuid(job_id): - abort(400, message="Invalid job_id specified") - - # First, we need to prepend "result_data" to each key in the result_data - for key in list(json_data): - json_data[f"result_data.{key}"] = json_data.pop(key) - - database.mongo.db.jobs.update_one({"job_id": job_id}, {"$set": json_data}) - return "OK" - - -@v1.get("/result/") -@v1.output(schemas.Result) -def result_get(job_id): - """Return results for a specified job_id - - :param job_id: - UUID as a string for the job - """ - if not check_valid_uuid(job_id): - abort(400, message="Invalid job_id specified") - response = database.mongo.db.jobs.find_one( - {"job_id": job_id}, {"result_data": True, "_id": False} - ) - - if not response or not (results := response.get("result_data")): - return "", 204 - results = response.get("result_data") - return results - - -@v1.post("/result//artifact") -def artifacts_post(job_id): - """Post artifact bundle for a specified job_id - - :param job_id: - UUID as a string for the job - """ - if not check_valid_uuid(job_id): - return "Invalid job id\n", 400 - database.save_file( - data=request.files["file"], - filename=f"{job_id}.artifact", - ) - return "OK" - - -@v1.get("/result//artifact") -def artifacts_get(job_id): - """Return artifact bundle for a specified job_id - - :param job_id: - UUID as a string for the job - :return: - send_file stream of artifact tarball to download - """ - if not check_valid_uuid(job_id): - return "Invalid job id\n", 400 - try: - file = database.retrieve_file(filename=f"{job_id}.artifact") - except FileNotFoundError: - return "", 204 - return send_file(file, download_name="artifact.tar.gz") - - -@v1.get("/result//output") -def output_get(job_id): - """Get latest output for a specified job ID - - :param job_id: - UUID as a string for the job - :return: - Output lines - """ - if not check_valid_uuid(job_id): - return "Invalid job id\n", 400 - response = database.mongo.db.output.find_one_and_delete( - {"job_id": job_id}, {"_id": False} - ) - output = response.get("output", []) if response else None - if output: - return "\n".join(output) - return "", 204 - - -@v1.post("/result//output") -def output_post(job_id): - """Post output for a specified job ID - - :param job_id: - UUID as a string for the job - :param data: - A string containing the latest lines of output to post - """ - if not check_valid_uuid(job_id): - abort(400, message="Invalid job_id specified") - data = request.get_data().decode("utf-8") - timestamp = datetime.utcnow() - database.mongo.db.output.update_one( - {"job_id": job_id}, - {"$set": {"updated_at": timestamp}, "$push": {"output": data}}, - upsert=True, - ) - return "OK" - - -@v1.post("/job//action") -@v1.input(schemas.ActionIn, location="json") -def action_post(job_id, json_data): - """Take action on the job status for a specified job ID - - :param job_id: - UUID as a string for the job - """ - if not check_valid_uuid(job_id): - return "Invalid job id\n", 400 - action = json_data["action"] - supported_actions = { - "cancel": cancel_job, - } - # Validation of actions happens in schemas.py:ActionIn - return supported_actions[action](job_id) - - -@v1.get("/agents/queues") -@v1.doc(responses=schemas.queues_out) -def queues_get(): - """Get all advertised queues from this server - - Returns a dict of queue names and descriptions, ex: - { - "some_queue": "A queue for testing", - "other_queue": "A queue for something else" - } - """ - all_queues = database.mongo.db.queues.find( - {}, projection={"_id": False, "name": True, "description": True} - ) - queue_dict = {} - # Create a dict of queues and descriptions - for queue in all_queues: - queue_dict[queue.get("name")] = queue.get("description", "") - return jsonify(queue_dict) - - -@v1.post("/agents/queues") -def queues_post(): - """Tell testflinger the queue names that are being serviced - - Some agents may want to advertise some of the queues they listen on so that - the user can check which queues are valid to use. - """ - queue_dict = request.get_json() - for queue, description in queue_dict.items(): - database.mongo.db.queues.update_one( - {"name": queue}, - {"$set": {"description": description}}, - upsert=True, - ) - return "OK" - - -@v1.get("/agents/images/") -@v1.doc(responses=schemas.images_out) -def images_get(queue): - """Get a dict of known images for a given queue""" - queue_data = database.mongo.db.queues.find_one( - {"name": queue}, {"_id": False, "images": True} - ) - if not queue_data: - return jsonify({}) - # It's ok for this to just return an empty result if there are none found - return jsonify(queue_data.get("images", {})) - - -@v1.post("/agents/images") -def images_post(): - """Tell testflinger about known images for a specified queue - images will be stored in a dict of key/value pairs as part of the queues - collection. That dict will contain image_name:provision_data mappings, ex: - { - "some_queue": { - "core22": "http://cdimage.ubuntu.com/.../core-22.tar.gz", - "jammy": "http://cdimage.ubuntu.com/.../ubuntu-22.04.tar.gz" - }, - "other_queue": { - ... - } - } - """ - image_dict = request.get_json() - # We need to delete and recreate the images in case some were removed - for queue, image_data in image_dict.items(): - database.mongo.db.queues.update_one( - {"name": queue}, - {"$set": {"images": image_data}}, - upsert=True, - ) - return "OK" - - -@v1.get("/agents/data") -@v1.output(schemas.AgentOut) -def agents_get_all(): - """Get all agent data""" - agents = database.mongo.db.agents.find({}, {"_id": False, "log": False}) - return jsonify(list(agents)) - - -@v1.post("/agents/data/") -@v1.input(schemas.AgentIn, location="json") -def agents_post(agent_name, json_data): - """Post information about the agent to the server - - The json sent to this endpoint may contain data such as the following: - { - "state": string, # State the device is in - "queues": array[string], # Queues the device is listening on - "location": string, # Location of the device - "job_id": string, # Job ID the device is running, if any - "log": array[string], # push and keep only the last 100 lines - } - """ - - json_data["name"] = agent_name - json_data["updated_at"] = datetime.utcnow() - # extract log from data so we can push it instead of setting it - log = json_data.pop("log", []) - - database.mongo.db.agents.update_one( - {"name": agent_name}, - {"$set": json_data, "$push": {"log": {"$each": log, "$slice": -100}}}, - upsert=True, - ) - return "OK" - - -@v1.post("/agents/provision_logs/") -@v1.input(schemas.ProvisionLogsIn, location="json") -def agents_provision_logs_post(agent_name, json_data): - """Post provision logs for the agent to the server""" - agent_record = {} - - # timestamp this agent record and provision log entry - timestamp = datetime.now(timezone.utc) - agent_record["updated_at"] = json_data["timestamp"] = timestamp - - update_operation = { - "$set": json_data, - "$push": { - "provision_log": {"$each": [json_data], "$slice": -100}, - }, - } - database.mongo.db.provision_logs.update_one( - {"name": agent_name}, - update_operation, - upsert=True, - ) - return "OK" - - -@v1.post("/agents/status") -@v1.input(schemas.StatusUpdate, location="json") -def agents_status_post(json_data): - """Posts status updates from the agent to the server to be forwarded - to TestObserver - - The json sent to this endpoint may contain data such as the following: - { - "agent_id": "", - "job_queue": "", - "job_status_webhook": "", - "events": [ - { - "event_name": "", - "timestamp": "", - "detail": "" - }, - ... - ] - } - - """ - request_json = json_data - webhook_url = request_json.pop("job_status_webhook") - try: - s = requests.Session() - s.mount( - "", - HTTPAdapter( - max_retries=Retry( - total=3, - allowed_methods=frozenset(["PUT"]), - backoff_factor=1, - ) - ), - ) - response = s.put(webhook_url, json=request_json, timeout=3) - return response.text, response.status_code - except requests.exceptions.Timeout: - return "Webhook Timeout", 504 - - -def check_valid_uuid(job_id): - """Check that the specified job_id is a valid UUID only - - :param job_id: - UUID as a string for the job - :return: - True if job_id is valid, False if not - """ - - try: - uuid.UUID(job_id) - except ValueError: - return False - return True - - -@v1.get("/job//position") -def job_position_get(job_id): - """Return the position of the specified jobid in the queue""" - job_data, status = job_get_id(job_id) - if status == 204: - return "Job not found or already started\n", 410 - if status != 200: - return job_data - try: - queue = job_data.json.get("job_queue") - except (AttributeError, TypeError): - return "Invalid json returned for id: {}\n".format(job_id), 400 - # Get all jobs with job_queue=queue and return only the _id - jobs = database.mongo.db.jobs.find( - {"job_data.job_queue": queue, "result_data.job_state": "waiting"}, - {"job_id": 1}, - ) - # Create a dict mapping job_id (as a string) to the position in the queue - jobs_id_position = {job.get("job_id"): pos for pos, job in enumerate(jobs)} - if job_id in jobs_id_position: - return str(jobs_id_position[job_id]) - return "Job not found or already started\n", 410 - - -def cancel_job(job_id): - """Cancellation for a specified job ID - - :param job_id: - UUID as a string for the job - """ - # Set the job status to cancelled - response = database.mongo.db.jobs.update_one( - { - "job_id": job_id, - "result_data.job_state": { - "$nin": ["cancelled", "complete", "completed"] - }, - }, - {"$set": {"result_data.job_state": "cancelled"}}, - ) - if response.modified_count == 0: - return "The job is already completed or cancelled", 400 - return "OK" +# Copyright (C) 2022 Canonical +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +""" +Testflinger v1 API +""" + +import uuid +from datetime import datetime + +import pkg_resources +from apiflask import APIBlueprint, abort +from flask import jsonify, request, send_file +from prometheus_client import Counter +from werkzeug.exceptions import BadRequest + +import requests +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry + +from src import database +from . import schemas + + +jobs_metric = Counter("jobs", "Number of jobs", ["queue"]) +reservations_metric = Counter( + "reservations", "Number of reservations", ["queue"] +) + + +v1 = APIBlueprint("v1", __name__) + + +@v1.get("/") +def home(): + """Identify ourselves""" + return get_version() + + +def get_version(): + """Return the Testflinger version""" + try: + version = pkg_resources.get_distribution("testflinger").version + except pkg_resources.DistributionNotFound: + version = "devel" + return "Testflinger Server v{}".format(version) + + +@v1.post("/job") +@v1.input(schemas.Job, location="json") +@v1.output(schemas.JobId) +def job_post(json_data: dict): + """Add a job to the queue""" + try: + job_queue = json_data.get("job_queue") + except (AttributeError, BadRequest): + # Set job_queue to None so we take the failure path below + job_queue = "" + if not job_queue: + abort(422, message="Invalid data or no job_queue specified") + + try: + job = job_builder(json_data) + except ValueError: + abort(400, message="Invalid job_id specified") + + jobs_metric.labels(queue=job_queue).inc() + if "reserve_data" in json_data: + reservations_metric.labels(queue=job_queue).inc() + + # CAUTION! If you ever move this line, you may need to pass data as a copy + # because it will get modified by submit_job and other things it calls + database.add_job(job) + return jsonify(job_id=job.get("job_id")) + + +def has_attachments(data: dict) -> bool: + """Predicate if the job described by `data` involves attachments""" + return any( + nested_field == "attachments" + for field, nested_dict in data.items() + if field.endswith("_data") + for nested_field in nested_dict + ) + + +def job_builder(data): + """Build a job from a dictionary of data""" + job = { + "created_at": datetime.utcnow(), + "result_data": { + "job_state": "waiting", + }, + } + # If the job_id is provided, keep it as long as the uuid is good. + # This is for job resubmission + job_id = data.pop("job_id", None) + if job_id and isinstance(job_id, str): + # This job already came with a job_id, so it was resubmitted + if not check_valid_uuid(job_id): + raise ValueError + else: + # This is a new job, so generate a new job_id + job_id = str(uuid.uuid4()) + + # side effect: modify the job dict + if has_attachments(data): + data["attachments_status"] = "waiting" + + job["job_id"] = job_id + job["job_data"] = data + return job + + +@v1.get("/job") +@v1.output(schemas.Job) +@v1.doc(responses=schemas.job_empty) +def job_get(): + """Request a job to run from supported queues""" + queue_list = request.args.getlist("queue") + if not queue_list: + return "No queue(s) specified in request", 400 + job = database.pop_job(queue_list=queue_list) + if job: + return jsonify(job) + return {}, 204 + + +@v1.get("/job/") +@v1.output(schemas.Job) +def job_get_id(job_id): + """Request the json job definition for a specified job, even if it has + already run + + :param job_id: + UUID as a string for the job + :return: + JSON data for the job or error string and http error + """ + if not check_valid_uuid(job_id): + abort(400, message="Invalid job_id specified") + response = database.mongo.db.jobs.find_one( + {"job_id": job_id}, projection={"job_data": True, "_id": False} + ) + if not response: + return {}, 204 + job_data = response.get("job_data") + job_data["job_id"] = job_id + return job_data + + +@v1.get("/job//attachments") +def attachment_get(job_id): + """Return the attachments bundle for a specified job_id + + :param job_id: + UUID as a string for the job + :return: + send_file stream of attachment tarball to download + """ + if not check_valid_uuid(job_id): + return "Invalid job id\n", 400 + try: + file = database.retrieve_file(filename=f"{job_id}.attachments") + except FileNotFoundError: + return "", 204 + return send_file(file, mimetype="application/gzip") + + +@v1.post("/job//attachments") +def attachments_post(job_id): + """Post attachment bundle for a specified job_id + + :param job_id: + UUID as a string for the job + """ + if not check_valid_uuid(job_id): + return "Invalid job id\n", 400 + try: + attachments_status = database.get_attachments_status(job_id) + except ValueError: + return f"Job {job_id} is not valid\n", 422 + if attachments_status is None: + return f"Job {job_id} not awaiting attachments\n", 422 + if attachments_status == "complete": + # attachments already submitted: successful, could be due to a retry + return "OK", 200 + + # save attachments archive in the database + database.save_file( + data=request.files["file"], + filename=f"{job_id}.attachments", + ) + + # now the job can be processed + database.attachments_received(job_id) + return "OK", 200 + + +@v1.get("/job/search") +@v1.input(schemas.JobSearchRequest, location="query") +@v1.output(schemas.JobSearchResponse) +def search_jobs(query_data): + """Search for jobs by tags""" + tags = query_data.get("tags") + match = request.args.get("match", "any") + states = request.args.getlist("state") + + query = {} + if tags and match == "all": + query["job_data.tags"] = {"$all": tags} + elif tags and match == "any": + query["job_data.tags"] = {"$in": tags} + + if "active" in states: + query["result_data.job_state"] = { + "$nin": ["cancelled", "complete", "completed"] + } + elif states: + query["result_data.job_state"] = {"$in": states} + + pipeline = [ + {"$match": query}, + { + "$project": { + "job_id": True, + "created_at": True, + "job_state": "$result_data.job_state", + "_id": False, + }, + }, + ] + + jobs = database.mongo.db.jobs.aggregate(pipeline) + + return jsonify(list(jobs)) + + +@v1.post("/result/") +@v1.input(schemas.Result, location="json") +def result_post(job_id, json_data): + """Post a result for a specified job_id + + :param job_id: + UUID as a string for the job + """ + if not check_valid_uuid(job_id): + abort(400, message="Invalid job_id specified") + + # First, we need to prepend "result_data" to each key in the result_data + for key in list(json_data): + json_data[f"result_data.{key}"] = json_data.pop(key) + + database.mongo.db.jobs.update_one({"job_id": job_id}, {"$set": json_data}) + return "OK" + + +@v1.get("/result/") +@v1.output(schemas.Result) +def result_get(job_id): + """Return results for a specified job_id + + :param job_id: + UUID as a string for the job + """ + if not check_valid_uuid(job_id): + abort(400, message="Invalid job_id specified") + response = database.mongo.db.jobs.find_one( + {"job_id": job_id}, {"result_data": True, "_id": False} + ) + + if not response or not (results := response.get("result_data")): + return "", 204 + results = response.get("result_data") + return results + + +@v1.post("/result//artifact") +def artifacts_post(job_id): + """Post artifact bundle for a specified job_id + + :param job_id: + UUID as a string for the job + """ + if not check_valid_uuid(job_id): + return "Invalid job id\n", 400 + database.save_file( + data=request.files["file"], + filename=f"{job_id}.artifact", + ) + return "OK" + + +@v1.get("/result//artifact") +def artifacts_get(job_id): + """Return artifact bundle for a specified job_id + + :param job_id: + UUID as a string for the job + :return: + send_file stream of artifact tarball to download + """ + if not check_valid_uuid(job_id): + return "Invalid job id\n", 400 + try: + file = database.retrieve_file(filename=f"{job_id}.artifact") + except FileNotFoundError: + return "", 204 + return send_file(file, download_name="artifact.tar.gz") + + +@v1.get("/result//output") +def output_get(job_id): + """Get latest output for a specified job ID + + :param job_id: + UUID as a string for the job + :return: + Output lines + """ + if not check_valid_uuid(job_id): + return "Invalid job id\n", 400 + response = database.mongo.db.output.find_one_and_delete( + {"job_id": job_id}, {"_id": False} + ) + output = response.get("output", []) if response else None + if output: + return "\n".join(output) + return "", 204 + + +@v1.post("/result//output") +def output_post(job_id): + """Post output for a specified job ID + + :param job_id: + UUID as a string for the job + :param data: + A string containing the latest lines of output to post + """ + if not check_valid_uuid(job_id): + abort(400, message="Invalid job_id specified") + data = request.get_data().decode("utf-8") + timestamp = datetime.utcnow() + database.mongo.db.output.update_one( + {"job_id": job_id}, + {"$set": {"updated_at": timestamp}, "$push": {"output": data}}, + upsert=True, + ) + return "OK" + + +@v1.post("/job//action") +@v1.input(schemas.ActionIn, location="json") +def action_post(job_id, json_data): + """Take action on the job status for a specified job ID + + :param job_id: + UUID as a string for the job + """ + if not check_valid_uuid(job_id): + return "Invalid job id\n", 400 + action = json_data["action"] + supported_actions = { + "cancel": cancel_job, + } + # Validation of actions happens in schemas.py:ActionIn + return supported_actions[action](job_id) + + +@v1.get("/agents/queues") +@v1.doc(responses=schemas.queues_out) +def queues_get(): + """Get all advertised queues from this server + + Returns a dict of queue names and descriptions, ex: + { + "some_queue": "A queue for testing", + "other_queue": "A queue for something else" + } + """ + all_queues = database.mongo.db.queues.find( + {}, projection={"_id": False, "name": True, "description": True} + ) + queue_dict = {} + # Create a dict of queues and descriptions + for queue in all_queues: + queue_dict[queue.get("name")] = queue.get("description", "") + return jsonify(queue_dict) + + +@v1.post("/agents/queues") +def queues_post(): + """Tell testflinger the queue names that are being serviced + + Some agents may want to advertise some of the queues they listen on so that + the user can check which queues are valid to use. + """ + queue_dict = request.get_json() + for queue, description in queue_dict.items(): + database.mongo.db.queues.update_one( + {"name": queue}, + {"$set": {"description": description}}, + upsert=True, + ) + return "OK" + + +@v1.get("/agents/images/") +@v1.doc(responses=schemas.images_out) +def images_get(queue): + """Get a dict of known images for a given queue""" + queue_data = database.mongo.db.queues.find_one( + {"name": queue}, {"_id": False, "images": True} + ) + if not queue_data: + return jsonify({}) + # It's ok for this to just return an empty result if there are none found + return jsonify(queue_data.get("images", {})) + + +@v1.post("/agents/images") +def images_post(): + """Tell testflinger about known images for a specified queue + images will be stored in a dict of key/value pairs as part of the queues + collection. That dict will contain image_name:provision_data mappings, ex: + { + "some_queue": { + "core22": "http://cdimage.ubuntu.com/.../core-22.tar.gz", + "jammy": "http://cdimage.ubuntu.com/.../ubuntu-22.04.tar.gz" + }, + "other_queue": { + ... + } + } + """ + image_dict = request.get_json() + # We need to delete and recreate the images in case some were removed + for queue, image_data in image_dict.items(): + database.mongo.db.queues.update_one( + {"name": queue}, + {"$set": {"images": image_data}}, + upsert=True, + ) + return "OK" + + +@v1.get("/agents/data") +@v1.output(schemas.AgentOut) +def agents_get_all(): + """Get all agent data""" + agents = database.mongo.db.agents.find({}, {"_id": False, "log": False}) + return jsonify(list(agents)) + + +@v1.post("/agents/data/") +@v1.input(schemas.AgentIn, location="json") +def agents_post(agent_name, json_data): + """Post information about the agent to the server + + The json sent to this endpoint may contain data such as the following: + { + "state": string, # State the device is in + "queues": array[string], # Queues the device is listening on + "location": string, # Location of the device + "job_id": string, # Job ID the device is running, if any + "log": array[string], # push and keep only the last 100 lines + } + """ + + json_data["name"] = agent_name + json_data["updated_at"] = datetime.utcnow() + # extract log from data so we can push it instead of setting it + log = json_data.pop("log", []) + + database.mongo.db.agents.update_one( + {"name": agent_name}, + {"$set": json_data, "$push": {"log": {"$each": log, "$slice": -100}}}, + upsert=True, + ) + return "OK" + + +@v1.post("/job//events") +@v1.input(schemas.StatusUpdate, location="json") +def agents_status_post(job_id, json_data): + """Posts status updates from the agent to the server to be forwarded + to TestObserver + + The json sent to this endpoint may contain data such as the following: + { + "agent_id": "", + "job_queue": "", + "job_status_webhook": "", + "events": [ + { + "event_name": "", + "timestamp": "", + "detail": "" + }, + ... + ] + } + + """ + _ = job_id + request_json = json_data + webhook_url = request_json.pop("job_status_webhook") + try: + s = requests.Session() + s.mount( + "", + HTTPAdapter( + max_retries=Retry( + total=3, + allowed_methods=frozenset(["PUT"]), + backoff_factor=1, + ) + ), + ) + response = s.put(webhook_url, json=request_json, timeout=3) + return response.text, response.status_code + except requests.exceptions.Timeout: + return "Webhook Timeout", 504 + + +def check_valid_uuid(job_id): + """Check that the specified job_id is a valid UUID only + + :param job_id: + UUID as a string for the job + :return: + True if job_id is valid, False if not + """ + + try: + uuid.UUID(job_id) + except ValueError: + return False + return True + + +@v1.get("/job//position") +def job_position_get(job_id): + """Return the position of the specified jobid in the queue""" + job_data, status = job_get_id(job_id) + if status == 204: + return "Job not found or already started\n", 410 + if status != 200: + return job_data + try: + queue = job_data.json.get("job_queue") + except (AttributeError, TypeError): + return "Invalid json returned for id: {}\n".format(job_id), 400 + # Get all jobs with job_queue=queue and return only the _id + jobs = database.mongo.db.jobs.find( + {"job_data.job_queue": queue, "result_data.job_state": "waiting"}, + {"job_id": 1}, + ) + # Create a dict mapping job_id (as a string) to the position in the queue + jobs_id_position = {job.get("job_id"): pos for pos, job in enumerate(jobs)} + if job_id in jobs_id_position: + return str(jobs_id_position[job_id]) + return "Job not found or already started\n", 410 + + +def cancel_job(job_id): + """Cancellation for a specified job ID + + :param job_id: + UUID as a string for the job + """ + # Set the job status to cancelled + response = database.mongo.db.jobs.update_one( + { + "job_id": job_id, + "result_data.job_state": { + "$nin": ["cancelled", "complete", "completed"] + }, + }, + {"$set": {"result_data.job_state": "cancelled"}}, + ) + if response.modified_count == 0: + return "The job is already completed or cancelled", 400 + return "OK" diff --git a/server/tests/test_v1.py b/server/tests/test_v1.py index 6500d971..16408029 100644 --- a/server/tests/test_v1.py +++ b/server/tests/test_v1.py @@ -531,6 +531,10 @@ def test_agents_provision_logs_post(mongo_app): def test_agents_status_put(mongo_app, requests_mock): """Test api to receive agent status requests""" app, _ = mongo_app + job_data = {"job_queue": "test"} + job_output = app.post("/v1/job", json=job_data) + job_id = job_output.json.get("job_id") + webhook = "http://mywebhook.com" requests_mock.put(webhook, status_code=200, text="webhook requested") status_update_data = { @@ -545,7 +549,7 @@ def test_agents_status_put(mongo_app, requests_mock): } ], } - output = app.post("/v1/agents/status", json=status_update_data) + output = app.post(f"/v1/job/{job_id}/events", json=status_update_data) assert 200 == output.status_code assert "webhook requested" == output.text From 8a469fc8ff65d03c32a053d5c3f861fe81c0aec2 Mon Sep 17 00:00:00 2001 From: Paul Larson Date: Fri, 12 Jul 2024 13:00:14 -0500 Subject: [PATCH 4/5] Fix some issues from the merge --- agent/testflinger_agent/agent.py | 8 +++---- agent/testflinger_agent/job.py | 4 +++- agent/testflinger_agent/tests/test_job.py | 16 +++++++++----- server/src/api/v1.py | 26 ++++++++++++++++++++++- 4 files changed, 43 insertions(+), 11 deletions(-) diff --git a/agent/testflinger_agent/agent.py b/agent/testflinger_agent/agent.py index 3a1e8e74..f273b239 100644 --- a/agent/testflinger_agent/agent.py +++ b/agent/testflinger_agent/agent.py @@ -264,16 +264,16 @@ def process_jobs(self): self.set_agent_state(phase) event_emitter.emit_event(TestEvent(phase + "_start")) - exitcode, exit_event, exit_reason = job.run_test_phase( + exit_code, exit_event, exit_reason = job.run_test_phase( phase, rundir ) - self.client.post_influx(phase, exitcode) + self.client.post_influx(phase, exit_code) event_emitter.emit_event(exit_event, exit_reason) - if exitcode: + if exit_code: # exit code 46 is our indication that recovery failed! # In this case, we need to mark the device offline - if exitcode == 46: + if exit_code == 46: self.mark_device_offline() exit_event = TestEvent.RECOVERY_FAIL else: diff --git a/agent/testflinger_agent/job.py b/agent/testflinger_agent/job.py index c3a932aa..6b7d9855 100644 --- a/agent/testflinger_agent/job.py +++ b/agent/testflinger_agent/job.py @@ -117,8 +117,10 @@ def run_test_phase(self, phase, rundir): ): runner.run(f"echo '{line}'") try: + # Set exit_event to fail for this phase in case of an exception + exit_event = f"{phase}_fail" exitcode, exit_event, exit_reason = runner.run(cmd) - except Exception as e: + except Exception as exc: logger.exception(exc) exitcode = 100 exit_reason = str(exc) # noqa: F841 - ignore this until it's used diff --git a/agent/testflinger_agent/tests/test_job.py b/agent/testflinger_agent/tests/test_job.py index 480bc046..83fb8e67 100644 --- a/agent/testflinger_agent/tests/test_job.py +++ b/agent/testflinger_agent/tests/test_job.py @@ -4,7 +4,7 @@ import tempfile import requests_mock as rmock -from unittest.mock import patch, Mock +from unittest.mock import patch import testflinger_agent from testflinger_agent.client import TestflingerClient as _TestflingerClient @@ -161,11 +161,17 @@ def test_run_test_phase_with_run_exception( requests_mock.post(rmock.ANY, status_code=200) job = _TestflingerJob({}, client) job.phase = "setup" - mock_runner = Mock() - mock_runner.run.side_effect = Exception("failed") - with patch("testflinger_agent.job.CommandRunner", mock_runner): - exit_code = job.run_test_phase("setup", tmp_path) + # Don't raise the exception on the 3 banner lines + with patch( + "testflinger_agent.job.CommandRunner.run", + side_effect=[None, None, None, Exception("failed")], + ): + exit_code, exit_event, exit_reason = job.run_test_phase( + "setup", tmp_path + ) assert exit_code == 100 + assert exit_event == "setup_fail" + assert exit_reason == "failed" def test_set_truncate(self, client): """Test the _set_truncate method of TestflingerJob""" diff --git a/server/src/api/v1.py b/server/src/api/v1.py index 57eefd83..05e7ffef 100644 --- a/server/src/api/v1.py +++ b/server/src/api/v1.py @@ -18,7 +18,7 @@ """ import uuid -from datetime import datetime +from datetime import datetime, timezone import pkg_resources from apiflask import APIBlueprint, abort @@ -493,6 +493,30 @@ def agents_post(agent_name, json_data): return "OK" +@v1.post("/agents/provision_logs/") +@v1.input(schemas.ProvisionLogsIn, location="json") +def agents_provision_logs_post(agent_name, json_data): + """Post provision logs for the agent to the server""" + agent_record = {} + + # timestamp this agent record and provision log entry + timestamp = datetime.now(timezone.utc) + agent_record["updated_at"] = json_data["timestamp"] = timestamp + + update_operation = { + "$set": json_data, + "$push": { + "provision_log": {"$each": [json_data], "$slice": -100}, + }, + } + database.mongo.db.provision_logs.update_one( + {"name": agent_name}, + update_operation, + upsert=True, + ) + return "OK" + + @v1.post("/job//events") @v1.input(schemas.StatusUpdate, location="json") def agents_status_post(job_id, json_data): From fd6c29d0c8a8d4cf326a9387defd5b2e39d1e04a Mon Sep 17 00:00:00 2001 From: Varun Valada Date: Thu, 18 Jul 2024 09:08:24 -0500 Subject: [PATCH 5/5] Change resource url --- agent/testflinger_agent/agent.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agent/testflinger_agent/agent.py b/agent/testflinger_agent/agent.py index f273b239..57ba0512 100644 --- a/agent/testflinger_agent/agent.py +++ b/agent/testflinger_agent/agent.py @@ -220,7 +220,7 @@ def process_jobs(self): logger.info("Starting job %s", job.job_id) event_emitter.emit_event( TestEvent.JOB_START, - f"{self.client.server}/job/{job.job_id}/events", + f"{self.client.server}/jobs/{job.job_id}", ) rundir = os.path.join( self.client.config.get("execution_basedir"), job.job_id