Skip to content

Commit

Permalink
Refs #20953: Make a proxy copy in TLUManager
Browse files Browse the repository at this point in the history
Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>
  • Loading branch information
cferreiragonz committed May 10, 2024
1 parent 1e0ea4f commit 87b13ba
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 20 deletions.
24 changes: 19 additions & 5 deletions src/cpp/fastdds/builtin/type_lookup_service/TypeLookupManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ ReturnCode_t TypeLookupManager::check_type_identifier_received(
const fastrtps::rtps::GUID_t& type_server,
const AsyncCallback& callback,
std::unordered_map<xtypes::TypeIdentfierWithSize,
std::vector<std::pair<typename eprosima::ProxyPool<ProxyType>::smart_ptr,
std::vector<std::pair<ProxyType*,
AsyncCallback>>>& async_get_type_callbacks)
{
xtypes::TypeIdentfierWithSize type_identifier_with_size =
Expand All @@ -342,7 +342,7 @@ ReturnCode_t TypeLookupManager::check_type_identifier_received(
is_type_identifier_known(type_identifier_with_size))
{
// The type is already known, invoke the callback
callback(temp_proxy_data);
callback(temp_proxy_data.get());
return RETCODE_OK;
}

Expand All @@ -353,7 +353,9 @@ ReturnCode_t TypeLookupManager::check_type_identifier_received(
if (it != async_get_type_callbacks.end())
{
// TypeIdentfierWithSize exists, add the callback
it->second.push_back(std::make_pair(std::move(temp_proxy_data), callback));
// Make a copy of the proxy to free the EDP pool
ProxyType* temp_proxy_data_copy(new ProxyType(*temp_proxy_data));
it->second.push_back(std::make_pair(temp_proxy_data_copy, callback));
// Return without sending new request
return RETCODE_NO_DATA;
}
Expand All @@ -366,8 +368,10 @@ ReturnCode_t TypeLookupManager::check_type_identifier_received(
{
// Store the sent requests and callback
add_async_get_type_request(get_type_dependencies_request, type_identifier_with_size);
std::vector<std::pair<typename eprosima::ProxyPool<ProxyType>::smart_ptr, AsyncCallback>> types;
types.push_back(std::make_pair(std::move(temp_proxy_data), callback));
std::vector<std::pair<ProxyType*, AsyncCallback>> types;
// Make a copy of the proxy to free the EDP pool
ProxyType* temp_proxy_data_copy(new ProxyType(*temp_proxy_data));
types.push_back(std::make_pair(temp_proxy_data_copy, callback));
async_get_type_callbacks.emplace(type_identifier_with_size, std::move(types));

return RETCODE_NO_DATA;
Expand Down Expand Up @@ -443,13 +447,23 @@ bool TypeLookupManager::remove_async_get_type_callback(
auto writer_it = async_get_type_writer_callbacks_.find(type_identifier_with_size);
if (writer_it != async_get_type_writer_callbacks_.end())
{
// Delete the proxies and remove the entry
for (auto& proxy_callback_pair : writer_it->second)
{
delete proxy_callback_pair.first;
}
async_get_type_writer_callbacks_.erase(writer_it);
removed = true;
}
// Check if the key is in the reader map
auto reader_it = async_get_type_reader_callbacks_.find(type_identifier_with_size);
if (reader_it != async_get_type_reader_callbacks_.end())
{
// Delete the proxies and remove the entry
for (auto& proxy_callback_pair : reader_it->second)
{
delete proxy_callback_pair.first;
}
async_get_type_reader_callbacks_.erase(reader_it);
removed = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ namespace builtin {
const SampleIdentity INVALID_SAMPLE_IDENTITY;

using AsyncGetTypeWriterCallback = std::function<
void (eprosima::ProxyPool<eprosima::fastrtps::rtps::WriterProxyData>::smart_ptr&)>;
void (eprosima::fastrtps::rtps::WriterProxyData*)>;
using AsyncGetTypeReaderCallback = std::function<
void (eprosima::ProxyPool<eprosima::fastrtps::rtps::ReaderProxyData>::smart_ptr&)>;
void (eprosima::fastrtps::rtps::ReaderProxyData*)>;

/**
* Class TypeLookupManager that implements the TypeLookup Service described in the DDS-XTYPES 1.3 specification.
Expand Down Expand Up @@ -215,7 +215,7 @@ class TypeLookupManager
const fastrtps::rtps::GUID_t& type_server,
const AsyncCallback& callback,
std::unordered_map<xtypes::TypeIdentfierWithSize,
std::vector<std::pair<typename eprosima::ProxyPool<ProxyType>::smart_ptr,
std::vector<std::pair<ProxyType*,
AsyncCallback>>>& async_get_type_callbacks);

/**
Expand Down Expand Up @@ -431,12 +431,12 @@ class TypeLookupManager

//! Collection of all the WriterProxyData and their callbacks related to a TypeIdentfierWithSize, hashed by its TypeIdentfierWithSize.
std::unordered_map < xtypes::TypeIdentfierWithSize,
std::vector<std::pair<eprosima::ProxyPool<eprosima::fastrtps::rtps::WriterProxyData>::smart_ptr,
std::vector<std::pair<eprosima::fastrtps::rtps::WriterProxyData*,
AsyncGetTypeWriterCallback>>> async_get_type_writer_callbacks_;

//! Collection of all the ReaderProxyData and their callbacks related to a TypeIdentfierWithSize, hashed by its TypeIdentfierWithSize.
std::unordered_map < xtypes::TypeIdentfierWithSize,
std::vector<std::pair<eprosima::ProxyPool<eprosima::fastrtps::rtps::ReaderProxyData>::smart_ptr,
std::vector<std::pair<eprosima::fastrtps::rtps::ReaderProxyData*,
AsyncGetTypeReaderCallback>>> async_get_type_reader_callbacks_;

//! Collection of all SampleIdentity and the TypeIdentfierWithSize it originated from, hashed by its SampleIdentity.
Expand Down
20 changes: 10 additions & 10 deletions src/cpp/rtps/builtin/discovery/endpoint/EDPSimpleListeners.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ void EDPBasePUBListener::add_writer_from_change(
// Callback function to continue after typelookup is complete
fastdds::dds::builtin::AsyncGetTypeWriterCallback after_typelookup_callback =
[reader, change, edp, &network, writer_added_callback]
(eprosima::ProxyPool<eprosima::fastrtps::rtps::WriterProxyData>::smart_ptr& temp_writer_data)
(eprosima::fastrtps::rtps::WriterProxyData* temp_writer_data)
{
//LOAD INFORMATION IN DESTINATION WRITER PROXY DATA
auto copy_data_fun = [&temp_writer_data, &network](
Expand Down Expand Up @@ -115,9 +115,6 @@ void EDPBasePUBListener::add_writer_from_change(
WriterProxyData* writer_data =
edp->mp_PDP->addWriterProxyData(temp_writer_data->guid(), participant_guid, copy_data_fun);

// release temporary proxy
temp_writer_data.reset();

if (writer_data != nullptr)
{
edp->pairing_writer_proxy_with_any_local_reader(participant_guid, writer_data);
Expand Down Expand Up @@ -150,8 +147,11 @@ void EDPBasePUBListener::add_writer_from_change(
else
{
EPROSIMA_LOG_INFO(RTPS_EDP, "EDPBasePUBListener: No TypeInformation. Trying fallback mechanism");
after_typelookup_callback(temp_writer_data);
after_typelookup_callback(temp_writer_data.get());
}
// Release temporary proxy
temp_writer_data.reset();


// Take the reader lock again if needed.
reader->getMutex().lock();
Expand Down Expand Up @@ -229,8 +229,9 @@ void EDPBaseSUBListener::add_reader_from_change(
// Callback function to continue after typelookup is complete
fastdds::dds::builtin::AsyncGetTypeReaderCallback after_typelookup_callback =
[reader, change, edp, &network, reader_added_callback]
(eprosima::ProxyPool<eprosima::fastrtps::rtps::ReaderProxyData>::smart_ptr& temp_reader_data)
(eprosima::fastrtps::rtps::ReaderProxyData* temp_reader_data)
{
//LOAD INFORMATION IN DESTINATION READER PROXY DATA
auto copy_data_fun = [&temp_reader_data, &network](
ReaderProxyData* data,
bool updating,
Expand All @@ -257,9 +258,6 @@ void EDPBaseSUBListener::add_reader_from_change(
ReaderProxyData* reader_data =
edp->mp_PDP->addReaderProxyData(temp_reader_data->guid(), participant_guid, copy_data_fun);

// Release the temporary proxy
temp_reader_data.reset();

if (reader_data != nullptr) //ADDED NEW DATA
{
edp->pairing_reader_proxy_with_any_local_writer(participant_guid, reader_data);
Expand Down Expand Up @@ -292,8 +290,10 @@ void EDPBaseSUBListener::add_reader_from_change(
else
{
EPROSIMA_LOG_INFO(RTPS_EDP, "EDPBaseSUBListener: No TypeInformation. Trying fallback mechanism");
after_typelookup_callback(temp_reader_data);
after_typelookup_callback(temp_reader_data.get());
}
// Release the temporary proxy
temp_reader_data.reset();

// Take the reader lock again if needed.
reader->getMutex().lock();
Expand Down

0 comments on commit 87b13ba

Please sign in to comment.