diff --git a/src/cpp/rtps/DataSharing/DataSharingNotification.cpp b/src/cpp/rtps/DataSharing/DataSharingNotification.cpp index 7a0023f5883..7bbdd841d0b 100644 --- a/src/cpp/rtps/DataSharing/DataSharingNotification.cpp +++ b/src/cpp/rtps/DataSharing/DataSharingNotification.cpp @@ -50,101 +50,52 @@ std::shared_ptr DataSharingNotification::open_notificat return notification; } -void DataSharingNotification::destroy() -{ - if (owned_) - { - // We cannot destroy the objects in the SHM, as the Writer may still be using them. - // We just remove the segment, and when the Writer closes it, it will be removed from the system. - segment_->remove(segment_name_); - - owned_ = false; - } - else - { - logError(HISTORY_DATASHARING_LISTENER, "Trying to destroy non-owned notification segment " << segment_name_); - } -} - bool DataSharingNotification::create_and_init_notification( const GUID_t& reader_guid, const std::string& shared_dir) { - segment_id_ = reader_guid; - segment_name_ = generate_segment_name(shared_dir, reader_guid); - - uint32_t per_allocation_extra_size = Segment::compute_per_allocation_extra_size( - alignof(Notification), DataSharingNotification::domain_name()); - uint32_t segment_size = static_cast(sizeof(Notification)) + per_allocation_extra_size; - - //Open the segment - Segment::remove(segment_name_); - try + if (shared_dir.empty()) { - segment_ = std::unique_ptr( - new Segment(boost::interprocess::create_only, - segment_name_, - segment_size + fastdds::rtps::SharedMemSegment::EXTRA_SEGMENT_SIZE)); + return create_and_init_shared_segment_notification(reader_guid, + shared_dir); } - catch (const std::exception& e) - { - logError(HISTORY_DATASHARING_LISTENER, "Failed to create segment " << segment_name_ - << ": " << e.what()); - return false; - } - - try + else { - // Alloc and initialize the Node - notification_ = segment_->get().construct("notification_node")(); - notification_->new_data.store(false); + return create_and_init_shared_segment_notification(reader_guid, + shared_dir); } - catch (std::exception& e) - { - Segment::remove(segment_name_); - - logError(HISTORY_DATASHARING_LISTENER, "Failed to create listener queue " << segment_name_ - << ": " << e.what()); - return false; - } - - owned_ = true; - return true; } bool DataSharingNotification::open_and_init_notification( const GUID_t& reader_guid, const std::string& shared_dir) { - segment_id_ = reader_guid; - segment_name_ = generate_segment_name(shared_dir, reader_guid); - - //Open the segment - try + if (shared_dir.empty()) { - segment_ = std::unique_ptr( - new Segment(boost::interprocess::open_only, - segment_name_.c_str())); + return open_and_init_shared_segment_notification(reader_guid, + shared_dir); } - catch (const std::exception& e) + else { - logError(HISTORY_DATASHARING_LISTENER, "Failed to open segment " << segment_name_ - << ": " << e.what()); - return false; + return open_and_init_shared_segment_notification(reader_guid, + shared_dir); } +} - // Initialize values from the segment - notification_ = segment_->get().find( - "notification_node").first; - if (!notification_) +void DataSharingNotification::destroy() +{ + if (owned_) { - segment_.reset(); + // We cannot destroy the objects in the SHM, as the Writer may still be using them. + // We just remove the segment, and when the Writer closes it, it will be removed from the system. + segment_->remove(); - logError(HISTORY_DATASHARING_LISTENER, "Failed to open listener queue " << segment_name_); - return false; + owned_ = false; + } + else + { + logError(HISTORY_DATASHARING_LISTENER, "Trying to destroy non-owned notification segment " << segment_name_); } - - return true; } } // namespace rtps diff --git a/src/cpp/rtps/DataSharing/DataSharingNotification.hpp b/src/cpp/rtps/DataSharing/DataSharingNotification.hpp index 78af4fde31b..e8b94ebd585 100644 --- a/src/cpp/rtps/DataSharing/DataSharingNotification.hpp +++ b/src/cpp/rtps/DataSharing/DataSharingNotification.hpp @@ -42,7 +42,7 @@ class DataSharingNotification public: - typedef fastdds::rtps::SharedMemSegment Segment; + typedef fastdds::rtps::SharedSegmentBase Segment; DataSharingNotification() = default; @@ -107,10 +107,14 @@ class DataSharingNotification #pragma warning(pop) static std::string generate_segment_name( - const std::string& /*shared_dir*/, + const std::string& shared_dir, const GUID_t& reader_guid) { std::stringstream ss; + if (!shared_dir.empty()) + { + ss << shared_dir << "/"; + } ss << DataSharingNotification::domain_name() << "_" << reader_guid.guidPrefix << "_" << reader_guid.entityId; return ss.str(); } @@ -119,11 +123,96 @@ class DataSharingNotification const GUID_t& reader_guid, const std::string& shared_dir = std::string()); - bool open_and_init_notification( const GUID_t& reader_guid, const std::string& shared_dir = std::string()); + template + bool create_and_init_shared_segment_notification( + const GUID_t& reader_guid, + const std::string& shared_dir) + { + segment_id_ = reader_guid; + segment_name_ = generate_segment_name(shared_dir, reader_guid); + + uint32_t per_allocation_extra_size = T::compute_per_allocation_extra_size( + alignof(Notification), DataSharingNotification::domain_name()); + uint32_t segment_size = static_cast(sizeof(Notification)) + per_allocation_extra_size; + + //Open the segment + T::remove(segment_name_); + std::unique_ptr local_segment; + try + { + local_segment = std::unique_ptr( + new T(boost::interprocess::create_only, + segment_name_, + segment_size + T::EXTRA_SEGMENT_SIZE)); + } + catch (const std::exception& e) + { + logError(HISTORY_DATASHARING_LISTENER, "Failed to create segment " << segment_name_ + << ": " << e.what()); + return false; + } + + try + { + // Alloc and initialize the Node + notification_ = local_segment->get().template construct("notification_node")(); + notification_->new_data.store(false); + } + catch (std::exception& e) + { + T::remove(segment_name_); + + logError(HISTORY_DATASHARING_LISTENER, "Failed to create listener queue " << segment_name_ + << ": " << e.what()); + return false; + } + + segment_ = std::move(local_segment); + owned_ = true; + return true; + } + + template + bool open_and_init_shared_segment_notification( + const GUID_t& reader_guid, + const std::string& shared_dir) + { + segment_id_ = reader_guid; + segment_name_ = generate_segment_name(shared_dir, reader_guid); + + //Open the segment + std::unique_ptr local_segment; + try + { + local_segment = std::unique_ptr( + new T(boost::interprocess::open_only, + segment_name_.c_str())); + } + catch (const std::exception& e) + { + logError(HISTORY_DATASHARING_LISTENER, "Failed to open segment " << segment_name_ + << ": " << e.what()); + return false; + } + + // Initialize values from the segment + notification_ = (local_segment->get().template find( + "notification_node")).first; + if (!notification_) + { + local_segment.reset(); + + logError(HISTORY_DATASHARING_LISTENER, "Failed to open listener queue " << segment_name_); + return false; + } + + segment_ = std::move(local_segment); + return true; + } GUID_t segment_id_; //< The ID of the segment is the GUID of the reader std::string segment_name_; //< Segment name diff --git a/src/cpp/rtps/DataSharing/DataSharingPayloadPool.hpp b/src/cpp/rtps/DataSharing/DataSharingPayloadPool.hpp index ec912ee8040..5b3dc66c7e3 100644 --- a/src/cpp/rtps/DataSharing/DataSharingPayloadPool.hpp +++ b/src/cpp/rtps/DataSharing/DataSharingPayloadPool.hpp @@ -43,7 +43,7 @@ class DataSharingPayloadPool : public IPayloadPool public: - using Segment = fastdds::rtps::SharedMemSegment; + using Segment = fastdds::rtps::SharedSegmentBase; using sharable_mutex = Segment::sharable_mutex; template using sharable_lock = Segment::sharable_lock; @@ -348,10 +348,15 @@ class DataSharingPayloadPool : public IPayloadPool #pragma warning(pop) static std::string generate_segment_name( - const std::string& /*shared_dir*/, + const std::string& shared_dir, const GUID_t& writer_guid) { std::stringstream ss; + if (!shared_dir.empty()) + { + ss << shared_dir << "/"; + } + ss << DataSharingPayloadPool::domain_name() << "_" << writer_guid.guidPrefix << "_" << writer_guid.entityId; return ss.str(); } diff --git a/src/cpp/rtps/DataSharing/ReaderPool.hpp b/src/cpp/rtps/DataSharing/ReaderPool.hpp index d15b3e7bfd4..f779c565c37 100644 --- a/src/cpp/rtps/DataSharing/ReaderPool.hpp +++ b/src/cpp/rtps/DataSharing/ReaderPool.hpp @@ -81,18 +81,20 @@ class ReaderPool : public DataSharingPayloadPool return DataSharingPayloadPool::release_payload(cache_change); } - bool init_shared_memory( + template + bool init_shared_segment( const GUID_t& writer_guid, - const std::string& shared_dir) override + const std::string& shared_dir) { segment_id_ = writer_guid; segment_name_ = generate_segment_name(shared_dir, writer_guid); + std::unique_ptr local_segment; // Open the segment try { - segment_ = std::unique_ptr( - new fastdds::rtps::SharedMemSegment(boost::interprocess::open_only, + local_segment = std::unique_ptr( + new T(boost::interprocess::open_only, segment_name_.c_str())); } catch (const std::exception& e) @@ -103,20 +105,20 @@ class ReaderPool : public DataSharingPayloadPool } // Get the pool description - descriptor_ = segment_->get().find(descriptor_chunk_name()).first; + descriptor_ = local_segment->get().template find(descriptor_chunk_name()).first; if (!descriptor_) { - segment_.reset(); + local_segment.reset(); logError(HISTORY_DATASHARING_PAYLOADPOOL, "Failed to open payload pool descriptor " << segment_name_); return false; } // Get the history - history_ = segment_->get().find(history_chunk_name()).first; + history_ = local_segment->get().template find(history_chunk_name()).first; if (!history_) { - segment_.reset(); + local_segment.reset(); logError(HISTORY_DATASHARING_PAYLOADPOOL, "Failed to open payload history " << segment_name_); return false; @@ -132,9 +134,24 @@ class ReaderPool : public DataSharingPayloadPool next_payload_ = begin(); } + segment_ = std::move(local_segment); return true; } + bool init_shared_memory( + const GUID_t& writer_guid, + const std::string& shared_dir) override + { + if (shared_dir.empty()) + { + return init_shared_segment(writer_guid, shared_dir); + } + else + { + return init_shared_segment(writer_guid, shared_dir); + } + } + void get_next_unread_payload( CacheChange_t& cache_change, SequenceNumber_t& last_sequence_number) diff --git a/src/cpp/rtps/DataSharing/WriterPool.hpp b/src/cpp/rtps/DataSharing/WriterPool.hpp index ec11ce9ec11..1a78c1242dd 100644 --- a/src/cpp/rtps/DataSharing/WriterPool.hpp +++ b/src/cpp/rtps/DataSharing/WriterPool.hpp @@ -53,7 +53,10 @@ class WriterPool : public DataSharingPayloadPool // We cannot destroy the objects in the SHM, as the Reader may still be using them. // We just remove the segment, and when the Reader closes it, it will be removed from the system. - segment_->remove(segment_name_); + if (segment_) + { + segment_->remove(); + } } bool get_payload( @@ -136,9 +139,10 @@ class WriterPool : public DataSharingPayloadPool return DataSharingPayloadPool::release_payload(cache_change); } - bool init_shared_memory( + template + bool init_shared_segment( const RTPSWriter* writer, - const std::string& shared_dir) override + const std::string& shared_dir) { writer_ = writer; segment_id_ = writer_->getGuid(); @@ -147,8 +151,7 @@ class WriterPool : public DataSharingPayloadPool // We need to reserve the whole segment at once, and the underlying classes use uint32_t as size type. // In order to avoid overflows, we will calculate using uint64 and check the casting bool overflow = false; - - size_t per_allocation_extra_size = fastdds::rtps::SharedMemSegment::compute_per_allocation_extra_size( + size_t per_allocation_extra_size = T::compute_per_allocation_extra_size( alignof(PayloadNode), DataSharingPayloadPool::domain_name()); size_t payload_size = DataSharingPayloadPool::node_size(max_data_size_); @@ -178,13 +181,14 @@ class WriterPool : public DataSharingPayloadPool } //Open the segment - fastdds::rtps::SharedMemSegment::remove(segment_name_); + T::remove(segment_name_); + std::unique_ptr local_segment; try { - segment_ = std::unique_ptr( - new Segment(boost::interprocess::create_only, + local_segment = std::unique_ptr( + new T(boost::interprocess::create_only, segment_name_, - segment_size + fastdds::rtps::SharedMemSegment::EXTRA_SEGMENT_SIZE)); + segment_size + T::EXTRA_SEGMENT_SIZE)); } catch (const std::exception& e) { @@ -198,7 +202,7 @@ class WriterPool : public DataSharingPayloadPool // Alloc the memory for the pool // Cannot use 'construct' because we need to reserve extra space for the data, // which is not considered in sizeof(PayloadNode). - payloads_pool_ = static_cast(segment_->get().allocate(size_for_payloads_pool)); + payloads_pool_ = static_cast(local_segment->get().allocate(size_for_payloads_pool)); // Initialize each node in the pool free_payloads_.init(pool_size_); @@ -214,10 +218,10 @@ class WriterPool : public DataSharingPayloadPool } //Alloc the memory for the history - history_ = segment_->get().construct(history_chunk_name())[pool_size_ + 1](); + history_ = local_segment->get().template construct(history_chunk_name())[pool_size_ + 1](); //Alloc the memory for the descriptor - descriptor_ = segment_->get().construct(descriptor_chunk_name())(); + descriptor_ = local_segment->get().template construct(descriptor_chunk_name())(); // Initialize the data in the descriptor descriptor_->history_size = pool_size_ + 1; @@ -229,17 +233,32 @@ class WriterPool : public DataSharingPayloadPool } catch (std::exception& e) { - Segment::remove(segment_name_); + T::remove(segment_name_); logError(DATASHARING_PAYLOADPOOL, "Failed to initialize segment " << segment_name_ << ": " << e.what()); return false; } + segment_ = std::move(local_segment); is_initialized_ = true; return true; } + bool init_shared_memory( + const RTPSWriter* writer, + const std::string& shared_dir) override + { + if (shared_dir.empty()) + { + return init_shared_segment(writer, shared_dir); + } + else + { + return init_shared_segment(writer, shared_dir); + } + } + /** * Fills the metadata of the shared payload from the cache change information * and adds the payload's offset to the shared history diff --git a/src/cpp/utils/shared_memory/SharedMemSegment.hpp b/src/cpp/utils/shared_memory/SharedMemSegment.hpp index 3a302b0613c..f0462b35820 100644 --- a/src/cpp/utils/shared_memory/SharedMemSegment.hpp +++ b/src/cpp/utils/shared_memory/SharedMemSegment.hpp @@ -26,6 +26,7 @@ #endif // if defined(__GNUC__) && ( __GNUC__ >= 9) #include +#include #if defined (__GNUC__) && ( __GNUC__ >= 9) #pragma GCC diagnostic pop @@ -52,7 +53,7 @@ using Log = fastdds::dds::Log; * Provides shared memory functionallity abstrating from * lower level layers */ -class SharedMemSegment +class SharedSegmentBase { public: @@ -60,19 +61,13 @@ class SharedMemSegment using sharable_lock = boost::interprocess::sharable_lock; using sharable_mutex = boost::interprocess::interprocess_sharable_mutex; - typedef RobustInterprocessCondition condition_variable; - typedef boost::interprocess::interprocess_mutex mutex; - typedef boost::interprocess::named_mutex named_mutex; - typedef boost::interprocess::spin_wait spin_wait; + using condition_variable = RobustInterprocessCondition; + using mutex = boost::interprocess::interprocess_mutex; + using named_mutex = boost::interprocess::named_mutex; + using spin_wait = boost::interprocess::spin_wait; // Offset must be the same size for 32/64-bit versions, so no size_t used here. - typedef std::uint32_t Offset; - typedef boost::interprocess::offset_ptr VoidPointerT; - typedef boost::interprocess::basic_managed_shared_memory< - char, - boost::interprocess::rbtree_best_fit, - boost::interprocess::iset_index> managed_shared_memory_type; + using Offset = std::uint32_t; static constexpr boost::interprocess::open_only_t open_only = boost::interprocess::open_only_t(); static constexpr boost::interprocess::create_only_t create_only = boost::interprocess::create_only_t(); @@ -83,124 +78,36 @@ class SharedMemSegment // TODO(Adolfo): Further analysis to determine the perfect value for this extra segment size static constexpr uint32_t EXTRA_SEGMENT_SIZE = 512; - SharedMemSegment( - boost::interprocess::create_only_t, - const std::string& name, - size_t size) - : name_(name) - { - segment_ = std::unique_ptr( - new managed_shared_memory_type(boost::interprocess::create_only, name.c_str(), - static_cast(size + EXTRA_SEGMENT_SIZE))); - } - - SharedMemSegment( - boost::interprocess::open_only_t, + explicit SharedSegmentBase( const std::string& name) : name_(name) { - segment_ = std::unique_ptr( - new managed_shared_memory_type(boost::interprocess::open_only, name.c_str())); - } - - SharedMemSegment( - boost::interprocess::open_or_create_t, - const std::string& name, - size_t size) - : name_(name) - { - segment_ = std::unique_ptr( - new managed_shared_memory_type(boost::interprocess::create_only, name.c_str(), static_cast(size))); - } - - ~SharedMemSegment() - { - // no need of exception handling cause never throws - segment_.reset(); - } - - void* get_address_from_offset( - SharedMemSegment::Offset offset) const - { - return segment_->get_address_from_handle(offset); - } - - SharedMemSegment::Offset get_offset_from_address( - void* address) const - { - return segment_->get_handle_from_address(address); } - managed_shared_memory_type& get() + virtual ~SharedSegmentBase() { - return *segment_; } - static void remove( - const std::string& name) - { - boost::interprocess::shared_memory_object::remove(name.c_str()); - } + virtual void remove() = 0; std::string name() { return name_; } - /** - * Estimates the extra segment space required for an allocation - */ - static uint32_t compute_per_allocation_extra_size( - size_t allocation_alignment, - const std::string& domain_name) - { - Id uuid; - - try - { - static uint32_t extra_size = 0; - - if (extra_size == 0) - { - uuid.generate(); - - auto name = domain_name + "_" + uuid.to_string(); - - SharedMemEnvironment::get().init(); - - { - boost::interprocess::managed_shared_memory - test_segment(boost::interprocess::create_only, name.c_str(), - (std::max)((size_t)1024, allocation_alignment * 4)); - - auto m1 = test_segment.get_free_memory(); - test_segment.allocate_aligned(1, allocation_alignment); - auto m2 = test_segment.get_free_memory(); - extra_size = static_cast(m1 - m2); - } - - boost::interprocess::shared_memory_object::remove(name.c_str()); - } - - return extra_size; - - } - catch (const std::exception& e) - { - logError(RTPS_TRANSPORT_SHM, "Failed to create segment " << uuid.to_string() - << ": " << e.what()); + virtual void* get_address_from_offset( + SharedSegmentBase::Offset offset) const = 0; - throw; - } - } + virtual SharedSegmentBase::Offset get_offset_from_address( + void* address) const = 0; - static std::unique_ptr open_or_create_and_lock_named_mutex( + static std::unique_ptr open_or_create_and_lock_named_mutex( const std::string& mutex_name) { - std::unique_ptr named_mutex; + std::unique_ptr named_mutex; - named_mutex = std::unique_ptr( - new SharedMemSegment::named_mutex(boost::interprocess::open_or_create, mutex_name.c_str())); + named_mutex = std::unique_ptr( + new SharedSegmentBase::named_mutex(boost::interprocess::open_or_create, mutex_name.c_str())); boost::posix_time::ptime wait_time = boost::posix_time::microsec_clock::universal_time() @@ -209,10 +116,10 @@ class SharedMemSegment { // Interprocess mutex timeout when locking. Possible deadlock: owner died without unlocking? // try to remove and create again - SharedMemSegment::named_mutex::remove(mutex_name.c_str()); + SharedSegmentBase::named_mutex::remove(mutex_name.c_str()); - named_mutex = std::unique_ptr( - new SharedMemSegment::named_mutex(boost::interprocess::open_or_create, mutex_name.c_str())); + named_mutex = std::unique_ptr( + new SharedSegmentBase::named_mutex(boost::interprocess::open_or_create, mutex_name.c_str())); if (!named_mutex->try_lock()) { @@ -223,13 +130,13 @@ class SharedMemSegment return named_mutex; } - static std::unique_ptr try_open_and_lock_named_mutex( + static std::unique_ptr try_open_and_lock_named_mutex( const std::string& mutex_name) { - std::unique_ptr named_mutex; + std::unique_ptr named_mutex; - named_mutex = std::unique_ptr( - new SharedMemSegment::named_mutex(boost::interprocess::open_only, mutex_name.c_str())); + named_mutex = std::unique_ptr( + new SharedSegmentBase::named_mutex(boost::interprocess::open_only, mutex_name.c_str())); boost::posix_time::ptime wait_time = boost::posix_time::microsec_clock::universal_time() @@ -242,38 +149,21 @@ class SharedMemSegment return named_mutex; } - static std::unique_ptr open_named_mutex( + static std::unique_ptr open_named_mutex( const std::string& mutex_name) { - std::unique_ptr named_mutex; + std::unique_ptr named_mutex; // Todo(Adolfo) : Dataraces could occur, this algorithm has to be improved - named_mutex = std::unique_ptr( - new SharedMemSegment::named_mutex(boost::interprocess::open_only, mutex_name.c_str())); + named_mutex = std::unique_ptr( + new SharedSegmentBase::named_mutex(boost::interprocess::open_only, mutex_name.c_str())); return named_mutex; } /** - * Check the allocator internal structures - * @return true if structures are ok, false otherwise - */ - bool check_sanity() - { - return segment_->check_sanity(); - } - - /** - * @return The segment's size in bytes, including internal structures overhead. - */ - Offset mem_size() const - { - return segment_->get_size(); - } - - /** - * Unique ID of the memory segment + * Unique ID of the segment */ class Id { @@ -350,11 +240,166 @@ class SharedMemSegment } shared_mem_environment_initializer_; - std::unique_ptr segment_; - std::string name_; }; +template +class SharedSegment : public SharedSegmentBase +{ +public: + + typedef T managed_shared_memory_type; + typedef U managed_shared_object_type; + + SharedSegment( + boost::interprocess::create_only_t, + const std::string& name, + size_t size) + : SharedSegmentBase(name) + { + segment_ = std::unique_ptr( + new managed_shared_memory_type(boost::interprocess::create_only, name.c_str(), + static_cast(size + EXTRA_SEGMENT_SIZE))); + } + + SharedSegment( + boost::interprocess::open_only_t, + const std::string& name) + : SharedSegmentBase(name) + { + segment_ = std::unique_ptr( + new managed_shared_memory_type(boost::interprocess::open_only, name.c_str())); + } + + SharedSegment( + boost::interprocess::open_or_create_t, + const std::string& name, + size_t size) + : SharedSegmentBase(name) + { + segment_ = std::unique_ptr( + new managed_shared_memory_type(boost::interprocess::create_only, name.c_str(), static_cast(size))); + } + + ~SharedSegment() + { + // no need of exception handling cause never throws + segment_.reset(); + } + + static void remove( + const std::string& name) + { + managed_shared_object_type::remove(name.c_str()); + } + + void remove() override + { + managed_shared_object_type::remove(name().c_str()); + } + + void* get_address_from_offset( + SharedSegment::Offset offset) const override + { + return segment_->get_address_from_handle(offset); + } + + SharedSegment::Offset get_offset_from_address( + void* address) const override + { + return segment_->get_handle_from_address(address); + } + + managed_shared_memory_type& get() + { + return *segment_; + } + + /** + * Estimates the extra segment space required for an allocation + */ + static uint32_t compute_per_allocation_extra_size( + size_t allocation_alignment, + const std::string& domain_name) + { + Id uuid; + + try + { + static uint32_t extra_size = 0; + + if (extra_size == 0) + { + uuid.generate(); + + auto name = domain_name + "_" + uuid.to_string(); + + SharedMemEnvironment::get().init(); + + { + managed_shared_memory_type + test_segment(boost::interprocess::create_only, name.c_str(), + (std::max)((uint32_t)1024, static_cast(allocation_alignment * 4))); + + auto m1 = test_segment.get_free_memory(); + test_segment.allocate_aligned(1, static_cast(allocation_alignment)); + auto m2 = test_segment.get_free_memory(); + extra_size = static_cast(m1 - m2); + } + + managed_shared_object_type::remove(name.c_str()); + } + + return extra_size; + + } + catch (const std::exception& e) + { + logError(RTPS_TRANSPORT_SHM, "Failed to create segment " << uuid.to_string() + << ": " << e.what()); + + throw; + } + } + + /** + * Check the allocator internal structures + * @return true if structures are ok, false otherwise + */ + bool check_sanity() + { + return segment_->check_sanity(); + } + + /** + * @return The segment's size in bytes, including internal structures overhead. + */ + Offset mem_size() const + { + return segment_->get_size(); + } + +private: + + std::unique_ptr segment_; +}; + +using SharedMemSegment = SharedSegment< + boost::interprocess::basic_managed_shared_memory< + char, + boost::interprocess::rbtree_best_fit>, + boost::interprocess::iset_index>, + boost::interprocess::shared_memory_object>; + +using SharedFileSegment = SharedSegment< + boost::interprocess::basic_managed_mapped_file< + char, + boost::interprocess::rbtree_best_fit>, + boost::interprocess::iset_index>, + boost::interprocess::file_mapping>; + } // namespace rtps } // namespace fastdds } // namespace eprosima diff --git a/test/blackbox/api/dds-pim/PubSubReader.hpp b/test/blackbox/api/dds-pim/PubSubReader.hpp index 53257f0b7d4..a0c46b2ee51 100644 --- a/test/blackbox/api/dds-pim/PubSubReader.hpp +++ b/test/blackbox/api/dds-pim/PubSubReader.hpp @@ -399,6 +399,7 @@ class PubSubReader std::cout << "Created datareader " << datareader_->guid() << " for topic " << topic_name_ << std::endl; initialized_ = true; + datareader_guid_ = datareader_->guid(); } } @@ -1328,6 +1329,14 @@ class PubSubReader return *this; } + PubSubReader& datasharing_auto( + const std::string directory, + std::vector domain_id = std::vector()) + { + datareader_qos_.data_sharing().automatic(directory, domain_id); + return *this; + } + PubSubReader& datasharing_on( const std::string directory, std::vector domain_id = std::vector()) @@ -1527,6 +1536,11 @@ class PubSubReader return datareader_->get_statuscondition(); } + const eprosima::fastrtps::rtps::GUID_t& datareader_guid() const + { + return datareader_guid_; + } + protected: const eprosima::fastrtps::rtps::GUID_t& participant_guid() const @@ -1627,6 +1641,7 @@ class PubSubReader eprosima::fastdds::dds::StatusMask status_mask_; std::string topic_name_; eprosima::fastrtps::rtps::GUID_t participant_guid_; + eprosima::fastrtps::rtps::GUID_t datareader_guid_; bool initialized_; std::list total_msgs_; std::mutex mutex_; diff --git a/test/blackbox/api/dds-pim/PubSubWriter.hpp b/test/blackbox/api/dds-pim/PubSubWriter.hpp index 0624b0e9b2f..5df2bc430bc 100644 --- a/test/blackbox/api/dds-pim/PubSubWriter.hpp +++ b/test/blackbox/api/dds-pim/PubSubWriter.hpp @@ -1288,6 +1288,14 @@ class PubSubWriter return *this; } + PubSubWriter& datasharing_auto( + const std::string directory, + std::vector domain_id = std::vector()) + { + datawriter_qos_.data_sharing().automatic(directory, domain_id); + return *this; + } + PubSubWriter& datasharing_on( const std::string directory, std::vector domain_id = std::vector()) diff --git a/test/blackbox/common/DDSBlackboxTestsDataSharing.cpp b/test/blackbox/common/DDSBlackboxTestsDataSharing.cpp index d283e9f5cd4..f9fe5e94bd9 100644 --- a/test/blackbox/common/DDSBlackboxTestsDataSharing.cpp +++ b/test/blackbox/common/DDSBlackboxTestsDataSharing.cpp @@ -21,11 +21,27 @@ #include #include +#include +#include #include using namespace eprosima::fastrtps; using namespace eprosima::fastrtps::rtps; +bool check_shared_file ( + const char* shared_dir, + const eprosima::fastrtps::rtps::GUID_t& guid) +{ + bool result; + std::stringstream file_name; + std::fstream file_stream; + + file_name << shared_dir << "/fast_datasharing_" << guid.guidPrefix << "_" << guid.entityId; + file_stream.open(file_name.str(), std::ios::in); + result = file_stream.is_open(); + file_stream.close(); + return result; +} TEST(DDSDataSharing, BasicCommunication) { @@ -38,14 +54,14 @@ TEST(DDSDataSharing, BasicCommunication) reader.history_depth(100) .add_user_transport_to_pparams(testTransport) - .datasharing_on("Unused. change when ready") + .datasharing_on(".") .reliability(BEST_EFFORT_RELIABILITY_QOS).init(); ASSERT_TRUE(reader.isInitialized()); writer.history_depth(100) .add_user_transport_to_pparams(testTransport) - .datasharing_on("Unused. change when ready") + .datasharing_on(".") .reliability(BEST_EFFORT_RELIABILITY_QOS).init(); ASSERT_TRUE(writer.isInitialized()); @@ -55,8 +71,11 @@ TEST(DDSDataSharing, BasicCommunication) writer.wait_discovery(); reader.wait_discovery(); - auto data = default_fixed_sized_data_generator(); + // Check that the shared files are created on the correct directory + ASSERT_TRUE(check_shared_file(".", reader.datareader_guid())); + ASSERT_TRUE(check_shared_file(".", writer.datawriter_guid())); + auto data = default_fixed_sized_data_generator(); reader.startReception(data); // Send data @@ -65,6 +84,14 @@ TEST(DDSDataSharing, BasicCommunication) ASSERT_TRUE(data.empty()); // Block reader until reception finished or timeout. reader.block_for_all(); + + // Destroy reader and writer and see if there are dangling files + reader.destroy(); + writer.destroy(); + + // Check that the shared files are created on the correct directory + ASSERT_FALSE(check_shared_file(".", reader.datareader_guid())); + ASSERT_FALSE(check_shared_file(".", writer.datawriter_guid())); } @@ -82,11 +109,14 @@ TEST(DDSDataSharing, TransientReader) writer.history_depth(writer_history_depth) .add_user_transport_to_pparams(testTransport) - .datasharing_on("Unused. change when ready") + .datasharing_on(".") .reliability(BEST_EFFORT_RELIABILITY_QOS).init(); ASSERT_TRUE(writer.isInitialized()); + // Check that the shared files are created on the correct directory + ASSERT_TRUE(check_shared_file(".", writer.datawriter_guid())); + // Send the data to fill the history and overwrite old changes // The reader only receives the last changes std::list data = default_fixed_sized_data_generator(writer_sent_data); @@ -100,17 +130,28 @@ TEST(DDSDataSharing, TransientReader) reader.history_depth(writer_sent_data) .add_user_transport_to_pparams(testTransport) - .datasharing_on("Unused. change when ready") + .datasharing_on(".") .reliability(BEST_EFFORT_RELIABILITY_QOS) .durability_kind(TRANSIENT_LOCAL_DURABILITY_QOS).init(); ASSERT_TRUE(reader.isInitialized()); + // Check that the shared files are created on the correct directory + ASSERT_TRUE(check_shared_file(".", reader.datareader_guid())); + writer.wait_discovery(); reader.wait_discovery(); reader.startReception(received_data); reader.block_for_all(); + + // Destroy reader and writer and see if there are dangling files + reader.destroy(); + writer.destroy(); + + // Check that the shared files are created on the correct directory + ASSERT_FALSE(check_shared_file(".", reader.datareader_guid())); + ASSERT_FALSE(check_shared_file(".", writer.datawriter_guid())); } @@ -131,7 +172,7 @@ TEST(DDSDataSharing, BestEffortDirtyPayloads) writer.history_depth(writer_history_depth) .add_user_transport_to_pparams(testTransport) - .datasharing_on("Unused. change when ready") + .datasharing_on(".") .reliability(BEST_EFFORT_RELIABILITY_QOS) .resource_limits_extra_samples(1).init(); @@ -139,11 +180,15 @@ TEST(DDSDataSharing, BestEffortDirtyPayloads) read_reader.history_depth(writer_sent_data) .add_user_transport_to_pparams(testTransport) - .datasharing_on("Unused. change when ready") + .datasharing_on(".") .reliability(BEST_EFFORT_RELIABILITY_QOS).init(); ASSERT_TRUE(read_reader.isInitialized()); + // Check that the shared files are created on the correct directory + ASSERT_TRUE(check_shared_file(".", read_reader.datareader_guid())); + ASSERT_TRUE(check_shared_file(".", writer.datawriter_guid())); + writer.wait_discovery(); read_reader.wait_discovery(); @@ -160,6 +205,14 @@ TEST(DDSDataSharing, BestEffortDirtyPayloads) // The reader has overridden payloads in the history. Only the valid ones are returned to the user read_reader.startReception(valid_data); read_reader.block_for_all(); + + // Destroy reader and writer and see if there are dangling files + read_reader.destroy(); + writer.destroy(); + + // Check that the shared files are created on the correct directory + ASSERT_FALSE(check_shared_file(".", read_reader.datareader_guid())); + ASSERT_FALSE(check_shared_file(".", writer.datawriter_guid())); } TEST(DDSDataSharing, ReliableDirtyPayloads) @@ -179,7 +232,7 @@ TEST(DDSDataSharing, ReliableDirtyPayloads) writer.history_depth(writer_history_depth) .add_user_transport_to_pparams(testTransport) - .datasharing_on("Unused. change when ready") + .datasharing_on(".") .reliability(RELIABLE_RELIABILITY_QOS) .resource_limits_extra_samples(1).init(); @@ -187,11 +240,15 @@ TEST(DDSDataSharing, ReliableDirtyPayloads) read_reader.history_depth(writer_sent_data) .add_user_transport_to_pparams(testTransport) - .datasharing_on("Unused. change when ready") + .datasharing_on(".") .reliability(RELIABLE_RELIABILITY_QOS).init(); ASSERT_TRUE(read_reader.isInitialized()); + // Check that the shared files are created on the correct directory + ASSERT_TRUE(check_shared_file(".", read_reader.datareader_guid())); + ASSERT_TRUE(check_shared_file(".", writer.datawriter_guid())); + writer.wait_discovery(); read_reader.wait_discovery(); @@ -209,6 +266,14 @@ TEST(DDSDataSharing, ReliableDirtyPayloads) // but will keep them in the history. read_reader.startReception(valid_data); read_reader.block_for_all(); + + // Destroy reader and writer and see if there are dangling files + read_reader.destroy(); + writer.destroy(); + + // Check that the shared files are created on the correct directory + ASSERT_FALSE(check_shared_file(".", read_reader.datareader_guid())); + ASSERT_FALSE(check_shared_file(".", writer.datawriter_guid())); } TEST(DDSDataSharing, DataSharingWriter_DifferentDomainReaders) @@ -229,13 +294,13 @@ TEST(DDSDataSharing, DataSharingWriter_DifferentDomainReaders) writer.disable_builtin_transport() .add_user_transport_to_pparams(testTransport) - .datasharing_on("Unused. change when ready", writer_ids) + .datasharing_on(".", writer_ids) .resource_limits_extra_samples(5).init(); ASSERT_TRUE(writer.isInitialized()); datasharing_reader.disable_builtin_transport() .add_user_transport_to_pparams(testTransport) - .datasharing_on("Unused. change when ready", reader_ids).init(); + .datasharing_on(".", reader_ids).init(); ASSERT_TRUE(datasharing_reader.isInitialized()); non_datasharing_reader.disable_builtin_transport() @@ -245,9 +310,15 @@ TEST(DDSDataSharing, DataSharingWriter_DifferentDomainReaders) auto_reader.disable_builtin_transport() .add_user_transport_to_pparams(testTransport) - .datasharing_auto(reader_ids).init(); + .datasharing_auto(".", reader_ids).init(); ASSERT_TRUE(auto_reader.isInitialized()); + // Check that the shared files are created on the correct directory + ASSERT_TRUE(check_shared_file(".", datasharing_reader.datareader_guid())); + ASSERT_FALSE(check_shared_file(".", non_datasharing_reader.datareader_guid())); + ASSERT_TRUE(check_shared_file(".", auto_reader.datareader_guid())); + ASSERT_TRUE(check_shared_file(".", writer.datawriter_guid())); + writer.wait_discovery(3); datasharing_reader.wait_discovery(); non_datasharing_reader.wait_discovery(); @@ -264,6 +335,18 @@ TEST(DDSDataSharing, DataSharingWriter_DifferentDomainReaders) ASSERT_EQ(datasharing_reader.block_for_all(std::chrono::seconds(2)), 0u); ASSERT_EQ(non_datasharing_reader.block_for_all(std::chrono::seconds(2)), 0u); ASSERT_EQ(auto_reader.block_for_all(std::chrono::seconds(2)), 0u); + + // Destroy reader and writer and see if there are dangling files + datasharing_reader.destroy(); + non_datasharing_reader.destroy(); + auto_reader.destroy(); + writer.destroy(); + + // Check that the shared files are created on the correct directory + ASSERT_FALSE(check_shared_file(".", datasharing_reader.datareader_guid())); + ASSERT_FALSE(check_shared_file(".", non_datasharing_reader.datareader_guid())); + ASSERT_FALSE(check_shared_file(".", auto_reader.datareader_guid())); + ASSERT_FALSE(check_shared_file(".", writer.datawriter_guid())); } TEST(DDSDataSharing, DataSharingWriter_CommonDomainReaders) @@ -286,13 +369,13 @@ TEST(DDSDataSharing, DataSharingWriter_CommonDomainReaders) writer.disable_builtin_transport() .add_user_transport_to_pparams(testTransport) - .datasharing_on("Unused. change when ready", writer_ids) + .datasharing_on(".", writer_ids) .resource_limits_extra_samples(5).init(); ASSERT_TRUE(writer.isInitialized()); datasharing_reader.disable_builtin_transport() .add_user_transport_to_pparams(testTransport) - .datasharing_on("Unused. change when ready", reader_ids).init(); + .datasharing_on(".", reader_ids).init(); ASSERT_TRUE(datasharing_reader.isInitialized()); non_datasharing_reader.disable_builtin_transport() @@ -302,9 +385,15 @@ TEST(DDSDataSharing, DataSharingWriter_CommonDomainReaders) auto_reader.disable_builtin_transport() .add_user_transport_to_pparams(testTransport) - .datasharing_auto(reader_ids).init(); + .datasharing_auto(".", reader_ids).init(); ASSERT_TRUE(auto_reader.isInitialized()); + // Check that the shared files are created on the correct directory + ASSERT_TRUE(check_shared_file(".", datasharing_reader.datareader_guid())); + ASSERT_FALSE(check_shared_file(".", non_datasharing_reader.datareader_guid())); + ASSERT_TRUE(check_shared_file(".", auto_reader.datareader_guid())); + ASSERT_TRUE(check_shared_file(".", writer.datawriter_guid())); + writer.wait_discovery(3); datasharing_reader.wait_discovery(); non_datasharing_reader.wait_discovery(); @@ -321,6 +410,18 @@ TEST(DDSDataSharing, DataSharingWriter_CommonDomainReaders) ASSERT_EQ(non_datasharing_reader.block_for_all(std::chrono::seconds(2)), 0u); auto_reader.block_for_all(); auto_reader.block_for_all(); + + // Destroy reader and writer and see if there are dangling files + datasharing_reader.destroy(); + non_datasharing_reader.destroy(); + auto_reader.destroy(); + writer.destroy(); + + // Check that the shared files are created on the correct directory + ASSERT_FALSE(check_shared_file(".", datasharing_reader.datareader_guid())); + ASSERT_FALSE(check_shared_file(".", non_datasharing_reader.datareader_guid())); + ASSERT_FALSE(check_shared_file(".", auto_reader.datareader_guid())); + ASSERT_FALSE(check_shared_file(".", writer.datawriter_guid())); } TEST(DDSDataSharing, DataSharingReader_DifferentDomainWriters) @@ -341,7 +442,7 @@ TEST(DDSDataSharing, DataSharingReader_DifferentDomainWriters) datasharing_writer.disable_builtin_transport() .add_user_transport_to_pparams(testTransport) - .datasharing_on("Unused. change when ready", writer_ids) + .datasharing_on(".", writer_ids) .resource_limits_extra_samples(5).init(); ASSERT_TRUE(datasharing_writer.isInitialized()); @@ -352,15 +453,21 @@ TEST(DDSDataSharing, DataSharingReader_DifferentDomainWriters) auto_writer.disable_builtin_transport() .add_user_transport_to_pparams(testTransport) - .datasharing_auto(writer_ids) + .datasharing_auto(".", writer_ids) .resource_limits_extra_samples(5).init(); ASSERT_TRUE(datasharing_writer.isInitialized()); reader.disable_builtin_transport() .add_user_transport_to_pparams(testTransport) - .datasharing_on("Unused. change when ready", reader_ids).init(); + .datasharing_on(".", reader_ids).init(); ASSERT_TRUE(reader.isInitialized()); + // Check that the shared files are created on the correct directory + ASSERT_TRUE(check_shared_file(".", datasharing_writer.datawriter_guid())); + ASSERT_FALSE(check_shared_file(".", non_datasharing_writer.datawriter_guid())); + ASSERT_TRUE(check_shared_file(".", auto_writer.datawriter_guid())); + ASSERT_TRUE(check_shared_file(".", reader.datareader_guid())); + reader.wait_discovery(std::chrono::seconds::zero(), 3); datasharing_writer.wait_discovery(); non_datasharing_writer.wait_discovery(); @@ -386,6 +493,18 @@ TEST(DDSDataSharing, DataSharingReader_DifferentDomainWriters) auto_writer.send(data); ASSERT_TRUE(data.empty()); ASSERT_EQ(reader.block_for_all(std::chrono::seconds(2)), 0u); + + // Destroy reader and writer and see if there are dangling files + reader.destroy(); + datasharing_writer.destroy(); + non_datasharing_writer.destroy(); + auto_writer.destroy(); + + // Check that the shared files are created on the correct directory + ASSERT_FALSE(check_shared_file(".", reader.datareader_guid())); + ASSERT_FALSE(check_shared_file(".", datasharing_writer.datawriter_guid())); + ASSERT_FALSE(check_shared_file(".", non_datasharing_writer.datawriter_guid())); + ASSERT_FALSE(check_shared_file(".", auto_writer.datawriter_guid())); } TEST(DDSDataSharing, DataSharingReader_CommonDomainWriters) @@ -408,7 +527,7 @@ TEST(DDSDataSharing, DataSharingReader_CommonDomainWriters) datasharing_writer.disable_builtin_transport() .add_user_transport_to_pparams(testTransport) - .datasharing_on("Unused. change when ready", writer_ids) + .datasharing_on(".", writer_ids) .resource_limits_extra_samples(5).init(); ASSERT_TRUE(datasharing_writer.isInitialized()); @@ -419,15 +538,21 @@ TEST(DDSDataSharing, DataSharingReader_CommonDomainWriters) auto_writer.disable_builtin_transport() .add_user_transport_to_pparams(testTransport) - .datasharing_auto(writer_ids) + .datasharing_auto(".", writer_ids) .resource_limits_extra_samples(5).init(); ASSERT_TRUE(datasharing_writer.isInitialized()); reader.disable_builtin_transport() .add_user_transport_to_pparams(testTransport) - .datasharing_on("Unused. change when ready", reader_ids).init(); + .datasharing_on(".", reader_ids).init(); ASSERT_TRUE(reader.isInitialized()); + // Check that the shared files are created on the correct directory + ASSERT_TRUE(check_shared_file(".", datasharing_writer.datawriter_guid())); + ASSERT_FALSE(check_shared_file(".", non_datasharing_writer.datawriter_guid())); + ASSERT_TRUE(check_shared_file(".", auto_writer.datawriter_guid())); + ASSERT_TRUE(check_shared_file(".", reader.datareader_guid())); + reader.wait_discovery(std::chrono::seconds::zero(), 3); datasharing_writer.wait_discovery(); non_datasharing_writer.wait_discovery(); @@ -453,6 +578,18 @@ TEST(DDSDataSharing, DataSharingReader_CommonDomainWriters) auto_writer.send(data); ASSERT_TRUE(data.empty()); reader.block_for_all(); + + // Destroy reader and writer and see if there are dangling files + reader.destroy(); + datasharing_writer.destroy(); + non_datasharing_writer.destroy(); + auto_writer.destroy(); + + // Check that the shared files are created on the correct directory + ASSERT_FALSE(check_shared_file(".", reader.datareader_guid())); + ASSERT_FALSE(check_shared_file(".", datasharing_writer.datawriter_guid())); + ASSERT_FALSE(check_shared_file(".", non_datasharing_writer.datawriter_guid())); + ASSERT_FALSE(check_shared_file(".", auto_writer.datawriter_guid())); } @@ -465,20 +602,25 @@ TEST(DDSDataSharing, DataSharingPoolError) writer_datasharing.resource_limits_max_samples(100000) .history_kind(eprosima::fastrtps::KEEP_ALL_HISTORY_QOS) .reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS) - .datasharing_on("Unused. change when ready").init(); + .datasharing_on(".").init(); ASSERT_FALSE(writer_datasharing.isInitialized()); writer_auto.resource_limits_max_samples(100000) .history_kind(eprosima::fastrtps::KEEP_ALL_HISTORY_QOS) .reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS) - .datasharing_auto().init(); + .datasharing_auto(".").init(); ASSERT_TRUE(writer_auto.isInitialized()); - reader.datasharing_on("Unused. change when ready") + reader.datasharing_on(".") .history_depth(10) .reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS).init(); ASSERT_TRUE(reader.isInitialized()); + // Check that the shared files are created on the correct directory + ASSERT_FALSE(check_shared_file(".", writer_datasharing.datawriter_guid())); + ASSERT_FALSE(check_shared_file(".", writer_auto.datawriter_guid())); + ASSERT_TRUE(check_shared_file(".", reader.datareader_guid())); + reader.wait_discovery(); writer_auto.wait_discovery(); @@ -488,4 +630,57 @@ TEST(DDSDataSharing, DataSharingPoolError) writer_auto.send(data); ASSERT_TRUE(data.empty()); reader.block_for_all(); + + // Destroy reader and writer and see if there are dangling files + reader.destroy(); + writer_datasharing.destroy(); + writer_auto.destroy(); + + // Check that the shared files are created on the correct directory + ASSERT_FALSE(check_shared_file(".", reader.datareader_guid())); + ASSERT_FALSE(check_shared_file(".", writer_datasharing.datawriter_guid())); + ASSERT_FALSE(check_shared_file(".", writer_auto.datawriter_guid())); } + + +TEST(DDSDataSharing, DataSharingDefaultDirectory) +{ + // Since the default directory heavily depends on the system, + // we are not checking the creation of the files in this case, + // only that it is working. + PubSubReader reader(TEST_TOPIC_NAME); + PubSubWriter writer(TEST_TOPIC_NAME); + + // Disable transports to ensure we are using datasharing + auto testTransport = std::make_shared(); + testTransport->dropDataMessagesPercentage = 100; + + reader.history_depth(100) + .add_user_transport_to_pparams(testTransport) + .datasharing_auto() + .reliability(BEST_EFFORT_RELIABILITY_QOS).init(); + + ASSERT_TRUE(reader.isInitialized()); + + writer.history_depth(100) + .add_user_transport_to_pparams(testTransport) + .datasharing_auto() + .reliability(BEST_EFFORT_RELIABILITY_QOS).init(); + + ASSERT_TRUE(writer.isInitialized()); + + // Because its volatile the durability + // Wait for discovery. + writer.wait_discovery(); + reader.wait_discovery(); + + auto data = default_fixed_sized_data_generator(); + reader.startReception(data); + + // Send data + writer.send(data); + // In this test all data should be sent. + ASSERT_TRUE(data.empty()); + // Block reader until reception finished or timeout. + reader.block_for_all(); +} \ No newline at end of file diff --git a/test/unittest/dds/publisher/DataWriterTests.cpp b/test/unittest/dds/publisher/DataWriterTests.cpp index 8a8a02e65e6..ed02dc8b3cd 100644 --- a/test/unittest/dds/publisher/DataWriterTests.cpp +++ b/test/unittest/dds/publisher/DataWriterTests.cpp @@ -277,7 +277,7 @@ TEST(DataWriterTests, ForcedDataSharing) // DataSharing enabled, unbounded topic data type qos = DATAWRITER_QOS_DEFAULT; qos.endpoint().history_memory_policy = fastrtps::rtps::PREALLOCATED_MEMORY_MODE; - qos.data_sharing().on("path"); + qos.data_sharing().on("."); datawriter = publisher->create_datawriter(topic, qos); ASSERT_EQ(datawriter, nullptr); @@ -288,7 +288,7 @@ TEST(DataWriterTests, ForcedDataSharing) // DataSharing enabled, bounded topic data type, Dynamic memory policy qos = DATAWRITER_QOS_DEFAULT; - qos.data_sharing().on("path"); + qos.data_sharing().on("."); qos.endpoint().history_memory_policy = fastrtps::rtps::DYNAMIC_RESERVE_MEMORY_MODE; datawriter = publisher->create_datawriter(bounded_topic, qos); ASSERT_EQ(datawriter, nullptr); @@ -338,7 +338,7 @@ TEST(DataWriterTests, ForcedDataSharing) ASSERT_NE(bounded_topic, nullptr); qos = DATAWRITER_QOS_DEFAULT; - qos.data_sharing().on("path"); + qos.data_sharing().on("."); qos.endpoint().history_memory_policy = fastrtps::rtps::PREALLOCATED_MEMORY_MODE; diff --git a/thirdparty/boost/include/boost/interprocess/detail/file_wrapper.hpp b/thirdparty/boost/include/boost/interprocess/detail/file_wrapper.hpp new file mode 100644 index 00000000000..99e0fea570a --- /dev/null +++ b/thirdparty/boost/include/boost/interprocess/detail/file_wrapper.hpp @@ -0,0 +1,213 @@ +////////////////////////////////////////////////////////////////////////////// +// +// (C) Copyright Ion Gaztanaga 2006-2012. Distributed under the Boost +// Software License, Version 1.0. (See accompanying file +// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// See http://www.boost.org/libs/interprocess for documentation. +// +////////////////////////////////////////////////////////////////////////////// + +#ifndef BOOST_INTERPROCESS_DETAIL_FILE_WRAPPER_HPP +#define BOOST_INTERPROCESS_DETAIL_FILE_WRAPPER_HPP + +#ifndef BOOST_CONFIG_HPP +# include +#endif +# +#if defined(BOOST_HAS_PRAGMA_ONCE) +# pragma once +#endif + +#include +#include +#include +#include +#include +#include +#include + +namespace boost { +namespace interprocess { +namespace ipcdetail{ + +class file_wrapper +{ + #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED) + BOOST_MOVABLE_BUT_NOT_COPYABLE(file_wrapper) + #endif //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED + public: + + //!Default constructor. + //!Represents an empty file_wrapper. + file_wrapper(); + + //!Creates a file object with name "name" and mode "mode", with the access mode "mode" + //!If the file previously exists, throws an error. + file_wrapper(create_only_t, const char *name, mode_t mode, const permissions &perm = permissions()) + { this->priv_open_or_create(ipcdetail::DoCreate, name, mode, perm); } + + //!Tries to create a file with name "name" and mode "mode", with the + //!access mode "mode". If the file previously exists, it tries to open it with mode "mode". + //!Otherwise throws an error. + file_wrapper(open_or_create_t, const char *name, mode_t mode, const permissions &perm = permissions()) + { this->priv_open_or_create(ipcdetail::DoOpenOrCreate, name, mode, perm); } + + //!Tries to open a file with name "name", with the access mode "mode". + //!If the file does not previously exist, it throws an error. + file_wrapper(open_only_t, const char *name, mode_t mode) + { this->priv_open_or_create(ipcdetail::DoOpen, name, mode, permissions()); } + + //!Moves the ownership of "moved"'s file to *this. + //!After the call, "moved" does not represent any file. + //!Does not throw + file_wrapper(BOOST_RV_REF(file_wrapper) moved) + : m_handle(file_handle_t(ipcdetail::invalid_file())) + { this->swap(moved); } + + //!Moves the ownership of "moved"'s file to *this. + //!After the call, "moved" does not represent any file. + //!Does not throw + file_wrapper &operator=(BOOST_RV_REF(file_wrapper) moved) + { + file_wrapper tmp(boost::move(moved)); + this->swap(tmp); + return *this; + } + + //!Swaps to file_wrappers. + //!Does not throw + void swap(file_wrapper &other); + + //!Erases a file from the system. + //!Returns false on error. Never throws + static bool remove(const char *name); + + //!Sets the size of the file + void truncate(offset_t length); + + //!Closes the + //!file + ~file_wrapper(); + + //!Returns the name of the file + //!used in the constructor + const char *get_name() const; + + //!Returns the name of the file + //!used in the constructor + bool get_size(offset_t &size) const; + + //!Returns access mode + //!used in the constructor + mode_t get_mode() const; + + //!Get mapping handle + //!to use with mapped_region + mapping_handle_t get_mapping_handle() const; + + private: + //!Closes a previously opened file mapping. Never throws. + void priv_close(); + //!Closes a previously opened file mapping. Never throws. + bool priv_open_or_create(ipcdetail::create_enum_t type, const char *filename, mode_t mode, const permissions &perm); + + file_handle_t m_handle; + mode_t m_mode; + std::string m_filename; +}; + +inline file_wrapper::file_wrapper() + : m_handle(file_handle_t(ipcdetail::invalid_file())) + , m_mode(read_only), m_filename() +{} + +inline file_wrapper::~file_wrapper() +{ this->priv_close(); } + +inline const char *file_wrapper::get_name() const +{ return m_filename.c_str(); } + +inline bool file_wrapper::get_size(offset_t &size) const +{ return get_file_size((file_handle_t)m_handle, size); } + +inline void file_wrapper::swap(file_wrapper &other) +{ + (simple_swap)(m_handle, other.m_handle); + (simple_swap)(m_mode, other.m_mode); + m_filename.swap(other.m_filename); +} + +inline mapping_handle_t file_wrapper::get_mapping_handle() const +{ return mapping_handle_from_file_handle(m_handle); } + +inline mode_t file_wrapper::get_mode() const +{ return m_mode; } + +inline bool file_wrapper::priv_open_or_create + (ipcdetail::create_enum_t type, + const char *filename, + mode_t mode, + const permissions &perm = permissions()) +{ + m_filename = filename; + + if(mode != read_only && mode != read_write){ + error_info err(mode_error); + throw interprocess_exception(err); + } + + //Open file existing native API to obtain the handle + switch(type){ + case ipcdetail::DoOpen: + m_handle = open_existing_file(filename, mode); + break; + case ipcdetail::DoCreate: + m_handle = create_new_file(filename, mode, perm); + break; + case ipcdetail::DoOpenOrCreate: + m_handle = create_or_open_file(filename, mode, perm); + break; + default: + { + error_info err = other_error; + throw interprocess_exception(err); + } + } + + //Check for error + if(m_handle == invalid_file()){ + error_info err = system_error_code(); + throw interprocess_exception(err); + } + + m_mode = mode; + return true; +} + +inline bool file_wrapper::remove(const char *filename) +{ return delete_file(filename); } + +inline void file_wrapper::truncate(offset_t length) +{ + if(!truncate_file(m_handle, length)){ + error_info err(system_error_code()); + throw interprocess_exception(err); + } +} + +inline void file_wrapper::priv_close() +{ + if(m_handle != invalid_file()){ + close_file(m_handle); + m_handle = invalid_file(); + } +} + +} //namespace ipcdetail{ +} //namespace interprocess { +} //namespace boost { + +#include + +#endif //BOOST_INTERPROCESS_DETAIL_FILE_WRAPPER_HPP diff --git a/thirdparty/boost/include/boost/interprocess/file_mapping.hpp b/thirdparty/boost/include/boost/interprocess/file_mapping.hpp new file mode 100644 index 00000000000..0d1cf1fe418 --- /dev/null +++ b/thirdparty/boost/include/boost/interprocess/file_mapping.hpp @@ -0,0 +1,199 @@ +////////////////////////////////////////////////////////////////////////////// +// +// (C) Copyright Ion Gaztanaga 2005-2012. Distributed under the Boost +// Software License, Version 1.0. (See accompanying file +// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// See http://www.boost.org/libs/interprocess for documentation. +// +////////////////////////////////////////////////////////////////////////////// + +#ifndef BOOST_INTERPROCESS_FILE_MAPPING_HPP +#define BOOST_INTERPROCESS_FILE_MAPPING_HPP + +#ifndef BOOST_CONFIG_HPP +# include +#endif +# +#if defined(BOOST_HAS_PRAGMA_ONCE) +# pragma once +#endif + +#include +#include + +#if !defined(BOOST_INTERPROCESS_MAPPED_FILES) +#error "Boost.Interprocess: This platform does not support memory mapped files!" +#endif + +#include +#include +#include +#include +#include +#include +#include +#include //std::string + +//!\file +//!Describes file_mapping and mapped region classes + +namespace boost { +namespace interprocess { + +//!A class that wraps a file-mapping that can be used to +//!create mapped regions from the mapped files +class file_mapping +{ + #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED) + BOOST_MOVABLE_BUT_NOT_COPYABLE(file_mapping) + #endif //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED + + public: + //!Constructs an empty file mapping. + //!Does not throw + file_mapping(); + + //!Opens a file mapping of file "filename", starting in offset + //!"file_offset", and the mapping's size will be "size". The mapping + //!can be opened for read-only "read_only" or read-write "read_write" + //!modes. Throws interprocess_exception on error. + file_mapping(const char *filename, mode_t mode); + + //!Moves the ownership of "moved"'s file mapping object to *this. + //!After the call, "moved" does not represent any file mapping object. + //!Does not throw + file_mapping(BOOST_RV_REF(file_mapping) moved) + : m_handle(file_handle_t(ipcdetail::invalid_file())) + , m_mode(read_only) + { this->swap(moved); } + + //!Moves the ownership of "moved"'s file mapping to *this. + //!After the call, "moved" does not represent any file mapping. + //!Does not throw + file_mapping &operator=(BOOST_RV_REF(file_mapping) moved) + { + file_mapping tmp(boost::move(moved)); + this->swap(tmp); + return *this; + } + + //!Swaps to file_mappings. + //!Does not throw. + void swap(file_mapping &other); + + //!Returns access mode + //!used in the constructor + mode_t get_mode() const; + + //!Obtains the mapping handle + //!to be used with mapped_region + mapping_handle_t get_mapping_handle() const; + + //!Destroys the file mapping. All mapped regions created from this are still + //!valid. Does not throw + ~file_mapping(); + + //!Returns the name of the file + //!used in the constructor. + const char *get_name() const; + + //!Removes the file named "filename" even if it's been memory mapped. + //!Returns true on success. + //!The function might fail in some operating systems if the file is + //!being used other processes and no deletion permission was shared. + static bool remove(const char *filename); + + #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED) + private: + //!Closes a previously opened file mapping. Never throws. + void priv_close(); + file_handle_t m_handle; + mode_t m_mode; + std::string m_filename; + #endif //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED +}; + +inline file_mapping::file_mapping() + : m_handle(file_handle_t(ipcdetail::invalid_file())) + , m_mode(read_only) +{} + +inline file_mapping::~file_mapping() +{ this->priv_close(); } + +inline const char *file_mapping::get_name() const +{ return m_filename.c_str(); } + +inline void file_mapping::swap(file_mapping &other) +{ + (simple_swap)(m_handle, other.m_handle); + (simple_swap)(m_mode, other.m_mode); + m_filename.swap(other.m_filename); +} + +inline mapping_handle_t file_mapping::get_mapping_handle() const +{ return ipcdetail::mapping_handle_from_file_handle(m_handle); } + +inline mode_t file_mapping::get_mode() const +{ return m_mode; } + +inline file_mapping::file_mapping + (const char *filename, mode_t mode) + : m_filename(filename) +{ + //Check accesses + if (mode != read_write && mode != read_only){ + error_info err = other_error; + throw interprocess_exception(err); + } + + //Open file + m_handle = ipcdetail::open_existing_file(filename, mode); + + //Check for error + if(m_handle == ipcdetail::invalid_file()){ + error_info err = system_error_code(); + this->priv_close(); + throw interprocess_exception(err); + } + m_mode = mode; +} + +inline bool file_mapping::remove(const char *filename) +{ return ipcdetail::delete_file(filename); } + +#if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED) + +inline void file_mapping::priv_close() +{ + if(m_handle != ipcdetail::invalid_file()){ + ipcdetail::close_file(m_handle); + m_handle = ipcdetail::invalid_file(); + } +} + +//!A class that stores the name of a file +//!and tries to remove it in its destructor +//!Useful to remove temporary files in the presence +//!of exceptions +class remove_file_on_destroy +{ + const char * m_name; + public: + remove_file_on_destroy(const char *name) + : m_name(name) + {} + + ~remove_file_on_destroy() + { ipcdetail::delete_file(m_name); } +}; + +#endif //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED + +} //namespace interprocess { +} //namespace boost { + +#include + +#endif //BOOST_INTERPROCESS_FILE_MAPPING_HPP diff --git a/thirdparty/boost/include/boost/interprocess/managed_mapped_file.hpp b/thirdparty/boost/include/boost/interprocess/managed_mapped_file.hpp new file mode 100644 index 00000000000..14ec160614b --- /dev/null +++ b/thirdparty/boost/include/boost/interprocess/managed_mapped_file.hpp @@ -0,0 +1,250 @@ +////////////////////////////////////////////////////////////////////////////// +// +// (C) Copyright Ion Gaztanaga 2005-2012. Distributed under the Boost +// Software License, Version 1.0. (See accompanying file +// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// See http://www.boost.org/libs/interprocess for documentation. +// +////////////////////////////////////////////////////////////////////////////// + +#ifndef BOOST_INTERPROCESS_MANAGED_MAPPED_FILE_HPP +#define BOOST_INTERPROCESS_MANAGED_MAPPED_FILE_HPP + +#ifndef BOOST_CONFIG_HPP +# include +#endif +# +#if defined(BOOST_HAS_PRAGMA_ONCE) +# pragma once +#endif + +#include +#include +#include +#include +#include +#include +#include +#include +#include +//These includes needed to fulfill default template parameters of +//predeclarations in interprocess_fwd.hpp +#include +#include +#include + +namespace boost { +namespace interprocess { +namespace ipcdetail { + +template +struct mfile_open_or_create +{ + typedef ipcdetail::managed_open_or_create_impl + < file_wrapper, AllocationAlgorithm::Alignment, true, false> type; +}; + +} //namespace ipcdetail { + +//!A basic mapped file named object creation class. Initializes the +//!mapped file. Inherits all basic functionality from +//!basic_managed_memory_impl +template + < + class CharType, + class AllocationAlgorithm, + template class IndexType + > +class basic_managed_mapped_file + #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED) + : public ipcdetail::basic_managed_memory_impl + ::type::ManagedOpenOrCreateUserOffset> + #endif //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED +{ + #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED) + public: + typedef ipcdetail::basic_managed_memory_impl + ::type::ManagedOpenOrCreateUserOffset> base_t; + typedef ipcdetail::file_wrapper device_type; + + private: + + typedef ipcdetail::create_open_func create_open_func_t; + + basic_managed_mapped_file *get_this_pointer() + { return this; } + + private: + typedef typename base_t::char_ptr_holder_t char_ptr_holder_t; + BOOST_MOVABLE_BUT_NOT_COPYABLE(basic_managed_mapped_file) + #endif //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED + + public: //functions + + //!Unsigned integral type enough to represent + //!the size of a basic_managed_mapped_file. + typedef typename BOOST_INTERPROCESS_IMPDEF(base_t::size_type) size_type; + + //!Creates mapped file and creates and places the segment manager. + //!This can throw. + basic_managed_mapped_file() + {} + + //!Creates mapped file and creates and places the segment manager. + //!This can throw. + basic_managed_mapped_file(create_only_t, const char *name, + size_type size, const void *addr = 0, const permissions &perm = permissions()) + : m_mfile(create_only, name, size, read_write, addr, + create_open_func_t(get_this_pointer(), ipcdetail::DoCreate), perm) + {} + + //!Creates mapped file and creates and places the segment manager if + //!segment was not created. If segment was created it connects to the + //!segment. + //!This can throw. + basic_managed_mapped_file (open_or_create_t, + const char *name, size_type size, + const void *addr = 0, const permissions &perm = permissions()) + : m_mfile(open_or_create, name, size, read_write, addr, + create_open_func_t(get_this_pointer(), + ipcdetail::DoOpenOrCreate), perm) + {} + + //!Connects to a created mapped file and its segment manager. + //!This can throw. + basic_managed_mapped_file (open_only_t, const char* name, + const void *addr = 0) + : m_mfile(open_only, name, read_write, addr, + create_open_func_t(get_this_pointer(), + ipcdetail::DoOpen)) + {} + + //!Connects to a created mapped file and its segment manager + //!in copy_on_write mode. + //!This can throw. + basic_managed_mapped_file (open_copy_on_write_t, const char* name, + const void *addr = 0) + : m_mfile(open_only, name, copy_on_write, addr, + create_open_func_t(get_this_pointer(), + ipcdetail::DoOpen)) + {} + + //!Connects to a created mapped file and its segment manager + //!in read-only mode. + //!This can throw. + basic_managed_mapped_file (open_read_only_t, const char* name, + const void *addr = 0) + : m_mfile(open_only, name, read_only, addr, + create_open_func_t(get_this_pointer(), + ipcdetail::DoOpen)) + {} + + //!Moves the ownership of "moved"'s managed memory to *this. + //!Does not throw + basic_managed_mapped_file(BOOST_RV_REF(basic_managed_mapped_file) moved) + { + this->swap(moved); + } + + //!Moves the ownership of "moved"'s managed memory to *this. + //!Does not throw + basic_managed_mapped_file &operator=(BOOST_RV_REF(basic_managed_mapped_file) moved) + { + basic_managed_mapped_file tmp(boost::move(moved)); + this->swap(tmp); + return *this; + } + + //!Destroys *this and indicates that the calling process is finished using + //!the resource. The destructor function will deallocate + //!any system resources allocated by the system for use by this process for + //!this resource. The resource can still be opened again calling + //!the open constructor overload. To erase the resource from the system + //!use remove(). + ~basic_managed_mapped_file() + {} + + //!Swaps the ownership of the managed mapped memories managed by *this and other. + //!Never throws. + void swap(basic_managed_mapped_file &other) + { + base_t::swap(other); + m_mfile.swap(other.m_mfile); + } + + //!Flushes cached data to file. + //!Never throws + bool flush() + { return m_mfile.flush(); } + + //!Tries to resize mapped file so that we have room for + //!more objects. + //! + //!This function is not synchronized so no other thread or process should + //!be reading or writing the file + static bool grow(const char *filename, size_type extra_bytes) + { + return base_t::template grow + (filename, extra_bytes); + } + + //!Tries to resize mapped file to minimized the size of the file. + //! + //!This function is not synchronized so no other thread or process should + //!be reading or writing the file + static bool shrink_to_fit(const char *filename) + { + return base_t::template shrink_to_fit + (filename); + } + + #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED) + + //!Tries to find a previous named allocation address. Returns a memory + //!buffer and the object count. If not found returned pointer is 0. + //!Never throws. + template + std::pair find (char_ptr_holder_t name) + { + if(m_mfile.get_mapped_region().get_mode() == read_only){ + return base_t::template find_no_lock(name); + } + else{ + return base_t::template find(name); + } + } + + private: + typename ipcdetail::mfile_open_or_create::type m_mfile; + #endif //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED +}; + +#ifdef BOOST_INTERPROCESS_DOXYGEN_INVOKED + +//!Typedef for a default basic_managed_mapped_file +//!of narrow characters +typedef basic_managed_mapped_file + + ,iset_index> +managed_mapped_file; + +//!Typedef for a default basic_managed_mapped_file +//!of wide characters +typedef basic_managed_mapped_file + + ,iset_index> +wmanaged_mapped_file; + +#endif //#ifdef BOOST_INTERPROCESS_DOXYGEN_INVOKED + +} //namespace interprocess { +} //namespace boost { + +#include + +#endif //BOOST_INTERPROCESS_MANAGED_MAPPED_FILE_HPP