From 7cf43a62cabc3124721258f02c9257f451dd1971 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Mon, 31 Jul 2023 11:21:34 +0200 Subject: [PATCH] Improve SHM resilience to crashing participants (#3759) * Refs #19255. Always pop descriptor from SHM port. Signed-off-by: Miguel Company * Refs #19255. Catch lock timeout on DataSharing notifications. Signed-off-by: Miguel Company * Refs #19255. Catch lock timeout on SharedMemGlobal. Signed-off-by: Miguel Company * Refs #19255. Please linters. Signed-off-by: Miguel Company --------- Signed-off-by: Miguel Company --- .../rtps/DataSharing/DataSharingListener.cpp | 20 +++++-- .../DataSharing/DataSharingNotification.hpp | 15 +++-- .../transport/shared_mem/SharedMemGlobal.hpp | 59 ++++++++++++++----- .../transport/shared_mem/SharedMemManager.hpp | 5 +- 4 files changed, 71 insertions(+), 28 deletions(-) diff --git a/src/cpp/rtps/DataSharing/DataSharingListener.cpp b/src/cpp/rtps/DataSharing/DataSharingListener.cpp index f76aa96b5e2..0e4f0dbc955 100644 --- a/src/cpp/rtps/DataSharing/DataSharingListener.cpp +++ b/src/cpp/rtps/DataSharing/DataSharingListener.cpp @@ -52,13 +52,21 @@ void DataSharingListener::run() std::unique_lock lock(notification_->notification_->notification_mutex, std::defer_lock); while (is_running_.load()) { - lock.lock(); - notification_->notification_->notification_cv.wait(lock, [&] - { - return !is_running_.load() || notification_->notification_->new_data.load(); - }); + try + { + lock.lock(); + notification_->notification_->notification_cv.wait(lock, [&] + { + return !is_running_.load() || notification_->notification_->new_data.load(); + }); - lock.unlock(); + lock.unlock(); + } + catch (const boost::interprocess::interprocess_exception& /*e*/) + { + // Timeout when locking + continue; + } if (!is_running_.load()) { diff --git a/src/cpp/rtps/DataSharing/DataSharingNotification.hpp b/src/cpp/rtps/DataSharing/DataSharingNotification.hpp index 167f4bcd9eb..5ebe0d242cc 100644 --- a/src/cpp/rtps/DataSharing/DataSharingNotification.hpp +++ b/src/cpp/rtps/DataSharing/DataSharingNotification.hpp @@ -53,10 +53,17 @@ class DataSharingNotification */ inline void notify() { - std::unique_lock lock(notification_->notification_mutex); - notification_->new_data.store(true); - lock.unlock(); - notification_->notification_cv.notify_all(); + try + { + std::unique_lock lock(notification_->notification_mutex); + notification_->new_data.store(true); + lock.unlock(); + notification_->notification_cv.notify_all(); + } + catch (const boost::interprocess::interprocess_exception& /*e*/) + { + // Timeout when locking + } } /** diff --git a/src/cpp/rtps/transport/shared_mem/SharedMemGlobal.hpp b/src/cpp/rtps/transport/shared_mem/SharedMemGlobal.hpp index 74788bcada4..4f5cf24363f 100644 --- a/src/cpp/rtps/transport/shared_mem/SharedMemGlobal.hpp +++ b/src/cpp/rtps/transport/shared_mem/SharedMemGlobal.hpp @@ -670,12 +670,19 @@ class SharedMemGlobal void close_listener( std::atomic* is_listener_closed) { + try { - std::lock_guard lock(node_->empty_cv_mutex); - is_listener_closed->exchange(true); + { + std::lock_guard lock(node_->empty_cv_mutex); + is_listener_closed->exchange(true); + } + node_->empty_cv.notify_all(); + } + catch (const boost::interprocess::interprocess_exception& /*e*/) + { + // Timeout when locking } - node_->empty_cv.notify_all(); } /** @@ -782,17 +789,25 @@ class SharedMemGlobal bool get_and_remove_blocked_processing( BufferDescriptor& buffer_descriptor) { - std::lock_guard lock(node_->empty_cv_mutex); - for (uint32_t i = 0; i < PortNode::LISTENERS_STATUS_SIZE; i++) + try { - if (node_->listeners_status[i].is_in_use && - node_->listeners_status[i].is_processing) + std::lock_guard lock(node_->empty_cv_mutex); + for (uint32_t i = 0; i < PortNode::LISTENERS_STATUS_SIZE; i++) { - buffer_descriptor = node_->listeners_status[i].descriptor; - listener_processing_stop(i); - return true; + if (node_->listeners_status[i].is_in_use && + node_->listeners_status[i].is_processing) + { + buffer_descriptor = node_->listeners_status[i].descriptor; + listener_processing_stop(i); + return true; + } } } + catch (const boost::interprocess::interprocess_exception& /*e*/) + { + // Timeout when locking + } + return false; } @@ -805,10 +820,17 @@ class SharedMemGlobal uint32_t listener_index, const BufferDescriptor& buffer_descriptor) { - std::lock_guard lock(node_->empty_cv_mutex); + try + { + std::lock_guard lock(node_->empty_cv_mutex); - node_->listeners_status[listener_index].descriptor = buffer_descriptor; - node_->listeners_status[listener_index].is_processing = true; + node_->listeners_status[listener_index].descriptor = buffer_descriptor; + node_->listeners_status[listener_index].is_processing = true; + } + catch (const boost::interprocess::interprocess_exception& /*e*/) + { + // Timeout when locking + } } /** @@ -818,9 +840,16 @@ class SharedMemGlobal void listener_processing_stop( uint32_t listener_index) { - std::lock_guard lock(node_->empty_cv_mutex); + try + { + std::lock_guard lock(node_->empty_cv_mutex); - node_->listeners_status[listener_index].is_processing = false; + node_->listeners_status[listener_index].is_processing = false; + } + catch (const boost::interprocess::interprocess_exception& /*e*/) + { + // Timeout when locking + } } /** diff --git a/src/cpp/rtps/transport/shared_mem/SharedMemManager.hpp b/src/cpp/rtps/transport/shared_mem/SharedMemManager.hpp index 8410f975767..4dbd05171d1 100644 --- a/src/cpp/rtps/transport/shared_mem/SharedMemManager.hpp +++ b/src/cpp/rtps/transport/shared_mem/SharedMemManager.hpp @@ -718,7 +718,9 @@ class SharedMemManager : throw std::runtime_error(""); } + // Read and pop descriptor SharedMemGlobal::BufferDescriptor buffer_descriptor = head_cell->data(); + global_port_->pop(*global_listener_, was_cell_freed); auto segment = shared_mem_manager_->find_segment(buffer_descriptor.source_segment_id); auto buffer_node = @@ -730,9 +732,6 @@ class SharedMemManager : buffer_node, buffer_descriptor.validity_id); - // If the cell has been read by all listeners - global_port_->pop(*global_listener_, was_cell_freed); - if (buffer_ref) { global_port_->listener_processing_start(listener_index_, buffer_descriptor);