Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[18966] SHM sending improvements (backport #3642) #3711

Merged
merged 2 commits into from
Jul 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/cpp/rtps/transport/shared_mem/SharedMemGlobal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,14 @@ class SharedMemGlobal
return node_->is_port_ok;
}

/**
* Checks if a port is OK and is opened for reading with listeners active
*/
inline bool port_has_listeners() const
{
return node_->is_port_ok && node_->is_opened_for_reading && node_->num_listeners > 0;
}

inline uint32_t port_id() const
{
return node_->port_id;
Expand Down
16 changes: 15 additions & 1 deletion src/cpp/rtps/transport/shared_mem/SharedMemManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -841,13 +841,26 @@ class SharedMemManager :
return *this;
}

/**
* Checks if a port is OK and opened for reading with listeners active
*/
bool has_listeners() const
{
return global_port_->port_has_listeners();
}

/**
* Try to enqueue a buffer in the port.
* @param[in, out] buffer reference to the SHM buffer to push to
* @param[out] is_port_ok true if the port is ok
* @returns false If the port's queue is full so buffer couldn't be enqueued.
*/
bool try_push(
const std::shared_ptr<Buffer>& buffer)
const std::shared_ptr<Buffer>& buffer,
bool& is_port_ok)
{
is_port_ok = true;

assert(std::dynamic_pointer_cast<SharedMemBuffer>(buffer));

SharedMemBuffer* shared_mem_buffer = std::static_pointer_cast<SharedMemBuffer>(buffer).get();
Expand Down Expand Up @@ -881,6 +894,7 @@ class SharedMemManager :
<< e.what());

