diff --git a/pyrobosim/pyrobosim/core/robot.py b/pyrobosim/pyrobosim/core/robot.py index fc37c750..cb2c4a4b 100644 --- a/pyrobosim/pyrobosim/core/robot.py +++ b/pyrobosim/pyrobosim/core/robot.py @@ -328,7 +328,11 @@ def follow_path( ) # Follow the path. - result = self.path_executor.execute(path, realtime_factor) + exec_options = self.action_execution_options.get("navigate") + battery_usage = exec_options.battery_usage if exec_options else 0.0 + result = self.path_executor.execute( + path, realtime_factor=realtime_factor, battery_usage=battery_usage + ) # Check that the robot made it to its goal pose at the end of execution. at_goal_pose = self.get_pose().is_approx(path.poses[-1]) @@ -339,14 +343,6 @@ def follow_path( ) # Update the robot state. - exec_options = self.action_execution_options.get("navigate") - if exec_options: - self.battery_level = max( - 0.0, - self.battery_level - - exec_options.battery_usage - * self.path_executor.current_distance_traveled, - ) if self.world: if ( isinstance(self.location, ObjectSpawn) diff --git a/pyrobosim/pyrobosim/navigation/execution.py b/pyrobosim/pyrobosim/navigation/execution.py index 22322441..9ccb4d80 100644 --- a/pyrobosim/pyrobosim/navigation/execution.py +++ b/pyrobosim/pyrobosim/navigation/execution.py @@ -53,11 +53,10 @@ def __init__( # Execution state self.current_traj_time = 0.0 - self.current_distance_traveled = 0.0 self.abort_execution = False # Flag to abort internally self.cancel_execution = False # Flag to cancel from user - def execute(self, path, realtime_factor=1.0): + def execute(self, path, realtime_factor=1.0, battery_usage=0.0): """ Generates and executes a trajectory on the robot. @@ -66,6 +65,8 @@ def execute(self, path, realtime_factor=1.0): :param realtime_factor: A multiplier on the execution time relative to real time, defaults to 1.0. :type realtime_factor: float, optional + :param battery_usage: Robot battery usage per unit distance. + :type battery_usage: float, optional :return: An object describing the execution result. :rtype: :class:`pyrobosim.planning.actions.ExecutionResult` """ @@ -86,7 +87,6 @@ def execute(self, path, realtime_factor=1.0): self.robot.executing_nav = True self.current_traj_time = 0.0 - self.current_distance_traveled = 0.0 self.abort_execution = False # Convert the path to an interpolated trajectory. @@ -132,7 +132,17 @@ def execute(self, path, realtime_factor=1.0): status = ExecutionStatus.CANCELED break - self.current_distance_traveled += cur_pose.get_linear_distance(prev_pose) + # Simulate battery usage and exit if the battery is fully depleted. + self.robot.battery_level -= battery_usage * cur_pose.get_linear_distance( + prev_pose + ) + if self.robot.battery_level <= 0.0: + self.robot.battery_level = 0.0 + message = f"[{self.robot.name}] Battery depleted while navigating." + warnings.warn(message) + status = ExecutionStatus.EXECUTION_FAILURE + break + prev_pose = cur_pose time.sleep(max(0, sleep_time - (time.time() - start_time))) diff --git a/pyrobosim/test/core/test_robot.py b/pyrobosim/test/core/test_robot.py index 0838460a..ed5c8c13 100644 --- a/pyrobosim/test/core/test_robot.py +++ b/pyrobosim/test/core/test_robot.py @@ -500,6 +500,7 @@ def test_execute_action(self): """Tests execution of a single action.""" init_pose = Pose(x=1.0, y=0.5, yaw=0.0) robot = Robot( + name="test_robot", pose=init_pose, path_planner=WorldGraphPlanner(world=self.test_world), path_executor=ConstantVelocityExecutor(linear_velocity=5.0, dt=0.1), @@ -507,16 +508,30 @@ def test_execute_action(self): ) robot.location = "kitchen" robot.world = self.test_world + + # Navigate to a location and check that the battery level decreased. action = TaskAction( "navigate", source_location="kitchen", target_location="my_desk", ) - result = robot.execute_action(action) assert result.is_success() assert robot.battery_level < 100.0 + # Modify the action execution options to deplete the battery extremely quickly. + robot.action_execution_options["navigate"].battery_usage = 100.0 + action = TaskAction( + "navigate", + source_location="my_desk", + target_location="bathroom", + ) + with pytest.warns(UserWarning): + result = robot.execute_action(action) + assert result.status == ExecutionStatus.EXECUTION_FAILURE + assert result.message == "[test_robot] Battery depleted while navigating." + assert robot.battery_level == 0.0 + def test_execute_invalid_action(self): """Tests execution of an action that is not recognized as a valid type.""" init_pose = Pose(x=1.0, y=0.5, yaw=0.0)