From fea197c7ea5554c0d43b19150834e85e4216d8a2 Mon Sep 17 00:00:00 2001 From: Kyle Benne Date: Thu, 12 Sep 2024 10:00:07 -0500 Subject: [PATCH 1/3] Improve logging and diagnostic information * Within worker, add logs when a message is received and immediatly after a response has been sent. * Include HOSTNAME in the test metadata that is stored in Redis. For a K8s deployment, HOSTNAME will correspond to the name of the pod running the test. This will make it possible to retreive worker logs for a misbehaving test. * When a test is complete, the worker and the associated logs, may no longer exist, however the logs will still be available in the log file contained within the test payload that is pushed to long term storage. * These changes pertain to worker, however there is an existing log message within the web implementation that logs when a message is sent to the worker, but no response is received. Additionally, each message between web and worker is given a unique ID, therefore with all of this togethor there will be breadcrumbs if a message is dropped. --- service/docs/redis.md | 19 ++++++++++--------- service/worker/jobs/boptest_run_test/job.py | 21 +++++++++++++++++++-- service/worker/logger.py | 12 +++++++----- 3 files changed, 36 insertions(+), 16 deletions(-) diff --git a/service/docs/redis.md b/service/docs/redis.md index 813704ab3..b8aa1317c 100644 --- a/service/docs/redis.md +++ b/service/docs/redis.md @@ -3,23 +3,24 @@ BOPTEST-Service currently uses Redis for two purposes. 1. Storing test metadata. -2. A pub/sub message bus to communicate with workers. +2. A pub/sub message bus to communicate with workers. ## Test Metadata: A hash under the key `tests:${testid}` is used to store test metadata. -| Key | Field | Value -| ---------------------------------------- | ----------- | ----------------------------- -| `tests:${testid}` | `status` | `Queued \| Running` -| `tests:${testid}` | `timestamp` | Epoch seconds when test is queued -| `tests:${testid}` | `user` | The OAuth user "sub", or undefined for anonymous tests +| Key | Field | Value | +| ----------------- | ----------- | ---------------------------------------------------------------------------------------- | +| `tests:${testid}` | `status` | `Queued \| Running` | +| `tests:${testid}` | `timestamp` | Epoch seconds when test is queued. | +| `tests:${testid}` | `user` | The OAuth user "sub", or undefined for anonymous tests. | +| `tests:${testid}` | `host` | The value of HOSTNAME for the worker running the test. On K8s this will be the pod name. | Tests that are started by an authenticated user are stored in a set associated with the user. -| Key | Value -| ---------------------------------------- | ------------------------------------------- -| `users:${userSub}:tests` | `[testid1, testid2, ...]` +| Key | Value | +| ------------------------ | ------------------------- | +| `users:${userSub}:tests` | `[testid1, testid2, ...]` | ## Pub/Sub Messages diff --git a/service/worker/jobs/boptest_run_test/job.py b/service/worker/jobs/boptest_run_test/job.py index 44315738c..91350f20d 100644 --- a/service/worker/jobs/boptest_run_test/job.py +++ b/service/worker/jobs/boptest_run_test/job.py @@ -7,6 +7,7 @@ import redis import numpy as np import msgpack +import logging from boptest.lib.testcase import TestCase class Job: @@ -19,6 +20,13 @@ def __init__(self, parameters): self.abort = False self.last_message_time = datetime.now() + log_level = os.environ.get("BOPTEST_LOGLEVEL", "INFO") + logging.getLogger().setLevel(log_level) + logging.getLogger('botocore').setLevel(log_level) + logging.getLogger('s3transfer').setLevel(log_level) + logging.getLogger('urllib3').setLevel(log_level) + self.logger = logging.getLogger('worker') + self.redis = redis.Redis(host=os.environ["BOPTEST_REDIS_HOST"]) self.redis_pubsub = self.redis.pubsub() if self.redis.hexists(self.testKey, "user"): @@ -91,7 +99,7 @@ def run(self): def check_idle_time(self): idle_time = datetime.now() - self.last_message_time if idle_time.total_seconds() > self.timeout: - print("Testid '%s' is terminating due to inactivity." % self.testid) + self.logger.info("Testid '%s' is terminating due to inactivity." % self.testid) self.keep_running = False # Begin methods for message passing between web and worker ### @@ -149,10 +157,14 @@ def process_messages(self): method = message_data.get("method") params = message_data.get("params") + self.logger.info("Request ID: '%s', with method '%s', was received" % (request_id, method)) + callback_result = self.call_message_handler(method, params) packed_result = self.pack({"requestID": request_id, "payload": callback_result}) self.redis.publish(response_channel, packed_result) + self.logger.info("Response for, '%s', was sent" % request_id) + self.last_message_time = datetime.now() except Job.InvalidRequestError as e: payload = {"status": 400, "message": "Bad Request", "payload": str(e)} @@ -161,7 +173,7 @@ def process_messages(self): except Exception: # Generic exceptions are an internal error error_message = str(sys.exc_info()[1]) - print(error_message) + self.logger.error(error_message) # End methods for message passing @@ -258,6 +270,8 @@ def reset(self, tarinfo): # cleanup after the simulation is stopped def cleanup(self): + self.logger.info("Test '%s' is complete" % self.testid) + self.redis.delete(self.testKey) if self.userTestsKey: self.redis.srem(self.userTestsKey, self.testid) @@ -294,3 +308,6 @@ def keys_to_camel_case(self, a_dict): def init_sim_status(self): self.redis.hset(self.testKey, "status", "Running") + host = os.getenv("HOSTNAME") + if host: + self.redis.hset(self.testKey, "host", host) diff --git a/service/worker/logger.py b/service/worker/logger.py index c2e310918..0c1d20adc 100644 --- a/service/worker/logger.py +++ b/service/worker/logger.py @@ -11,10 +11,12 @@ class Logger: """A logger specific for the tasks of the Worker""" def __init__(self): - logging.basicConfig(level=os.environ.get("BOPTEST_LOGLEVEL", "INFO")) self.logger = logging.getLogger('worker') - self.formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + self.logger.setLevel(os.environ.get("BOPTEST_LOGLEVEL", "INFO")) + fmt = '%(asctime)s UTC\t%(name)-20s%(levelname)s\t%(message)s' + datefmt = '%m/%d/%Y %I:%M:%S %p' + formatter = logging.Formatter(fmt,datefmt) - self.fh = logging.FileHandler('worker.log') - self.fh.setFormatter(self.formatter) - self.logger.addHandler(self.fh) + stream_handler = logging.StreamHandler() + stream_handler.setFormatter(formatter) + self.logger.addHandler(stream_handler) From c6ea785da598a380132be1128d2686ed8899c00f Mon Sep 17 00:00:00 2001 From: Kyle Benne Date: Mon, 16 Sep 2024 13:11:32 -0500 Subject: [PATCH 2/3] Adjust message timeout between web and worker * The default message timeout is now 20 minutes, and the value is configurable using the BOPTEST_MESSAGE_TIMEOUT environment variable. --- docker-compose.yml | 1 + service/test/test_testcase_api.py | 3 +++ service/web/server/src/lib/messaging.js | 12 ++++++++++-- 3 files changed, 14 insertions(+), 2 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 9b39f12c7..a4d12e348 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -31,6 +31,7 @@ services: - BOPTEST_REDIS_HOST - BOPTEST_REGION - BOPTEST_TIMEOUT + - BOPTEST_MESSAGE_TIMEOUT - BOPTEST_DASHBOARD_SERVER - AWS_ACCESS_KEY_ID - AWS_SECRET_ACCESS_KEY diff --git a/service/test/test_testcase_api.py b/service/test/test_testcase_api.py index 561aa5efe..00855d044 100644 --- a/service/test/test_testcase_api.py +++ b/service/test/test_testcase_api.py @@ -56,6 +56,9 @@ def test_ibpsa_boptest_testcase(): response = requests.get(f"{host}/name/{testid}") check.equal(response.status_code, 200) + response = requests.post(f"{host}/advance/{testid}") + check.equal(response.status_code, 200) + # Stop the test response = requests.put(f"{host}/stop/{testid}") check.equal(response.status_code, 200) diff --git a/service/web/server/src/lib/messaging.js b/service/web/server/src/lib/messaging.js index 63b1eed39..fa318350e 100644 --- a/service/web/server/src/lib/messaging.js +++ b/service/web/server/src/lib/messaging.js @@ -5,7 +5,11 @@ import { pack, unpack } from 'msgpackr' class Messaging { constructor() { this.subTimeoutTime = 600000 - this.responseTimeoutTime = 480000 + this.responseTimeoutTime = Number( + process.env.BOPTEST_MESSAGE_TIMEOUT + ? process.env.BOPTEST_MESSAGE_TIMEOUT + : "1200000", // Default 20 minute message timeout + ); this.subscriptionTimers = {} this.messageHandlers = {} @@ -39,7 +43,11 @@ class Messaging { this.subscribe(responseChannel) this.sendWorkerMessage(requestChannel, requestID, method, params) responseTimeout = setTimeout(() => { - reject(new Error(`Timeout while sending command '${method}' to testid '${workerID}'`)) + reject( + new Error( + `Timeout for request: '${requestID}', with method:'${method}', sent to testid: '${workerID}'`, + ), + ); }, this.responseTimeoutTime) }) } From 38bd2b6f548624c2a6685c477e7b6e0d84b28ddf Mon Sep 17 00:00:00 2001 From: Kyle Benne Date: Tue, 17 Sep 2024 15:58:16 -0500 Subject: [PATCH 3/3] Further adjust messaging timeout The message subscription timeout is now configured as BOPTEST_MESSAGE_TIMEOUT + 60 seconds --- service/web/server/src/lib/messaging.js | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/service/web/server/src/lib/messaging.js b/service/web/server/src/lib/messaging.js index fa318350e..0857b0e71 100644 --- a/service/web/server/src/lib/messaging.js +++ b/service/web/server/src/lib/messaging.js @@ -4,13 +4,18 @@ import { pack, unpack } from 'msgpackr' class Messaging { constructor() { - this.subTimeoutTime = 600000 - this.responseTimeoutTime = Number( + const timeout = Number( process.env.BOPTEST_MESSAGE_TIMEOUT ? process.env.BOPTEST_MESSAGE_TIMEOUT : "1200000", // Default 20 minute message timeout ); + // This is how long we wait for a response from the worker + this.responseTimeoutTime = timeout + // This is how long we keep the subscription for a particular test open + // Subscriptions timeout 1 minutes after the message timeout + // New API requests to an existing subscription, will refresh the subscription's timeout + this.subTimeoutTime = timeout + 60000 this.subscriptionTimers = {} this.messageHandlers = {}