Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Smooth thrusters #134

Merged
merged 2 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ line-length = 100
flake8-quotes.inline-quotes='single'
extend-select = ["ALL"]
fixable = ["ALL"]
ignore = ["D100", "D101", "D102", "D103", "D104", "D107", "T201", "FIX002", "TD003", "TD002", "TRY003", "EM101", "EM102", "RET504", "D211", "COM812", "ISC001", "ERA001", "S602", "S603", "D205"]
ignore = ["D100", "D101", "D102", "D103", "D104", "D107", "T201", "FIX002", "TD003", "TD002", "TRY003", "EM101", "EM102", "RET504", "D211", "COM812", "ISC001", "ERA001", "S602", "S603", "D205", "UP040"]

[tool.ruff.lint.pydocstyle]
convention = "numpy"
Expand Down
165 changes: 121 additions & 44 deletions src/surface/flight_control/flight_control/multiplexer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from collections.abc import Callable
from typing import Final

import rclpy
Expand All @@ -7,14 +6,19 @@
from rclpy.node import Node
from rclpy.qos import QoSPresetProfiles

from flight_control.pixhawk_instruction_utils import (
apply_function,
pixhawk_instruction_to_tuple,
tuple_to_pixhawk_instruction,
)
from rov_msgs.msg import PixhawkInstruction
from rov_msgs.srv import AutonomousFlight

# Brown out protection
SPEED_THROTTLE = 0.85
SPEED_THROTTLE: Final = 0.65

# Joystick curve
JOYSTICK_EXPONENT = 3
JOYSTICK_EXPONENT: Final = 3

# Range of values Pixhawk takes
# In microseconds
Expand All @@ -27,25 +31,108 @@

EXTENSIONS_CODE: Final = 0b00000011

# Channels for RC command
MAX_CHANNEL = 8
MIN_CHANNEL = 1

FORWARD_CHANNEL = 4 # X
THROTTLE_CHANNEL = 2 # Z (vertical)
LATERAL_CHANNEL = 5 # Y (left & right)
PITCH_CHANNEL = 0 # Pitch
YAW_CHANNEL = 3 # Yaw
ROLL_CHANNEL = 1 # Roll
NEXT_INSTR_FRAC: Final = 0.05
PREV_INSTR_FRAC: Final = 1 - NEXT_INSTR_FRAC
INSTR_EPSILON: Final = 0.05


def joystick_map(raw: float) -> float:
"""
Convert the provided joystick position to a
float in [-1.0, 1.0] for use in a PixhawkInstruction.

Parameters
----------
raw : float
The joystick position to convert

Returns
-------
float
A float in [-1.0, 1.0] to act as a PixhawkInstruction dimension
"""
mapped = abs(raw) ** JOYSTICK_EXPONENT
if raw < 0:
mapped *= -1
return mapped


def manual_control_map(value: float) -> float:
"""
Convert the provided float in [-1.0, 1.0] to a ManualControl dimension.

Parameters
----------
value : float
The float in [-1.0, 1.0] to convert to a ManualControl dimension

Returns
-------
float
The resulting ManualControl dimension
"""
return RANGE_SPEED * value + ZERO_SPEED


def smooth_value(prev_value: float, next_value: float) -> float:
"""
Get a value that interpolates prev_value & next_value.

Parameters
----------
prev_value : float
The previous value, affects the result based on PREV_INSTR_FRAC
next_value : float
The next value, affects the result based on NEXT_INSTR_FRAC

Returns
-------
float
The resulting value between prev_value & next_value
"""
smoothed_value = PREV_INSTR_FRAC * prev_value + NEXT_INSTR_FRAC * next_value

# If close to target value, snap to it
# (we want to get there eventually, not approach in the limit)
if next_value - INSTR_EPSILON <= smoothed_value <= next_value + INSTR_EPSILON:
return next_value

return smoothed_value


def to_manual_control(msg: PixhawkInstruction) -> ManualControl:
"""
Convert the provided PixhawkInstruction to a ManualControl message.

Parameters
----------
msg : PixhawkInstruction
The PixhawkInstruction to convert

Returns
-------
ManualControl
The resulting ManualControl message
"""
mc_msg = ManualControl()

# Maps to PWM
mapped_msg = apply_function(msg, manual_control_map)

# To account for different z limits
mapped_msg.vertical = Z_RANGE_SPEED * msg.vertical + Z_ZERO_SPEED

mc_msg.x = mapped_msg.forward
mc_msg.z = mapped_msg.vertical
mc_msg.y = mapped_msg.lateral
mc_msg.r = mapped_msg.yaw
mc_msg.enabled_extensions = EXTENSIONS_CODE
mc_msg.s = mapped_msg.pitch
mc_msg.t = mapped_msg.roll

return mc_msg


class MultiplexerNode(Node):
def __init__(self) -> None:
super().__init__('multiplexer', parameter_overrides=[])
Expand All @@ -67,35 +154,24 @@ def __init__(self) -> None:
ManualControl, 'mavros/manual_control/send', QoSPresetProfiles.DEFAULT.value
)

@staticmethod
def apply(msg: PixhawkInstruction, function_to_apply: Callable[[float], float]) -> None:
"""Apply a function to each dimension of this PixhawkInstruction."""
msg.forward = function_to_apply(msg.forward)
msg.vertical = msg.vertical
msg.lateral = function_to_apply(msg.lateral)
msg.pitch = function_to_apply(msg.pitch)
msg.yaw = function_to_apply(msg.yaw)
msg.roll = function_to_apply(msg.roll)