regenerate_port();
is_port_ok = false;
ret = false;
}
else
Expand Down
37 changes: 35 additions & 2 deletions src/cpp/rtps/transport/shared_mem/SharedMemTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,10 @@ bool SharedMemTransport::send(
{
using namespace eprosima::fastdds::statistics::rtps;

#if !defined(_WIN32)
cleanup_output_ports();
#endif // if !defined(_WIN32)

fastrtps::rtps::LocatorsIterator& it = *destination_locators_begin;

bool ret = true;
Expand Down Expand Up @@ -465,6 +469,22 @@ bool SharedMemTransport::send(

}

void SharedMemTransport::cleanup_output_ports()
{
auto it = opened_ports_.begin();
while (it != opened_ports_.end())
{
if (it->second->has_listeners())
{
++it;
}
else
{
it = opened_ports_.erase(it);
}
}
}

std::shared_ptr<SharedMemManager::Port> SharedMemTransport::find_port(
uint32_t port_id)
{
Expand Down Expand Up @@ -492,9 +512,22 @@ bool SharedMemTransport::push_discard(
{
try
{
if (!find_port(remote_locator.port)->try_push(buffer))
bool is_port_ok = false;
const size_t num_retries = 2;
for (size_t i = 0; i < num_retries && !is_port_ok; ++i)
{
logInfo(RTPS_MSG_OUT, "Port " << remote_locator.port << " full. Buffer dropped");
if (!find_port(remote_locator.port)->try_push(buffer, is_port_ok))
{
if (is_port_ok)
{
logInfo(RTPS_MSG_OUT, "Port " << remote_locator.port << " full. Buffer dropped");
}
else
{
logWarning(RTPS_MSG_OUT, "Port " << remote_locator.port << " inconsistent. Port dropped");
opened_ports_.erase(remote_locator.port);
}
}
}
}
catch (const std::exception& error)
Expand Down
2 changes: 2 additions & 0 deletions src/cpp/rtps/transport/shared_mem/SharedMemTransport.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,8 @@ class SharedMemTransport : public TransportInterface
const std::shared_ptr<SharedMemManager::Buffer>& buffer,
const Locator& remote_locator);

void cleanup_output_ports();

std::shared_ptr<SharedMemManager::Port> find_port(
uint32_t port_id);

Expand Down
77 changes: 63 additions & 14 deletions test/unittest/transport/SharedMemTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1300,7 +1300,11 @@ TEST_F(SHMTransportTests, port_listener_dead_recover)
ASSERT_TRUE(buf != nullptr);
memset(buf->data(), 0, buf->size());
*static_cast<uint8_t*>(buf->data()) = 1u;
ASSERT_TRUE(port_sender->try_push(buf));
{
bool is_port_ok = false;
ASSERT_TRUE(port_sender->try_push(buf, is_port_ok));
ASSERT_TRUE(is_port_ok);
}

// Wait until message received
while (thread_listener2_state.load() < 1u)
Expand All @@ -1325,10 +1329,18 @@ TEST_F(SHMTransportTests, port_listener_dead_recover)

*static_cast<uint8_t*>(buf->data()) = 2u;
// This push must fail because port is not OK
ASSERT_FALSE(port_sender->try_push(buf));
{
bool is_port_ok = false;
ASSERT_FALSE(port_sender->try_push(buf, is_port_ok));
ASSERT_FALSE(is_port_ok);
}

// This push must success because port was regenerated in the last try_push call.
ASSERT_TRUE(port_sender->try_push(buf));
{
bool is_port_ok = false;
ASSERT_TRUE(port_sender->try_push(buf, is_port_ok));
ASSERT_TRUE(is_port_ok);
}

// Wait until port is regenerated
while (thread_listener2_state.load() < 3u)
Expand Down Expand Up @@ -1465,8 +1477,16 @@ TEST_F(SHMTransportTests, port_not_ok_listener_recover)
auto buffer = data_segment->alloc_buffer(1, std::chrono::steady_clock::now() + std::chrono::milliseconds(100));
*static_cast<uint8_t*>(buffer->data()) = 6;
// Fail because port regeneration
ASSERT_FALSE(managed_port->try_push(buffer));
ASSERT_TRUE(managed_port->try_push(buffer));
{
bool is_port_ok = false;
ASSERT_FALSE(managed_port->try_push(buffer, is_port_ok));
ASSERT_FALSE(is_port_ok);
}
{
bool is_port_ok = false;
ASSERT_TRUE(managed_port->try_push(buffer, is_port_ok));
ASSERT_TRUE(is_port_ok);
}

thread_listener.join();
}
Expand Down Expand Up @@ -1538,14 +1558,25 @@ TEST_F(SHMTransportTests, buffer_recover)

// Test 1 (without port overflow)
uint32_t send_counter = 0u;

bool is_port_ok = false;

while (listener1_recv_count.load() < 16u)
{
{
// The segment should never overflow
auto buf = segment->alloc_buffer(1, std::chrono::steady_clock::time_point());

ASSERT_EQ(true, pub_sub1_write->try_push(buf));
ASSERT_EQ(true, pub_sub2_write->try_push(buf));
{
is_port_ok = false;
ASSERT_TRUE(pub_sub1_write->try_push(buf, is_port_ok));
ASSERT_TRUE(is_port_ok);
}
{
is_port_ok = false;
ASSERT_TRUE(pub_sub2_write->try_push(buf, is_port_ok));
ASSERT_TRUE(is_port_ok);
}
}

{
Expand Down Expand Up @@ -1580,14 +1611,22 @@ TEST_F(SHMTransportTests, buffer_recover)
// The segment should never overflow
auto buf = segment->alloc_buffer(1u, std::chrono::steady_clock::time_point());

if (!pub_sub1_write->try_push(buf))
{
port_overflows1++;
is_port_ok = false;
if (!pub_sub1_write->try_push(buf, is_port_ok))
{
EXPECT_TRUE(is_port_ok);
port_overflows1++;
}
}

if (!pub_sub2_write->try_push(buf))
{
port_overflows2++;
is_port_ok = false;
if (!pub_sub2_write->try_push(buf, is_port_ok))
{
EXPECT_TRUE(is_port_ok);
port_overflows2++;
}
}
}

Expand All @@ -1611,8 +1650,16 @@ TEST_F(SHMTransportTests, buffer_recover)

{
auto buf = segment->alloc_buffer(1u, std::chrono::steady_clock::time_point());
ASSERT_EQ(true, pub_sub1_write->try_push(buf));
ASSERT_EQ(true, pub_sub2_write->try_push(buf));
{
is_port_ok = false;
ASSERT_TRUE(pub_sub1_write->try_push(buf, is_port_ok));
ASSERT_TRUE(is_port_ok);
}
{
is_port_ok = false;
ASSERT_TRUE(pub_sub2_write->try_push(buf, is_port_ok));
ASSERT_TRUE(is_port_ok);
}
}

thread_listener1.join();
Expand Down Expand Up @@ -1655,7 +1702,9 @@ TEST_F(SHMTransportTests, remote_segments_free)
{
if (j != i)
{
ASSERT_TRUE(ports[j]->try_push(buf));
bool is_port_ok = false;
ASSERT_TRUE(ports[j]->try_push(buf, is_port_ok));
ASSERT_TRUE(is_port_ok);
ASSERT_TRUE(listeners[j]->pop() != nullptr);
}
}
Expand Down