Skip to content

Commit

Permalink
Improve SHM resilience to crashing participants (#3759)
Browse files Browse the repository at this point in the history
* Refs #19255. Always pop descriptor from SHM port.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19255. Catch lock timeout on DataSharing notifications.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19255. Catch lock timeout on SharedMemGlobal.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19255. Please linters.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

---------

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>
  • Loading branch information
MiguelCompany authored Jul 31, 2023
1 parent 6e74c1e commit 7cf43a6
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 28 deletions.
20 changes: 14 additions & 6 deletions src/cpp/rtps/DataSharing/DataSharingListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,21 @@ void DataSharingListener::run()
std::unique_lock<Segment::mutex> lock(notification_->notification_->notification_mutex, std::defer_lock);
while (is_running_.load())
{
lock.lock();
notification_->notification_->notification_cv.wait(lock, [&]
{
return !is_running_.load() || notification_->notification_->new_data.load();
});
try
{
lock.lock();
notification_->notification_->notification_cv.wait(lock, [&]
{
return !is_running_.load() || notification_->notification_->new_data.load();
});

lock.unlock();
lock.unlock();
}
catch (const boost::interprocess::interprocess_exception& /*e*/)
{
// Timeout when locking
continue;
}

if (!is_running_.load())
{
Expand Down
15 changes: 11 additions & 4 deletions src/cpp/rtps/DataSharing/DataSharingNotification.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,17 @@ class DataSharingNotification
*/
inline void notify()
{
std::unique_lock<Segment::mutex> lock(notification_->notification_mutex);
notification_->new_data.store(true);
lock.unlock();
notification_->notification_cv.notify_all();
try
{
std::unique_lock<Segment::mutex> lock(notification_->notification_mutex);
notification_->new_data.store(true);
lock.unlock();
notification_->notification_cv.notify_all();
}
catch (const boost::interprocess::interprocess_exception& /*e*/)
{
// Timeout when locking
}
}

/**
Expand Down
59 changes: 44 additions & 15 deletions src/cpp/rtps/transport/shared_mem/SharedMemGlobal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -670,12 +670,19 @@ class SharedMemGlobal
void close_listener(
std::atomic<bool>* is_listener_closed)
{
try
{
std::lock_guard<SharedMemSegment::mutex> lock(node_->empty_cv_mutex);
is_listener_closed->exchange(true);
{
std::lock_guard<SharedMemSegment::mutex> lock(node_->empty_cv_mutex);
is_listener_closed->exchange(true);
}
node_->empty_cv.notify_all();
}
catch (const boost::interprocess::interprocess_exception& /*e*/)
{
// Timeout when locking
}

node_->empty_cv.notify_all();
}

/**
Expand Down Expand Up @@ -782,17 +789,25 @@ class SharedMemGlobal
bool get_and_remove_blocked_processing(
BufferDescriptor& buffer_descriptor)
{
std::lock_guard<SharedMemSegment::mutex> lock(node_->empty_cv_mutex);
for (uint32_t i = 0; i < PortNode::LISTENERS_STATUS_SIZE; i++)
try
{
if (node_->listeners_status[i].is_in_use &&
node_->listeners_status[i].is_processing)
std::lock_guard<SharedMemSegment::mutex> lock(node_->empty_cv_mutex);
for (uint32_t i = 0; i < PortNode::LISTENERS_STATUS_SIZE; i++)
{
buffer_descriptor = node_->listeners_status[i].descriptor;
listener_processing_stop(i);
return true;
if (node_->listeners_status[i].is_in_use &&
node_->listeners_status[i].is_processing)
{
buffer_descriptor = node_->listeners_status[i].descriptor;
listener_processing_stop(i);
return true;
}
}
}
catch (const boost::interprocess::interprocess_exception& /*e*/)
{
// Timeout when locking
}

return false;
}

Expand All @@ -805,10 +820,17 @@ class SharedMemGlobal
uint32_t listener_index,
const BufferDescriptor& buffer_descriptor)
{
std::lock_guard<SharedMemSegment::mutex> lock(node_->empty_cv_mutex);
try
{
std::lock_guard<SharedMemSegment::mutex> lock(node_->empty_cv_mutex);

node_->listeners_status[listener_index].descriptor = buffer_descriptor;
node_->listeners_status[listener_index].is_processing = true;
node_->listeners_status[listener_index].descriptor = buffer_descriptor;
node_->listeners_status[listener_index].is_processing = true;
}
catch (const boost::interprocess::interprocess_exception& /*e*/)
{
// Timeout when locking
}
}

/**
Expand All @@ -818,9 +840,16 @@ class SharedMemGlobal
void listener_processing_stop(
uint32_t listener_index)
{
std::lock_guard<SharedMemSegment::mutex> lock(node_->empty_cv_mutex);
try
{
std::lock_guard<SharedMemSegment::mutex> lock(node_->empty_cv_mutex);

node_->listeners_status[listener_index].is_processing = false;
node_->listeners_status[listener_index].is_processing = false;
}
catch (const boost::interprocess::interprocess_exception& /*e*/)
{
// Timeout when locking
}
}

/**
Expand Down
5 changes: 2 additions & 3 deletions src/cpp/rtps/transport/shared_mem/SharedMemManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,9 @@ class SharedMemManager :
throw std::runtime_error("");
}

// Read and pop descriptor
SharedMemGlobal::BufferDescriptor buffer_descriptor = head_cell->data();
global_port_->pop(*global_listener_, was_cell_freed);

auto segment = shared_mem_manager_->find_segment(buffer_descriptor.source_segment_id);
auto buffer_node =
Expand All @@ -730,9 +732,6 @@ class SharedMemManager :
buffer_node,
buffer_descriptor.validity_id);

// If the cell has been read by all listeners
global_port_->pop(*global_listener_, was_cell_freed);

if (buffer_ref)
{
global_port_->listener_processing_start(listener_index_, buffer_descriptor);
Expand Down

0 comments on commit 7cf43a6

Please sign in to comment.