Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/datasharing custom path [12051] #2092

Merged
merged 11 commits into from
Sep 1, 2021
97 changes: 24 additions & 73 deletions src/cpp/rtps/DataSharing/DataSharingNotification.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,101 +50,52 @@ std::shared_ptr<DataSharingNotification> DataSharingNotification::open_notificat
return notification;
}

void DataSharingNotification::destroy()
{
if (owned_)
{
// We cannot destroy the objects in the SHM, as the Writer may still be using them.
// We just remove the segment, and when the Writer closes it, it will be removed from the system.
segment_->remove(segment_name_);

owned_ = false;
}
else
{
logError(HISTORY_DATASHARING_LISTENER, "Trying to destroy non-owned notification segment " << segment_name_);
}
}

bool DataSharingNotification::create_and_init_notification(
const GUID_t& reader_guid,
const std::string& shared_dir)
{
segment_id_ = reader_guid;
segment_name_ = generate_segment_name(shared_dir, reader_guid);

uint32_t per_allocation_extra_size = Segment::compute_per_allocation_extra_size(
alignof(Notification), DataSharingNotification::domain_name());
uint32_t segment_size = static_cast<uint32_t>(sizeof(Notification)) + per_allocation_extra_size;

//Open the segment
Segment::remove(segment_name_);
try
if (shared_dir.empty())
{
segment_ = std::unique_ptr<Segment>(
new Segment(boost::interprocess::create_only,
segment_name_,
segment_size + fastdds::rtps::SharedMemSegment::EXTRA_SEGMENT_SIZE));
return create_and_init_shared_segment_notification<fastdds::rtps::SharedMemSegment>(reader_guid,
shared_dir);
}
catch (const std::exception& e)
{
logError(HISTORY_DATASHARING_LISTENER, "Failed to create segment " << segment_name_
<< ": " << e.what());
return false;
}

try
else
{
// Alloc and initialize the Node
notification_ = segment_->get().construct<Notification>("notification_node")();
notification_->new_data.store(false);
return create_and_init_shared_segment_notification<fastdds::rtps::SharedFileSegment>(reader_guid,
shared_dir);
}
catch (std::exception& e)
{
Segment::remove(segment_name_);

logError(HISTORY_DATASHARING_LISTENER, "Failed to create listener queue " << segment_name_
<< ": " << e.what());
return false;
}

owned_ = true;
return true;
}

bool DataSharingNotification::open_and_init_notification(
const GUID_t& reader_guid,
const std::string& shared_dir)
{
segment_id_ = reader_guid;
segment_name_ = generate_segment_name(shared_dir, reader_guid);

//Open the segment
try
if (shared_dir.empty())
{
segment_ = std::unique_ptr<Segment>(
new Segment(boost::interprocess::open_only,
segment_name_.c_str()));
return open_and_init_shared_segment_notification<fastdds::rtps::SharedMemSegment>(reader_guid,
shared_dir);
}
catch (const std::exception& e)
else
{
logError(HISTORY_DATASHARING_LISTENER, "Failed to open segment " << segment_name_
<< ": " << e.what());
return false;
return open_and_init_shared_segment_notification<fastdds::rtps::SharedFileSegment>(reader_guid,
shared_dir);
}
}

// Initialize values from the segment
notification_ = segment_->get().find<Notification>(
"notification_node").first;
if (!notification_)
void DataSharingNotification::destroy()
{
if (owned_)
{
segment_.reset();
// We cannot destroy the objects in the SHM, as the Writer may still be using them.
// We just remove the segment, and when the Writer closes it, it will be removed from the system.
segment_->remove();

logError(HISTORY_DATASHARING_LISTENER, "Failed to open listener queue " << segment_name_);
return false;
owned_ = false;
}
else
{
logError(HISTORY_DATASHARING_LISTENER, "Trying to destroy non-owned notification segment " << segment_name_);
}

return true;
}

} // namespace rtps
Expand Down
95 changes: 92 additions & 3 deletions src/cpp/rtps/DataSharing/DataSharingNotification.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class DataSharingNotification

public:

typedef fastdds::rtps::SharedMemSegment Segment;
typedef fastdds::rtps::SharedSegmentBase Segment;

DataSharingNotification() = default;

Expand Down Expand Up @@ -107,10 +107,14 @@ class DataSharingNotification
#pragma warning(pop)

static std::string generate_segment_name(
const std::string& /*shared_dir*/,
const std::string& shared_dir,
const GUID_t& reader_guid)
{
std::stringstream ss;
if (!shared_dir.empty())
{
ss << shared_dir << "/";
}
ss << DataSharingNotification::domain_name() << "_" << reader_guid.guidPrefix << "_" << reader_guid.entityId;
return ss.str();
}
Expand All @@ -119,11 +123,96 @@ class DataSharingNotification
const GUID_t& reader_guid,
const std::string& shared_dir = std::string());


bool open_and_init_notification(
const GUID_t& reader_guid,
const std::string& shared_dir = std::string());

