diff --git a/test/simplecyclic_test.py b/test/simplecyclic_test.py index 7116efc9d..c4c1a2340 100644 --- a/test/simplecyclic_test.py +++ b/test/simplecyclic_test.py @@ -5,8 +5,11 @@ """ import gc +import sys import time +import traceback import unittest +from threading import Thread from time import sleep from typing import List from unittest.mock import MagicMock @@ -87,6 +90,8 @@ def test_removing_bus_tasks(self): # Note calling task.stop will remove the task from the Bus's internal task management list task.stop() + self.join_threads([task.thread for task in tasks], 5.0) + assert len(bus._periodic_tasks) == 0 bus.shutdown() @@ -115,8 +120,7 @@ def test_managed_tasks(self): for task in tasks: task.stop() - for task in tasks: - assert task.thread.join(5.0) is None, "Task didn't stop before timeout" + self.join_threads([task.thread for task in tasks], 5.0) bus.shutdown() @@ -142,9 +146,7 @@ def test_stopping_perodic_tasks(self): # stop the other half using the bus api bus.stop_all_periodic_tasks(remove_tasks=False) - - for task in tasks: - assert task.thread.join(5.0) is None, "Task didn't stop before timeout" + self.join_threads([task.thread for task in tasks], 5.0) # Tasks stopped via `stop_all_periodic_tasks` with remove_tasks=False should # still be associated with the bus (e.g. for restarting) @@ -161,7 +163,7 @@ def test_restart_perodic_tasks(self): is_extended_id=False, arbitration_id=0x123, data=[0, 1, 2, 3, 4, 5, 6, 7] ) - def _read_all_messages(_bus: can.interfaces.virtual.VirtualBus) -> None: + def _read_all_messages(_bus: "can.interfaces.virtual.VirtualBus") -> None: sleep(safe_timeout) while not _bus.queue.empty(): _bus.recv(timeout=period) @@ -207,9 +209,8 @@ def _read_all_messages(_bus: can.interfaces.virtual.VirtualBus) -> None: # Stop all tasks and wait for the thread to exit bus.stop_all_periodic_tasks() - if isinstance(task, can.broadcastmanager.ThreadBasedCyclicSendTask): - # Avoids issues where the thread is still running when the bus is shutdown - task.thread.join(safe_timeout) + # Avoids issues where the thread is still running when the bus is shutdown + self.join_threads([task.thread], 5.0) @unittest.skipIf(IS_CI, "fails randomly when run on CI server") def test_thread_based_cyclic_send_task(self): @@ -288,6 +289,27 @@ def increment_first_byte(msg: can.Message) -> None: self.assertEqual(b"\x06\x00\x00\x00\x00\x00\x00\x00", bytes(msg_list[5].data)) self.assertEqual(b"\x07\x00\x00\x00\x00\x00\x00\x00", bytes(msg_list[6].data)) + @staticmethod + def join_threads(threads: List[Thread], timeout: float) -> None: + stuck_threads: List[Thread] = [] + t0 = time.perf_counter() + for thread in threads: + time_left = timeout - (time.perf_counter() - t0) + if time_left > 0.0: + thread.join(time_left) + if thread.is_alive(): + if platform.python_implementation() == "CPython": + # print thread frame to help with debugging + frame = sys._current_frames()[thread.ident] + traceback.print_stack(frame, file=sys.stderr) + stuck_threads.append(thread) + if stuck_threads: + err_message = ( + f"Threads did not stop within {timeout:.1f} seconds: " + f"[{', '.join([str(t) for t in stuck_threads])}]" + ) + raise RuntimeError(err_message) + if __name__ == "__main__": unittest.main()