From d3a93e883006a856d7c7a262b7a668cc4f3cba7b Mon Sep 17 00:00:00 2001 From: Iker Luengo Date: Mon, 26 Jul 2021 13:58:53 +0200 Subject: [PATCH 01/11] Refs 12051. New boost headers for file mapping Signed-off-by: Iker Luengo --- .../interprocess/detail/file_wrapper.hpp | 213 +++++++++++++++ .../boost/interprocess/file_mapping.hpp | 199 ++++++++++++++ .../interprocess/managed_mapped_file.hpp | 250 ++++++++++++++++++ 3 files changed, 662 insertions(+) create mode 100644 thirdparty/boost/include/boost/interprocess/detail/file_wrapper.hpp create mode 100644 thirdparty/boost/include/boost/interprocess/file_mapping.hpp create mode 100644 thirdparty/boost/include/boost/interprocess/managed_mapped_file.hpp diff --git a/thirdparty/boost/include/boost/interprocess/detail/file_wrapper.hpp b/thirdparty/boost/include/boost/interprocess/detail/file_wrapper.hpp new file mode 100644 index 00000000000..99e0fea570a --- /dev/null +++ b/thirdparty/boost/include/boost/interprocess/detail/file_wrapper.hpp @@ -0,0 +1,213 @@ +////////////////////////////////////////////////////////////////////////////// +// +// (C) Copyright Ion Gaztanaga 2006-2012. Distributed under the Boost +// Software License, Version 1.0. (See accompanying file +// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// See http://www.boost.org/libs/interprocess for documentation. +// +////////////////////////////////////////////////////////////////////////////// + +#ifndef BOOST_INTERPROCESS_DETAIL_FILE_WRAPPER_HPP +#define BOOST_INTERPROCESS_DETAIL_FILE_WRAPPER_HPP + +#ifndef BOOST_CONFIG_HPP +# include +#endif +# +#if defined(BOOST_HAS_PRAGMA_ONCE) +# pragma once +#endif + +#include +#include +#include +#include +#include +#include +#include + +namespace boost { +namespace interprocess { +namespace ipcdetail{ + +class file_wrapper +{ + #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED) + BOOST_MOVABLE_BUT_NOT_COPYABLE(file_wrapper) + #endif //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED + public: + + //!Default constructor. + //!Represents an empty file_wrapper. + file_wrapper(); + + //!Creates a file object with name "name" and mode "mode", with the access mode "mode" + //!If the file previously exists, throws an error. + file_wrapper(create_only_t, const char *name, mode_t mode, const permissions &perm = permissions()) + { this->priv_open_or_create(ipcdetail::DoCreate, name, mode, perm); } + + //!Tries to create a file with name "name" and mode "mode", with the + //!access mode "mode". If the file previously exists, it tries to open it with mode "mode". + //!Otherwise throws an error. + file_wrapper(open_or_create_t, const char *name, mode_t mode, const permissions &perm = permissions()) + { this->priv_open_or_create(ipcdetail::DoOpenOrCreate, name, mode, perm); } + + //!Tries to open a file with name "name", with the access mode "mode". + //!If the file does not previously exist, it throws an error. + file_wrapper(open_only_t, const char *name, mode_t mode) + { this->priv_open_or_create(ipcdetail::DoOpen, name, mode, permissions()); } + + //!Moves the ownership of "moved"'s file to *this. + //!After the call, "moved" does not represent any file. + //!Does not throw + file_wrapper(BOOST_RV_REF(file_wrapper) moved) + : m_handle(file_handle_t(ipcdetail::invalid_file())) + { this->swap(moved); } + + //!Moves the ownership of "moved"'s file to *this. + //!After the call, "moved" does not represent any file. + //!Does not throw + file_wrapper &operator=(BOOST_RV_REF(file_wrapper) moved) + { + file_wrapper tmp(boost::move(moved)); + this->swap(tmp); + return *this; + } + + //!Swaps to file_wrappers. + //!Does not throw + void swap(file_wrapper &other); + + //!Erases a file from the system. + //!Returns false on error. Never throws + static bool remove(const char *name); + + //!Sets the size of the file + void truncate(offset_t length); + + //!Closes the + //!file + ~file_wrapper(); + + //!Returns the name of the file + //!used in the constructor + const char *get_name() const; + + //!Returns the name of the file + //!used in the constructor + bool get_size(offset_t &size) const; + + //!Returns access mode + //!used in the constructor + mode_t get_mode() const; + + //!Get mapping handle + //!to use with mapped_region + mapping_handle_t get_mapping_handle() const; + + private: + //!Closes a previously opened file mapping. Never throws. + void priv_close(); + //!Closes a previously opened file mapping. Never throws. + bool priv_open_or_create(ipcdetail::create_enum_t type, const char *filename, mode_t mode, const permissions &perm); + + file_handle_t m_handle; + mode_t m_mode; + std::string m_filename; +}; + +inline file_wrapper::file_wrapper() + : m_handle(file_handle_t(ipcdetail::invalid_file())) + , m_mode(read_only), m_filename() +{} + +inline file_wrapper::~file_wrapper() +{ this->priv_close(); } + +inline const char *file_wrapper::get_name() const +{ return m_filename.c_str(); } + +inline bool file_wrapper::get_size(offset_t &size) const +{ return get_file_size((file_handle_t)m_handle, size); } + +inline void file_wrapper::swap(file_wrapper &other) +{ + (simple_swap)(m_handle, other.m_handle); + (simple_swap)(m_mode, other.m_mode); + m_filename.swap(other.m_filename); +} + +inline mapping_handle_t file_wrapper::get_mapping_handle() const +{ return mapping_handle_from_file_handle(m_handle); } + +inline mode_t file_wrapper::get_mode() const +{ return m_mode; } + +inline bool file_wrapper::priv_open_or_create + (ipcdetail::create_enum_t type, + const char *filename, + mode_t mode, + const permissions &perm = permissions()) +{ + m_filename = filename; + + if(mode != read_only && mode != read_write){ + error_info err(mode_error); + throw interprocess_exception(err); + } + + //Open file existing native API to obtain the handle + switch(type){ + case ipcdetail::DoOpen: + m_handle = open_existing_file(filename, mode); + break; + case ipcdetail::DoCreate: + m_handle = create_new_file(filename, mode, perm); + break; + case ipcdetail::DoOpenOrCreate: + m_handle = create_or_open_file(filename, mode, perm); + break; + default: + { + error_info err = other_error; + throw interprocess_exception(err); + } + } + + //Check for error + if(m_handle == invalid_file()){ + error_info err = system_error_code(); + throw interprocess_exception(err); + } + + m_mode = mode; + return true; +} + +inline bool file_wrapper::remove(const char *filename) +{ return delete_file(filename); } + +inline void file_wrapper::truncate(offset_t length) +{ + if(!truncate_file(m_handle, length)){ + error_info err(system_error_code()); + throw interprocess_exception(err); + } +} + +inline void file_wrapper::priv_close() +{ + if(m_handle != invalid_file()){ + close_file(m_handle); + m_handle = invalid_file(); + } +} + +} //namespace ipcdetail{ +} //namespace interprocess { +} //namespace boost { + +#include + +#endif //BOOST_INTERPROCESS_DETAIL_FILE_WRAPPER_HPP diff --git a/thirdparty/boost/include/boost/interprocess/file_mapping.hpp b/thirdparty/boost/include/boost/interprocess/file_mapping.hpp new file mode 100644 index 00000000000..0d1cf1fe418 --- /dev/null +++ b/thirdparty/boost/include/boost/interprocess/file_mapping.hpp @@ -0,0 +1,199 @@ +////////////////////////////////////////////////////////////////////////////// +// +// (C) Copyright Ion Gaztanaga 2005-2012. Distributed under the Boost +// Software License, Version 1.0. (See accompanying file +// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// See http://www.boost.org/libs/interprocess for documentation. +// +////////////////////////////////////////////////////////////////////////////// + +#ifndef BOOST_INTERPROCESS_FILE_MAPPING_HPP +#define BOOST_INTERPROCESS_FILE_MAPPING_HPP + +#ifndef BOOST_CONFIG_HPP +# include +#endif +# +#if defined(BOOST_HAS_PRAGMA_ONCE) +# pragma once +#endif + +#include +#include + +#if !defined(BOOST_INTERPROCESS_MAPPED_FILES) +#error "Boost.Interprocess: This platform does not support memory mapped files!" +#endif + +#include +#include +#include +#include +#include +#include +#include +#include //std::string + +//!\file +//!Describes file_mapping and mapped region classes + +namespace boost { +namespace interprocess { + +//!A class that wraps a file-mapping that can be used to +//!create mapped regions from the mapped files +class file_mapping +{ + #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED) + BOOST_MOVABLE_BUT_NOT_COPYABLE(file_mapping) + #endif //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED + + public: + //!Constructs an empty file mapping. + //!Does not throw + file_mapping(); + + //!Opens a file mapping of file "filename", starting in offset + //!"file_offset", and the mapping's size will be "size". The mapping + //!can be opened for read-only "read_only" or read-write "read_write" + //!modes. Throws interprocess_exception on error. + file_mapping(const char *filename, mode_t mode); + + //!Moves the ownership of "moved"'s file mapping object to *this. + //!After the call, "moved" does not represent any file mapping object. + //!Does not throw + file_mapping(BOOST_RV_REF(file_mapping) moved) + : m_handle(file_handle_t(ipcdetail::invalid_file())) + , m_mode(read_only) + { this->swap(moved); } + + //!Moves the ownership of "moved"'s file mapping to *this. + //!After the call, "moved" does not represent any file mapping. + //!Does not throw + file_mapping &operator=(BOOST_RV_REF(file_mapping) moved) + { + file_mapping tmp(boost::move(moved)); + this->swap(tmp); + return *this; + } + + //!Swaps to file_mappings. + //!Does not throw. + void swap(file_mapping &other); + + //!Returns access mode + //!used in the constructor + mode_t get_mode() const; + + //!Obtains the mapping handle + //!to be used with mapped_region + mapping_handle_t get_mapping_handle() const; + + //!Destroys the file mapping. All mapped regions created from this are still + //!valid. Does not throw + ~file_mapping(); + + //!Returns the name of the file + //!used in the constructor. + const char *get_name() const; + + //!Removes the file named "filename" even if it's been memory mapped. + //!Returns true on success. + //!The function might fail in some operating systems if the file is + //!being used other processes and no deletion permission was shared. + static bool remove(const char *filename); + + #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED) + private: + //!Closes a previously opened file mapping. Never throws. + void priv_close(); + file_handle_t m_handle; + mode_t m_mode; + std::string m_filename; + #endif //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED +}; + +inline file_mapping::file_mapping() + : m_handle(file_handle_t(ipcdetail::invalid_file())) + , m_mode(read_only) +{} + +inline file_mapping::~file_mapping() +{ this->priv_close(); } + +inline const char *file_mapping::get_name() const +{ return m_filename.c_str(); } + +inline void file_mapping::swap(file_mapping &other) +{ + (simple_swap)(m_handle, other.m_handle); + (simple_swap)(m_mode, other.m_mode); + m_filename.swap(other.m_filename); +} + +inline mapping_handle_t file_mapping::get_mapping_handle() const +{ return ipcdetail::mapping_handle_from_file_handle(m_handle); } + +inline mode_t file_mapping::get_mode() const +{ return m_mode; } + +inline file_mapping::file_mapping + (const char *filename, mode_t mode) + : m_filename(filename) +{ + //Check accesses + if (mode != read_write && mode != read_only){ + error_info err = other_error; + throw interprocess_exception(err); + } + + //Open file + m_handle = ipcdetail::open_existing_file(filename, mode); + + //Check for error + if(m_handle == ipcdetail::invalid_file()){ + error_info err = system_error_code(); + this->priv_close(); + throw interprocess_exception(err); + } + m_mode = mode; +} + +inline bool file_mapping::remove(const char *filename) +{ return ipcdetail::delete_file(filename); } + +#if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED) + +inline void file_mapping::priv_close() +{ + if(m_handle != ipcdetail::invalid_file()){ + ipcdetail::close_file(m_handle); + m_handle = ipcdetail::invalid_file(); + } +} + +//!A class that stores the name of a file +//!and tries to remove it in its destructor +//!Useful to remove temporary files in the presence +//!of exceptions +class remove_file_on_destroy +{ + const char * m_name; + public: + remove_file_on_destroy(const char *name) + : m_name(name) + {} + + ~remove_file_on_destroy() + { ipcdetail::delete_file(m_name); } +}; + +#endif //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED + +} //namespace interprocess { +} //namespace boost { + +#include + +#endif //BOOST_INTERPROCESS_FILE_MAPPING_HPP diff --git a/thirdparty/boost/include/boost/interprocess/managed_mapped_file.hpp b/thirdparty/boost/include/boost/interprocess/managed_mapped_file.hpp new file mode 100644 index 00000000000..14ec160614b --- /dev/null +++ b/thirdparty/boost/include/boost/interprocess/managed_mapped_file.hpp @@ -0,0 +1,250 @@ +////////////////////////////////////////////////////////////////////////////// +// +// (C) Copyright Ion Gaztanaga 2005-2012. Distributed under the Boost +// Software License, Version 1.0. (See accompanying file +// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// See http://www.boost.org/libs/interprocess for documentation. +// +////////////////////////////////////////////////////////////////////////////// + +#ifndef BOOST_INTERPROCESS_MANAGED_MAPPED_FILE_HPP +#define BOOST_INTERPROCESS_MANAGED_MAPPED_FILE_HPP + +#ifndef BOOST_CONFIG_HPP +# include +#endif +# +#if defined(BOOST_HAS_PRAGMA_ONCE) +# pragma once +#endif + +#include +#include +#include +#include +#include +#include +#include +#include +#include +//These includes needed to fulfill default template parameters of +//predeclarations in interprocess_fwd.hpp +#include +#include +#include + +namespace boost { +namespace interprocess { +namespace ipcdetail { + +template +struct mfile_open_or_create +{ + typedef ipcdetail::managed_open_or_create_impl + < file_wrapper, AllocationAlgorithm::Alignment, true, false> type; +}; + +} //namespace ipcdetail { + +//!A basic mapped file named object creation class. Initializes the +//!mapped file. Inherits all basic functionality from +//!basic_managed_memory_impl +template + < + class CharType, + class AllocationAlgorithm, + template class IndexType + > +class basic_managed_mapped_file + #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED) + : public ipcdetail::basic_managed_memory_impl + ::type::ManagedOpenOrCreateUserOffset> + #endif //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED +{ + #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED) + public: + typedef ipcdetail::basic_managed_memory_impl + ::type::ManagedOpenOrCreateUserOffset> base_t; + typedef ipcdetail::file_wrapper device_type; + + private: + + typedef ipcdetail::create_open_func create_open_func_t; + + basic_managed_mapped_file *get_this_pointer() + { return this; } + + private: + typedef typename base_t::char_ptr_holder_t char_ptr_holder_t; + BOOST_MOVABLE_BUT_NOT_COPYABLE(basic_managed_mapped_file) + #endif //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED + + public: //functions + + //!Unsigned integral type enough to represent + //!the size of a basic_managed_mapped_file. + typedef typename BOOST_INTERPROCESS_IMPDEF(base_t::size_type) size_type; + + //!Creates mapped file and creates and places the segment manager. + //!This can throw. + basic_managed_mapped_file() + {} + + //!Creates mapped file and creates and places the segment manager. + //!This can throw. + basic_managed_mapped_file(create_only_t, const char *name, + size_type size, const void *addr = 0, const permissions &perm = permissions()) + : m_mfile(create_only, name, size, read_write, addr, + create_open_func_t(get_this_pointer(), ipcdetail::DoCreate), perm) + {} + + //!Creates mapped file and creates and places the segment manager if + //!segment was not created. If segment was created it connects to the + //!segment. + //!This can throw. + basic_managed_mapped_file (open_or_create_t, + const char *name, size_type size, + const void *addr = 0, const permissions &perm = permissions()) + : m_mfile(open_or_create, name, size, read_write, addr, + create_open_func_t(get_this_pointer(), + ipcdetail::DoOpenOrCreate), perm) + {} + + //!Connects to a created mapped file and its segment manager. + //!This can throw. + basic_managed_mapped_file (open_only_t, const char* name, + const void *addr = 0) + : m_mfile(open_only, name, read_write, addr, + create_open_func_t(get_this_pointer(), + ipcdetail::DoOpen)) + {} + + //!Connects to a created mapped file and its segment manager + //!in copy_on_write mode. + //!This can throw. + basic_managed_mapped_file (open_copy_on_write_t, const char* name, + const void *addr = 0) + : m_mfile(open_only, name, copy_on_write, addr, + create_open_func_t(get_this_pointer(), + ipcdetail::DoOpen)) + {} + + //!Connects to a created mapped file and its segment manager + //!in read-only mode. + //!This can throw. + basic_managed_mapped_file (open_read_only_t, const char* name, + const void *addr = 0) + : m_mfile(open_only, name, read_only, addr, + create_open_func_t(get_this_pointer(), + ipcdetail::DoOpen)) + {} + + //!Moves the ownership of "moved"'s managed memory to *this. + //!Does not throw + basic_managed_mapped_file(BOOST_RV_REF(basic_managed_mapped_file) moved) + { + this->swap(moved); + } + + //!Moves the ownership of "moved"'s managed memory to *this. + //!Does not throw + basic_managed_mapped_file &operator=(BOOST_RV_REF(basic_managed_mapped_file) moved) + { + basic_managed_mapped_file tmp(boost::move(moved)); + this->swap(tmp); + return *this; + } + + //!Destroys *this and indicates that the calling process is finished using + //!the resource. The destructor function will deallocate + //!any system resources allocated by the system for use by this process for + //!this resource. The resource can still be opened again calling + //!the open constructor overload. To erase the resource from the system + //!use remove(). + ~basic_managed_mapped_file() + {} + + //!Swaps the ownership of the managed mapped memories managed by *this and other. + //!Never throws. + void swap(basic_managed_mapped_file &other) + { + base_t::swap(other); + m_mfile.swap(other.m_mfile); + } + + //!Flushes cached data to file. + //!Never throws + bool flush() + { return m_mfile.flush(); } + + //!Tries to resize mapped file so that we have room for + //!more objects. + //! + //!This function is not synchronized so no other thread or process should + //!be reading or writing the file + static bool grow(const char *filename, size_type extra_bytes) + { + return base_t::template grow + (filename, extra_bytes); + } + + //!Tries to resize mapped file to minimized the size of the file. + //! + //!This function is not synchronized so no other thread or process should + //!be reading or writing the file + static bool shrink_to_fit(const char *filename) + { + return base_t::template shrink_to_fit + (filename); + } + + #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED) + + //!Tries to find a previous named allocation address. Returns a memory + //!buffer and the object count. If not found returned pointer is 0. + //!Never throws. + template + std::pair find (char_ptr_holder_t name) + { + if(m_mfile.get_mapped_region().get_mode() == read_only){ + return base_t::template find_no_lock(name); + } + else{ + return base_t::template find(name); + } + } + + private: + typename ipcdetail::mfile_open_or_create::type m_mfile; + #endif //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED +}; + +#ifdef BOOST_INTERPROCESS_DOXYGEN_INVOKED + +//!Typedef for a default basic_managed_mapped_file +//!of narrow characters +typedef basic_managed_mapped_file + + ,iset_index> +managed_mapped_file; + +//!Typedef for a default basic_managed_mapped_file +//!of wide characters +typedef basic_managed_mapped_file + + ,iset_index> +wmanaged_mapped_file; + +#endif //#ifdef BOOST_INTERPROCESS_DOXYGEN_INVOKED + +} //namespace interprocess { +} //namespace boost { + +#include + +#endif //BOOST_INTERPROCESS_MANAGED_MAPPED_FILE_HPP From 10c165f2bff73c62c3fac9e7ab03f94c9398e1f3 Mon Sep 17 00:00:00 2001 From: Iker Luengo Date: Mon, 26 Jul 2021 13:59:42 +0200 Subject: [PATCH 02/11] Refs 12051. Testing for custom directories Signed-off-by: Iker Luengo --- test/blackbox/api/dds-pim/PubSubReader.hpp | 15 ++ test/blackbox/api/dds-pim/PubSubWriter.hpp | 8 + .../common/DDSBlackboxTestsDataSharing.cpp | 243 ++++++++++++++++-- 3 files changed, 242 insertions(+), 24 deletions(-) diff --git a/test/blackbox/api/dds-pim/PubSubReader.hpp b/test/blackbox/api/dds-pim/PubSubReader.hpp index 53257f0b7d4..a0c46b2ee51 100644 --- a/test/blackbox/api/dds-pim/PubSubReader.hpp +++ b/test/blackbox/api/dds-pim/PubSubReader.hpp @@ -399,6 +399,7 @@ class PubSubReader std::cout << "Created datareader " << datareader_->guid() << " for topic " << topic_name_ << std::endl; initialized_ = true; + datareader_guid_ = datareader_->guid(); } } @@ -1328,6 +1329,14 @@ class PubSubReader return *this; } + PubSubReader& datasharing_auto( + const std::string directory, + std::vector domain_id = std::vector()) + { + datareader_qos_.data_sharing().automatic(directory, domain_id); + return *this; + } + PubSubReader& datasharing_on( const std::string directory, std::vector domain_id = std::vector()) @@ -1527,6 +1536,11 @@ class PubSubReader return datareader_->get_statuscondition(); } + const eprosima::fastrtps::rtps::GUID_t& datareader_guid() const + { + return datareader_guid_; + } + protected: const eprosima::fastrtps::rtps::GUID_t& participant_guid() const @@ -1627,6 +1641,7 @@ class PubSubReader eprosima::fastdds::dds::StatusMask status_mask_; std::string topic_name_; eprosima::fastrtps::rtps::GUID_t participant_guid_; + eprosima::fastrtps::rtps::GUID_t datareader_guid_; bool initialized_; std::list total_msgs_; std::mutex mutex_; diff --git a/test/blackbox/api/dds-pim/PubSubWriter.hpp b/test/blackbox/api/dds-pim/PubSubWriter.hpp index 0624b0e9b2f..5df2bc430bc 100644 --- a/test/blackbox/api/dds-pim/PubSubWriter.hpp +++ b/test/blackbox/api/dds-pim/PubSubWriter.hpp @@ -1288,6 +1288,14 @@ class PubSubWriter return *this; } + PubSubWriter& datasharing_auto( + const std::string directory, + std::vector domain_id = std::vector()) + { + datawriter_qos_.data_sharing().automatic(directory, domain_id); + return *this; + } + PubSubWriter& datasharing_on( const std::string directory, std::vector domain_id = std::vector()) diff --git a/test/blackbox/common/DDSBlackboxTestsDataSharing.cpp b/test/blackbox/common/DDSBlackboxTestsDataSharing.cpp index d283e9f5cd4..f9fe5e94bd9 100644 --- a/test/blackbox/common/DDSBlackboxTestsDataSharing.cpp +++ b/test/blackbox/common/DDSBlackboxTestsDataSharing.cpp @@ -21,11 +21,27 @@ #include #include +#include +#include #include using namespace eprosima::fastrtps; using namespace eprosima::fastrtps::rtps; +bool check_shared_file ( + const char* shared_dir, + const eprosima::fastrtps::rtps::GUID_t& guid) +{ + bool result; + std::stringstream file_name; + std::fstream file_stream; + + file_name << shared_dir << "/fast_datasharing_" << guid.guidPrefix << "_" << guid.entityId; + file_stream.open(file_name.str(), std::ios::in); + result = file_stream.is_open(); + file_stream.close(); + return result; +} TEST(DDSDataSharing, BasicCommunication) { @@ -38,14 +54,14 @@ TEST(DDSDataSharing, BasicCommunication) reader.history_depth(100) .add_user_transport_to_pparams(testTransport) - .datasharing_on("Unused. change when ready") + .datasharing_on(".") .reliability(BEST_EFFORT_RELIABILITY_QOS).init(); ASSERT_TRUE(reader.isInitialized()); writer.history_depth(100) .add_user_transport_to_pparams(testTransport) - .datasharing_on("Unused. change when ready") + .datasharing_on(".") .reliability(BEST_EFFORT_RELIABILITY_QOS).init(); ASSERT_TRUE(writer.isInitialized()); @@ -55,8 +71,11 @@ TEST(DDSDataSharing, BasicCommunication) writer.wait_discovery(); reader.wait_discovery(); - auto data = default_fixed_sized_data_generator(); + // Check that the shared files are created on the correct directory + ASSERT_TRUE(check_shared_file(".", reader.datareader_guid())); + ASSERT_TRUE(check_shared_file(".", writer.datawriter_guid())); + auto data = default_fixed_sized_data_generator(); reader.startReception(data); // Send data @@ -65,6 +84,14 @@ TEST(DDSDataSharing, BasicCommunication) ASSERT_TRUE(data.empty()); // Block reader until reception finished or timeout. reader.block_for_all(); + + // Destroy reader and writer and see if there are dangling files + reader.destroy(); + writer.destroy(); + + // Check that the shared files are created on the correct directory + ASSERT_FALSE(check_shared_file(".", reader.datareader_guid())); + ASSERT_FALSE(check_shared_file(".", writer.datawriter_guid())); } @@ -82,11 +109,14 @@ TEST(DDSDataSharing, TransientReader) writer.history_depth(writer_history_depth) .add_user_transport_to_pparams(testTransport) - .datasharing_on("Unused. change when ready") + .datasharing_on(".") .reliability(BEST_EFFORT_RELIABILITY_QOS).init(); ASSERT_TRUE(writer.isInitialized()); + // Check that the shared files are created on the correct directory + ASSERT_TRUE(check_shared_file(".", writer.datawriter_guid())); + // Send the data to fill the history and overwrite old changes // The reader only receives the last changes std::list data = default_fixed_sized_data_generator(writer_sent_data); @@ -100,17 +130,28 @@ TEST(DDSDataSharing, TransientReader) reader.history_depth(writer_sent_data) .add_user_transport_to_pparams(testTransport) - .datasharing_on("Unused. change when ready") + .datasharing_on(".") .reliability(BEST_EFFORT_RELIABILITY_QOS) .durability_kind(TRANSIENT_LOCAL_DURABILITY_QOS).init(); ASSERT_TRUE(reader.isInitialized()); + // Check that the shared files are created on the correct directory + ASSERT_TRUE(check_shared_file(".", reader.datareader_guid())); + writer.wait_discovery(); reader.wait_discovery(); reader.startReception(received_data); reader.block_for_all(); + + // Destroy reader and writer and see if there are dangling files + reader.destroy(); + writer.destroy(); + + // Check that the shared files are created on the correct directory + ASSERT_FALSE(check_shared_file(".", reader.datareader_guid())); + ASSERT_FALSE(check_shared_file(".", writer.datawriter_guid())); } @@ -131,7 +172,7 @@ TEST(DDSDataSharing, BestEffortDirtyPayloads) writer.history_depth(writer_history_depth) .add_user_transport_to_pparams(testTransport) - .datasharing_on("Unused. change when ready") + .datasharing_on(".") .reliability(BEST_EFFORT_RELIABILITY_QOS) .resource_limits_extra_samples(1).init(); @@ -139,11 +180,15 @@ TEST(DDSDataSharing, BestEffortDirtyPayloads) read_reader.history_depth(writer_sent_data) .add_user_transport_to_pparams(testTransport) - .datasharing_on("Unused. change when ready") + .datasharing_on(".") .reliability(BEST_EFFORT_RELIABILITY_QOS).init(); ASSERT_TRUE(read_reader.isInitialized()); + // Check that the shared files are created on the correct directory + ASSERT_TRUE(check_shared_file(".", read_reader.datareader_guid())); + ASSERT_TRUE(check_shared_file(".", writer.datawriter_guid())); + writer.wait_discovery(); read_reader.wait_discovery(); @@ -160,6 +205,14 @@ TEST(DDSDataSharing, BestEffortDirtyPayloads) // The reader has overridden payloads in the history. Only the valid ones are returned to the user read_reader.startReception(valid_data); read_reader.block_for_all(); + + // Destroy reader and writer and see if there are dangling files + read_reader.destroy(); + writer.destroy(); + + // Check that the shared files are created on the correct directory + ASSERT_FALSE(check_shared_file(".", read_reader.datareader_guid())); + ASSERT_FALSE(check_shared_file(".", writer.datawriter_guid())); } TEST(DDSDataSharing, ReliableDirtyPayloads) @@ -179,7 +232,7 @@ TEST(DDSDataSharing, ReliableDirtyPayloads) writer.history_depth(writer_history_depth) .add_user_transport_to_pparams(testTransport) - .datasharing_on("Unused. change when ready") + .datasharing_on(".") .reliability(RELIABLE_RELIABILITY_QOS) .resource_limits_extra_samples(1).init(); @@ -187,11 +240,15 @@ TEST(DDSDataSharing, ReliableDirtyPayloads) read_reader.history_depth(writer_sent_data) .add_user_transport_to_pparams(testTransport) - .datasharing_on("Unused. change when ready") + .datasharing_on(".") .reliability(RELIABLE_RELIABILITY_QOS).init(); ASSERT_TRUE(read_reader.isInitialized()); + // Check that the shared files are created on the correct directory + ASSERT_TRUE(check_shared_file(".", read_reader.datareader_guid())); + ASSERT_TRUE(check_shared_file(".", writer.datawriter_guid())); + writer.wait_discovery(); read_reader.wait_discovery(); @@ -209,6 +266,14 @@ TEST(DDSDataSharing, ReliableDirtyPayloads) // but will keep them in the history. read_reader.startReception(valid_data); read_reader.block_for_all(); + + // Destroy reader and writer and see if there are dangling files + read_reader.destroy(); + writer.destroy(); + + // Check that the shared files are created on the correct directory + ASSERT_FALSE(check_shared_file(".", read_reader.datareader_guid())); + ASSERT_FALSE(check_shared_file(".", writer.datawriter_guid())); } TEST(DDSDataSharing, DataSharingWriter_DifferentDomainReaders) @@ -229,13 +294,13 @@ TEST(DDSDataSharing, DataSharingWriter_DifferentDomainReaders) writer.disable_builtin_transport() .add_user_transport_to_pparams(testTransport) - .datasharing_on("Unused. change when ready", writer_ids) + .datasharing_on(".", writer_ids) .resource_limits_extra_samples(5).init(); ASSERT_TRUE(writer.isInitialized()); datasharing_reader.disable_builtin_transport() .add_user_transport_to_pparams(testTransport) - .datasharing_on("Unused. change when ready", reader_ids).init(); + .datasharing_on(".", reader_ids).init(); ASSERT_TRUE(datasharing_reader.isInitialized()); non_datasharing_reader.disable_builtin_transport() @@ -245,9 +310,15 @@ TEST(DDSDataSharing, DataSharingWriter_DifferentDomainReaders) auto_reader.disable_builtin_transport() .add_user_transport_to_pparams(testTransport) - .datasharing_auto(reader_ids).init(); + .datasharing_auto(".", reader_ids).init(); ASSERT_TRUE(auto_reader.isInitialized()); + // Check that the shared files are created on the correct directory + ASSERT_TRUE(check_shared_file(".", datasharing_reader.datareader_guid())); + ASSERT_FALSE(check_shared_file(".", non_datasharing_reader.datareader_guid())); + ASSERT_TRUE(check_shared_file(".", auto_reader.datareader_guid())); + ASSERT_TRUE(check_shared_file(".", writer.datawriter_guid())); + writer.wait_discovery(3); datasharing_reader.wait_discovery(); non_datasharing_reader.wait_discovery(); @@ -264,6 +335,18 @@ TEST(DDSDataSharing, DataSharingWriter_DifferentDomainReaders) ASSERT_EQ(datasharing_reader.block_for_all(std::chrono::seconds(2)), 0u); ASSERT_EQ(non_datasharing_reader.block_for_all(std::chrono::seconds(2)), 0u); ASSERT_EQ(auto_reader.block_for_all(std::chrono::seconds(2)), 0u); + + // Destroy reader and writer and see if there are dangling files + datasharing_reader.destroy(); + non_datasharing_reader.destroy(); + auto_reader.destroy(); + writer.destroy(); + + // Check that the shared files are created on the correct directory + ASSERT_FALSE(check_shared_file(".", datasharing_reader.datareader_guid())); + ASSERT_FALSE(check_shared_file(".", non_datasharing_reader.datareader_guid())); + ASSERT_FALSE(check_shared_file(".", auto_reader.datareader_guid())); + ASSERT_FALSE(check_shared_file(".", writer.datawriter_guid())); } TEST(DDSDataSharing, DataSharingWriter_CommonDomainReaders) @@ -286,13 +369,13 @@ TEST(DDSDataSharing, DataSharingWriter_CommonDomainReaders) writer.disable_builtin_transport() .add_user_transport_to_pparams(testTransport) - .datasharing_on("Unused. change when ready", writer_ids) + .datasharing_on(".", writer_ids) .resource_limits_extra_samples(5).init(); ASSERT_TRUE(writer.isInitialized()); datasharing_reader.disable_builtin_transport() .add_user_transport_to_pparams(testTransport) - .datasharing_on("Unused. change when ready", reader_ids).init(); + .datasharing_on(".", reader_ids).init(); ASSERT_TRUE(datasharing_reader.isInitialized()); non_datasharing_reader.disable_builtin_transport() @@ -302,9 +385,15 @@ TEST(DDSDataSharing, DataSharingWriter_CommonDomainReaders) auto_reader.disable_builtin_transport() .add_user_transport_to_pparams(testTransport) - .datasharing_auto(reader_ids).init(); + .datasharing_auto(".", reader_ids).init(); ASSERT_TRUE(auto_reader.isInitialized()); + // Check that the shared files are created on the correct directory + ASSERT_TRUE(check_shared_file(".", datasharing_reader.datareader_guid())); + ASSERT_FALSE(check_shared_file(".", non_datasharing_reader.datareader_guid())); + ASSERT_TRUE(check_shared_file(".", auto_reader.datareader_guid())); + ASSERT_TRUE(check_shared_file(".", writer.datawriter_guid())); + writer.wait_discovery(3); datasharing_reader.wait_discovery(); non_datasharing_reader.wait_discovery(); @@ -321,6 +410,18 @@ TEST(DDSDataSharing, DataSharingWriter_CommonDomainReaders) ASSERT_EQ(non_datasharing_reader.block_for_all(std::chrono::seconds(2)), 0u); auto_reader.block_for_all(); auto_reader.block_for_all(); + + // Destroy reader and writer and see if there are dangling files + datasharing_reader.destroy(); + non_datasharing_reader.destroy(); + auto_reader.destroy(); + writer.destroy(); + + // Check that the shared files are created on the correct directory + ASSERT_FALSE(check_shared_file(".", datasharing_reader.datareader_guid())); + ASSERT_FALSE(check_shared_file(".", non_datasharing_reader.datareader_guid())); + ASSERT_FALSE(check_shared_file(".", auto_reader.datareader_guid())); + ASSERT_FALSE(check_shared_file(".", writer.datawriter_guid())); } TEST(DDSDataSharing, DataSharingReader_DifferentDomainWriters) @@ -341,7 +442,7 @@ TEST(DDSDataSharing, DataSharingReader_DifferentDomainWriters) datasharing_writer.disable_builtin_transport() .add_user_transport_to_pparams(testTransport) - .datasharing_on("Unused. change when ready", writer_ids) + .datasharing_on(".", writer_ids) .resource_limits_extra_samples(5).init(); ASSERT_TRUE(datasharing_writer.isInitialized()); @@ -352,15 +453,21 @@ TEST(DDSDataSharing, DataSharingReader_DifferentDomainWriters) auto_writer.disable_builtin_transport() .add_user_transport_to_pparams(testTransport) - .datasharing_auto(writer_ids) + .datasharing_auto(".", writer_ids) .resource_limits_extra_samples(5).init(); ASSERT_TRUE(datasharing_writer.isInitialized()); reader.disable_builtin_transport() .add_user_transport_to_pparams(testTransport) - .datasharing_on("Unused. change when ready", reader_ids).init(); + .datasharing_on(".", reader_ids).init(); ASSERT_TRUE(reader.isInitialized()); + // Check that the shared files are created on the correct directory + ASSERT_TRUE(check_shared_file(".", datasharing_writer.datawriter_guid())); + ASSERT_FALSE(check_shared_file(".", non_datasharing_writer.datawriter_guid())); + ASSERT_TRUE(check_shared_file(".", auto_writer.datawriter_guid())); + ASSERT_TRUE(check_shared_file(".", reader.datareader_guid())); + reader.wait_discovery(std::chrono::seconds::zero(), 3); datasharing_writer.wait_discovery(); non_datasharing_writer.wait_discovery(); @@ -386,6 +493,18 @@ TEST(DDSDataSharing, DataSharingReader_DifferentDomainWriters) auto_writer.send(data); ASSERT_TRUE(data.empty()); ASSERT_EQ(reader.block_for_all(std::chrono::seconds(2)), 0u); + + // Destroy reader and writer and see if there are dangling files + reader.destroy(); + datasharing_writer.destroy(); + non_datasharing_writer.destroy(); + auto_writer.destroy(); + + // Check that the shared files are created on the correct directory + ASSERT_FALSE(check_shared_file(".", reader.datareader_guid())); + ASSERT_FALSE(check_shared_file(".", datasharing_writer.datawriter_guid())); + ASSERT_FALSE(check_shared_file(".", non_datasharing_writer.datawriter_guid())); + ASSERT_FALSE(check_shared_file(".", auto_writer.datawriter_guid())); } TEST(DDSDataSharing, DataSharingReader_CommonDomainWriters) @@ -408,7 +527,7 @@ TEST(DDSDataSharing, DataSharingReader_CommonDomainWriters) datasharing_writer.disable_builtin_transport() .add_user_transport_to_pparams(testTransport) - .datasharing_on("Unused. change when ready", writer_ids) + .datasharing_on(".", writer_ids) .resource_limits_extra_samples(5).init(); ASSERT_TRUE(datasharing_writer.isInitialized()); @@ -419,15 +538,21 @@ TEST(DDSDataSharing, DataSharingReader_CommonDomainWriters) auto_writer.disable_builtin_transport() .add_user_transport_to_pparams(testTransport) - .datasharing_auto(writer_ids) + .datasharing_auto(".", writer_ids) .resource_limits_extra_samples(5).init(); ASSERT_TRUE(datasharing_writer.isInitialized()); reader.disable_builtin_transport() .add_user_transport_to_pparams(testTransport) - .datasharing_on("Unused. change when ready", reader_ids).init(); + .datasharing_on(".", reader_ids).init(); ASSERT_TRUE(reader.isInitialized()); + // Check that the shared files are created on the correct directory + ASSERT_TRUE(check_shared_file(".", datasharing_writer.datawriter_guid())); + ASSERT_FALSE(check_shared_file(".", non_datasharing_writer.datawriter_guid())); + ASSERT_TRUE(check_shared_file(".", auto_writer.datawriter_guid())); + ASSERT_TRUE(check_shared_file(".", reader.datareader_guid())); + reader.wait_discovery(std::chrono::seconds::zero(), 3); datasharing_writer.wait_discovery(); non_datasharing_writer.wait_discovery(); @@ -453,6 +578,18 @@ TEST(DDSDataSharing, DataSharingReader_CommonDomainWriters) auto_writer.send(data); ASSERT_TRUE(data.empty()); reader.block_for_all(); + + // Destroy reader and writer and see if there are dangling files + reader.destroy(); + datasharing_writer.destroy(); + non_datasharing_writer.destroy(); + auto_writer.destroy(); + + // Check that the shared files are created on the correct directory + ASSERT_FALSE(check_shared_file(".", reader.datareader_guid())); + ASSERT_FALSE(check_shared_file(".", datasharing_writer.datawriter_guid())); + ASSERT_FALSE(check_shared_file(".", non_datasharing_writer.datawriter_guid())); + ASSERT_FALSE(check_shared_file(".", auto_writer.datawriter_guid())); } @@ -465,20 +602,25 @@ TEST(DDSDataSharing, DataSharingPoolError) writer_datasharing.resource_limits_max_samples(100000) .history_kind(eprosima::fastrtps::KEEP_ALL_HISTORY_QOS) .reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS) - .datasharing_on("Unused. change when ready").init(); + .datasharing_on(".").init(); ASSERT_FALSE(writer_datasharing.isInitialized()); writer_auto.resource_limits_max_samples(100000) .history_kind(eprosima::fastrtps::KEEP_ALL_HISTORY_QOS) .reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS) - .datasharing_auto().init(); + .datasharing_auto(".").init(); ASSERT_TRUE(writer_auto.isInitialized()); - reader.datasharing_on("Unused. change when ready") + reader.datasharing_on(".") .history_depth(10) .reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS).init(); ASSERT_TRUE(reader.isInitialized()); + // Check that the shared files are created on the correct directory + ASSERT_FALSE(check_shared_file(".", writer_datasharing.datawriter_guid())); + ASSERT_FALSE(check_shared_file(".", writer_auto.datawriter_guid())); + ASSERT_TRUE(check_shared_file(".", reader.datareader_guid())); + reader.wait_discovery(); writer_auto.wait_discovery(); @@ -488,4 +630,57 @@ TEST(DDSDataSharing, DataSharingPoolError) writer_auto.send(data); ASSERT_TRUE(data.empty()); reader.block_for_all(); + + // Destroy reader and writer and see if there are dangling files + reader.destroy(); + writer_datasharing.destroy(); + writer_auto.destroy(); + + // Check that the shared files are created on the correct directory + ASSERT_FALSE(check_shared_file(".", reader.datareader_guid())); + ASSERT_FALSE(check_shared_file(".", writer_datasharing.datawriter_guid())); + ASSERT_FALSE(check_shared_file(".", writer_auto.datawriter_guid())); } + + +TEST(DDSDataSharing, DataSharingDefaultDirectory) +{ + // Since the default directory heavily depends on the system, + // we are not checking the creation of the files in this case, + // only that it is working. + PubSubReader reader(TEST_TOPIC_NAME); + PubSubWriter writer(TEST_TOPIC_NAME); + + // Disable transports to ensure we are using datasharing + auto testTransport = std::make_shared(); + testTransport->dropDataMessagesPercentage = 100; + + reader.history_depth(100) + .add_user_transport_to_pparams(testTransport) + .datasharing_auto() + .reliability(BEST_EFFORT_RELIABILITY_QOS).init(); + + ASSERT_TRUE(reader.isInitialized()); + + writer.history_depth(100) + .add_user_transport_to_pparams(testTransport) + .datasharing_auto() + .reliability(BEST_EFFORT_RELIABILITY_QOS).init(); + + ASSERT_TRUE(writer.isInitialized()); + + // Because its volatile the durability + // Wait for discovery. + writer.wait_discovery(); + reader.wait_discovery(); + + auto data = default_fixed_sized_data_generator(); + reader.startReception(data); + + // Send data + writer.send(data); + // In this test all data should be sent. + ASSERT_TRUE(data.empty()); + // Block reader until reception finished or timeout. + reader.block_for_all(); +} \ No newline at end of file From 134f6945b8a1d4a009652c693a1e23cceb2f5814 Mon Sep 17 00:00:00 2001 From: Iker Luengo Date: Mon, 26 Jul 2021 14:06:09 +0200 Subject: [PATCH 03/11] Refs 12051. Define file and memory shared segments Define two new classes: - SharedMemSegment is the same as before, using the BOOST shared memory. - SharedFileSegment uses BOOST memory mapped files. They both are specializations of a template class SharedSegment, which has a base class SharedSegmentBase. The idea is for DataSharing objects to hold a pointer to SharedSegmentBase that will be initialized as SharedMemSegment or SharedFileSegment depending on the configuration. At this point no changes have been done to the users of these classes, as they are already using SharedMemSegment Signed-off-by: Iker Luengo --- .../utils/shared_memory/SharedMemSegment.hpp | 309 ++++++++++-------- 1 file changed, 172 insertions(+), 137 deletions(-) diff --git a/src/cpp/utils/shared_memory/SharedMemSegment.hpp b/src/cpp/utils/shared_memory/SharedMemSegment.hpp index 3a302b0613c..81b1703fae8 100644 --- a/src/cpp/utils/shared_memory/SharedMemSegment.hpp +++ b/src/cpp/utils/shared_memory/SharedMemSegment.hpp @@ -26,6 +26,7 @@ #endif // if defined(__GNUC__) && ( __GNUC__ >= 9) #include +#include #if defined (__GNUC__) && ( __GNUC__ >= 9) #pragma GCC diagnostic pop @@ -52,7 +53,7 @@ using Log = fastdds::dds::Log; * Provides shared memory functionallity abstrating from * lower level layers */ -class SharedMemSegment +class SharedSegmentBase { public: @@ -60,19 +61,13 @@ class SharedMemSegment using sharable_lock = boost::interprocess::sharable_lock; using sharable_mutex = boost::interprocess::interprocess_sharable_mutex; - typedef RobustInterprocessCondition condition_variable; - typedef boost::interprocess::interprocess_mutex mutex; - typedef boost::interprocess::named_mutex named_mutex; - typedef boost::interprocess::spin_wait spin_wait; + using condition_variable = RobustInterprocessCondition; + using mutex = boost::interprocess::interprocess_mutex; + using named_mutex = boost::interprocess::named_mutex; + using spin_wait = boost::interprocess::spin_wait; // Offset must be the same size for 32/64-bit versions, so no size_t used here. - typedef std::uint32_t Offset; - typedef boost::interprocess::offset_ptr VoidPointerT; - typedef boost::interprocess::basic_managed_shared_memory< - char, - boost::interprocess::rbtree_best_fit, - boost::interprocess::iset_index> managed_shared_memory_type; + using Offset = std::uint32_t; static constexpr boost::interprocess::open_only_t open_only = boost::interprocess::open_only_t(); static constexpr boost::interprocess::create_only_t create_only = boost::interprocess::create_only_t(); @@ -83,57 +78,14 @@ class SharedMemSegment // TODO(Adolfo): Further analysis to determine the perfect value for this extra segment size static constexpr uint32_t EXTRA_SEGMENT_SIZE = 512; - SharedMemSegment( - boost::interprocess::create_only_t, - const std::string& name, - size_t size) - : name_(name) - { - segment_ = std::unique_ptr( - new managed_shared_memory_type(boost::interprocess::create_only, name.c_str(), - static_cast(size + EXTRA_SEGMENT_SIZE))); - } - - SharedMemSegment( - boost::interprocess::open_only_t, + explicit SharedSegmentBase( const std::string& name) : name_(name) { - segment_ = std::unique_ptr( - new managed_shared_memory_type(boost::interprocess::open_only, name.c_str())); - } - - SharedMemSegment( - boost::interprocess::open_or_create_t, - const std::string& name, - size_t size) - : name_(name) - { - segment_ = std::unique_ptr( - new managed_shared_memory_type(boost::interprocess::create_only, name.c_str(), static_cast(size))); - } - - ~SharedMemSegment() - { - // no need of exception handling cause never throws - segment_.reset(); - } - - void* get_address_from_offset( - SharedMemSegment::Offset offset) const - { - return segment_->get_address_from_handle(offset); - } - - SharedMemSegment::Offset get_offset_from_address( - void* address) const - { - return segment_->get_handle_from_address(address); } - managed_shared_memory_type& get() + virtual ~SharedSegmentBase() { - return *segment_; } static void remove( @@ -147,60 +99,19 @@ class SharedMemSegment return name_; } - /** - * Estimates the extra segment space required for an allocation - */ - static uint32_t compute_per_allocation_extra_size( - size_t allocation_alignment, - const std::string& domain_name) - { - Id uuid; - - try - { - static uint32_t extra_size = 0; + virtual void* get_address_from_offset( + SharedSegmentBase::Offset offset) const = 0; - if (extra_size == 0) - { - uuid.generate(); + virtual SharedSegmentBase::Offset get_offset_from_address( + void* address) const = 0; - auto name = domain_name + "_" + uuid.to_string(); - - SharedMemEnvironment::get().init(); - - { - boost::interprocess::managed_shared_memory - test_segment(boost::interprocess::create_only, name.c_str(), - (std::max)((size_t)1024, allocation_alignment * 4)); - - auto m1 = test_segment.get_free_memory(); - test_segment.allocate_aligned(1, allocation_alignment); - auto m2 = test_segment.get_free_memory(); - extra_size = static_cast(m1 - m2); - } - - boost::interprocess::shared_memory_object::remove(name.c_str()); - } - - return extra_size; - - } - catch (const std::exception& e) - { - logError(RTPS_TRANSPORT_SHM, "Failed to create segment " << uuid.to_string() - << ": " << e.what()); - - throw; - } - } - - static std::unique_ptr open_or_create_and_lock_named_mutex( + static std::unique_ptr open_or_create_and_lock_named_mutex( const std::string& mutex_name) { - std::unique_ptr named_mutex; + std::unique_ptr named_mutex; - named_mutex = std::unique_ptr( - new SharedMemSegment::named_mutex(boost::interprocess::open_or_create, mutex_name.c_str())); + named_mutex = std::unique_ptr( + new SharedSegmentBase::named_mutex(boost::interprocess::open_or_create, mutex_name.c_str())); boost::posix_time::ptime wait_time = boost::posix_time::microsec_clock::universal_time() @@ -209,10 +120,10 @@ class SharedMemSegment { // Interprocess mutex timeout when locking. Possible deadlock: owner died without unlocking? // try to remove and create again - SharedMemSegment::named_mutex::remove(mutex_name.c_str()); + SharedSegmentBase::named_mutex::remove(mutex_name.c_str()); - named_mutex = std::unique_ptr( - new SharedMemSegment::named_mutex(boost::interprocess::open_or_create, mutex_name.c_str())); + named_mutex = std::unique_ptr( + new SharedSegmentBase::named_mutex(boost::interprocess::open_or_create, mutex_name.c_str())); if (!named_mutex->try_lock()) { @@ -223,13 +134,13 @@ class SharedMemSegment return named_mutex; } - static std::unique_ptr try_open_and_lock_named_mutex( + static std::unique_ptr try_open_and_lock_named_mutex( const std::string& mutex_name) { - std::unique_ptr named_mutex; + std::unique_ptr named_mutex; - named_mutex = std::unique_ptr( - new SharedMemSegment::named_mutex(boost::interprocess::open_only, mutex_name.c_str())); + named_mutex = std::unique_ptr( + new SharedSegmentBase::named_mutex(boost::interprocess::open_only, mutex_name.c_str())); boost::posix_time::ptime wait_time = boost::posix_time::microsec_clock::universal_time() @@ -242,38 +153,21 @@ class SharedMemSegment return named_mutex; } - static std::unique_ptr open_named_mutex( + static std::unique_ptr open_named_mutex( const std::string& mutex_name) { - std::unique_ptr named_mutex; + std::unique_ptr named_mutex; // Todo(Adolfo) : Dataraces could occur, this algorithm has to be improved - named_mutex = std::unique_ptr( - new SharedMemSegment::named_mutex(boost::interprocess::open_only, mutex_name.c_str())); + named_mutex = std::unique_ptr( + new SharedSegmentBase::named_mutex(boost::interprocess::open_only, mutex_name.c_str())); return named_mutex; } /** - * Check the allocator internal structures - * @return true if structures are ok, false otherwise - */ - bool check_sanity() - { - return segment_->check_sanity(); - } - - /** - * @return The segment's size in bytes, including internal structures overhead. - */ - Offset mem_size() const - { - return segment_->get_size(); - } - - /** - * Unique ID of the memory segment + * Unique ID of the segment */ class Id { @@ -350,11 +244,152 @@ class SharedMemSegment } shared_mem_environment_initializer_; - std::unique_ptr segment_; - std::string name_; }; +template +class SharedSegment : public SharedSegmentBase +{ +public: + + typedef T managed_shared_memory_type; + + SharedSegment( + boost::interprocess::create_only_t, + const std::string& name, + size_t size) + : SharedSegmentBase(name) + { + segment_ = std::unique_ptr( + new managed_shared_memory_type(boost::interprocess::create_only, name.c_str(), + static_cast(size + EXTRA_SEGMENT_SIZE))); + } + + SharedSegment( + boost::interprocess::open_only_t, + const std::string& name) + : SharedSegmentBase(name) + { + segment_ = std::unique_ptr( + new managed_shared_memory_type(boost::interprocess::open_only, name.c_str())); + } + + SharedSegment( + boost::interprocess::open_or_create_t, + const std::string& name, + size_t size) + : SharedSegmentBase(name) + { + segment_ = std::unique_ptr( + new managed_shared_memory_type(boost::interprocess::create_only, name.c_str(), static_cast(size))); + } + + ~SharedSegment() + { + // no need of exception handling cause never throws + segment_.reset(); + } + + void* get_address_from_offset( + SharedSegment::Offset offset) const override + { + return segment_->get_address_from_handle(offset); + } + + SharedSegment::Offset get_offset_from_address( + void* address) const override + { + return segment_->get_handle_from_address(address); + } + + managed_shared_memory_type& get() + { + return *segment_; + } + + /** + * Estimates the extra segment space required for an allocation + */ + static uint32_t compute_per_allocation_extra_size( + size_t allocation_alignment, + const std::string& domain_name) + { + Id uuid; + + try + { + static uint32_t extra_size = 0; + + if (extra_size == 0) + { + uuid.generate(); + + auto name = domain_name + "_" + uuid.to_string(); + + SharedMemEnvironment::get().init(); + + { + managed_shared_memory_type + test_segment(boost::interprocess::create_only, name.c_str(), + (std::max)((size_t)1024, allocation_alignment * 4)); + + auto m1 = test_segment.get_free_memory(); + test_segment.allocate_aligned(1, allocation_alignment); + auto m2 = test_segment.get_free_memory(); + extra_size = static_cast(m1 - m2); + } + + remove(name.c_str()); + } + + return extra_size; + + } + catch (const std::exception& e) + { + logError(RTPS_TRANSPORT_SHM, "Failed to create segment " << uuid.to_string() + << ": " << e.what()); + + throw; + } + } + + /** + * Check the allocator internal structures + * @return true if structures are ok, false otherwise + */ + bool check_sanity() + { + return segment_->check_sanity(); + } + + /** + * @return The segment's size in bytes, including internal structures overhead. + */ + Offset mem_size() const + { + return segment_->get_size(); + } + +private: + + std::unique_ptr segment_; +}; + +using SharedMemSegment = SharedSegment< + boost::interprocess::basic_managed_shared_memory< + char, + boost::interprocess::rbtree_best_fit>, + boost::interprocess::iset_index>>; + +using SharedFileSegment = SharedSegment< + boost::interprocess::basic_managed_mapped_file< + char, + boost::interprocess::rbtree_best_fit>, + boost::interprocess::iset_index>>; + } // namespace rtps } // namespace fastdds } // namespace eprosima From a4698ced448f9206f8d12c0b6c25e618e6006133 Mon Sep 17 00:00:00 2001 From: Iker Luengo Date: Mon, 26 Jul 2021 14:18:53 +0200 Subject: [PATCH 04/11] Refs 12051. Datasharing to use files if path configured Signed-off-by: Iker Luengo --- .../DataSharing/DataSharingNotification.cpp | 105 ++++-------------- .../DataSharing/DataSharingNotification.hpp | 92 ++++++++++++++- .../DataSharing/DataSharingPayloadPool.hpp | 9 +- src/cpp/rtps/DataSharing/ReaderPool.hpp | 33 ++++-- src/cpp/rtps/DataSharing/WriterPool.hpp | 37 ++++-- 5 files changed, 168 insertions(+), 108 deletions(-) diff --git a/src/cpp/rtps/DataSharing/DataSharingNotification.cpp b/src/cpp/rtps/DataSharing/DataSharingNotification.cpp index 7a0023f5883..79bd242d162 100644 --- a/src/cpp/rtps/DataSharing/DataSharingNotification.cpp +++ b/src/cpp/rtps/DataSharing/DataSharingNotification.cpp @@ -31,7 +31,17 @@ std::shared_ptr DataSharingNotification::create_notific const std::string& shared_dir) { std::shared_ptr notification = std::make_shared(); - if (!notification->create_and_init_notification(reader_guid, shared_dir)) + bool create_result = false; + if (shared_dir.empty()) + { + create_result = notification->create_and_init_notification(reader_guid, shared_dir); + } + else + { + create_result = notification->create_and_init_notification(reader_guid, shared_dir); + } + + if (!create_result) { notification.reset(); } @@ -43,7 +53,17 @@ std::shared_ptr DataSharingNotification::open_notificat const std::string& shared_dir) { std::shared_ptr notification = std::make_shared(); - if (!notification->open_and_init_notification(writer_guid, shared_dir)) + bool open_result = false; + if (shared_dir.empty()) + { + open_result = notification->open_and_init_notification(writer_guid, shared_dir); + } + else + { + open_result = notification->open_and_init_notification(writer_guid, shared_dir); + } + + if (!open_result) { notification.reset(); } @@ -66,87 +86,6 @@ void DataSharingNotification::destroy() } } -bool DataSharingNotification::create_and_init_notification( - const GUID_t& reader_guid, - const std::string& shared_dir) -{ - segment_id_ = reader_guid; - segment_name_ = generate_segment_name(shared_dir, reader_guid); - - uint32_t per_allocation_extra_size = Segment::compute_per_allocation_extra_size( - alignof(Notification), DataSharingNotification::domain_name()); - uint32_t segment_size = static_cast(sizeof(Notification)) + per_allocation_extra_size; - - //Open the segment - Segment::remove(segment_name_); - try - { - segment_ = std::unique_ptr( - new Segment(boost::interprocess::create_only, - segment_name_, - segment_size + fastdds::rtps::SharedMemSegment::EXTRA_SEGMENT_SIZE)); - } - catch (const std::exception& e) - { - logError(HISTORY_DATASHARING_LISTENER, "Failed to create segment " << segment_name_ - << ": " << e.what()); - return false; - } - - try - { - // Alloc and initialize the Node - notification_ = segment_->get().construct("notification_node")(); - notification_->new_data.store(false); - } - catch (std::exception& e) - { - Segment::remove(segment_name_); - - logError(HISTORY_DATASHARING_LISTENER, "Failed to create listener queue " << segment_name_ - << ": " << e.what()); - return false; - } - - owned_ = true; - return true; -} - -bool DataSharingNotification::open_and_init_notification( - const GUID_t& reader_guid, - const std::string& shared_dir) -{ - segment_id_ = reader_guid; - segment_name_ = generate_segment_name(shared_dir, reader_guid); - - //Open the segment - try - { - segment_ = std::unique_ptr( - new Segment(boost::interprocess::open_only, - segment_name_.c_str())); - } - catch (const std::exception& e) - { - logError(HISTORY_DATASHARING_LISTENER, "Failed to open segment " << segment_name_ - << ": " << e.what()); - return false; - } - - // Initialize values from the segment - notification_ = segment_->get().find( - "notification_node").first; - if (!notification_) - { - segment_.reset(); - - logError(HISTORY_DATASHARING_LISTENER, "Failed to open listener queue " << segment_name_); - return false; - } - - return true; -} - } // namespace rtps } // namespace fastrtps } // namespace eprosima diff --git a/src/cpp/rtps/DataSharing/DataSharingNotification.hpp b/src/cpp/rtps/DataSharing/DataSharingNotification.hpp index 78af4fde31b..4a9a7f33b39 100644 --- a/src/cpp/rtps/DataSharing/DataSharingNotification.hpp +++ b/src/cpp/rtps/DataSharing/DataSharingNotification.hpp @@ -42,7 +42,7 @@ class DataSharingNotification public: - typedef fastdds::rtps::SharedMemSegment Segment; + typedef fastdds::rtps::SharedSegmentBase Segment; DataSharingNotification() = default; @@ -107,22 +107,104 @@ class DataSharingNotification #pragma warning(pop) static std::string generate_segment_name( - const std::string& /*shared_dir*/, + const std::string& shared_dir, const GUID_t& reader_guid) { std::stringstream ss; + if (!shared_dir.empty()) + { + ss << shared_dir << "/"; + } ss << DataSharingNotification::domain_name() << "_" << reader_guid.guidPrefix << "_" << reader_guid.entityId; return ss.str(); } + template bool create_and_init_notification( const GUID_t& reader_guid, - const std::string& shared_dir = std::string()); - + const std::string& shared_dir) + { + segment_id_ = reader_guid; + segment_name_ = generate_segment_name(shared_dir, reader_guid); + + size_t per_allocation_extra_size = T::compute_per_allocation_extra_size( + alignof(Notification), DataSharingNotification::domain_name()); + uint32_t segment_size = static_cast(sizeof(Notification)) + per_allocation_extra_size; + + //Open the segment + remove(segment_name_); + std::unique_ptr local_segment; + try + { + local_segment = std::unique_ptr( + new T(boost::interprocess::create_only, + segment_name_, + segment_size + T::EXTRA_SEGMENT_SIZE)); + } + catch (const std::exception& e) + { + logError(HISTORY_DATASHARING_LISTENER, "Failed to create segment " << segment_name_ + << ": " << e.what()); + return false; + } + + try + { + // Alloc and initialize the Node + notification_ = local_segment->get().template construct("notification_node")(); + notification_->new_data.store(false); + } + catch (std::exception& e) + { + remove(segment_name_); + + logError(HISTORY_DATASHARING_LISTENER, "Failed to create listener queue " << segment_name_ + << ": " << e.what()); + return false; + } + + segment_ = std::move(local_segment); + owned_ = true; + return true; + } + template bool open_and_init_notification( const GUID_t& reader_guid, - const std::string& shared_dir = std::string()); + const std::string& shared_dir) + { + segment_id_ = reader_guid; + segment_name_ = generate_segment_name(shared_dir, reader_guid); + + //Open the segment + std::unique_ptr local_segment; + try + { + local_segment = std::unique_ptr( + new T(boost::interprocess::open_only, + segment_name_.c_str())); + } + catch (const std::exception& e) + { + logError(HISTORY_DATASHARING_LISTENER, "Failed to open segment " << segment_name_ + << ": " << e.what()); + return false; + } + + // Initialize values from the segment + notification_ = (local_segment->get().template find( + "notification_node")).first; + if (!notification_) + { + local_segment.reset(); + + logError(HISTORY_DATASHARING_LISTENER, "Failed to open listener queue " << segment_name_); + return false; + } + + segment_ = std::move(local_segment); + return true; + } GUID_t segment_id_; //< The ID of the segment is the GUID of the reader diff --git a/src/cpp/rtps/DataSharing/DataSharingPayloadPool.hpp b/src/cpp/rtps/DataSharing/DataSharingPayloadPool.hpp index ec912ee8040..5b3dc66c7e3 100644 --- a/src/cpp/rtps/DataSharing/DataSharingPayloadPool.hpp +++ b/src/cpp/rtps/DataSharing/DataSharingPayloadPool.hpp @@ -43,7 +43,7 @@ class DataSharingPayloadPool : public IPayloadPool public: - using Segment = fastdds::rtps::SharedMemSegment; + using Segment = fastdds::rtps::SharedSegmentBase; using sharable_mutex = Segment::sharable_mutex; template using sharable_lock = Segment::sharable_lock; @@ -348,10 +348,15 @@ class DataSharingPayloadPool : public IPayloadPool #pragma warning(pop) static std::string generate_segment_name( - const std::string& /*shared_dir*/, + const std::string& shared_dir, const GUID_t& writer_guid) { std::stringstream ss; + if (!shared_dir.empty()) + { + ss << shared_dir << "/"; + } + ss << DataSharingPayloadPool::domain_name() << "_" << writer_guid.guidPrefix << "_" << writer_guid.entityId; return ss.str(); } diff --git a/src/cpp/rtps/DataSharing/ReaderPool.hpp b/src/cpp/rtps/DataSharing/ReaderPool.hpp index d15b3e7bfd4..f779c565c37 100644 --- a/src/cpp/rtps/DataSharing/ReaderPool.hpp +++ b/src/cpp/rtps/DataSharing/ReaderPool.hpp @@ -81,18 +81,20 @@ class ReaderPool : public DataSharingPayloadPool return DataSharingPayloadPool::release_payload(cache_change); } - bool init_shared_memory( + template + bool init_shared_segment( const GUID_t& writer_guid, - const std::string& shared_dir) override + const std::string& shared_dir) { segment_id_ = writer_guid; segment_name_ = generate_segment_name(shared_dir, writer_guid); + std::unique_ptr local_segment; // Open the segment try { - segment_ = std::unique_ptr( - new fastdds::rtps::SharedMemSegment(boost::interprocess::open_only, + local_segment = std::unique_ptr( + new T(boost::interprocess::open_only, segment_name_.c_str())); } catch (const std::exception& e) @@ -103,20 +105,20 @@ class ReaderPool : public DataSharingPayloadPool } // Get the pool description - descriptor_ = segment_->get().find(descriptor_chunk_name()).first; + descriptor_ = local_segment->get().template find(descriptor_chunk_name()).first; if (!descriptor_) { - segment_.reset(); + local_segment.reset(); logError(HISTORY_DATASHARING_PAYLOADPOOL, "Failed to open payload pool descriptor " << segment_name_); return false; } // Get the history - history_ = segment_->get().find(history_chunk_name()).first; + history_ = local_segment->get().template find(history_chunk_name()).first; if (!history_) { - segment_.reset(); + local_segment.reset(); logError(HISTORY_DATASHARING_PAYLOADPOOL, "Failed to open payload history " << segment_name_); return false; @@ -132,9 +134,24 @@ class ReaderPool : public DataSharingPayloadPool next_payload_ = begin(); } + segment_ = std::move(local_segment); return true; } + bool init_shared_memory( + const GUID_t& writer_guid, + const std::string& shared_dir) override + { + if (shared_dir.empty()) + { + return init_shared_segment(writer_guid, shared_dir); + } + else + { + return init_shared_segment(writer_guid, shared_dir); + } + } + void get_next_unread_payload( CacheChange_t& cache_change, SequenceNumber_t& last_sequence_number) diff --git a/src/cpp/rtps/DataSharing/WriterPool.hpp b/src/cpp/rtps/DataSharing/WriterPool.hpp index ec11ce9ec11..5c1f698a7ca 100644 --- a/src/cpp/rtps/DataSharing/WriterPool.hpp +++ b/src/cpp/rtps/DataSharing/WriterPool.hpp @@ -136,9 +136,11 @@ class WriterPool : public DataSharingPayloadPool return DataSharingPayloadPool::release_payload(cache_change); } - bool init_shared_memory( + + template + bool init_shared_segment( const RTPSWriter* writer, - const std::string& shared_dir) override + const std::string& shared_dir) { writer_ = writer; segment_id_ = writer_->getGuid(); @@ -147,8 +149,7 @@ class WriterPool : public DataSharingPayloadPool // We need to reserve the whole segment at once, and the underlying classes use uint32_t as size type. // In order to avoid overflows, we will calculate using uint64 and check the casting bool overflow = false; - - size_t per_allocation_extra_size = fastdds::rtps::SharedMemSegment::compute_per_allocation_extra_size( + size_t per_allocation_extra_size = T::compute_per_allocation_extra_size( alignof(PayloadNode), DataSharingPayloadPool::domain_name()); size_t payload_size = DataSharingPayloadPool::node_size(max_data_size_); @@ -179,12 +180,13 @@ class WriterPool : public DataSharingPayloadPool //Open the segment fastdds::rtps::SharedMemSegment::remove(segment_name_); + std::unique_ptr local_segment; try { - segment_ = std::unique_ptr( - new Segment(boost::interprocess::create_only, + local_segment = std::unique_ptr( + new T(boost::interprocess::create_only, segment_name_, - segment_size + fastdds::rtps::SharedMemSegment::EXTRA_SEGMENT_SIZE)); + segment_size + T::EXTRA_SEGMENT_SIZE)); } catch (const std::exception& e) { @@ -198,7 +200,7 @@ class WriterPool : public DataSharingPayloadPool // Alloc the memory for the pool // Cannot use 'construct' because we need to reserve extra space for the data, // which is not considered in sizeof(PayloadNode). - payloads_pool_ = static_cast(segment_->get().allocate(size_for_payloads_pool)); + payloads_pool_ = static_cast(local_segment->get().allocate(size_for_payloads_pool)); // Initialize each node in the pool free_payloads_.init(pool_size_); @@ -214,10 +216,10 @@ class WriterPool : public DataSharingPayloadPool } //Alloc the memory for the history - history_ = segment_->get().construct(history_chunk_name())[pool_size_ + 1](); + history_ = local_segment->get().template construct(history_chunk_name())[pool_size_ + 1](); //Alloc the memory for the descriptor - descriptor_ = segment_->get().construct(descriptor_chunk_name())(); + descriptor_ = local_segment->get().template construct(descriptor_chunk_name())(); // Initialize the data in the descriptor descriptor_->history_size = pool_size_ + 1; @@ -236,9 +238,24 @@ class WriterPool : public DataSharingPayloadPool return false; } + segment_ = std::move(local_segment); is_initialized_ = true; return true; } + + bool init_shared_memory( + const RTPSWriter* writer, + const std::string& shared_dir) override + { + if (shared_dir.empty()) + { + return init_shared_segment(writer, shared_dir); + } + else + { + return init_shared_segment(writer, shared_dir); + } + } /** * Fills the metadata of the shared payload from the cache change information From 8f16279ea43911ba171ad13b7814ba6ae403d918 Mon Sep 17 00:00:00 2001 From: Iker Luengo Date: Mon, 26 Jul 2021 14:19:39 +0200 Subject: [PATCH 05/11] Correct the removal of the shared object Signed-off-by: Iker Luengo --- .../DataSharing/DataSharingNotification.cpp | 2 +- .../DataSharing/DataSharingNotification.hpp | 4 +-- src/cpp/rtps/DataSharing/WriterPool.hpp | 9 ++++-- .../utils/shared_memory/SharedMemSegment.hpp | 28 +++++++++++++------ 4 files changed, 28 insertions(+), 15 deletions(-) diff --git a/src/cpp/rtps/DataSharing/DataSharingNotification.cpp b/src/cpp/rtps/DataSharing/DataSharingNotification.cpp index 79bd242d162..4e8a25307a1 100644 --- a/src/cpp/rtps/DataSharing/DataSharingNotification.cpp +++ b/src/cpp/rtps/DataSharing/DataSharingNotification.cpp @@ -76,7 +76,7 @@ void DataSharingNotification::destroy() { // We cannot destroy the objects in the SHM, as the Writer may still be using them. // We just remove the segment, and when the Writer closes it, it will be removed from the system. - segment_->remove(segment_name_); + segment_->remove(); owned_ = false; } diff --git a/src/cpp/rtps/DataSharing/DataSharingNotification.hpp b/src/cpp/rtps/DataSharing/DataSharingNotification.hpp index 4a9a7f33b39..254d10f93e4 100644 --- a/src/cpp/rtps/DataSharing/DataSharingNotification.hpp +++ b/src/cpp/rtps/DataSharing/DataSharingNotification.hpp @@ -132,7 +132,7 @@ class DataSharingNotification uint32_t segment_size = static_cast(sizeof(Notification)) + per_allocation_extra_size; //Open the segment - remove(segment_name_); + T::remove(segment_name_); std::unique_ptr local_segment; try { @@ -156,7 +156,7 @@ class DataSharingNotification } catch (std::exception& e) { - remove(segment_name_); + T::remove(segment_name_); logError(HISTORY_DATASHARING_LISTENER, "Failed to create listener queue " << segment_name_ << ": " << e.what()); diff --git a/src/cpp/rtps/DataSharing/WriterPool.hpp b/src/cpp/rtps/DataSharing/WriterPool.hpp index 5c1f698a7ca..7d55bd04fd7 100644 --- a/src/cpp/rtps/DataSharing/WriterPool.hpp +++ b/src/cpp/rtps/DataSharing/WriterPool.hpp @@ -53,7 +53,10 @@ class WriterPool : public DataSharingPayloadPool // We cannot destroy the objects in the SHM, as the Reader may still be using them. // We just remove the segment, and when the Reader closes it, it will be removed from the system. - segment_->remove(segment_name_); + if (segment_) + { + segment_->remove(); + } } bool get_payload( @@ -179,7 +182,7 @@ class WriterPool : public DataSharingPayloadPool } //Open the segment - fastdds::rtps::SharedMemSegment::remove(segment_name_); + T::remove(segment_name_); std::unique_ptr local_segment; try { @@ -231,7 +234,7 @@ class WriterPool : public DataSharingPayloadPool } catch (std::exception& e) { - Segment::remove(segment_name_); + T::remove(segment_name_); logError(DATASHARING_PAYLOADPOOL, "Failed to initialize segment " << segment_name_ << ": " << e.what()); diff --git a/src/cpp/utils/shared_memory/SharedMemSegment.hpp b/src/cpp/utils/shared_memory/SharedMemSegment.hpp index 81b1703fae8..62a5d61fe0f 100644 --- a/src/cpp/utils/shared_memory/SharedMemSegment.hpp +++ b/src/cpp/utils/shared_memory/SharedMemSegment.hpp @@ -88,11 +88,7 @@ class SharedSegmentBase { } - static void remove( - const std::string& name) - { - boost::interprocess::shared_memory_object::remove(name.c_str()); - } + virtual void remove() = 0; std::string name() { @@ -247,12 +243,13 @@ class SharedSegmentBase std::string name_; }; -template +template class SharedSegment : public SharedSegmentBase { public: typedef T managed_shared_memory_type; + typedef U managed_shared_object_type; SharedSegment( boost::interprocess::create_only_t, @@ -290,6 +287,17 @@ class SharedSegment : public SharedSegmentBase segment_.reset(); } + static void remove( + const std::string& name) + { + managed_shared_object_type::remove(name.c_str()); + } + + void remove() override + { + managed_shared_object_type::remove(name().c_str()); + } + void* get_address_from_offset( SharedSegment::Offset offset) const override { @@ -339,7 +347,7 @@ class SharedSegment : public SharedSegmentBase extra_size = static_cast(m1 - m2); } - remove(name.c_str()); + managed_shared_object_type::remove(name.c_str()); } return extra_size; @@ -381,14 +389,16 @@ using SharedMemSegment = SharedSegment< char, boost::interprocess::rbtree_best_fit>, - boost::interprocess::iset_index>>; + boost::interprocess::iset_index>, + boost::interprocess::shared_memory_object>; using SharedFileSegment = SharedSegment< boost::interprocess::basic_managed_mapped_file< char, boost::interprocess::rbtree_best_fit>, - boost::interprocess::iset_index>>; + boost::interprocess::iset_index>, + boost::interprocess::file_mapping>; } // namespace rtps } // namespace fastdds From 2a05b961869310389175757789a735c3866a9184 Mon Sep 17 00:00:00 2001 From: Iker Luengo Date: Mon, 26 Jul 2021 14:21:08 +0200 Subject: [PATCH 06/11] Refs 12051. linters Signed-off-by: Iker Luengo --- .../DataSharing/DataSharingNotification.cpp | 13 ++++++---- .../DataSharing/DataSharingNotification.hpp | 9 ++++--- src/cpp/rtps/DataSharing/WriterPool.hpp | 3 +-- .../utils/shared_memory/SharedMemSegment.hpp | 24 +++++++++---------- 4 files changed, 26 insertions(+), 23 deletions(-) diff --git a/src/cpp/rtps/DataSharing/DataSharingNotification.cpp b/src/cpp/rtps/DataSharing/DataSharingNotification.cpp index 4e8a25307a1..11e53f1b68e 100644 --- a/src/cpp/rtps/DataSharing/DataSharingNotification.cpp +++ b/src/cpp/rtps/DataSharing/DataSharingNotification.cpp @@ -34,11 +34,13 @@ std::shared_ptr DataSharingNotification::create_notific bool create_result = false; if (shared_dir.empty()) { - create_result = notification->create_and_init_notification(reader_guid, shared_dir); + create_result = notification->create_and_init_notification(reader_guid, + shared_dir); } else { - create_result = notification->create_and_init_notification(reader_guid, shared_dir); + create_result = notification->create_and_init_notification(reader_guid, + shared_dir); } if (!create_result) @@ -56,11 +58,14 @@ std::shared_ptr DataSharingNotification::open_notificat bool open_result = false; if (shared_dir.empty()) { - open_result = notification->open_and_init_notification(writer_guid, shared_dir); + open_result = + notification->open_and_init_notification(writer_guid, shared_dir); } else { - open_result = notification->open_and_init_notification(writer_guid, shared_dir); + open_result = + notification->open_and_init_notification(writer_guid, + shared_dir); } if (!open_result) diff --git a/src/cpp/rtps/DataSharing/DataSharingNotification.hpp b/src/cpp/rtps/DataSharing/DataSharingNotification.hpp index 254d10f93e4..0086d837917 100644 --- a/src/cpp/rtps/DataSharing/DataSharingNotification.hpp +++ b/src/cpp/rtps/DataSharing/DataSharingNotification.hpp @@ -144,7 +144,7 @@ class DataSharingNotification catch (const std::exception& e) { logError(HISTORY_DATASHARING_LISTENER, "Failed to create segment " << segment_name_ - << ": " << e.what()); + << ": " << e.what()); return false; } @@ -159,7 +159,7 @@ class DataSharingNotification T::remove(segment_name_); logError(HISTORY_DATASHARING_LISTENER, "Failed to create listener queue " << segment_name_ - << ": " << e.what()); + << ": " << e.what()); return false; } @@ -187,13 +187,13 @@ class DataSharingNotification catch (const std::exception& e) { logError(HISTORY_DATASHARING_LISTENER, "Failed to open segment " << segment_name_ - << ": " << e.what()); + << ": " << e.what()); return false; } // Initialize values from the segment notification_ = (local_segment->get().template find( - "notification_node")).first; + "notification_node")).first; if (!notification_) { local_segment.reset(); @@ -206,7 +206,6 @@ class DataSharingNotification return true; } - GUID_t segment_id_; //< The ID of the segment is the GUID of the reader std::string segment_name_; //< Segment name diff --git a/src/cpp/rtps/DataSharing/WriterPool.hpp b/src/cpp/rtps/DataSharing/WriterPool.hpp index 7d55bd04fd7..1a78c1242dd 100644 --- a/src/cpp/rtps/DataSharing/WriterPool.hpp +++ b/src/cpp/rtps/DataSharing/WriterPool.hpp @@ -139,7 +139,6 @@ class WriterPool : public DataSharingPayloadPool return DataSharingPayloadPool::release_payload(cache_change); } - template bool init_shared_segment( const RTPSWriter* writer, @@ -245,7 +244,7 @@ class WriterPool : public DataSharingPayloadPool is_initialized_ = true; return true; } - + bool init_shared_memory( const RTPSWriter* writer, const std::string& shared_dir) override diff --git a/src/cpp/utils/shared_memory/SharedMemSegment.hpp b/src/cpp/utils/shared_memory/SharedMemSegment.hpp index 62a5d61fe0f..f84fec2b59e 100644 --- a/src/cpp/utils/shared_memory/SharedMemSegment.hpp +++ b/src/cpp/utils/shared_memory/SharedMemSegment.hpp @@ -385,20 +385,20 @@ class SharedSegment : public SharedSegmentBase }; using SharedMemSegment = SharedSegment< - boost::interprocess::basic_managed_shared_memory< - char, - boost::interprocess::rbtree_best_fit>, - boost::interprocess::iset_index>, - boost::interprocess::shared_memory_object>; + boost::interprocess::basic_managed_shared_memory< + char, + boost::interprocess::rbtree_best_fit>, + boost::interprocess::iset_index>, + boost::interprocess::shared_memory_object>; using SharedFileSegment = SharedSegment< - boost::interprocess::basic_managed_mapped_file< - char, - boost::interprocess::rbtree_best_fit>, - boost::interprocess::iset_index>, - boost::interprocess::file_mapping>; + boost::interprocess::basic_managed_mapped_file< + char, + boost::interprocess::rbtree_best_fit>, + boost::interprocess::iset_index>, + boost::interprocess::file_mapping>; } // namespace rtps } // namespace fastdds From cb4d2801383cd1ed5d838a900f2b71a218b7d972 Mon Sep 17 00:00:00 2001 From: Iker Luengo Date: Mon, 26 Jul 2021 15:29:04 +0200 Subject: [PATCH 07/11] Refs 12051. Correct unit test Signed-off-by: Iker Luengo --- test/unittest/dds/publisher/DataWriterTests.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/unittest/dds/publisher/DataWriterTests.cpp b/test/unittest/dds/publisher/DataWriterTests.cpp index 8a8a02e65e6..ed02dc8b3cd 100644 --- a/test/unittest/dds/publisher/DataWriterTests.cpp +++ b/test/unittest/dds/publisher/DataWriterTests.cpp @@ -277,7 +277,7 @@ TEST(DataWriterTests, ForcedDataSharing) // DataSharing enabled, unbounded topic data type qos = DATAWRITER_QOS_DEFAULT; qos.endpoint().history_memory_policy = fastrtps::rtps::PREALLOCATED_MEMORY_MODE; - qos.data_sharing().on("path"); + qos.data_sharing().on("."); datawriter = publisher->create_datawriter(topic, qos); ASSERT_EQ(datawriter, nullptr); @@ -288,7 +288,7 @@ TEST(DataWriterTests, ForcedDataSharing) // DataSharing enabled, bounded topic data type, Dynamic memory policy qos = DATAWRITER_QOS_DEFAULT; - qos.data_sharing().on("path"); + qos.data_sharing().on("."); qos.endpoint().history_memory_policy = fastrtps::rtps::DYNAMIC_RESERVE_MEMORY_MODE; datawriter = publisher->create_datawriter(bounded_topic, qos); ASSERT_EQ(datawriter, nullptr); @@ -338,7 +338,7 @@ TEST(DataWriterTests, ForcedDataSharing) ASSERT_NE(bounded_topic, nullptr); qos = DATAWRITER_QOS_DEFAULT; - qos.data_sharing().on("path"); + qos.data_sharing().on("."); qos.endpoint().history_memory_policy = fastrtps::rtps::PREALLOCATED_MEMORY_MODE; From d915a76f3bbe8486c8baa8307c65332556503469 Mon Sep 17 00:00:00 2001 From: Iker Luengo Date: Wed, 28 Jul 2021 09:01:24 +0200 Subject: [PATCH 08/11] Refs 12051. Fix warnings on Windows Signed-off-by: Iker Luengo --- src/cpp/rtps/DataSharing/DataSharingNotification.hpp | 2 +- src/cpp/utils/shared_memory/SharedMemSegment.hpp | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/cpp/rtps/DataSharing/DataSharingNotification.hpp b/src/cpp/rtps/DataSharing/DataSharingNotification.hpp index 0086d837917..529c7adf14e 100644 --- a/src/cpp/rtps/DataSharing/DataSharingNotification.hpp +++ b/src/cpp/rtps/DataSharing/DataSharingNotification.hpp @@ -127,7 +127,7 @@ class DataSharingNotification segment_id_ = reader_guid; segment_name_ = generate_segment_name(shared_dir, reader_guid); - size_t per_allocation_extra_size = T::compute_per_allocation_extra_size( + uint32_t per_allocation_extra_size = T::compute_per_allocation_extra_size( alignof(Notification), DataSharingNotification::domain_name()); uint32_t segment_size = static_cast(sizeof(Notification)) + per_allocation_extra_size; diff --git a/src/cpp/utils/shared_memory/SharedMemSegment.hpp b/src/cpp/utils/shared_memory/SharedMemSegment.hpp index f84fec2b59e..f0462b35820 100644 --- a/src/cpp/utils/shared_memory/SharedMemSegment.hpp +++ b/src/cpp/utils/shared_memory/SharedMemSegment.hpp @@ -339,10 +339,10 @@ class SharedSegment : public SharedSegmentBase { managed_shared_memory_type test_segment(boost::interprocess::create_only, name.c_str(), - (std::max)((size_t)1024, allocation_alignment * 4)); + (std::max)((uint32_t)1024, static_cast(allocation_alignment * 4))); auto m1 = test_segment.get_free_memory(); - test_segment.allocate_aligned(1, allocation_alignment); + test_segment.allocate_aligned(1, static_cast(allocation_alignment)); auto m2 = test_segment.get_free_memory(); extra_size = static_cast(m1 - m2); } From f9d57b26031c12984f86593f10963a6cc8b71ba9 Mon Sep 17 00:00:00 2001 From: Iker Luengo Date: Thu, 29 Jul 2021 09:31:15 +0200 Subject: [PATCH 09/11] Refs 12051. Keep old API and move templatized init Signed-off-by: Iker Luengo --- .../DataSharing/DataSharingNotification.cpp | 45 ++++++++----------- .../DataSharing/DataSharingNotification.hpp | 18 +++++--- 2 files changed, 31 insertions(+), 32 deletions(-) diff --git a/src/cpp/rtps/DataSharing/DataSharingNotification.cpp b/src/cpp/rtps/DataSharing/DataSharingNotification.cpp index 11e53f1b68e..c594f15254d 100644 --- a/src/cpp/rtps/DataSharing/DataSharingNotification.cpp +++ b/src/cpp/rtps/DataSharing/DataSharingNotification.cpp @@ -31,19 +31,7 @@ std::shared_ptr DataSharingNotification::create_notific const std::string& shared_dir) { std::shared_ptr notification = std::make_shared(); - bool create_result = false; - if (shared_dir.empty()) - { - create_result = notification->create_and_init_notification(reader_guid, - shared_dir); - } - else - { - create_result = notification->create_and_init_notification(reader_guid, - shared_dir); - } - - if (!create_result) + if (!notification->create_and_init_notification(reader_guid, shared_dir)); { notification.reset(); } @@ -55,26 +43,29 @@ std::shared_ptr DataSharingNotification::open_notificat const std::string& shared_dir) { std::shared_ptr notification = std::make_shared(); - bool open_result = false; - if (shared_dir.empty()) - { - open_result = - notification->open_and_init_notification(writer_guid, shared_dir); - } - else - { - open_result = - notification->open_and_init_notification(writer_guid, - shared_dir); - } - - if (!open_result) + if (!notification->open_and_init_notification(writer_guid, shared_dir)) { notification.reset(); } return notification; } +bool DataSharingNotification::create_and_init_notification( + const GUID_t& reader_guid, + const std::string& shared_dir) +{ + return create_and_init_shared_segment_notification(reader_guid, + shared_dir); +} + +bool DataSharingNotification::open_and_init_notification( + const GUID_t& reader_guid, + const std::string& shared_dir) +{ + return open_and_init_shared_segment_notification(reader_guid, + shared_dir); +} + void DataSharingNotification::destroy() { if (owned_) diff --git a/src/cpp/rtps/DataSharing/DataSharingNotification.hpp b/src/cpp/rtps/DataSharing/DataSharingNotification.hpp index 529c7adf14e..fe5c6e38309 100644 --- a/src/cpp/rtps/DataSharing/DataSharingNotification.hpp +++ b/src/cpp/rtps/DataSharing/DataSharingNotification.hpp @@ -119,8 +119,16 @@ class DataSharingNotification return ss.str(); } - template bool create_and_init_notification( + const GUID_t& reader_guid, + const std::string& shared_dir = std::string()); + + bool open_and_init_notification( + const GUID_t& reader_guid, + const std::string& shared_dir = std::string()); + + template + bool create_and_init_shared_segment_notification( const GUID_t& reader_guid, const std::string& shared_dir) { @@ -144,7 +152,7 @@ class DataSharingNotification catch (const std::exception& e) { logError(HISTORY_DATASHARING_LISTENER, "Failed to create segment " << segment_name_ - << ": " << e.what()); + << ": " << e.what()); return false; } @@ -159,7 +167,7 @@ class DataSharingNotification T::remove(segment_name_); logError(HISTORY_DATASHARING_LISTENER, "Failed to create listener queue " << segment_name_ - << ": " << e.what()); + << ": " << e.what()); return false; } @@ -169,7 +177,7 @@ class DataSharingNotification } template - bool open_and_init_notification( + bool open_and_init_shared_segment_notification( const GUID_t& reader_guid, const std::string& shared_dir) { @@ -187,7 +195,7 @@ class DataSharingNotification catch (const std::exception& e) { logError(HISTORY_DATASHARING_LISTENER, "Failed to open segment " << segment_name_ - << ": " << e.what()); + << ": " << e.what()); return false; } From 2800cb2fcb9b98561162cb11a7991a4c69666b3a Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Mon, 2 Aug 2021 12:52:40 +0200 Subject: [PATCH 10/11] Removed semicolon Signed-off-by: Miguel Company --- src/cpp/rtps/DataSharing/DataSharingNotification.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cpp/rtps/DataSharing/DataSharingNotification.cpp b/src/cpp/rtps/DataSharing/DataSharingNotification.cpp index c594f15254d..be7bbfaee8b 100644 --- a/src/cpp/rtps/DataSharing/DataSharingNotification.cpp +++ b/src/cpp/rtps/DataSharing/DataSharingNotification.cpp @@ -31,7 +31,7 @@ std::shared_ptr DataSharingNotification::create_notific const std::string& shared_dir) { std::shared_ptr notification = std::make_shared(); - if (!notification->create_and_init_notification(reader_guid, shared_dir)); + if (!notification->create_and_init_notification(reader_guid, shared_dir)) { notification.reset(); } From add60679e8138ff8d70f7245231f2ff2b8e7af01 Mon Sep 17 00:00:00 2001 From: Iker Luengo Date: Thu, 19 Aug 2021 15:10:20 +0200 Subject: [PATCH 11/11] Refs 12051. Notification can be file shared too Signed-off-by: Iker Luengo --- .../DataSharing/DataSharingNotification.cpp | 24 +++++++++++++++---- .../DataSharing/DataSharingNotification.hpp | 6 ++--- 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/src/cpp/rtps/DataSharing/DataSharingNotification.cpp b/src/cpp/rtps/DataSharing/DataSharingNotification.cpp index be7bbfaee8b..7bbdd841d0b 100644 --- a/src/cpp/rtps/DataSharing/DataSharingNotification.cpp +++ b/src/cpp/rtps/DataSharing/DataSharingNotification.cpp @@ -54,16 +54,32 @@ bool DataSharingNotification::create_and_init_notification( const GUID_t& reader_guid, const std::string& shared_dir) { - return create_and_init_shared_segment_notification(reader_guid, - shared_dir); + if (shared_dir.empty()) + { + return create_and_init_shared_segment_notification(reader_guid, + shared_dir); + } + else + { + return create_and_init_shared_segment_notification(reader_guid, + shared_dir); + } } bool DataSharingNotification::open_and_init_notification( const GUID_t& reader_guid, const std::string& shared_dir) { - return open_and_init_shared_segment_notification(reader_guid, - shared_dir); + if (shared_dir.empty()) + { + return open_and_init_shared_segment_notification(reader_guid, + shared_dir); + } + else + { + return open_and_init_shared_segment_notification(reader_guid, + shared_dir); + } } void DataSharingNotification::destroy() diff --git a/src/cpp/rtps/DataSharing/DataSharingNotification.hpp b/src/cpp/rtps/DataSharing/DataSharingNotification.hpp index fe5c6e38309..e8b94ebd585 100644 --- a/src/cpp/rtps/DataSharing/DataSharingNotification.hpp +++ b/src/cpp/rtps/DataSharing/DataSharingNotification.hpp @@ -152,7 +152,7 @@ class DataSharingNotification catch (const std::exception& e) { logError(HISTORY_DATASHARING_LISTENER, "Failed to create segment " << segment_name_ - << ": " << e.what()); + << ": " << e.what()); return false; } @@ -167,7 +167,7 @@ class DataSharingNotification T::remove(segment_name_); logError(HISTORY_DATASHARING_LISTENER, "Failed to create listener queue " << segment_name_ - << ": " << e.what()); + << ": " << e.what()); return false; } @@ -195,7 +195,7 @@ class DataSharingNotification catch (const std::exception& e) { logError(HISTORY_DATASHARING_LISTENER, "Failed to open segment " << segment_name_ - << ": " << e.what()); + << ": " << e.what()); return false; }