diff --git a/servo/logging.py b/servo/logging.py index f4d87641..2063894f 100644 --- a/servo/logging.py +++ b/servo/logging.py @@ -162,6 +162,11 @@ async def sink(self, message: loguru.Message) -> None: ) ) + def clear_progress_queue(self) -> None: + while not self._queue.empty(): + self._queue.get_nowait() + self._queue.task_done() + async def shutdown(self) -> None: """Shutdown the progress handler by emptying the queue and releasing the queue processor.""" await self._queue.join() diff --git a/servo/runner.py b/servo/runner.py index f7982e04..0bb5e841 100644 --- a/servo/runner.py +++ b/servo/runner.py @@ -42,6 +42,7 @@ class ServoRunner(pydantic.BaseModel, servo.logging.Mixin): interactive: bool = False + _assembly_runner: AssemblyRunner = pydantic.PrivateAttr(None) _servo: servo.Servo = pydantic.PrivateAttr(None) _connected: bool = pydantic.PrivateAttr(False) _running: bool = pydantic.PrivateAttr(False) @@ -52,9 +53,12 @@ class ServoRunner(pydantic.BaseModel, servo.logging.Mixin): class Config: arbitrary_types_allowed = True - def __init__(self, servo_: servo, **kwargs) -> None: # noqa: D10 + def __init__( + self, servo_: servo, _assembly_runner: AssemblyRunner = None, **kwargs + ) -> None: # noqa: D10 super().__init__(**kwargs) self._servo = servo_ + self._assembly_runner = _assembly_runner # initialize default servo options if not configured if self.config.settings is None: @@ -155,6 +159,7 @@ async def exec_command(self) -> servo.api.Status: descriptor=description.__opsani_repr__(), command_uid=cmd_response.command_uid, ) + self.clear_progress_queue() return await self.servo.post_event(servo.api.Events.describe, status.dict()) elif cmd_response.command == servo.api.Commands.measure: @@ -177,6 +182,7 @@ async def exec_command(self) -> servo.api.Status: self.logger.error(f"Responding with {status.dict()}") self.logger.opt(exception=error).debug("Measure failure details") + self.clear_progress_queue() return await self.servo.post_event(servo.api.Events.measure, status.dict()) elif cmd_response.command == servo.api.Commands.adjust: @@ -208,6 +214,7 @@ async def exec_command(self) -> servo.api.Status: self.logger.error(f"Responding with {status.dict()}") self.logger.opt(exception=error).debug("Adjust failure details") + self.clear_progress_queue() return await self.servo.post_event(servo.api.Events.adjust, status.dict()) elif cmd_response.command == servo.api.Commands.sleep: @@ -404,6 +411,11 @@ async def shutdown(self, *, reason: Optional[str] = None) -> None: except Exception: self.logger.exception(f"Exception occurred during GOODBYE request") + def clear_progress_queue(self) -> None: + if self._assembly_runner and self._assembly_runner.progress_handler: + self.logger.debug("Clearing progress handler queue") + self._assembly_runner.progress_handler.clear_progress_queue() + class AssemblyRunner(pydantic.BaseModel, servo.logging.Mixin): assembly: servo.Assembly @@ -535,7 +547,9 @@ async def handle_progress_exception( try: for servo_ in self.assembly.servos: - servo_runner = ServoRunner(servo_, interactive=interactive) + servo_runner = ServoRunner( + servo_, interactive=interactive, _assembly_runner=self + ) loop.create_task(servo_runner.run(poll=poll)) self.runners.append(servo_runner)