@staticmethod
def to_manual_control(msg: PixhawkInstruction) -> ManualControl:
"""Convert this PixhawkInstruction to an rc_msg with the appropriate channels array."""
mc_msg = ManualControl()

# Maps to PWM
MultiplexerNode.apply(msg, lambda value: (RANGE_SPEED * value) + ZERO_SPEED)

mc_msg.x = msg.forward
mc_msg.z = (
Z_RANGE_SPEED * msg.vertical
) + Z_ZERO_SPEED # To account for different z limits
mc_msg.y = msg.lateral
mc_msg.r = msg.yaw
mc_msg.enabled_extensions = EXTENSIONS_CODE
mc_msg.s = msg.pitch
mc_msg.t = msg.roll

return mc_msg
self.previous_instruction_tuple: tuple[float, ...] = pixhawk_instruction_to_tuple(
PixhawkInstruction()
)

def smooth_pixhawk_instruction(self, msg: PixhawkInstruction) -> PixhawkInstruction:
instruction_tuple = pixhawk_instruction_to_tuple(msg)

smoothed_tuple = tuple(
smooth_value(previous_value, value)
for (previous_value, value) in zip(
self.previous_instruction_tuple, instruction_tuple, strict=True
)
)
smoothed_instruction = tuple_to_pixhawk_instruction(smoothed_tuple, msg.author)

self.previous_instruction_tuple = smoothed_tuple

return smoothed_instruction

def state_control(
self, req: AutonomousFlight.Request, res: AutonomousFlight.Response
Expand All @@ -111,7 +187,7 @@ def control_callback(self, msg: PixhawkInstruction) -> None:
):
# Smooth out adjustments
# TODO: look into maybe doing inheritance on a PixhawkInstruction
MultiplexerNode.apply(msg, joystick_map)
msg = apply_function(msg, joystick_map)
elif (
msg.author == PixhawkInstruction.KEYBOARD_CONTROL
and self.state == AutonomousFlight.Request.STOP
Expand All @@ -123,7 +199,8 @@ def control_callback(self, msg: PixhawkInstruction) -> None:
else:
return

self.mc_pub.publish(self.to_manual_control(msg))
smoothed_instruction = self.smooth_pixhawk_instruction(msg)
self.mc_pub.publish(to_manual_control(smoothed_instruction))


def main() -> None:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from collections.abc import Callable

from rov_msgs.msg import PixhawkInstruction


def pixhawk_instruction_to_tuple(
msg: PixhawkInstruction,
) -> tuple[float, float, float, float, float, float]:
"""
Convert PixhawkInstruction to tuple of dimensions.

Parameters
----------
msg : PixhawkInstruction
PixhawkInstruction to convert

Returns
-------
tuple[float, float, float, float, float, float]
Tuple of dimensions from the instruction
"""
return (msg.forward, msg.vertical, msg.lateral, msg.pitch, msg.yaw, msg.roll)


def tuple_to_pixhawk_instruction(
instruction_tuple: tuple[float, ...], author: int = PixhawkInstruction.MANUAL_CONTROL
) -> PixhawkInstruction:
"""
Convert tuple of dimensions and author to a PixhawkInstruction.

Parameters
----------
instruction_tuple : tuple[float, ...]
Tuple of dimensions
author : int, optional
Author of the PixhawkInstruction, by default PixhawkInstruction.MANUAL_CONTROL

Returns
-------
PixhawkInstruction
A new PixhawkInstruction with the provided dimensions and author
"""
return PixhawkInstruction(
forward=instruction_tuple[0],
vertical=instruction_tuple[1],
lateral=instruction_tuple[2],
pitch=instruction_tuple[3],
yaw=instruction_tuple[4],
roll=instruction_tuple[5],
author=author,
)


def apply_function(
msg: PixhawkInstruction, function_to_apply: Callable[[float], float]
) -> PixhawkInstruction:
"""
Run the provided function on each dimension of a PixhawkInstruction.
Does not modify the original PixhawkInstruction.

Parameters
----------
msg : PixhawkInstruction
The instruction to run the function on
function_to_apply : Callable[[float], float]
The function to apply to each dimension

Returns
-------
PixhawkInstruction
The new PixhawkInstruction made by applying the function to each dimension of msg
"""
instruction_tuple = pixhawk_instruction_to_tuple(msg)
modified_tuple = tuple(function_to_apply(value) for value in instruction_tuple)
return tuple_to_pixhawk_instruction(modified_tuple, msg.author)
10 changes: 5 additions & 5 deletions src/surface/flight_control/test/test_manual_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
Z_RANGE_SPEED,
Z_ZERO_SPEED,
ZERO_SPEED,
MultiplexerNode,
to_manual_control,
)

from rov_msgs.msg import PixhawkInstruction
Expand All @@ -31,14 +31,14 @@ def test_joystick_profiles() -> None:
roll=0.92,
)

msg = MultiplexerNode.to_manual_control(instruction)
msg = to_manual_control(instruction)

assert msg.x == ZERO_SPEED
assert msg.z == (Z_ZERO_SPEED + Z_RANGE_SPEED)
assert msg.y == (ZERO_SPEED - RANGE_SPEED)

# 1539 1378

assert msg.s == ZERO_SPEED + int(RANGE_SPEED * 0.34)
assert msg.r == ZERO_SPEED + int(RANGE_SPEED * -0.6)
assert msg.t == ZERO_SPEED + int(RANGE_SPEED * 0.92)
assert msg.s == ZERO_SPEED + RANGE_SPEED * 0.34
assert msg.r == ZERO_SPEED + RANGE_SPEED * -0.6
assert msg.t == ZERO_SPEED + RANGE_SPEED * 0.92
Loading