Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better teardown logging #558

Merged
merged 7 commits into from
Jun 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/mantarray_desktop_app/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,9 @@ class SystemActionTransitionStates(Enum):
SERIAL_COMM_HANDSHAKE_PERIOD_SECONDS = 5
SERIAL_COMM_REGISTRATION_TIMEOUT_SECONDS = 8
# Tanner (3/22/22): The following values are probably much larger than they need to be, not sure best duration of time to use now that a command might be sent right before or during a FW reboot initiated automatically by a FW error
SERIAL_COMM_STATUS_BEACON_TIMEOUT_SECONDS = SERIAL_COMM_STATUS_BEACON_PERIOD_SECONDS * 2
SERIAL_COMM_HANDSHAKE_TIMEOUT_SECONDS = SERIAL_COMM_HANDSHAKE_PERIOD_SECONDS * 2
SERIAL_COMM_RESPONSE_TIMEOUT_SECONDS = 10
SERIAL_COMM_HANDSHAKE_TIMEOUT_SECONDS = 10
SERIAL_COMM_STATUS_BEACON_TIMEOUT_SECONDS = 10

# general packet components
SERIAL_COMM_MAGIC_WORD_BYTES = b"CURI BIO"
Expand Down
3 changes: 2 additions & 1 deletion src/mantarray_desktop_app/simulators/mc_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -815,8 +815,9 @@ def read_all(self) -> bytes:
"""Read all available bytes from the simulator."""
return self._read()

def write(self, input_item: bytes) -> None:
def write(self, input_item: bytes) -> int:
self._input_queue.put_nowait(input_item)
return len(input_item)

