diff --git a/pyproject.toml b/pyproject.toml index ad0e6cc9..24894976 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,7 +24,7 @@ line-length = 100 flake8-quotes.inline-quotes='single' extend-select = ["ALL"] fixable = ["ALL"] -ignore = ["D100", "D101", "D102", "D103", "D104", "D107", "T201", "FIX002", "TD003", "TD002", "TRY003", "EM101", "EM102", "RET504", "D211", "COM812", "ISC001", "ERA001", "S602", "S603", "D205"] +ignore = ["D100", "D101", "D102", "D103", "D104", "D107", "T201", "FIX002", "TD003", "TD002", "TRY003", "EM101", "EM102", "RET504", "D211", "COM812", "ISC001", "ERA001", "S602", "S603", "D205", "UP040"] [tool.ruff.lint.pydocstyle] convention = "numpy" diff --git a/src/surface/flight_control/flight_control/multiplexer.py b/src/surface/flight_control/flight_control/multiplexer.py index b6719068..baae889d 100644 --- a/src/surface/flight_control/flight_control/multiplexer.py +++ b/src/surface/flight_control/flight_control/multiplexer.py @@ -1,4 +1,3 @@ -from collections.abc import Callable from typing import Final import rclpy @@ -7,14 +6,19 @@ from rclpy.node import Node from rclpy.qos import QoSPresetProfiles +from flight_control.pixhawk_instruction_utils import ( + apply_function, + pixhawk_instruction_to_tuple, + tuple_to_pixhawk_instruction, +) from rov_msgs.msg import PixhawkInstruction from rov_msgs.srv import AutonomousFlight # Brown out protection -SPEED_THROTTLE = 0.85 +SPEED_THROTTLE: Final = 0.65 # Joystick curve -JOYSTICK_EXPONENT = 3 +JOYSTICK_EXPONENT: Final = 3 # Range of values Pixhawk takes # In microseconds @@ -27,25 +31,100 @@ EXTENSIONS_CODE: Final = 0b00000011 -# Channels for RC command -MAX_CHANNEL = 8 -MIN_CHANNEL = 1 - -FORWARD_CHANNEL = 4 # X -THROTTLE_CHANNEL = 2 # Z (vertical) -LATERAL_CHANNEL = 5 # Y (left & right) -PITCH_CHANNEL = 0 # Pitch -YAW_CHANNEL = 3 # Yaw -ROLL_CHANNEL = 1 # Roll +NEXT_INSTR_FRAC: Final = 0.05 +PREV_INSTR_FRAC: Final = 1 - NEXT_INSTR_FRAC def joystick_map(raw: float) -> float: + """ + Convert the provided joystick position to a + float in [-1.0, 1.0] for use in a PixhawkInstruction. + + Parameters + ---------- + raw : float + The joystick position to convert + + Returns + ------- + float + A float in [-1.0, 1.0] to act as a PixhawkInstruction dimension + """ mapped = abs(raw) ** JOYSTICK_EXPONENT if raw < 0: mapped *= -1 return mapped +def manual_control_map(value: float) -> float: + """ + Convert the provided float in [-1.0, 1.0] to a ManualControl dimension. + + Parameters + ---------- + value : float + The float in [-1.0, 1.0] to convert to a ManualControl dimension + + Returns + ------- + float + The resulting ManualControl dimension + """ + return RANGE_SPEED * value + ZERO_SPEED + + +def smooth_value(prev_value: float, next_value: float) -> float: + """ + Get a value that interpolates prev_value & next_value. + + Parameters + ---------- + prev_value : float + The previous value, affects the result based on PREV_INSTR_FRAC + next_value : float + The next value, affects the result based on NEXT_INSTR_FRAC + + Returns + ------- + float + The resulting value between prev_value & next_value + """ + return PREV_INSTR_FRAC * prev_value + NEXT_INSTR_FRAC * next_value + + +def to_manual_control(msg: PixhawkInstruction) -> ManualControl: + """ + Convert the provided PixhawkInstruction to a ManualControl message. + + Parameters + ---------- + msg : PixhawkInstruction + The PixhawkInstruction to convert + + Returns + ------- + ManualControl + The resulting ManualControl message + """ + mc_msg = ManualControl() + + # Maps to PWM + mapped_msg = apply_function(msg, manual_control_map) + + # To account for different z limits + mapped_msg.vertical = Z_RANGE_SPEED * msg.vertical + Z_ZERO_SPEED + + mc_msg.x = mapped_msg.forward + mc_msg.z = mapped_msg.vertical + mc_msg.y = mapped_msg.lateral + mc_msg.r = mapped_msg.yaw + mc_msg.enabled_extensions = EXTENSIONS_CODE + mc_msg.s = mapped_msg.pitch + mc_msg.t = mapped_msg.roll + + return mc_msg + + class MultiplexerNode(Node): def __init__(self) -> None: super().__init__('multiplexer', parameter_overrides=[]) @@ -67,35 +146,24 @@ def __init__(self) -> None: ManualControl, 'mavros/manual_control/send', QoSPresetProfiles.DEFAULT.value ) - @staticmethod - def apply(msg: PixhawkInstruction, function_to_apply: Callable[[float], float]) -> None: - """Apply a function to each dimension of this PixhawkInstruction.""" - msg.forward = function_to_apply(msg.forward) - msg.vertical = msg.vertical - msg.lateral = function_to_apply(msg.lateral) - msg.pitch = function_to_apply(msg.pitch) - msg.yaw = function_to_apply(msg.yaw) - msg.roll = function_to_apply(msg.roll) - - @staticmethod - def to_manual_control(msg: PixhawkInstruction) -> ManualControl: - """Convert this PixhawkInstruction to an rc_msg with the appropriate channels array.""" - mc_msg = ManualControl() - - # Maps to PWM - MultiplexerNode.apply(msg, lambda value: (RANGE_SPEED * value) + ZERO_SPEED) - - mc_msg.x = msg.forward - mc_msg.z = ( - Z_RANGE_SPEED * msg.vertical - ) + Z_ZERO_SPEED # To account for different z limits - mc_msg.y = msg.lateral - mc_msg.r = msg.yaw - mc_msg.enabled_extensions = EXTENSIONS_CODE - mc_msg.s = msg.pitch - mc_msg.t = msg.roll - - return mc_msg + self.previous_instruction_tuple: tuple[float, ...] = pixhawk_instruction_to_tuple( + PixhawkInstruction() + ) + + def smooth_pixhawk_instruction(self, msg: PixhawkInstruction) -> PixhawkInstruction: + instruction_tuple = pixhawk_instruction_to_tuple(msg) + + smoothed_tuple = tuple( + smooth_value(previous_value, value) + for (previous_value, value) in zip( + self.previous_instruction_tuple, instruction_tuple, strict=True + ) + ) + smoothed_instruction = tuple_to_pixhawk_instruction(smoothed_tuple, msg.author) + + self.previous_instruction_tuple = smoothed_tuple + + return smoothed_instruction def state_control( self, req: AutonomousFlight.Request, res: AutonomousFlight.Response @@ -111,7 +179,7 @@ def control_callback(self, msg: PixhawkInstruction) -> None: ): # Smooth out adjustments # TODO: look into maybe doing inheritance on a PixhawkInstruction - MultiplexerNode.apply(msg, joystick_map) + msg = apply_function(msg, joystick_map) elif ( msg.author == PixhawkInstruction.KEYBOARD_CONTROL and self.state == AutonomousFlight.Request.STOP @@ -123,7 +191,8 @@ def control_callback(self, msg: PixhawkInstruction) -> None: else: return - self.mc_pub.publish(self.to_manual_control(msg)) + smoothed_instruction = self.smooth_pixhawk_instruction(msg) + self.mc_pub.publish(to_manual_control(smoothed_instruction)) def main() -> None: diff --git a/src/surface/flight_control/flight_control/pixhawk_instruction_utils.py b/src/surface/flight_control/flight_control/pixhawk_instruction_utils.py new file mode 100644 index 00000000..19a09803 --- /dev/null +++ b/src/surface/flight_control/flight_control/pixhawk_instruction_utils.py @@ -0,0 +1,75 @@ +from collections.abc import Callable + +from rov_msgs.msg import PixhawkInstruction + + +def pixhawk_instruction_to_tuple( + msg: PixhawkInstruction, +) -> tuple[float, float, float, float, float, float]: + """ + Convert PixhawkInstruction to tuple of dimensions. + + Parameters + ---------- + msg : PixhawkInstruction + PixhawkInstruction to convert + + Returns + ------- + tuple[float, float, float, float, float, float] + Tuple of dimensions from the instruction + """ + return (msg.forward, msg.vertical, msg.lateral, msg.pitch, msg.yaw, msg.roll) + + +def tuple_to_pixhawk_instruction( + instruction_tuple: tuple[float, ...], author: int = PixhawkInstruction.MANUAL_CONTROL +) -> PixhawkInstruction: + """ + Convert tuple of dimensions and author to a PixhawkInstruction. + + Parameters + ---------- + instruction_tuple : tuple[float, ...] + Tuple of dimensions + author : int, optional + Author of the PixhawkInstruction, by default PixhawkInstruction.MANUAL_CONTROL + + Returns + ------- + PixhawkInstruction + A new PixhawkInstruction with the provided dimensions and author + """ + return PixhawkInstruction( + forward=instruction_tuple[0], + vertical=instruction_tuple[1], + lateral=instruction_tuple[2], + pitch=instruction_tuple[3], + yaw=instruction_tuple[4], + roll=instruction_tuple[5], + author=author, + ) + + +def apply_function( + msg: PixhawkInstruction, function_to_apply: Callable[[float], float] +) -> PixhawkInstruction: + """ + Run the provided function on each dimension of a PixhawkInstruction. + Does not modify the original PixhawkInstruction. + + Parameters + ---------- + msg : PixhawkInstruction + The instruction to run the function on + function_to_apply : Callable[[float], float] + The function to apply to each dimension + + Returns + ------- + PixhawkInstruction + The new PixhawkInstruction made by applying the function to each dimension of msg + """ + instruction_tuple = pixhawk_instruction_to_tuple(msg) + modified_tuple = tuple(function_to_apply(value) for value in instruction_tuple) + return tuple_to_pixhawk_instruction(modified_tuple, msg.author) diff --git a/src/surface/flight_control/test/test_manual_control.py b/src/surface/flight_control/test/test_manual_control.py index c4666584..101aac0a 100644 --- a/src/surface/flight_control/test/test_manual_control.py +++ b/src/surface/flight_control/test/test_manual_control.py @@ -5,7 +5,7 @@ Z_RANGE_SPEED, Z_ZERO_SPEED, ZERO_SPEED, - MultiplexerNode, + to_manual_control, ) from rov_msgs.msg import PixhawkInstruction @@ -31,7 +31,7 @@ def test_joystick_profiles() -> None: roll=0.92, ) - msg = MultiplexerNode.to_manual_control(instruction) + msg = to_manual_control(instruction) assert msg.x == ZERO_SPEED assert msg.z == (Z_ZERO_SPEED + Z_RANGE_SPEED) @@ -39,6 +39,6 @@ def test_joystick_profiles() -> None: # 1539 1378 - assert msg.s == ZERO_SPEED + int(RANGE_SPEED * 0.34) - assert msg.r == ZERO_SPEED + int(RANGE_SPEED * -0.6) - assert msg.t == ZERO_SPEED + int(RANGE_SPEED * 0.92) + assert msg.s == ZERO_SPEED + RANGE_SPEED * 0.34 + assert msg.r == ZERO_SPEED + RANGE_SPEED * -0.6 + assert msg.t == ZERO_SPEED + RANGE_SPEED * 0.92