Skip to content

Commit

Permalink
feat(hesai_hw_interface): use new UDP socket implementation (#235)
Browse files Browse the repository at this point in the history
* feat(hesai): switch to new UDP socket implementation

Signed-off-by: Max SCHMELLER <max.schmeller@tier4.jp>

* fix(hesai): gracefully stop receiver thread on shutdown to prevent SEGFAULT

Signed-off-by: Max SCHMELLER <max.schmeller@tier4.jp>

* chore(hesai): disallow 255.255.255.255 in config

Signed-off-by: Max SCHMELLER <max.schmeller@tier4.jp>

* ci(pre-commit): autofix

---------

Signed-off-by: Max SCHMELLER <max.schmeller@tier4.jp>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
mojomex and pre-commit-ci[bot] authored Feb 4, 2025
1 parent bd8b507 commit 6eb72b5
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
// Have to define macros to silence warnings about deprecated headers being used by
// boost/property_tree/ in some versions of boost.
// See: https://github.com/boostorg/property_tree/issues/51
#include "nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp"

#include <nebula_common/nebula_status.hpp>

#include <boost/version.hpp>
Expand All @@ -32,7 +34,6 @@

#include <boost_tcp_driver/http_client_driver.hpp>
#include <boost_tcp_driver/tcp_driver.hpp>
#include <boost_udp_driver/udp_driver.hpp>
#include <nebula_common/hesai/hesai_common.hpp>
#include <nebula_common/hesai/hesai_status.hpp>
#include <nebula_common/loggers/logger.hpp>
Expand Down Expand Up @@ -128,12 +129,11 @@ class HesaiHwInterface
using ptc_cmd_result_t = nebula::util::expected<std::vector<uint8_t>, ptc_error_t>;

std::shared_ptr<loggers::Logger> logger_;
std::unique_ptr<::drivers::common::IoContext> cloud_io_context_;
std::optional<connections::UdpSocket> udp_socket_;
std::shared_ptr<boost::asio::io_context> m_owned_ctx_;
std::unique_ptr<::drivers::udp_driver::UdpDriver> cloud_udp_driver_;
std::shared_ptr<::drivers::tcp_driver::TcpDriver> tcp_driver_;
std::shared_ptr<const HesaiSensorConfiguration> sensor_configuration_;
std::function<void(std::vector<uint8_t> & buffer)>
std::function<void(const std::vector<uint8_t> & buffer)>
cloud_packet_callback_; /**This function pointer is called when the scan is complete*/

std::mutex mtx_inflight_tcp_request_;
Expand Down Expand Up @@ -198,7 +198,7 @@ class HesaiHwInterface

/// @brief Callback function to receive the Cloud Packet data from the UDP Driver
/// @param buffer Buffer containing the data received from the UDP socket
void receive_sensor_packet_callback(std::vector<uint8_t> & buffer);
void receive_sensor_packet_callback(const std::vector<uint8_t> & buffer);
/// @brief Starting the interface that handles UDP streams
/// @return Resulting status
Status sensor_interface_start();
Expand All @@ -221,7 +221,7 @@ class HesaiHwInterface
/// @brief Registering callback for PandarScan
/// @param scan_callback Callback function
/// @return Resulting status
Status register_scan_callback(std::function<void(std::vector<uint8_t> &)> scan_callback);
Status register_scan_callback(std::function<void(const std::vector<uint8_t> &)> scan_callback);
/// @brief Getting data with PTC_COMMAND_GET_LIDAR_CALIBRATION
/// @return Resulting status
std::string get_lidar_calibration_string();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "nebula_common/loggers/logger.hpp"
#include "nebula_common/nebula_common.hpp"
#include "nebula_common/nebula_status.hpp"
#include "nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp"
#include "nebula_hw_interfaces/nebula_hw_interfaces_hesai/hesai_cmd_response.hpp"

#include <nlohmann/json.hpp>
Expand All @@ -15,6 +16,7 @@
#include <boost/asio/socket_base.hpp>

#include <cassert>
#include <cstddef>
#include <iostream>
#include <memory>
#include <sstream>
Expand Down Expand Up @@ -42,9 +44,7 @@ using nlohmann::json;

HesaiHwInterface::HesaiHwInterface(const std::shared_ptr<loggers::Logger> & logger)
: logger_(logger),
cloud_io_context_{new ::drivers::common::IoContext(1)},
m_owned_ctx_{new boost::asio::io_context(1)},
cloud_udp_driver_{new ::drivers::udp_driver::UdpDriver(*cloud_io_context_)},
tcp_driver_{new ::drivers::tcp_driver::TcpDriver(m_owned_ctx_)},
target_model_no_(nebula_model_to_hesai_model_no(SensorModel::UNKNOWN))
{
Expand Down Expand Up @@ -122,7 +122,7 @@ HesaiHwInterface::ptc_cmd_result_t HesaiHwInterface::send_receive(

// Header had payload length 0 (thus, header callback processed request successfully already),
// but we still received a payload: invalid state
if (*response_complete == true) {
if (*response_complete) {
error_code->error_flags |= g_tcp_error_unexpected_payload;
return;
}
Expand Down Expand Up @@ -168,66 +168,51 @@ Status HesaiHwInterface::set_sensor_configuration(

Status HesaiHwInterface::sensor_interface_start()
{
auto builder = connections::UdpSocket::Builder(
sensor_configuration_->host_ip, sensor_configuration_->data_port);
if (!sensor_configuration_->multicast_ip.empty()) {
builder.join_multicast_group(sensor_configuration_->multicast_ip);
}

builder.set_mtu(g_mtu_size);

try {
logger_->info("Starting UDP receiver");
if (sensor_configuration_->multicast_ip.empty()) {
cloud_udp_driver_->init_receiver(
sensor_configuration_->host_ip, sensor_configuration_->data_port);
} else {
cloud_udp_driver_->init_receiver(
sensor_configuration_->multicast_ip, sensor_configuration_->data_port,
sensor_configuration_->host_ip, sensor_configuration_->data_port);
cloud_udp_driver_->receiver()->setMulticast(true);
}
#ifdef WITH_DEBUG_STDOUT_HESAI_HW_INTERFACE
logger_->error("init ok");
#endif
cloud_udp_driver_->receiver()->open();
#ifdef WITH_DEBUG_STDOUT_HESAI_HW_INTERFACE
logger_->error("open ok");
#endif
builder.set_socket_buffer_size(g_udp_socket_buffer_size);
} catch (const connections::SocketError & e) {
throw std::runtime_error(
"Could not set socket receive buffer size to " + std::to_string(g_udp_socket_buffer_size) +
". Try increasing net.core.rmem_max.");
}

bool success = cloud_udp_driver_->receiver()->setKernelBufferSize(g_udp_socket_buffer_size);
if (!success) {
logger_->error(
"Could not set receive buffer size. Try increasing net.core.rmem_max to " +
std::to_string(g_udp_socket_buffer_size) + " B.");
return Status::ERROR_1;
}
udp_socket_.emplace(std::move(builder).bind());

cloud_udp_driver_->receiver()->bind();
#ifdef WITH_DEBUG_STDOUT_HESAI_HW_INTERFACE
logger_->error("bind ok");
#endif
udp_socket_->subscribe([&](
const std::vector<uint8_t> & packet,
const connections::UdpSocket::RxMetadata & /* metadata */) {
receive_sensor_packet_callback(packet);
});

cloud_udp_driver_->receiver()->asyncReceive(
std::bind(&HesaiHwInterface::receive_sensor_packet_callback, this, std::placeholders::_1));
#ifdef WITH_DEBUG_STDOUT_HESAI_HW_INTERFACE
logger_->error("async receive set");
#endif
} catch (const std::exception & ex) {
Status status = Status::UDP_CONNECTION_ERROR;
std::cerr << status << " for " << sensor_configuration_->sensor_ip << ":"
<< sensor_configuration_->data_port << " - " << ex.what() << std::endl;
return status;
}
return Status::OK;
}

Status HesaiHwInterface::register_scan_callback(
std::function<void(std::vector<uint8_t> &)> scan_callback)
std::function<void(const std::vector<uint8_t> &)> scan_callback)
{
cloud_packet_callback_ = std::move(scan_callback);
return Status::OK;
}

void HesaiHwInterface::receive_sensor_packet_callback(std::vector<uint8_t> & buffer)
void HesaiHwInterface::receive_sensor_packet_callback(const std::vector<uint8_t> & buffer)
{
cloud_packet_callback_(buffer);
}

Status HesaiHwInterface::sensor_interface_stop()
{
return Status::ERROR_1;
if (udp_socket_) {
udp_socket_->unsubscribe();
}
return Status::OK;
}

Status HesaiHwInterface::get_sensor_configuration(
Expand Down
8 changes: 6 additions & 2 deletions nebula_ros/include/nebula_ros/hesai/hesai_ros_wrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ class HesaiRosWrapper final : public rclcpp::Node

public:
explicit HesaiRosWrapper(const rclcpp::NodeOptions & options);
~HesaiRosWrapper() noexcept override = default;
~HesaiRosWrapper() noexcept override
{
if (!hw_interface_wrapper_) return;
hw_interface_wrapper_->hw_interface()->sensor_interface_stop();
};

/// @brief Get current status of this driver
/// @return Current status
Expand All @@ -61,7 +65,7 @@ class HesaiRosWrapper final : public rclcpp::Node
Status stream_start();

private:
void receive_cloud_packet_callback(std::vector<uint8_t> & packet);
void receive_cloud_packet_callback(const std::vector<uint8_t> & packet);

void receive_scan_message_callback(std::unique_ptr<pandar_msgs::msg::PandarScan> scan_msg);

Expand Down
11 changes: 9 additions & 2 deletions nebula_ros/src/hesai/hesai_ros_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,13 @@ Status HesaiRosWrapper::validate_and_set_config(
if (new_config->frame_id.empty()) {
return Status::SENSOR_CONFIG_ERROR;
}
if (new_config->host_ip == "255.255.255.255") {
RCLCPP_ERROR(
get_logger(),
"Due to potential network performance issues when using IP broadcast for sensor data, Nebula "
"disallows use of the broadcast IP. Please specify the concrete host IP instead.");
return Status::SENSOR_CONFIG_ERROR;
}
if (new_config->ptp_profile == nebula::drivers::PtpProfile::UNKNOWN_PROFILE) {
RCLCPP_ERROR(
get_logger(), "Invalid PTP Profile Provided. Please use '1588v2', '802.1as' or 'automotive'");
Expand Down Expand Up @@ -414,7 +421,7 @@ rcl_interfaces::msg::SetParametersResult HesaiRosWrapper::on_parameter_change(
return rcl_interfaces::build<SetParametersResult>().successful(true).reason("");
}

void HesaiRosWrapper::receive_cloud_packet_callback(std::vector<uint8_t> & packet)
void HesaiRosWrapper::receive_cloud_packet_callback(const std::vector<uint8_t> & packet)
{
if (!decoder_wrapper_ || decoder_wrapper_->status() != Status::OK) {
return;
Expand All @@ -427,7 +434,7 @@ void HesaiRosWrapper::receive_cloud_packet_callback(std::vector<uint8_t> & packe
auto msg_ptr = std::make_unique<nebula_msgs::msg::NebulaPacket>();
msg_ptr->stamp.sec = static_cast<int>(timestamp_ns / 1'000'000'000);
msg_ptr->stamp.nanosec = static_cast<int>(timestamp_ns % 1'000'000'000);
msg_ptr->data.swap(packet);
msg_ptr->data = packet;

decoder_wrapper_->process_cloud_packet(std::move(msg_ptr));
}
Expand Down

0 comments on commit 6eb72b5

Please sign in to comment.