Skip to content

Commit

Permalink
troubleshooting improvements
Browse files Browse the repository at this point in the history
locust.py:
- do not call _spread_sticky_tags_on_workers in _grizzly_distribute_users, it has already been done in _prepare_rebalance
- handle KeyError, which can happen if a worker is missing

influxdb.py:
- include response_time in metrics for heartbeat_received (locust feature needed, which is on the used locust branch)

async_message/__init__.py:
- include request_id in logging

async_message/utils.py:
- start timer outside of while-loop to get correct time spent in polling for response

async_message/mq/__init__.py:
- include request_id in logging
  • Loading branch information
mgor committed Nov 15, 2024
1 parent 987efba commit f3f0211
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 15 deletions.
10 changes: 5 additions & 5 deletions grizzly/listeners/influxdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,20 +397,20 @@ def _create_metrics(self, response_time: int, response_length: int) -> dict[str,
return metrics

def heartbeat_sent(self, client_id: str, timestamp: float) -> None:
return self._heartbeat(client_id=client_id, direction='sent', timestamp=timestamp)
return self._heartbeat(client_id=client_id, direction='sent', timestamp=timestamp, response_time=None)

def heartbeat_received(self, client_id: str, timestamp: float) -> None:
return self._heartbeat(client_id=client_id, direction='received', timestamp=timestamp)
def heartbeat_received(self, client_id: str, timestamp: float, response_time: float | None) -> None:
return self._heartbeat(client_id=client_id, direction='received', timestamp=timestamp, response_time=response_time)

def _heartbeat(self, client_id: str, direction: Literal['sent', 'received'], timestamp: float) -> None:
def _heartbeat(self, client_id: str, direction: Literal['sent', 'received'], timestamp: float, response_time: float | None) -> None:
_timestamp = datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()

tags: dict[str, str | None] = {
'client_id': client_id,
'direction': direction,
}

metrics: dict[str, int] = {'value': 1}
metrics: dict[str, int | float | None] = {'value': 1, 'response_time': response_time}

self._create_event(_timestamp, 'heartbeat', tags, metrics)

Expand Down
12 changes: 9 additions & 3 deletions grizzly/locust.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ def remove_worker(self, worker_node: WorkerNode) -> None:
"""
self._worker_nodes = [w for w in self._worker_nodes if w.id != worker_node.id]
if len(self._worker_nodes) == 0:
logger.warning('cannot remove worker %s, since there are no workers registrered', worker_node.id)
logger.warning('worker %s was the last worker', worker_node.id)
return
self._prepare_rebalance()

Expand Down Expand Up @@ -588,7 +588,8 @@ def _grizzly_distribute_users(
if target_user_count == {}:
target_user_count = {**self._grizzly_target_user_count}

self._spread_sticky_tags_on_workers()
# _grizzly_distribute_users is only called from _prepare_rebalance, which already has called _spread_sticky_tags_on_workers
# self._spread_sticky_tags_on_workers() # noqa: ERA001

user_gen = self._create_user_generator()

Expand Down Expand Up @@ -616,7 +617,12 @@ def _grizzly_distribute_users(

sticky_tag = self._users_to_sticky_tag[next_user_class_name]
worker_node = next(self._sticky_tag_to_workers[sticky_tag])
users_on_workers[worker_node.id][next_user_class_name] += 1
try:
users_on_workers[worker_node.id][next_user_class_name] += 1
except KeyError:
logger.error('worker %s is not available for tag %s', worker_node.id, sticky_tag) # noqa: TRY400
continue

user_count_total += 1
current_user_count[next_user_class_name] += 1
active_users.append((worker_node, next_user_class_name))
Expand Down
14 changes: 10 additions & 4 deletions grizzly_extras/async_message/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,12 @@ def configure_logging() -> None:
except ValueError:
pass

level = logging.getLevelName(environ.get('GRIZZLY_EXTRAS_LOGLEVEL', 'INFO'))
env_level = environ.get('GRIZZLY_EXTRAS_LOGLEVEL', 'INFO')
try:
level = logging.getLevelNamesMapping()[env_level]
except KeyError:
level = logging.getLevelNamesMapping()['INFO']
logging.warning('level %s specified in environment variable GRIZZLY_EXTRAS_LOGLEVEL is not a valid logging level, defaulting to INFO', env_level)

logging.basicConfig(
level=level,
Expand Down Expand Up @@ -136,14 +141,15 @@ def close(self) -> None:
def handle(self, request: AsyncMessageRequest) -> AsyncMessageResponse:
start_time = time()
action = request.get('action', None)
request_id = request.get('request_id', None)

try:
if action is None:
message = 'no action in request'
raise RuntimeError(message)

request_handler = self.get_handler(action)
self.logger.debug('handling %s, request=\n%s', action, jsondumps(request, indent=2, cls=JsonBytesEncoder))
self.logger.debug('handling %s, request_id=%s, request=\n%s', action, request_id, jsondumps(request, indent=2, cls=JsonBytesEncoder))

response: AsyncMessageResponse

Expand All @@ -160,7 +166,7 @@ def handle(self, request: AsyncMessageRequest) -> AsyncMessageResponse:
'success': False,
'message': f'{action or "UNKNOWN"}: {e.__class__.__name__}="{e!s}"',
}
self.logger.exception('%s: %s="%s"', action or 'UNKNOWN', e.__class__.__name__, str(e)) # noqa: TRY401
self.logger.exception('%s: request_id=%s, %s="%s"', action or 'UNKNOWN', request_id, e.__class__.__name__, str(e)) # noqa: TRY401
finally:
total_time = int((time() - start_time) * 1000)
response.update({
Expand All @@ -172,7 +178,7 @@ def handle(self, request: AsyncMessageRequest) -> AsyncMessageResponse:
response.update({'action': str(action)})

if not self._event.is_set():
self.logger.debug('handled %s, response=\n%s', action, jsondumps(response, indent=2, cls=JsonBytesEncoder))
self.logger.debug('handled %s for, request_id=%s, response=\n%s', action, request_id, jsondumps(response, indent=2, cls=JsonBytesEncoder))

return response

Expand Down
6 changes: 4 additions & 2 deletions grizzly_extras/async_message/mq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,8 @@ def _request(self, request: AsyncMessageRequest) -> AsyncMessageResponse: # noq
msg = 'no endpoint specified'
raise AsyncMessageError(msg)

request_id = request.get('request_id', None)

try:
arguments = parse_arguments(endpoint, separator=':')
unsupported_arguments = get_unsupported_arguments(['queue', 'expression', 'max_message_size'], arguments)
Expand Down Expand Up @@ -305,7 +307,7 @@ def _request(self, request: AsyncMessageRequest) -> AsyncMessageResponse: # noq
with self.queue_context(endpoint=queue_name) as queue:
do_retry: bool = False

self.logger.info('executing %s on %s', action, queue_name)
self.logger.info('executing %s on %s for request %s', action, queue_name, request_id)
start = time()

try:
Expand Down Expand Up @@ -387,7 +389,7 @@ def _request(self, request: AsyncMessageRequest) -> AsyncMessageResponse: # noq
sleep(retries * retries * 0.5)
else:
delta = (time() - start) * 1000
self.logger.info('%s on %s took %d ms, response_length=%d, retries=%d', action, queue_name, delta, response_length, retries)
self.logger.info('%s on %s for request %s took %d ms, response_length=%d, retries=%d', action, queue_name, request_id, delta, response_length, retries)
return {
'payload': payload,
'metadata': self._get_safe_message_descriptor(md),
Expand Down
2 changes: 1 addition & 1 deletion grizzly_extras/async_message/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ def async_message_request(client: ztypes.Socket, request: AsyncMessageRequest) -
response: Optional[AsyncMessageResponse] = None
count = 0

start = perf_counter()
while True:
start = perf_counter()
count += 1
try:
response = cast(Optional[AsyncMessageResponse], client.recv_json(flags=zmq.NOBLOCK))
Expand Down

0 comments on commit f3f0211

Please sign in to comment.