template <typename T>
bool create_and_init_shared_segment_notification(
const GUID_t& reader_guid,
const std::string& shared_dir)
{
segment_id_ = reader_guid;
segment_name_ = generate_segment_name(shared_dir, reader_guid);

uint32_t per_allocation_extra_size = T::compute_per_allocation_extra_size(
alignof(Notification), DataSharingNotification::domain_name());
uint32_t segment_size = static_cast<uint32_t>(sizeof(Notification)) + per_allocation_extra_size;

//Open the segment
T::remove(segment_name_);
std::unique_ptr<T> local_segment;
try
{
local_segment = std::unique_ptr<T>(
new T(boost::interprocess::create_only,
segment_name_,
segment_size + T::EXTRA_SEGMENT_SIZE));
}
catch (const std::exception& e)
{
logError(HISTORY_DATASHARING_LISTENER, "Failed to create segment " << segment_name_
<< ": " << e.what());
return false;
}

try
{
// Alloc and initialize the Node
notification_ = local_segment->get().template construct<Notification>("notification_node")();
notification_->new_data.store(false);
}
catch (std::exception& e)
{
T::remove(segment_name_);

logError(HISTORY_DATASHARING_LISTENER, "Failed to create listener queue " << segment_name_
<< ": " << e.what());
return false;
}

segment_ = std::move(local_segment);
owned_ = true;
return true;
}

template <typename T>
bool open_and_init_shared_segment_notification(
const GUID_t& reader_guid,
const std::string& shared_dir)
{
segment_id_ = reader_guid;
segment_name_ = generate_segment_name(shared_dir, reader_guid);

//Open the segment
std::unique_ptr<T> local_segment;
try
{
local_segment = std::unique_ptr<T>(
new T(boost::interprocess::open_only,
segment_name_.c_str()));
}
catch (const std::exception& e)
{
logError(HISTORY_DATASHARING_LISTENER, "Failed to open segment " << segment_name_
<< ": " << e.what());
return false;
}

// Initialize values from the segment
notification_ = (local_segment->get().template find<Notification>(
"notification_node")).first;
if (!notification_)
{
local_segment.reset();

logError(HISTORY_DATASHARING_LISTENER, "Failed to open listener queue " << segment_name_);
return false;
}

segment_ = std::move(local_segment);
return true;
}

GUID_t segment_id_; //< The ID of the segment is the GUID of the reader
std::string segment_name_; //< Segment name
Expand Down
9 changes: 7 additions & 2 deletions src/cpp/rtps/DataSharing/DataSharingPayloadPool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class DataSharingPayloadPool : public IPayloadPool

public:

using Segment = fastdds::rtps::SharedMemSegment;
using Segment = fastdds::rtps::SharedSegmentBase;
using sharable_mutex = Segment::sharable_mutex;
template <class M>
using sharable_lock = Segment::sharable_lock<M>;
Expand Down Expand Up @@ -348,10 +348,15 @@ class DataSharingPayloadPool : public IPayloadPool
#pragma warning(pop)

static std::string generate_segment_name(
const std::string& /*shared_dir*/,
const std::string& shared_dir,
const GUID_t& writer_guid)
{
std::stringstream ss;
if (!shared_dir.empty())
{
ss << shared_dir << "/";
}

ss << DataSharingPayloadPool::domain_name() << "_" << writer_guid.guidPrefix << "_" << writer_guid.entityId;
return ss.str();
}
Expand Down
33 changes: 25 additions & 8 deletions src/cpp/rtps/DataSharing/ReaderPool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,20 @@ class ReaderPool : public DataSharingPayloadPool
return DataSharingPayloadPool::release_payload(cache_change);
}

bool init_shared_memory(
template <typename T>
bool init_shared_segment(
const GUID_t& writer_guid,
const std::string& shared_dir) override
const std::string& shared_dir)
{
segment_id_ = writer_guid;
segment_name_ = generate_segment_name(shared_dir, writer_guid);

std::unique_ptr<T> local_segment;
// Open the segment
try
{
segment_ = std::unique_ptr<fastdds::rtps::SharedMemSegment>(
new fastdds::rtps::SharedMemSegment(boost::interprocess::open_only,
local_segment = std::unique_ptr<T>(
new T(boost::interprocess::open_only,
segment_name_.c_str()));
}
catch (const std::exception& e)
Expand All @@ -103,20 +105,20 @@ class ReaderPool : public DataSharingPayloadPool
}

// Get the pool description
descriptor_ = segment_->get().find<PoolDescriptor>(descriptor_chunk_name()).first;
descriptor_ = local_segment->get().template find<PoolDescriptor>(descriptor_chunk_name()).first;
if (!descriptor_)
{
segment_.reset();
local_segment.reset();

logError(HISTORY_DATASHARING_PAYLOADPOOL, "Failed to open payload pool descriptor " << segment_name_);
return false;
}

// Get the history
history_ = segment_->get().find<Segment::Offset>(history_chunk_name()).first;
history_ = local_segment->get().template find<Segment::Offset>(history_chunk_name()).first;
if (!history_)
{
segment_.reset();
local_segment.reset();

logError(HISTORY_DATASHARING_PAYLOADPOOL, "Failed to open payload history " << segment_name_);
return false;
Expand All @@ -132,9 +134,24 @@ class ReaderPool : public DataSharingPayloadPool
next_payload_ = begin();
}

segment_ = std::move(local_segment);
return true;
}

bool init_shared_memory(
const GUID_t& writer_guid,
const std::string& shared_dir) override
{
if (shared_dir.empty())
{
return init_shared_segment<fastdds::rtps::SharedMemSegment>(writer_guid, shared_dir);
}
else
{
return init_shared_segment<fastdds::rtps::SharedFileSegment>(writer_guid, shared_dir);
}
}

void get_next_unread_payload(
CacheChange_t& cache_change,
SequenceNumber_t& last_sequence_number)
Expand Down
Loading