def _drain_all_queues(self) -> Dict[str, Any]:
queue_items = {
Expand Down
97 changes: 81 additions & 16 deletions src/mantarray_desktop_app/sub_processes/mc_comm.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ def __init__(self, *args: Any, hardware_test_mode: bool = False, **kwargs: Any):
self._auto_get_metadata = False
self._time_of_last_handshake_secs: Optional[float] = None
self._time_of_last_beacon_secs: Optional[float] = None
self._handshake_sent_after_beacon_missed = False
self._commands_awaiting_response = CommandTracker()
self._hardware_test_mode = hardware_test_mode
# reboot values
Expand Down Expand Up @@ -325,7 +326,9 @@ def _teardown_after_loop(self) -> None:
# log any data in cache, flush and log remaining serial data
put_log_message_into_queue(
logging.INFO,
f"Remaining serial data in cache: {list(self._serial_packet_cache)}, in buffer: {list(board.read_all())}",
f"Duration (seconds) since events: {self._get_dur_since_events()}. "
f"Remaining serial data in cache: {list(self._serial_packet_cache)}, "
f"in buffer: {list(board.read_all())}",
self._board_queues[board_idx][1],
self.get_logging_level(),
)
Expand All @@ -338,10 +341,22 @@ def _teardown_after_loop(self) -> None:
unset_keepawake()

if self._error and not isinstance(self._error, InstrumentFirmwareError):
self._send_data_packet(board_idx, SERIAL_COMM_REBOOT_PACKET_TYPE)
self._send_data_packet(board_idx, SERIAL_COMM_REBOOT_PACKET_TYPE, track_command=False)

super()._teardown_after_loop()

def _get_dur_since_events(self) -> Dict[str, float | str]:
event_timepoints = {
"status_beacon_received": self._time_of_last_beacon_secs,
"handshake_sent": self._time_of_last_handshake_secs,
**self._timepoints_of_prev_actions,
}
current_timepoint = perf_counter()
return {
event_name: ("No occurrence" if event_timepoint is None else current_timepoint - event_timepoint)
for event_name, event_timepoint in event_timepoints.items()
}

def _report_fatal_error(self, err: Exception, formatted_stack_trace: Optional[str] = None) -> None:
self._error = (
err if not isinstance(err, SerialCommCommandProcessingError) else err.__cause__ # type: ignore
Expand Down Expand Up @@ -378,7 +393,15 @@ def _reset_performance_tracking_values(self) -> None:

def _reset_timepoints_of_prev_actions(self) -> None:
self._timepoints_of_prev_actions = {
key: None for key in ("data_read", "packet_sort", "mag_data_parse")
key: None
for key in (
"data_read",
"packet_sort",
"mag_data_parse",
"stim_data_parse",
"command_sent",
"command_response_received",
)
}

def is_registered_with_serial_comm(self, board_idx: int) -> bool:
Expand Down Expand Up @@ -429,16 +452,24 @@ def set_board_connection(self, board_idx: int, board: Union[MantarrayMcSimulator
self._simulator_error_queues[board_idx] = board.get_fatal_error_reporter()

def _send_data_packet(
self,
board_idx: int,
packet_type: int,
data_to_send: bytes = bytes(0),
self, board_idx: int, packet_type: int, data_to_send: bytes = bytes(0), track_command: bool = True
) -> None:
if track_command:
self._timepoints_of_prev_actions["command_sent"] = perf_counter()

data_packet = create_data_packet(get_serial_comm_timestamp(), packet_type, data_to_send)
board = self._board_connections[board_idx]
if board is None:
raise NotImplementedError("Board should not be None when sending a command to it")
board.write(data_packet)

write_len = board.write(data_packet)
if write_len == 0:
put_log_message_into_queue(
logging.INFO,
"Serial data write reporting no bytes written",
self._board_queues[board_idx][1],
self.get_logging_level(),
)

def _commands_for_each_run_iteration(self) -> None:
"""Ordered actions to perform each iteration.
Expand Down Expand Up @@ -741,7 +772,7 @@ def _has_initial_handshake_been_sent(self) -> bool:

def _send_handshake(self, board_idx: int) -> None:
self._time_of_last_handshake_secs = perf_counter()
self._send_data_packet(board_idx, SERIAL_COMM_HANDSHAKE_PACKET_TYPE)
self._send_data_packet(board_idx, SERIAL_COMM_HANDSHAKE_PACKET_TYPE, track_command=False)

def _process_comm_from_instrument(self, packet_type: int, packet_payload: bytes) -> None:
if packet_type == SERIAL_COMM_CHECKSUM_FAILURE_PACKET_TYPE:
Expand Down Expand Up @@ -921,6 +952,14 @@ def _process_comm_from_instrument(self, packet_type: int, packet_payload: bytes)
else:
raise UnrecognizedSerialCommPacketTypeError(f"Packet Type ID: {packet_type} is not defined")

if packet_type not in (
# beacons + handshakes tracked separately, going dormant packet will raise an error and does not need to be tracked
SERIAL_COMM_STATUS_BEACON_PACKET_TYPE,
SERIAL_COMM_HANDSHAKE_PACKET_TYPE,
SERIAL_COMM_GOING_DORMANT_PACKET_TYPE,
):
self._timepoints_of_prev_actions["command_response_received"] = perf_counter()

def _process_status_beacon(self, packet_payload: bytes) -> None:
board_idx = 0
status_codes_dict = convert_status_code_bytes_to_dict(
Expand Down Expand Up @@ -993,7 +1032,17 @@ def _handle_data_stream(self) -> None:
)
self._timepoints_of_prev_actions["data_read"] = perf_counter()
# read bytes from serial buffer
data_read_bytes = board.read_all()
try:
data_read_bytes = board.read_all()
except serial.SerialException as e:
put_log_message_into_queue(
logging.INFO,
f"Serial data read failed: {e}. Trying one more time",
self._board_queues[board_idx][1],
self.get_logging_level(),
)
data_read_bytes = board.read_all()

new_performance_tracking_values["data_read_duration"] = _get_dur_of_data_read_secs(
self._timepoints_of_prev_actions["data_read"] # type: ignore
)
Expand Down Expand Up @@ -1121,6 +1170,8 @@ def _handle_stim_packets(self, stim_stream_info: Dict[str, Union[bytes, int]]) -
if not stim_stream_info["num_packets"]:
return

self._timepoints_of_prev_actions["stim_data_parse"] = perf_counter()

protocol_statuses: Dict[int, Any] = parse_stim_data(*stim_stream_info.values())

well_statuses: Dict[int, Any] = {}
Expand Down Expand Up @@ -1170,15 +1221,28 @@ def _dump_stim_packet(self, well_statuses: NDArray[(2, Any), int]) -> None:
self._has_stim_packet_been_sent = True

def _handle_beacon_tracking(self) -> None:
if self._time_of_last_beacon_secs is None:
if (
self._time_of_last_beacon_secs is None
or self._is_waiting_for_reboot
or self._is_updating_firmware
or self._is_setting_nickname
):
return
secs_since_last_beacon_received = _get_secs_since_last_beacon(self._time_of_last_beacon_secs)
if (
secs_since_last_beacon_received >= SERIAL_COMM_STATUS_BEACON_TIMEOUT_SECONDS
and not self._is_waiting_for_reboot
and not self._is_updating_firmware
and not self._is_setting_nickname
secs_since_last_beacon_received >= SERIAL_COMM_STATUS_BEACON_PERIOD_SECONDS + 1
and not self._handshake_sent_after_beacon_missed
):
board_idx = 0
put_log_message_into_queue(
logging.INFO,
"Status Beacon overdue. Sending handshake now to prompt a response.",
self._board_queues[board_idx][1],
self.get_logging_level(),
)
self._send_handshake(board_idx)
self._handshake_sent_after_beacon_missed = True
elif secs_since_last_beacon_received >= SERIAL_COMM_STATUS_BEACON_TIMEOUT_SECONDS:
raise SerialCommStatusBeaconTimeoutError()

def _handle_command_tracking(self) -> None:
Expand Down Expand Up @@ -1212,6 +1276,7 @@ def _check_firmware_update_status(self) -> None:

def _handle_status_codes(self, status_codes_dict: Dict[str, int], comm_type: str) -> None:
self._time_of_last_beacon_secs = perf_counter()
self._handshake_sent_after_beacon_missed = False

board_idx = 0
if (
Expand All @@ -1229,7 +1294,7 @@ def _handle_status_codes(self, status_codes_dict: Dict[str, int], comm_type: str

status_codes_msg = f"{comm_type} received from instrument. Status Codes: {status_codes_dict}"
if any(status_codes_dict.values()):
self._send_data_packet(board_idx, SERIAL_COMM_ERROR_ACK_PACKET_TYPE)
self._send_data_packet(board_idx, SERIAL_COMM_ERROR_ACK_PACKET_TYPE, track_command=False)
raise InstrumentFirmwareError(status_codes_msg)
put_log_message_into_queue(
logging.DEBUG,
Expand Down
4 changes: 2 additions & 2 deletions tests/mc_comm/test_board_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,8 @@ def test_McCommunicationProcess__does_not_check_for_overdue_status_beacons_after
# run mc_process to sent reboot command and simulator to start reboot
invoke_process_run_and_check_errors(mc_process)
invoke_process_run_and_check_errors(simulator)
# run mc_process again to make sure status beacon time is checked but no error is raised
assert mocked_get_secs.call_count == 1
# run mc_process again to make sure status beacon time is not checked
assert mocked_get_secs.call_count == 0


def test_McCommunicationProcess__raises_error_if_reboot_takes_longer_than_maximum_reboot_period(
Expand Down
45 changes: 45 additions & 0 deletions tests/mc_comm/test_data_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from mantarray_desktop_app.sub_processes import mc_comm
import numpy as np
import pytest
import serial
from stdlib_utils import create_metrics_stats
from stdlib_utils import drain_queue
from stdlib_utils import invoke_process_run_and_check_errors
Expand Down Expand Up @@ -259,6 +260,50 @@ def test_McCommunicationProcess__reads_all_bytes_from_instrument__and_does_not_s
confirm_queue_is_eventually_empty(to_fw_queue)


def test_McCommunicationProcess__tries_to_read_one_more_time_if_first_read_fails(
four_board_mc_comm_process_no_handshake,
mantarray_mc_simulator_no_beacon,
mocker,
):
mc_process = four_board_mc_comm_process_no_handshake["mc_process"]
to_main_queue = four_board_mc_comm_process_no_handshake["board_queues"][0][1]
simulator = mantarray_mc_simulator_no_beacon["simulator"]

test_sampling_period_us = 25000 # arbitrary value
# mocking to ensure only one data packet is sent
mocker.patch.object(
mc_simulator,
"_get_us_since_last_data_packet",
autospec=True,
side_effect=[0, test_sampling_period_us],
)

set_connection_and_register_simulator(
four_board_mc_comm_process_no_handshake, mantarray_mc_simulator_no_beacon
)
set_sampling_period_and_start_streaming(
four_board_mc_comm_process_no_handshake, simulator, sampling_period=test_sampling_period_us
)

expected_error = serial.SerialException("test msg")

mocked_read_all = mocker.patch.object(
simulator, "read_all", autospec=True, side_effect=[expected_error, bytes()]
)

# send and read data
invoke_process_run_and_check_errors(simulator)
invoke_process_run_and_check_errors(mc_process)

assert mocked_read_all.call_count == 2

confirm_queue_is_eventually_of_size(to_main_queue, 1)
assert (
to_main_queue.get(timeout=QUEUE_CHECK_TIMEOUT_SECONDS)["message"]
== f"Serial data read failed: {expected_error}. Trying one more time"
)


def test_McCommunicationProcess__processes_non_stream_packet_immediately(
four_board_mc_comm_process_no_handshake,
mantarray_mc_simulator_no_beacon,
Expand Down
42 changes: 39 additions & 3 deletions tests/mc_comm/test_serial_comm.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
import copy
import logging
from random import randint

from mantarray_desktop_app import create_data_packet
Expand Down Expand Up @@ -31,6 +32,7 @@
from mantarray_desktop_app.exceptions import SerialCommCommandProcessingError
from mantarray_desktop_app.sub_processes import mc_comm
import pytest
from src.mantarray_desktop_app.constants import SERIAL_COMM_STATUS_BEACON_PERIOD_SECONDS
from stdlib_utils import invoke_process_run_and_check_errors

from ..fixtures import fixture_patch_print
Expand Down Expand Up @@ -212,6 +214,31 @@ def test_McCommunicationProcess__includes_correct_timestamp_in_packets_sent_to_i
spied_write.assert_called_with(expected_data_packet)


def test_McCommunicationProcess__logs_message_if_write_fails(
four_board_mc_comm_process, mantarray_mc_simulator_no_beacon, mocker
):
mc_process = four_board_mc_comm_process["mc_process"]
board_queues = four_board_mc_comm_process["board_queues"]
input_queue, output_queue = board_queues[0][:2]

simulator = mantarray_mc_simulator_no_beacon["simulator"]

set_connection_and_register_simulator(four_board_mc_comm_process, mantarray_mc_simulator_no_beacon)
test_command = {"communication_type": "metadata_comm", "command": "get_metadata"}
put_object_into_queue_and_raise_error_if_eventually_still_empty(copy.deepcopy(test_command), input_queue)

mocker.patch.object(simulator, "write", autospec=True, return_value=0)

# run mc_process one iteration to send the command
invoke_process_run_and_check_errors(mc_process)

confirm_queue_is_eventually_of_size(output_queue, 1)
assert (
output_queue.get(timeout=QUEUE_CHECK_TIMEOUT_SECONDS)["message"]
== "Serial data write reporting no bytes written"
)


def test_McCommunicationProcess__sends_handshake_every_5_seconds__and_includes_correct_timestamp__and_processes_response(
four_board_mc_comm_process,
mantarray_mc_simulator_no_beacon,
Expand Down Expand Up @@ -349,25 +376,34 @@ def test_McCommunicationProcess__raises_error_if_command_response_not_received_w
invoke_process_run_and_check_errors(mc_process)


def test_McCommunicationProcess__raises_error_if_status_beacon_not_received_in_allowed_period_of_time(
def test_McCommunicationProcess__handles_missed_beacons_correctly(
four_board_mc_comm_process_no_handshake,
mantarray_mc_simulator_no_beacon,
mocker,
patch_print,
):
mc_process = four_board_mc_comm_process_no_handshake["mc_process"]
to_main_queue = four_board_mc_comm_process_no_handshake["board_queues"][0][1]

set_connection_and_register_simulator(
four_board_mc_comm_process_no_handshake, mantarray_mc_simulator_no_beacon
)

# patch so next iteration of mc_process will hit beacon timeout
mocker.patch.object(
mc_comm,
"_get_secs_since_last_beacon",
autospec=True,
return_value=SERIAL_COMM_STATUS_BEACON_TIMEOUT_SECONDS,
side_effect=[SERIAL_COMM_STATUS_BEACON_PERIOD_SECONDS + 1, SERIAL_COMM_STATUS_BEACON_TIMEOUT_SECONDS],
)

invoke_process_run_and_check_errors(mc_process)
confirm_queue_is_eventually_of_size(to_main_queue, 1)
assert to_main_queue.get(timeout=QUEUE_CHECK_TIMEOUT_SECONDS) == {
"communication_type": "log",
"log_level": logging.INFO,
"message": "Status Beacon overdue. Sending handshake now to prompt a response.",
}

with pytest.raises(SerialCommStatusBeaconTimeoutError):
invoke_process_run_and_check_errors(mc_process)

Expand Down