Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue617 refactor service with history2 #689

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ services:
- BOPTEST_REDIS_HOST
- BOPTEST_REGION
- BOPTEST_TIMEOUT
- BOPTEST_MESSAGE_TIMEOUT
- BOPTEST_DASHBOARD_SERVER
- AWS_ACCESS_KEY_ID
- AWS_SECRET_ACCESS_KEY
Expand Down
19 changes: 10 additions & 9 deletions service/docs/redis.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,24 @@
BOPTEST-Service currently uses Redis for two purposes.

1. Storing test metadata.
2. A pub/sub message bus to communicate with workers.
2. A pub/sub message bus to communicate with workers.

## Test Metadata:

A hash under the key `tests:${testid}` is used to store test metadata.

| Key | Field | Value
| ---------------------------------------- | ----------- | -----------------------------
| `tests:${testid}` | `status` | `Queued \| Running`
| `tests:${testid}` | `timestamp` | Epoch seconds when test is queued
| `tests:${testid}` | `user` | The OAuth user "sub", or undefined for anonymous tests
| Key | Field | Value |
| ----------------- | ----------- | ---------------------------------------------------------------------------------------- |
| `tests:${testid}` | `status` | `Queued \| Running` |
| `tests:${testid}` | `timestamp` | Epoch seconds when test is queued. |
| `tests:${testid}` | `user` | The OAuth user "sub", or undefined for anonymous tests. |
| `tests:${testid}` | `host` | The value of HOSTNAME for the worker running the test. On K8s this will be the pod name. |

Tests that are started by an authenticated user are stored in a set associated with the user.

| Key | Value
| ---------------------------------------- | -------------------------------------------
| `users:${userSub}:tests` | `[testid1, testid2, ...]`
| Key | Value |
| ------------------------ | ------------------------- |
| `users:${userSub}:tests` | `[testid1, testid2, ...]` |

## Pub/Sub Messages

Expand Down
3 changes: 3 additions & 0 deletions service/test/test_testcase_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ def test_ibpsa_boptest_testcase():
response = requests.get(f"{host}/name/{testid}")
check.equal(response.status_code, 200)

response = requests.post(f"{host}/advance/{testid}")
check.equal(response.status_code, 200)

# Stop the test
response = requests.put(f"{host}/stop/{testid}")
check.equal(response.status_code, 200)
Expand Down
19 changes: 16 additions & 3 deletions service/web/server/src/lib/messaging.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,18 @@ import { pack, unpack } from 'msgpackr'

class Messaging {
constructor() {
this.subTimeoutTime = 600000
this.responseTimeoutTime = 480000
const timeout = Number(
process.env.BOPTEST_MESSAGE_TIMEOUT
? process.env.BOPTEST_MESSAGE_TIMEOUT
: "1200000", // Default 20 minute message timeout
);

// This is how long we wait for a response from the worker
this.responseTimeoutTime = timeout
// This is how long we keep the subscription for a particular test open
// Subscriptions timeout 1 minutes after the message timeout
// New API requests to an existing subscription, will refresh the subscription's timeout
this.subTimeoutTime = timeout + 60000
this.subscriptionTimers = {}
this.messageHandlers = {}

Expand Down Expand Up @@ -39,7 +48,11 @@ class Messaging {
this.subscribe(responseChannel)
this.sendWorkerMessage(requestChannel, requestID, method, params)
responseTimeout = setTimeout(() => {
reject(new Error(`Timeout while sending command '${method}' to testid '${workerID}'`))
reject(
new Error(
`Timeout for request: '${requestID}', with method:'${method}', sent to testid: '${workerID}'`,
),
);
}, this.responseTimeoutTime)
})
}
Expand Down
21 changes: 19 additions & 2 deletions service/worker/jobs/boptest_run_test/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import redis
import numpy as np
import msgpack
import logging
from boptest.lib.testcase import TestCase

class Job:
Expand All @@ -19,6 +20,13 @@ def __init__(self, parameters):
self.abort = False
self.last_message_time = datetime.now()

log_level = os.environ.get("BOPTEST_LOGLEVEL", "INFO")
logging.getLogger().setLevel(log_level)
logging.getLogger('botocore').setLevel(log_level)
logging.getLogger('s3transfer').setLevel(log_level)
logging.getLogger('urllib3').setLevel(log_level)
self.logger = logging.getLogger('worker')

self.redis = redis.Redis(host=os.environ["BOPTEST_REDIS_HOST"])
self.redis_pubsub = self.redis.pubsub()
if self.redis.hexists(self.testKey, "user"):
Expand Down Expand Up @@ -91,7 +99,7 @@ def run(self):
def check_idle_time(self):
idle_time = datetime.now() - self.last_message_time
if idle_time.total_seconds() > self.timeout:
print("Testid '%s' is terminating due to inactivity." % self.testid)
self.logger.info("Testid '%s' is terminating due to inactivity." % self.testid)
self.keep_running = False

# Begin methods for message passing between web and worker ###
Expand Down Expand Up @@ -149,10 +157,14 @@ def process_messages(self):
method = message_data.get("method")
params = message_data.get("params")

self.logger.info("Request ID: '%s', with method '%s', was received" % (request_id, method))

callback_result = self.call_message_handler(method, params)
packed_result = self.pack({"requestID": request_id, "payload": callback_result})
self.redis.publish(response_channel, packed_result)

self.logger.info("Response for, '%s', was sent" % request_id)

self.last_message_time = datetime.now()
except Job.InvalidRequestError as e:
payload = {"status": 400, "message": "Bad Request", "payload": str(e)}
Expand All @@ -161,7 +173,7 @@ def process_messages(self):
except Exception:
# Generic exceptions are an internal error
error_message = str(sys.exc_info()[1])
print(error_message)
self.logger.error(error_message)

# End methods for message passing

Expand Down Expand Up @@ -258,6 +270,8 @@ def reset(self, tarinfo):

# cleanup after the simulation is stopped
def cleanup(self):
self.logger.info("Test '%s' is complete" % self.testid)

self.redis.delete(self.testKey)
if self.userTestsKey:
self.redis.srem(self.userTestsKey, self.testid)
Expand Down Expand Up @@ -294,3 +308,6 @@ def keys_to_camel_case(self, a_dict):

def init_sim_status(self):
self.redis.hset(self.testKey, "status", "Running")
host = os.getenv("HOSTNAME")
if host:
self.redis.hset(self.testKey, "host", host)
12 changes: 7 additions & 5 deletions service/worker/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ class Logger:
"""A logger specific for the tasks of the Worker"""

def __init__(self):
logging.basicConfig(level=os.environ.get("BOPTEST_LOGLEVEL", "INFO"))
self.logger = logging.getLogger('worker')
self.formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
self.logger.setLevel(os.environ.get("BOPTEST_LOGLEVEL", "INFO"))
fmt = '%(asctime)s UTC\t%(name)-20s%(levelname)s\t%(message)s'
datefmt = '%m/%d/%Y %I:%M:%S %p'
formatter = logging.Formatter(fmt,datefmt)

self.fh = logging.FileHandler('worker.log')
self.fh.setFormatter(self.formatter)
self.logger.addHandler(self.fh)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
self.logger.addHandler(stream_handler)