Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support calling python scripts with arguments, by injecting them as args in scope #358

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions grizzly/listeners/influxdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,8 @@ def request(
else:
logger_method = self.logger.debug

message_to_log = f'{message_to_log} Timestamp: {datetime.now(timezone.utc).isoformat()}'

logger_method(message_to_log)
self._log_request(request_type, name, result, metrics, context, exception)
except Exception:
Expand All @@ -395,20 +397,20 @@ def _create_metrics(self, response_time: int, response_length: int) -> dict[str,
return metrics

def heartbeat_sent(self, client_id: str, timestamp: float) -> None:
return self._heartbeat(client_id=client_id, direction='sent', timestamp=timestamp)
return self._heartbeat(client_id=client_id, direction='sent', timestamp=timestamp, response_time=None)

def heartbeat_received(self, client_id: str, timestamp: float) -> None:
return self._heartbeat(client_id=client_id, direction='received', timestamp=timestamp)
def heartbeat_received(self, client_id: str, timestamp: float, response_time: float | None) -> None:
return self._heartbeat(client_id=client_id, direction='received', timestamp=timestamp, response_time=response_time)

def _heartbeat(self, client_id: str, direction: Literal['sent', 'received'], timestamp: float) -> None:
def _heartbeat(self, client_id: str, direction: Literal['sent', 'received'], timestamp: float, response_time: float | None) -> None:
_timestamp = datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()

tags: dict[str, str | None] = {
'client_id': client_id,
'direction': direction,
}

metrics: dict[str, int] = {'value': 1}
metrics: dict[str, int | float | None] = {'value': 1, 'response_time': response_time}

self._create_event(_timestamp, 'heartbeat', tags, metrics)

Expand Down
36 changes: 31 additions & 5 deletions grizzly/locust.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from collections import defaultdict
from contextlib import contextmanager, suppress
from dataclasses import dataclass
from datetime import datetime, timezone
from math import ceil, floor
from operator import attrgetter, itemgetter
from os import environ
Expand Down Expand Up @@ -104,8 +105,12 @@ def __init__(self, worker_nodes: list[WorkerNode], user_classes: list[type[Grizz
self._user_classes = sorted(user_classes, key=attrgetter('__name__'))
self._original_user_classes = self._user_classes.copy()

assert len(user_classes) > 0
assert len(set(self._user_classes)) == len(self._user_classes)
try:
assert len(user_classes) > 0
assert len(set(self._user_classes)) == len(self._user_classes)
except AssertionError:
logger.exception('sanity check of configuration failed')
raise

self._initial_users_on_workers = {
worker_node.id: {user_class.__name__: 0 for user_class in self._user_classes}
Expand Down Expand Up @@ -234,6 +239,7 @@ def remove_worker(self, worker_node: WorkerNode) -> None:
"""
self._worker_nodes = [w for w in self._worker_nodes if w.id != worker_node.id]
if len(self._worker_nodes) == 0:
logger.warning('worker %s was the last worker', worker_node.id)
return
self._prepare_rebalance()

Expand Down Expand Up @@ -275,7 +281,11 @@ def new_dispatch(
:param user_classes: The user classes to be used for the new dispatch
"""
# this dispatcher does not care about target_user_count
assert target_user_count == -1
try:
assert target_user_count == -1
except AssertionError:
logger.exception('invalid value for `target_user_count`')
raise

grizzly_user_classes = cast(Optional[list[type[GrizzlyUser]]], user_classes)

Expand All @@ -298,6 +308,8 @@ def new_dispatch(
else:
self.target_user_count = {user_class.__name__: user_class.fixed_count for user_class in self._user_classes}

logger.debug('creating new dispatcher: %r', self.target_user_count)

self._spawn_rate = spawn_rate

self._user_count_per_dispatch_iteration = max(1, floor(self._spawn_rate))
Expand Down Expand Up @@ -576,7 +588,8 @@ def _grizzly_distribute_users(
if target_user_count == {}:
target_user_count = {**self._grizzly_target_user_count}

self._spread_sticky_tags_on_workers()
# _grizzly_distribute_users is only called from _prepare_rebalance, which already has called _spread_sticky_tags_on_workers
# self._spread_sticky_tags_on_workers() # noqa: ERA001

user_gen = self._create_user_generator()

Expand Down Expand Up @@ -604,7 +617,12 @@ def _grizzly_distribute_users(

sticky_tag = self._users_to_sticky_tag[next_user_class_name]
worker_node = next(self._sticky_tag_to_workers[sticky_tag])
users_on_workers[worker_node.id][next_user_class_name] += 1
try:
users_on_workers[worker_node.id][next_user_class_name] += 1
except KeyError:
logger.error('worker %s is not available for tag %s', worker_node.id, sticky_tag) # noqa: TRY400
continue

user_count_total += 1
current_user_count[next_user_class_name] += 1
active_users.append((worker_node, next_user_class_name))
Expand Down Expand Up @@ -904,6 +922,12 @@ def run(context: Context) -> int: # noqa: C901, PLR0915, PLR0912
# And locust log level is
setup_logging(log_level, None)

# @TODO: remove this after troubleshooting <!--
for logger_name in ['locust.runners', 'grizzly.locust', 'grizzly_extras.async_message.utils', 'grizzly.scenarios']:
logger = logging.getLogger(logger_name)
logger.setLevel(logging.DEBUG)
# -->

# make sure the user hasn't screwed up
is_both_master_and_worker = on_master(context) and on_worker(context)
is_spawn_rate_not_set = grizzly.setup.spawn_rate is None
Expand Down Expand Up @@ -1043,6 +1067,7 @@ def watch_running_external_processes() -> None:
logger.error('%s is not running, stop', dependency)
del processes[dependency]

logger.debug('waiting 10 seconds for next external process check')
gevent.sleep(10.0)

logger.info('stop watching external processes')
Expand Down Expand Up @@ -1308,6 +1333,7 @@ def grizzly_print_stats(stats: lstats.RequestStats, *, current: bool = True, gri
("%-" + str(lstats.STATS_TYPE_WIDTH) + "s %-" + str(name_column_width) + "s %7s %12s |%7s %7s %7s%7s | %7s %11s")
% ("Type", "Name", "# reqs", "# fails", "Avg", "Min", "Max", "Med", "req/s", "failures/s")
)
stats_logger.info(datetime.now(timezone.utc).isoformat())
stats_logger.info(row)
separator = f'{"-" * lstats.STATS_TYPE_WIDTH}|{"-" * (name_column_width)}|{"-" * 7}|{"-" * 13}|{"-" * 7}|{"-" * 7}|{"-" * 7}|{"-" * 7}|{"-" * 8}|{"-" * 11}'
stats_logger.info(separator)
Expand Down
11 changes: 11 additions & 0 deletions grizzly/scenarios/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from __future__ import annotations

import logging
from math import floor
from typing import TYPE_CHECKING, Any, Callable, Optional, Union, cast

from gevent.event import Event
Expand All @@ -23,6 +24,9 @@
from grizzly.users import GrizzlyUser


debug_logger = logging.getLogger('grizzly.scenarios')


class GrizzlyScenario(SequentialTaskSet):
consumer: TestdataConsumer
logger: logging.Logger
Expand Down Expand Up @@ -52,6 +56,10 @@ def __init__(self, parent: GrizzlyUser) -> None:
def user(self) -> GrizzlyUser:
return cast('GrizzlyUser', self._user)

@property
def current_iteration(self) -> int:
return floor(self._task_index / len(self.tasks)) + 1

@classmethod
def populate(cls, task_factory: GrizzlyTask) -> None:
cls.tasks.append(task_factory())
Expand Down Expand Up @@ -93,6 +101,7 @@ def on_start(self) -> None:
self.prefetch()

def on_iteration(self) -> None:
debug_logger.debug('scenario %s calling on_iteration hook for iteration %d of %d', self.logger.name, self.current_iteration, self.user._scenario.iterations)
self.user.on_iteration()

for task in self.tasks:
Expand Down Expand Up @@ -136,6 +145,8 @@ def get_next_task(self) -> Union[TaskSet, Callable]:
raise LocustError(message)

task = self.tasks[self._task_index % len(self.tasks)]
if self._task_index % len(self.tasks) >= len(self.tasks) - 1:
debug_logger.debug('scenario %s starting from first task', self.logger.name)
self._task_index += 1

return task
Expand Down
12 changes: 9 additions & 3 deletions grizzly/scenarios/iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from grizzly.types import RequestType, ScenarioState
from grizzly.types.locust import StopUser

from . import GrizzlyScenario
from . import GrizzlyScenario, debug_logger

if TYPE_CHECKING: # pragma: no cover
from locust.stats import StatsEntry
Expand Down Expand Up @@ -293,9 +293,12 @@ def iterator(self, *, prefetch: Optional[bool] = False) -> None:

# scenario timer
self.start = perf_counter()
debug_logger.debug('scenario %s iteration %d of %d starting', self.logger.name, self.current_iteration, self.user._scenario.iterations)

remote_context = self.consumer.testdata()

debug_logger.debug('scenario %s iteration %d of %d consumed testdata', self.logger.name, self.current_iteration, self.user._scenario.iterations)

if remote_context is None:
self.logger.debug('no iteration data available, stop scenario')
raise StopScenario
Expand All @@ -322,8 +325,10 @@ def iterator(self, *, prefetch: Optional[bool] = False) -> None:
@task
def pace(self) -> None:
"""Last task in this scenario, if self.pace_time is set.
This is ensured by `grizzly.scenarios.GrizzlyScenario.populate`.
This is ensured by `grizzly.scenarios.iterator.IteratorScenario.populate`.
"""
debug_logger.debug('scenario %s iteration %d of %d done', self.logger.name, self.current_iteration, self.user._scenario.iterations)

if self.pace_time is None:
return

Expand All @@ -344,9 +349,10 @@ def pace(self) -> None:
if (pace_correction * 1000) < value:
self.logger.debug('keeping pace by sleeping %d milliseconds', pace_correction * 1000)
gsleep((value / 1000) - pace_correction)
debug_logger.debug('scenario %s slept %d milliseconds to keep pace', self.logger.name, pace_correction * 1000)
response_length = 1
else:
self.logger.error('pace falling behind, currently at %d milliseconds expecting %f milliseconds', abs(pace_correction * 1000), value)
self.logger.error('pace falling behind, currently at %d milliseconds expecting %.2f milliseconds', abs(pace_correction * 1000), value)
message = f'pace falling behind, iteration takes longer than {value} milliseconds'
raise RuntimeError(message)
except Exception as e:
Expand Down
48 changes: 43 additions & 5 deletions grizzly/steps/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from os import environ
from pathlib import Path
from shlex import split as shlex_split
from typing import cast

from grizzly.context import GrizzlyContext
Expand Down Expand Up @@ -149,15 +150,52 @@ def step_setup_variable_value(context: Context, name: str, value: str) -> None:
raise


def _execute_python_script(context: Context, source: str) -> None:
def _execute_python_script(context: Context, source: str, args: str | None) -> None:
if on_worker(context):
return

scope = globals()
scope.update({'context': context})
scope_args: list[str] | None = None
if args is not None:
scope_args = shlex_split(args)

scope = {**globals()}
scope.update({'context': context, 'args': scope_args})

exec(source, scope, scope) # noqa: S102

@then('execute python script "{script_path}" with arguments "{arguments}"')
def step_setup_execute_python_script_with_args(context: Context, script_path: str, arguments: str) -> None:
"""Execute python script located in specified path, providing the specified arguments.

The script will not execute on workers, only on master (distributed mode) or local (local mode), and
it will only execute once before the test starts. Available in the scope is the current `context` object
and also `args` (list), which is `shlex.split` of specified `arguments`.

This can be useful for generating test data files.

Example:
```gherkin
Then execute python script "../bin/generate-testdata.py"
```

"""
grizzly = cast(GrizzlyContext, context.grizzly)

script_file = Path(script_path)
if not script_file.exists():
feature = cast(Feature, context.feature)
base_path = Path(feature.filename).parent if feature.filename not in [None, '<string>'] else Path.cwd()
script_file = (base_path / script_path).resolve()

assert script_file.exists(), f'script {script_path} does not exist'

if has_template(arguments):
grizzly.scenario.orphan_templates.append(arguments)

arguments = cast(str, resolve_variable(grizzly.scenario, arguments, guess_datatype=False, try_file=False))

_execute_python_script(context, script_file.read_text(), arguments)

@then('execute python script "{script_path}"')
def step_setup_execute_python_script(context: Context, script_path: str) -> None:
"""Execute python script located in specified path.
Expand All @@ -181,7 +219,7 @@ def step_setup_execute_python_script(context: Context, script_path: str) -> None

assert script_file.exists(), f'script {script_path} does not exist'

_execute_python_script(context, script_file.read_text())
_execute_python_script(context, script_file.read_text(), None)

@then('execute python script')
def step_setup_execute_python_script_inline(context: Context) -> None:
Expand All @@ -201,7 +239,7 @@ def step_setup_execute_python_script_inline(context: Context) -> None:
```

"""
_execute_python_script(context, context.text)
_execute_python_script(context, context.text, None)


@given('set context variable "{variable}" to "{value}"')
Expand Down
2 changes: 1 addition & 1 deletion grizzly/tasks/log_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,6 @@ def __call__(self) -> grizzlytask:
@grizzlytask
def task(parent: GrizzlyScenario) -> Any:
message = parent.user.render(self.message)
parent.logger.info(message)
parent.user.logger.info(message)

return task
7 changes: 5 additions & 2 deletions grizzly/testdata/communication.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pathlib import Path
from time import perf_counter
from typing import TYPE_CHECKING, Any, ClassVar, Optional, cast
from uuid import uuid4

from gevent import sleep as gsleep
from gevent.event import AsyncResult
Expand Down Expand Up @@ -244,12 +245,13 @@ def _keystore_request(self, *, request: dict[str, Any]) -> dict[str, Any] | None
def _request(self, request: dict[str, str]) -> dict[str, Any] | None:
with self.semaphore:
uid = id(self.scenario.user)
request_id = str(uuid4())

if uid in self._responses:
self.logger.warning('greenlet %d is already waiting for testdata', uid)

self._responses.update({uid: AsyncResult()})
self.runner.send_message('produce_testdata', {'uid': uid, 'cid': self.runner.client_id, 'request': request})
self.runner.send_message('produce_testdata', {'uid': uid, 'cid': self.runner.client_id, 'id': request_id, 'request': request})

# waits for async result
try:
Expand Down Expand Up @@ -504,6 +506,7 @@ def handle_request(self, environment: Environment, msg: Message, **_kwargs: Any)
uid = msg.data['uid']
cid = msg.data['cid']
request = msg.data['request']
request_id = msg.data['id']

if request['message'] == 'keystore':
response = self._handle_request_keystore(request=request)
Expand All @@ -513,4 +516,4 @@ def handle_request(self, environment: Environment, msg: Message, **_kwargs: Any)
self.logger.error('received unknown message "%s"', request['message'])
response = {}

self.runner.send_message('consume_testdata', {'uid': uid, 'response': response}, client_id=cid)
self.runner.send_message('consume_testdata', {'uid': uid, 'id': request_id, 'response': response}, client_id=cid)
Loading
Loading