diff --git a/include/fastrtps/qos/ParameterTypes.h b/include/fastrtps/qos/ParameterTypes.h index 2e4249675d3..49ba3ef8bfc 100644 --- a/include/fastrtps/qos/ParameterTypes.h +++ b/include/fastrtps/qos/ParameterTypes.h @@ -241,6 +241,8 @@ class ParameterKey_t : public Parameter_t }; #define PARAMETER_KEY_LENGTH 16 +#define PARAMETER_KEY_HASH_LENGTH 16 + /** * */ diff --git a/include/fastrtps/rtps/common/Types.h b/include/fastrtps/rtps/common/Types.h index 2a439d50d52..b1a4a7e847c 100644 --- a/include/fastrtps/rtps/common/Types.h +++ b/include/fastrtps/rtps/common/Types.h @@ -89,23 +89,23 @@ const Endianness_t DEFAULT_ENDIAN = BIGEND; const Endianness_t DEFAULT_ENDIAN = LITTLEEND; #endif -typedef unsigned char octet; +using octet = unsigned char; //typedef unsigned int uint; //typedef unsigned short ushort; -typedef unsigned char SubmessageFlag; -typedef uint32_t BuiltinEndpointSet_t; -typedef uint32_t Count_t; - -#define BIT0 0x1 -#define BIT1 0x2 -#define BIT2 0x4 -#define BIT3 0x8 -#define BIT4 0x10 -#define BIT5 0x20 -#define BIT6 0x40 -#define BIT7 0x80 - -#define BIT(i) ((i==0) ? BIT0 : (i==1) ? BIT1 :(i==2)?BIT2:(i==3)?BIT3:(i==4)?BIT4:(i==5)?BIT5:(i==6)?BIT6:(i==7)?BIT7:0x0) +using SubmessageFlag = unsigned char; +using BuiltinEndpointSet_t = uint32_t; +using Count_t = uint32_t; + +#define BIT0 0x1u +#define BIT1 0x2u +#define BIT2 0x4u +#define BIT3 0x8u +#define BIT4 0x10u +#define BIT5 0x20u +#define BIT6 0x40u +#define BIT7 0x80u + +#define BIT(i) (1U << static_cast(i)) //!@brief Structure ProtocolVersion_t, contains the protocol version. struct RTPS_DllAPI ProtocolVersion_t diff --git a/include/fastrtps/rtps/messages/MessageReceiver.h b/include/fastrtps/rtps/messages/MessageReceiver.h index ea08a482668..273de83b973 100644 --- a/include/fastrtps/rtps/messages/MessageReceiver.h +++ b/include/fastrtps/rtps/messages/MessageReceiver.h @@ -16,17 +16,15 @@ * @file MessageReceiver.h */ - - #ifndef MESSAGERECEIVER_H_ #define MESSAGERECEIVER_H_ #ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC -#include #include "../common/all_common.h" #include "../../qos/ParameterList.h" #include +#include namespace eprosima { namespace fastrtps { @@ -44,126 +42,149 @@ struct SubmessageHeader_t; */ class MessageReceiver { - public: - /** - * @param participant - * @param rec_buffer_size - */ - MessageReceiver(RTPSParticipantImpl* participant, uint32_t rec_buffer_size); - - virtual ~MessageReceiver(); - //!Reset the MessageReceiver to process a new message. - void reset(); - /** Init MessageReceiver. Does what the constructor used to do. - This is now on an independent function since MessageReceiver now stands inside - a struct. - @param rec_buffer_size - **/ - void init(uint32_t rec_buffer_size); - - /** - * Process a new CDR message. - * @param[in] loc Locator indicating the sending address. - * @param[in] msg Pointer to the message - */ - void processCDRMsg(const Locator_t& loc, CDRMessage_t*msg); - - //!Pointer to the Listen Resource that contains this MessageReceiver. - - //!Received message +public: + + /** + * @param participant + * @param rec_buffer_size + */ + MessageReceiver( + RTPSParticipantImpl* participant, + uint32_t rec_buffer_size); + + virtual ~MessageReceiver(); + + /** + * Process a new CDR message. + * @param[in] loc Locator indicating the sending address. + * @param[in] msg Pointer to the message + */ + void processCDRMsg( + const Locator_t& loc, + CDRMessage_t* msg); + + // Functions to associate/remove associatedendpoints + void associateEndpoint( + Endpoint* to_add); + void removeEndpoint( + Endpoint* to_remove); + +private: + + std::mutex mtx_; + std::vector associated_writers_; + std::unordered_map > associated_readers_; + + RTPSParticipantImpl* participant_; + + //!Protocol version of the message + ProtocolVersion_t source_version_; + //!VendorID that created the message + VendorId_t source_vendor_id_; + //!GuidPrefix of the entity that created the message + GuidPrefix_t source_guid_prefix_; + //!GuidPrefix of the entity that receives the message. GuidPrefix of the RTPSParticipant. + GuidPrefix_t dest_guid_prefix_; + //!Has the message timestamp? + bool have_timestamp_; + //!Timestamp associated with the message + Time_t timestamp_; + #if HAVE_SECURITY - CDRMessage_t m_crypto_msg; + CDRMessage_t crypto_msg_; #endif - // Functions to associate/remove associatedendpoints - void associateEndpoint(Endpoint *to_add); - void removeEndpoint(Endpoint *to_remove); - - private: - std::vector AssociatedWriters; - std::unordered_map> AssociatedReaders; - std::mutex mtx; - //!Protocol version of the message - ProtocolVersion_t sourceVersion; - //!VendorID that created the message - VendorId_t sourceVendorId; - //!GuidPrefix of the entity that created the message - GuidPrefix_t sourceGuidPrefix; - //!GuidPrefix of the entity that receives the message. GuidPrefix of the RTPSParticipant. - GuidPrefix_t destGuidPrefix; - //!Has the message timestamp? - bool haveTimestamp; - //!Timestamp associated with the message - Time_t timestamp; - //!Version of the protocol used by the receiving end. - ProtocolVersion_t destVersion; - - uint16_t mMaxPayload_; - - - /**@name Processing methods. - * These methods are designed to read a part of the message - * and perform the corresponding actions: - * -Modify the message receiver state if necessary. - * -Add information to the history. - * -Return an error if the message is malformed. - * @param[in] msg Pointer to the message - * @param[out] params Different parameters depending on the message - * @return True if correct, false otherwise - */ - - ///@{ - /** - * Check the RTPSHeader of a received message. - * @param msg Pointer to the message. - * @return True if correct. - */ - bool checkRTPSHeader(CDRMessage_t*msg); - /** - * Read the submessage header of a message. - * @param msg Pointer to the CDRMessage_t to read. - * @param smh Pointer to the submessageheader structure. - * @return True if correctly read. - */ - bool readSubmessageHeader(CDRMessage_t*msg, SubmessageHeader_t* smh); - - /** - * Find if there is a reader (in AssociatedReaders) that will accept a msg directed - * to the given entity ID. - */ - bool willAReaderAcceptMsgDirectedTo(const EntityId_t & readerID); - - /** - * Find all readers (in AssociatedReaders), with the given entity ID, and call the - * callback provided. - */ - template - void findAllReaders( - const EntityId_t & readerID, - const Functor & callback); - - /** - * - * @param msg - * @param smh - * @return - */ - bool proc_Submsg_Data(CDRMessage_t*msg, SubmessageHeader_t* smh); - bool proc_Submsg_DataFrag(CDRMessage_t*msg, SubmessageHeader_t* smh); - bool proc_Submsg_Acknack(CDRMessage_t*msg, SubmessageHeader_t* smh); - bool proc_Submsg_Heartbeat(CDRMessage_t*msg, SubmessageHeader_t* smh); - bool proc_Submsg_Gap(CDRMessage_t*msg, SubmessageHeader_t* smh); - bool proc_Submsg_InfoTS(CDRMessage_t*msg, SubmessageHeader_t* smh); - bool proc_Submsg_InfoDST(CDRMessage_t*msg,SubmessageHeader_t* smh); - bool proc_Submsg_InfoSRC(CDRMessage_t*msg,SubmessageHeader_t* smh); - bool proc_Submsg_NackFrag(CDRMessage_t*msg, SubmessageHeader_t* smh); - bool proc_Submsg_HeartbeatFrag(CDRMessage_t*msg, SubmessageHeader_t* smh); - bool proc_Submsg_SecureMessage(CDRMessage_t*msg, SubmessageHeader_t* smh); - bool proc_Submsg_SecureSubMessage(CDRMessage_t*msg, SubmessageHeader_t* smh); - - RTPSParticipantImpl* participant_; + + //! Reset the MessageReceiver to process a new message. + void reset(); + + /** + * Find if there is a reader (in associated_readers_) that will accept a msg directed + * to the given entity ID. + */ + bool willAReaderAcceptMsgDirectedTo( + const EntityId_t& readerID); + + /** + * Find all readers (in associated_readers_), with the given entity ID, and call the + * callback provided. + */ + template + void findAllReaders( + const EntityId_t& readerID, + const Functor& callback); + + /** + * Check the RTPSHeader of a received message. + * @param msg Pointer to the message. + * @return True if correct. + */ + bool checkRTPSHeader( + CDRMessage_t* msg); + /** + * Read the submessage header of a message. + * @param msg Pointer to the CDRMessage_t to read. + * @param smh Pointer to the submessageheader structure. + * @return True if correctly read. + */ + bool readSubmessageHeader( + CDRMessage_t* msg, + SubmessageHeader_t* smh); + + /** + * @name Processing methods. + * These methods are designed to read a part of the message + * and perform the corresponding actions: + * -Modify the message receiver state if necessary. + * -Add information to the history. + * -Return an error if the message is malformed. + * @param[in,out] msg Pointer to the message + * @param[in] smh Pointer to the submessage header + * @return True if correct, false otherwise + */ + ///@{ + + /** + * + * @param msg + * @param smh + * @return + */ + bool proc_Submsg_Data( + CDRMessage_t* msg, + SubmessageHeader_t* smh); + bool proc_Submsg_DataFrag( + CDRMessage_t* msg, + SubmessageHeader_t* smh); + bool proc_Submsg_Acknack( + CDRMessage_t* msg, + SubmessageHeader_t* smh); + bool proc_Submsg_Heartbeat( + CDRMessage_t* msg, + SubmessageHeader_t* smh); + bool proc_Submsg_Gap( + CDRMessage_t* msg, + SubmessageHeader_t* smh); + bool proc_Submsg_InfoTS( + CDRMessage_t* msg, + SubmessageHeader_t* smh); + bool proc_Submsg_InfoDST( + CDRMessage_t* msg, + SubmessageHeader_t* smh); + bool proc_Submsg_InfoSRC( + CDRMessage_t* msg, + SubmessageHeader_t* smh); + bool proc_Submsg_NackFrag( + CDRMessage_t* msg, + SubmessageHeader_t* smh); + bool proc_Submsg_HeartbeatFrag( + CDRMessage_t* msg, + SubmessageHeader_t* smh); + ///@} }; -} + } /* namespace rtps */ +} /* namespace fastrtps */ } /* namespace eprosima */ + #endif #endif /* MESSAGERECEIVER_H_ */ diff --git a/src/cpp/rtps/messages/MessageReceiver.cpp b/src/cpp/rtps/messages/MessageReceiver.cpp index 60236c93592..65bb2226490 100644 --- a/src/cpp/rtps/messages/MessageReceiver.cpp +++ b/src/cpp/rtps/messages/MessageReceiver.cpp @@ -20,167 +20,181 @@ #include #include - #include - #include -#include "../participant/RTPSParticipantImpl.h" +#include -#include +#include "../participant/RTPSParticipantImpl.h" -#include #include +#include +#include +#define INFO_SRC_SUBMSG_LENGTH 20 -#include - -#define IDSTRING "(ID:" << std::this_thread::get_id() <<") "<< - -using namespace eprosima::fastrtps; +#define IDSTRING "(ID:" << std::this_thread::get_id() << ") " << namespace eprosima { -namespace fastrtps{ +namespace fastrtps { namespace rtps { - -MessageReceiver::MessageReceiver(RTPSParticipantImpl* participant, uint32_t rec_buffer_size) : +MessageReceiver::MessageReceiver( + RTPSParticipantImpl* participant, + uint32_t rec_buffer_size) + : participant_(participant) + , source_version_(c_ProtocolVersion) + , source_vendor_id_(c_VendorId_Unknown) + , source_guid_prefix_(c_GuidPrefix_Unknown) + , dest_guid_prefix_(c_GuidPrefix_Unknown) + , have_timestamp_(false) + , timestamp_(c_TimeInvalid) #if HAVE_SECURITY - m_crypto_msg(rec_buffer_size), + , crypto_msg_(participant->is_secure() ? rec_buffer_size : 0) #endif - sourceVendorId(c_VendorId_Unknown), participant_(participant) { - init(rec_buffer_size); -} - -void MessageReceiver::init(uint32_t rec_buffer_size){ - destVersion = c_ProtocolVersion; - sourceVersion = c_ProtocolVersion; - sourceVendorId = c_VendorId_Unknown; - sourceGuidPrefix = c_GuidPrefix_Unknown; - destGuidPrefix = c_GuidPrefix_Unknown; - haveTimestamp = false; - timestamp = c_TimeInvalid; - - logInfo(RTPS_MSG_IN,"Created with CDRMessage of size: "<< rec_buffer_size); - mMaxPayload_ = ((uint32_t)std::numeric_limits::max() < rec_buffer_size) ? std::numeric_limits::max() : (uint16_t)rec_buffer_size; + (void)rec_buffer_size; + logInfo(RTPS_MSG_IN, "Created with CDRMessage of size: " << rec_buffer_size); } MessageReceiver::~MessageReceiver() { - logInfo(RTPS_MSG_IN,""); - assert(AssociatedWriters.size() == 0); - assert(AssociatedReaders.size() == 0); + logInfo(RTPS_MSG_IN, ""); + assert(associated_writers_.empty()); + assert(associated_readers_.empty()); } -void MessageReceiver::associateEndpoint(Endpoint *to_add){ - bool found = false; - std::lock_guard guard(mtx); - if(to_add->getAttributes().endpointKind == WRITER) +void MessageReceiver::associateEndpoint( + Endpoint* to_add) +{ + std::lock_guard guard(mtx_); + if (to_add->getAttributes().endpointKind == WRITER) { - for(auto it = AssociatedWriters.begin(); it != AssociatedWriters.end(); ++it) + const auto writer = dynamic_cast(to_add); + for (const auto& it : associated_writers_) { - if( (*it) == (RTPSWriter*)to_add ) + if (it == writer) { - found = true; - break; + return; } } - if(!found) AssociatedWriters.push_back((RTPSWriter*)to_add); + + associated_writers_.push_back(writer); } else { - const auto reader = static_cast(to_add); + const auto reader = dynamic_cast(to_add); const auto entityId = reader->getGuid().entityId; // search for set of readers by entity ID - const auto readers = AssociatedReaders.find(entityId); - if (readers == AssociatedReaders.end()) { + const auto readers = associated_readers_.find(entityId); + if (readers == associated_readers_.end()) + { auto vec = std::vector(); vec.push_back(reader); - AssociatedReaders.emplace(entityId, vec); + associated_readers_.emplace(entityId, vec); } - else { - for (const auto & it : readers->second) { - if (it == reader) { - found = true; - break; + else + { + for (const auto& it : readers->second) + { + if (it == reader) + { + return; } } - if (!found) readers->second.push_back(reader); + + readers->second.push_back(reader); } } - return; } -void MessageReceiver::removeEndpoint(Endpoint *to_remove){ - - std::lock_guard guard(mtx); - if(to_remove->getAttributes().endpointKind == WRITER){ - RTPSWriter* var = (RTPSWriter *)to_remove; - for(auto it=AssociatedWriters.begin(); it !=AssociatedWriters.end(); ++it){ - if ((*it) == var){ - AssociatedWriters.erase(it); + +void MessageReceiver::removeEndpoint( + Endpoint* to_remove) +{ + std::lock_guard guard(mtx_); + + if (to_remove->getAttributes().endpointKind == WRITER) + { + auto* var = dynamic_cast(to_remove); + for (auto it = associated_writers_.begin(); it != associated_writers_.end(); ++it) + { + if (*it == var) + { + associated_writers_.erase(it); break; } } - }else{ - auto readers = AssociatedReaders.find(to_remove->getGuid().entityId); - if (readers != AssociatedReaders.end()) { - RTPSReader *var = (RTPSReader *)to_remove; - for (auto it = readers->second.begin(); it != readers->second.end(); ++it) { - if (*it == var){ + } + else + { + auto readers = associated_readers_.find(to_remove->getGuid().entityId); + if (readers != associated_readers_.end()) + { + auto* var = dynamic_cast(to_remove); + for (auto it = readers->second.begin(); it != readers->second.end(); ++it) + { + if (*it == var) + { readers->second.erase(it); - if (readers->second.empty()) AssociatedReaders.erase(readers); + if (readers->second.empty()) + { + associated_readers_.erase(readers); + } break; } } } } - return; } - -void MessageReceiver::reset(){ - destVersion = c_ProtocolVersion; - sourceVersion = c_ProtocolVersion; - sourceVendorId = c_VendorId_Unknown; - sourceGuidPrefix = c_GuidPrefix_Unknown; - destGuidPrefix = c_GuidPrefix_Unknown; - haveTimestamp = false; - timestamp = c_TimeInvalid; +void MessageReceiver::reset() +{ + source_version_ = c_ProtocolVersion; + source_vendor_id_ = c_VendorId_Unknown; + source_guid_prefix_ = c_GuidPrefix_Unknown; + dest_guid_prefix_ = c_GuidPrefix_Unknown; + have_timestamp_ = false; + timestamp_ = c_TimeInvalid; } -void MessageReceiver::processCDRMsg(const Locator_t& loc, CDRMessage_t*msg) +void MessageReceiver::processCDRMsg( + const Locator_t& loc, + CDRMessage_t* msg) { (void)loc; - if(msg->length < RTPSMESSAGE_HEADER_SIZE) + if (msg->length < RTPSMESSAGE_HEADER_SIZE) { - logWarning(RTPS_MSG_IN,IDSTRING"Received message too short, ignoring"); + logWarning(RTPS_MSG_IN, IDSTRING "Received message too short, ignoring"); return; } - this->reset(); + reset(); GuidPrefix_t participantGuidPrefix = participant_->getGuid().guidPrefix; - destGuidPrefix = participantGuidPrefix; + dest_guid_prefix_ = participantGuidPrefix; msg->pos = 0; //Start reading at 0 //Once everything is set, the reading begins: - if(!checkRTPSHeader(msg)) + if (!checkRTPSHeader(msg)) { return; } #if HAVE_SECURITY - CDRMessage_t* auxiliary_buffer = &m_crypto_msg; + security::SecurityManager& security = participant_->security_manager(); + CDRMessage_t* auxiliary_buffer = &crypto_msg_; CDRMessage::initCDRMsg(auxiliary_buffer); - int decode_ret = participant_->security_manager().decode_rtps_message(*msg, *auxiliary_buffer, sourceGuidPrefix); + int decode_ret = security.decode_rtps_message(*msg, *auxiliary_buffer, source_guid_prefix_); - if(decode_ret < 0) + if (decode_ret < 0) + { return; - else if(decode_ret == 0) + } + + if (decode_ret == 0) { // Swap std::swap(msg, auxiliary_buffer); @@ -191,142 +205,144 @@ void MessageReceiver::processCDRMsg(const Locator_t& loc, CDRMessage_t*msg) bool valid; int count = 0; SubmessageHeader_t submsgh; //Current submessage header - //Pointers to different types of messages: - while(msg->pos < msg->length)// end of the message + while (msg->pos < msg->length)// end of the message { CDRMessage_t* submessage = msg; #if HAVE_SECURITY CDRMessage::initCDRMsg(auxiliary_buffer); - decode_ret = participant_->security_manager().decode_rtps_submessage(*msg, *auxiliary_buffer, sourceGuidPrefix); + decode_ret = security.decode_rtps_submessage(*msg, *auxiliary_buffer, source_guid_prefix_); - if(decode_ret < 0) + if (decode_ret < 0) { return; } - else if(decode_ret == 0) + + if (decode_ret == 0) { submessage = auxiliary_buffer; } #endif //First 4 bytes must contain: ID | flags | octets to next header - if(!readSubmessageHeader(submessage, &submsgh)) + if (!readSubmessageHeader(submessage, &submsgh)) + { return; + } valid = true; count++; uint32_t next_msg_pos = submessage->pos; - next_msg_pos += (submsgh.submessageLength + 3) & ~3; - switch(submsgh.submessageId) + next_msg_pos += (submsgh.submessageLength + 3u) & ~3u; + switch (submsgh.submessageId) { case DATA: + { + if (dest_guid_prefix_ != participantGuidPrefix) { - if(this->destGuidPrefix != participantGuidPrefix) - { - logInfo(RTPS_MSG_IN,IDSTRING"Data Submsg ignored, DST is another RTPSParticipant"); - } - else - { - logInfo(RTPS_MSG_IN,IDSTRING"Data Submsg received, processing."); - valid = proc_Submsg_Data(submessage, &submsgh); - } - break; + logInfo(RTPS_MSG_IN, IDSTRING "Data Submsg ignored, DST is another RTPSParticipant"); } + else + { + logInfo(RTPS_MSG_IN, IDSTRING "Data Submsg received, processing."); + valid = proc_Submsg_Data(submessage, &submsgh); + } + break; + } case DATA_FRAG: - if (this->destGuidPrefix != participantGuidPrefix) + if (dest_guid_prefix_ != participantGuidPrefix) { - logInfo(RTPS_MSG_IN, IDSTRING"DataFrag Submsg ignored, DST is another RTPSParticipant"); + logInfo(RTPS_MSG_IN, IDSTRING "DataFrag Submsg ignored, DST is another RTPSParticipant"); } else { - logInfo(RTPS_MSG_IN, IDSTRING"DataFrag Submsg received, processing."); + logInfo(RTPS_MSG_IN, IDSTRING "DataFrag Submsg received, processing."); valid = proc_Submsg_DataFrag(submessage, &submsgh); } break; case GAP: + { + if (dest_guid_prefix_ != participantGuidPrefix) { - if(this->destGuidPrefix != participantGuidPrefix) - { - logInfo(RTPS_MSG_IN,IDSTRING"Gap Submsg ignored, DST is another RTPSParticipant..."); - } - else - { - logInfo(RTPS_MSG_IN,IDSTRING"Gap Submsg received, processing..."); - valid = proc_Submsg_Gap(submessage, &submsgh); - } - break; + logInfo(RTPS_MSG_IN, IDSTRING "Gap Submsg ignored, DST is another RTPSParticipant..."); } + else + { + logInfo(RTPS_MSG_IN, IDSTRING "Gap Submsg received, processing..."); + valid = proc_Submsg_Gap(submessage, &submsgh); + } + break; + } case ACKNACK: + { + if (dest_guid_prefix_ != participantGuidPrefix) { - if(this->destGuidPrefix != participantGuidPrefix) - { - logInfo(RTPS_MSG_IN,IDSTRING"Acknack Submsg ignored, DST is another RTPSParticipant..."); - } - else - { - logInfo(RTPS_MSG_IN,IDSTRING"Acknack Submsg received, processing..."); - valid = proc_Submsg_Acknack(submessage, &submsgh); - } - break; + logInfo(RTPS_MSG_IN, IDSTRING "Acknack Submsg ignored, DST is another RTPSParticipant..."); } + else + { + logInfo(RTPS_MSG_IN, IDSTRING "Acknack Submsg received, processing..."); + valid = proc_Submsg_Acknack(submessage, &submsgh); + } + break; + } case NACK_FRAG: + { + if (dest_guid_prefix_ != participantGuidPrefix) { - if (this->destGuidPrefix != participantGuidPrefix) - { - logInfo(RTPS_MSG_IN, IDSTRING"NackFrag Submsg ignored, DST is another RTPSParticipant..."); - } - else - { - logInfo(RTPS_MSG_IN, IDSTRING"NackFrag Submsg received, processing..."); - valid = proc_Submsg_NackFrag(submessage, &submsgh); - } - break; + logInfo(RTPS_MSG_IN, IDSTRING "NackFrag Submsg ignored, DST is another RTPSParticipant..."); + } + else + { + logInfo(RTPS_MSG_IN, IDSTRING "NackFrag Submsg received, processing..."); + valid = proc_Submsg_NackFrag(submessage, &submsgh); } + break; + } case HEARTBEAT: + { + if (dest_guid_prefix_ != participantGuidPrefix) { - if(this->destGuidPrefix != participantGuidPrefix) - { - logInfo(RTPS_MSG_IN,IDSTRING"HB Submsg ignored, DST is another RTPSParticipant..."); - } - else - { - logInfo(RTPS_MSG_IN,IDSTRING"Heartbeat Submsg received, processing..."); - valid = proc_Submsg_Heartbeat(submessage, &submsgh); - } - break; + logInfo(RTPS_MSG_IN, IDSTRING "HB Submsg ignored, DST is another RTPSParticipant..."); } + else + { + logInfo(RTPS_MSG_IN, IDSTRING "Heartbeat Submsg received, processing..."); + valid = proc_Submsg_Heartbeat(submessage, &submsgh); + } + break; + } case HEARTBEAT_FRAG: + { + if (dest_guid_prefix_ != participantGuidPrefix) { - if (this->destGuidPrefix != participantGuidPrefix) - { - logInfo(RTPS_MSG_IN, IDSTRING"HBFrag Submsg ignored, DST is another RTPSParticipant..."); - } - else - { - logInfo(RTPS_MSG_IN, IDSTRING"HeartbeatFrag Submsg received, processing..."); - valid = proc_Submsg_HeartbeatFrag(submessage, &submsgh); - } - break; + logInfo(RTPS_MSG_IN, IDSTRING "HBFrag Submsg ignored, DST is another RTPSParticipant..."); + } + else + { + logInfo(RTPS_MSG_IN, IDSTRING "HeartbeatFrag Submsg received, processing..."); + valid = proc_Submsg_HeartbeatFrag(submessage, &submsgh); } + break; + } case PAD: - logWarning(RTPS_MSG_IN,IDSTRING"PAD messages not yet implemented, ignoring"); + logWarning(RTPS_MSG_IN, IDSTRING "PAD messages not yet implemented, ignoring"); break; case INFO_DST: - logInfo(RTPS_MSG_IN,IDSTRING"InfoDST message received, processing..."); + logInfo(RTPS_MSG_IN, IDSTRING "InfoDST message received, processing..."); valid = proc_Submsg_InfoDST(submessage, &submsgh); break; case INFO_SRC: - logInfo(RTPS_MSG_IN,IDSTRING"InfoSRC message received, processing..."); + logInfo(RTPS_MSG_IN, IDSTRING "InfoSRC message received, processing..."); valid = proc_Submsg_InfoSRC(submessage, &submsgh); break; case INFO_TS: - { - logInfo(RTPS_MSG_IN,IDSTRING"InfoTS Submsg received, processing..."); - valid = proc_Submsg_InfoTS(submessage, &submsgh); - break; - } + { + logInfo(RTPS_MSG_IN, IDSTRING "InfoTS Submsg received, processing..."); + valid = proc_Submsg_InfoTS(submessage, &submsgh); + break; + } case INFO_REPLY: break; case INFO_REPLY_IP4: @@ -335,7 +351,7 @@ void MessageReceiver::processCDRMsg(const Locator_t& loc, CDRMessage_t*msg) break; } - if(!valid || submsgh.is_last) + if (!valid || submsgh.is_last) { break; } @@ -343,62 +359,70 @@ void MessageReceiver::processCDRMsg(const Locator_t& loc, CDRMessage_t*msg) submessage->pos = next_msg_pos; } - - participant_->assert_remote_participant_liveliness(sourceGuidPrefix); + participant_->assert_remote_participant_liveliness(source_guid_prefix_); } -bool MessageReceiver::checkRTPSHeader(CDRMessage_t*msg) //check and proccess the RTPS Header +bool MessageReceiver::checkRTPSHeader( + CDRMessage_t* msg) { - - if(msg->buffer[0] != 'R' || msg->buffer[1] != 'T' || + //check and proccess the RTPS Header + if (msg->buffer[0] != 'R' || msg->buffer[1] != 'T' || msg->buffer[2] != 'P' || msg->buffer[3] != 'S') { - logInfo(RTPS_MSG_IN,IDSTRING"Msg received with no RTPS in header, ignoring..."); + logInfo(RTPS_MSG_IN, IDSTRING "Msg received with no RTPS in header, ignoring..."); return false; } - msg->pos+=4; + msg->pos += 4; //CHECK AND SET protocol version - if(msg->buffer[msg->pos] <= destVersion.m_major) + if (msg->buffer[msg->pos] <= c_ProtocolVersion.m_major) { - sourceVersion.m_major = msg->buffer[msg->pos];msg->pos++; - sourceVersion.m_minor = msg->buffer[msg->pos];msg->pos++; + source_version_.m_major = msg->buffer[msg->pos]; + msg->pos++; + source_version_.m_minor = msg->buffer[msg->pos]; + msg->pos++; } else { - logWarning(RTPS_MSG_IN,IDSTRING"Major RTPS Version not supported"); + logWarning(RTPS_MSG_IN, IDSTRING "Major RTPS Version not supported"); return false; } //Set source vendor id - sourceVendorId[0] = msg->buffer[msg->pos];msg->pos++; - sourceVendorId[1] = msg->buffer[msg->pos];msg->pos++; + source_vendor_id_[0] = msg->buffer[msg->pos]; + msg->pos++; + source_vendor_id_[1] = msg->buffer[msg->pos]; + msg->pos++; //set source guid prefix - memcpy(sourceGuidPrefix.value,&msg->buffer[msg->pos],12); - msg->pos+=12; - haveTimestamp = false; + CDRMessage::readData(msg, source_guid_prefix_.value, GuidPrefix_t::size); + have_timestamp_ = false; return true; } - -bool MessageReceiver::readSubmessageHeader(CDRMessage_t* msg, SubmessageHeader_t* smh) +bool MessageReceiver::readSubmessageHeader( + CDRMessage_t* msg, + SubmessageHeader_t* smh) { - if(msg->length - msg->pos < 4) + if (msg->length - msg->pos < 4) { - logWarning(RTPS_MSG_IN,IDSTRING"SubmessageHeader too short"); + logWarning(RTPS_MSG_IN, IDSTRING "SubmessageHeader too short"); return false; } - smh->submessageId = msg->buffer[msg->pos];msg->pos++; - smh->flags = msg->buffer[msg->pos];msg->pos++; + + smh->submessageId = msg->buffer[msg->pos]; + msg->pos++; + smh->flags = msg->buffer[msg->pos]; + msg->pos++; + //Set endianness of message - msg->msg_endian = smh->flags & BIT(0) ? LITTLEEND : BIGEND; + msg->msg_endian = (smh->flags & BIT(0)) != 0 ? LITTLEEND : BIGEND; uint16_t length = 0; - CDRMessage::readUInt16(msg,&length); + CDRMessage::readUInt16(msg, &length); if (msg->pos + length > msg->length) { - logWarning(RTPS_MSG_IN, IDSTRING"SubMsg of invalid length (" << length - << ") with current msg position/length (" << msg->pos << "/" << msg->length << ")"); + logWarning(RTPS_MSG_IN, IDSTRING "SubMsg of invalid length (" << length << + ") with current msg position/length (" << msg->pos << "/" << msg->length << ")"); return false; } @@ -420,23 +444,23 @@ bool MessageReceiver::readSubmessageHeader(CDRMessage_t* msg, SubmessageHeader_t bool MessageReceiver::willAReaderAcceptMsgDirectedTo( const EntityId_t& readerID) { - if(AssociatedReaders.empty()) + if (associated_readers_.empty()) { - logWarning(RTPS_MSG_IN,IDSTRING"Data received when NO readers are listening"); + logWarning(RTPS_MSG_IN, IDSTRING "Data received when NO readers are listening"); return false; } if (readerID != c_EntityId_Unknown) { - const auto readers = AssociatedReaders.find(readerID); - if (readers != AssociatedReaders.end()) + const auto readers = associated_readers_.find(readerID); + if (readers != associated_readers_.end()) { return true; } } else { - for(const auto& readers : AssociatedReaders) + for (const auto& readers : associated_readers_) { if (readers.second.empty()) { @@ -453,7 +477,7 @@ bool MessageReceiver::willAReaderAcceptMsgDirectedTo( } } - logWarning(RTPS_MSG_IN, IDSTRING"No Reader accepts this message (directed to: " <second) { @@ -475,7 +499,7 @@ void MessageReceiver::findAllReaders( } else { - for (const auto& readers : AssociatedReaders) + for (const auto& readers : associated_readers_) { for (const auto& it : readers.second) { @@ -488,35 +512,41 @@ void MessageReceiver::findAllReaders( } } -bool MessageReceiver::proc_Submsg_Data(CDRMessage_t* msg,SubmessageHeader_t* smh) +bool MessageReceiver::proc_Submsg_Data( + CDRMessage_t* msg, + SubmessageHeader_t* smh) { - std::lock_guard guard(mtx); + std::lock_guard guard(mtx_); //READ and PROCESS - if(smh->submessageLength < RTPSMESSAGE_DATA_MIN_LENGTH) + if (smh->submessageLength < RTPSMESSAGE_DATA_MIN_LENGTH) { - logInfo(RTPS_MSG_IN,IDSTRING"Too short submessage received, ignoring"); + logInfo(RTPS_MSG_IN, IDSTRING "Too short submessage received, ignoring"); return false; } //Fill flags bool values - bool endiannessFlag = smh->flags & BIT(0) ? true : false; - bool inlineQosFlag = smh->flags & BIT(1) ? true : false; - bool dataFlag = smh->flags & BIT(2) ? true : false; - bool keyFlag = smh->flags & BIT(3) ? true : false; - if(keyFlag && dataFlag) + bool endiannessFlag = (smh->flags & BIT(0)) != 0; + bool inlineQosFlag = (smh->flags & BIT(1)) != 0; + bool dataFlag = (smh->flags & BIT(2)) != 0; + bool keyFlag = (smh->flags & BIT(3)) != 0; + if (keyFlag && dataFlag) { - logWarning(RTPS_MSG_IN,IDSTRING"Message received with Data and Key Flag set, ignoring"); + logWarning(RTPS_MSG_IN, IDSTRING "Message received with Data and Key Flag set, ignoring"); return false; } //Assign message endianness - if(endiannessFlag) + if (endiannessFlag) + { msg->msg_endian = LITTLEEND; + } else + { msg->msg_endian = BIGEND; + } //Extra flags don't matter now. Avoid those bytes - msg->pos+=2; + msg->pos += 2; bool valid = true; int16_t octetsToInlineQos; @@ -524,7 +554,7 @@ bool MessageReceiver::proc_Submsg_Data(CDRMessage_t* msg,SubmessageHeader_t* smh //reader and writer ID EntityId_t readerID; - valid &= CDRMessage::readEntityId(msg,&readerID); + valid &= CDRMessage::readEntityId(msg, &readerID); //WE KNOW THE READER THAT THE MESSAGE IS DIRECTED TO SO WE LOOK FOR IT: if (!willAReaderAcceptMsgDirectedTo(readerID)) @@ -536,132 +566,140 @@ bool MessageReceiver::proc_Submsg_Data(CDRMessage_t* msg,SubmessageHeader_t* smh //We ask the reader for a cachechange to store the information. CacheChange_t ch; ch.kind = ALIVE; - ch.serializedPayload.max_size = mMaxPayload_; - ch.writerGUID.guidPrefix = sourceGuidPrefix; - valid &= CDRMessage::readEntityId(msg,&ch.writerGUID.entityId); + ch.writerGUID.guidPrefix = source_guid_prefix_; + valid &= CDRMessage::readEntityId(msg, &ch.writerGUID.entityId); //Get sequence number - valid &= CDRMessage::readSequenceNumber(msg,&ch.sequenceNumber); + valid &= CDRMessage::readSequenceNumber(msg, &ch.sequenceNumber); - if (!valid){ + if (!valid) + { return false; } - if(ch.sequenceNumber <= SequenceNumber_t(0, 0) || (ch.sequenceNumber.high == -1 && ch.sequenceNumber.low == 0)) //message invalid //TODO make faster + if (ch.sequenceNumber <= SequenceNumber_t()) { - logWarning(RTPS_MSG_IN,IDSTRING"Invalid message received, bad sequence Number"); + logWarning(RTPS_MSG_IN, IDSTRING "Invalid message received, bad sequence Number"); return false; } //Jump ahead if more parameters are before inlineQos (not in this version, maybe if further minor versions.) - if(octetsToInlineQos > RTPSMESSAGE_OCTETSTOINLINEQOS_DATASUBMSG) + if (octetsToInlineQos > RTPSMESSAGE_OCTETSTOINLINEQOS_DATASUBMSG) { msg->pos += (octetsToInlineQos - RTPSMESSAGE_OCTETSTOINLINEQOS_DATASUBMSG); if (msg->pos > msg->length) { - logWarning(RTPS_MSG_IN, IDSTRING "Invalid jump through msg, msg->pos " << msg->pos << " > msg->length " << msg->length); + logWarning(RTPS_MSG_IN, + IDSTRING "Invalid jump through msg, msg->pos " << msg->pos << " > msg->length " << msg->length); return false; } } uint32_t inlineQosSize = 0; - if(inlineQosFlag) + if (inlineQosFlag) { - if(false == ParameterList::updateCacheChangeFromInlineQos(ch, msg, inlineQosSize) ) + if (!ParameterList::updateCacheChangeFromInlineQos(ch, msg, inlineQosSize) ) { - logInfo(RTPS_MSG_IN,IDSTRING"SubMessage Data ERROR, Inline Qos ParameterList error"); + logInfo(RTPS_MSG_IN, IDSTRING "SubMessage Data ERROR, Inline Qos ParameterList error"); return false; } - } - if(dataFlag || keyFlag) + if (dataFlag || keyFlag) { uint32_t payload_size; - payload_size = smh->submessageLength - (RTPSMESSAGE_DATA_EXTRA_INLINEQOS_SIZE+octetsToInlineQos+inlineQosSize); + payload_size = smh->submessageLength - + (RTPSMESSAGE_DATA_EXTRA_INLINEQOS_SIZE + octetsToInlineQos + inlineQosSize); - if(dataFlag) + if (dataFlag) { - if(ch.serializedPayload.max_size >= payload_size && payload_size > 0) + uint32_t next_pos = msg->pos + payload_size; + if (msg->length >= next_pos && payload_size > 0) { ch.serializedPayload.data = &msg->buffer[msg->pos]; ch.serializedPayload.length = payload_size; - msg->pos += payload_size; + ch.serializedPayload.max_size = payload_size; + msg->pos = next_pos; } else { - logWarning(RTPS_MSG_IN,IDSTRING"Serialized Payload value invalid or larger than maximum allowed size" - "(" <length - msg->pos) << ")"); return false; } } - else if(keyFlag) + else if (keyFlag) { if (payload_size <= 0) { - logWarning(RTPS_MSG_IN, IDSTRING"Serialized Payload value invalid (" << payload_size << ")"); + logWarning(RTPS_MSG_IN, IDSTRING "Serialized Payload value invalid (" << payload_size << ")"); return false; } - if (payload_size <= 16) + if (payload_size <= PARAMETER_KEY_HASH_LENGTH) { memcpy(ch.instanceHandle.value, &msg->buffer[msg->pos], payload_size); } else { - logWarning(RTPS_MSG_IN, IDSTRING"Ignoring Serialized Payload for too large key-only data" - "(" << payload_size << ")"); + logWarning(RTPS_MSG_IN, IDSTRING "Ignoring Serialized Payload for too large key-only data (" << + payload_size << ")"); } msg->pos += payload_size; } } // Set sourcetimestamp - if(haveTimestamp) + if (have_timestamp_) { - ch.sourceTimestamp = this->timestamp; + ch.sourceTimestamp = timestamp_; } + logInfo(RTPS_MSG_IN, IDSTRING "from Writer " << ch.writerGUID << "; possible RTPSReader entities: " << + associated_readers_.size()); - //FIXME: DO SOMETHING WITH PARAMETERLIST CREATED. - logInfo(RTPS_MSG_IN,IDSTRING"from Writer " << ch.writerGUID << "; possible RTPSReader entities: " - << AssociatedReaders.size()); //Look for the correct reader to add the change - findAllReaders(readerID, [ - &ch - ] (RTPSReader* reader) { + findAllReaders(readerID, + [&ch] (RTPSReader* reader) + { reader->processDataMsg(&ch); }); //TODO(Ricardo) If a exception is thrown (ex, by fastcdr), this line is not executed -> segmentation fault ch.serializedPayload.data = nullptr; - logInfo(RTPS_MSG_IN,IDSTRING"Sub Message DATA processed"); + logInfo(RTPS_MSG_IN, IDSTRING "Sub Message DATA processed"); return true; } -bool MessageReceiver::proc_Submsg_DataFrag(CDRMessage_t* msg, SubmessageHeader_t* smh) +bool MessageReceiver::proc_Submsg_DataFrag( + CDRMessage_t* msg, + SubmessageHeader_t* smh) { - std::lock_guard guard(mtx); + std::lock_guard guard(mtx_); //READ and PROCESS if (smh->submessageLength < RTPSMESSAGE_DATA_MIN_LENGTH) { - logInfo(RTPS_MSG_IN, IDSTRING"Too short submessage received, ignoring"); + logInfo(RTPS_MSG_IN, IDSTRING "Too short submessage received, ignoring"); return false; } //Fill flags bool values - bool endiannessFlag = smh->flags & BIT(0) ? true : false; - bool inlineQosFlag = smh->flags & BIT(1) ? true : false; - bool keyFlag = smh->flags & BIT(2) ? true : false; + bool endiannessFlag = (smh->flags & BIT(0)) != 0; + bool inlineQosFlag = (smh->flags & BIT(1)) != 0; + bool keyFlag = (smh->flags & BIT(2)) != 0; //Assign message endianness if (endiannessFlag) + { msg->msg_endian = LITTLEEND; + } else + { msg->msg_endian = BIGEND; + } //Extra flags don't matter now. Avoid those bytes msg->pos += 2; @@ -675,13 +713,15 @@ bool MessageReceiver::proc_Submsg_DataFrag(CDRMessage_t* msg, SubmessageHeader_t valid &= CDRMessage::readEntityId(msg, &readerID); //WE KNOW THE READER THAT THE MESSAGE IS DIRECTED TO SO WE LOOK FOR IT: - if (!willAReaderAcceptMsgDirectedTo(readerID)) return false; + if (!willAReaderAcceptMsgDirectedTo(readerID)) + { + return false; + } //FOUND THE READER. //We ask the reader for a cachechange to store the information. CacheChange_t ch; - ch.serializedPayload.max_size = mMaxPayload_; - ch.writerGUID.guidPrefix = sourceGuidPrefix; + ch.writerGUID.guidPrefix = source_guid_prefix_; valid &= CDRMessage::readEntityId(msg, &ch.writerGUID.entityId); //Get sequence number @@ -689,7 +729,7 @@ bool MessageReceiver::proc_Submsg_DataFrag(CDRMessage_t* msg, SubmessageHeader_t if (ch.sequenceNumber <= SequenceNumber_t()) { - logWarning(RTPS_MSG_IN, IDSTRING"Invalid message received, bad sequence Number"); + logWarning(RTPS_MSG_IN, IDSTRING "Invalid message received, bad sequence Number"); return false; } @@ -709,7 +749,8 @@ bool MessageReceiver::proc_Submsg_DataFrag(CDRMessage_t* msg, SubmessageHeader_t uint32_t sampleSize; valid &= CDRMessage::readUInt32(msg, &sampleSize); - if(!valid){ + if (!valid) + { return false; } @@ -719,7 +760,8 @@ bool MessageReceiver::proc_Submsg_DataFrag(CDRMessage_t* msg, SubmessageHeader_t msg->pos += (octetsToInlineQos - RTPSMESSAGE_OCTETSTOINLINEQOS_DATAFRAGSUBMSG); if (msg->pos > msg->length) { - logWarning(RTPS_MSG_IN, IDSTRING "Invalid jump through msg, msg->pos " << msg->pos << " > msg->length " << msg->length); + logWarning(RTPS_MSG_IN, + IDSTRING "Invalid jump through msg, msg->pos " << msg->pos << " > msg->length " << msg->length); return false; } } @@ -728,9 +770,9 @@ bool MessageReceiver::proc_Submsg_DataFrag(CDRMessage_t* msg, SubmessageHeader_t if (inlineQosFlag) { - if (false == ParameterList::updateCacheChangeFromInlineQos(ch, msg, inlineQosSize)) + if (!ParameterList::updateCacheChangeFromInlineQos(ch, msg, inlineQosSize)) { - logInfo(RTPS_MSG_IN, IDSTRING"SubMessage Data ERROR, Inline Qos ParameterList error"); + logInfo(RTPS_MSG_IN, IDSTRING "SubMessage Data ERROR, Inline Qos ParameterList error"); return false; } } @@ -742,21 +784,21 @@ bool MessageReceiver::proc_Submsg_DataFrag(CDRMessage_t* msg, SubmessageHeader_t if (!keyFlag) { - if (ch.serializedPayload.max_size >= payload_size && payload_size > 0) + uint32_t next_pos = msg->pos + payload_size; + if (msg->length >= next_pos && payload_size > 0) { - ch.serializedPayload.length = payload_size; - - ch.setFragmentSize(fragmentSize); + ch.kind = ALIVE; ch.serializedPayload.data = &msg->buffer[msg->pos]; ch.serializedPayload.length = payload_size; - msg->pos += payload_size; + ch.serializedPayload.max_size = payload_size; + ch.setFragmentSize(fragmentSize); - ch.kind = ALIVE; + msg->pos = next_pos; } else { - logWarning(RTPS_MSG_IN, IDSTRING"Serialized Payload value invalid or larger than maximum allowed size " - "(" << payload_size << "/" << ch.serializedPayload.max_size << ")"); + logWarning(RTPS_MSG_IN, IDSTRING "Serialized Payload value invalid or larger than maximum allowed size " + "(" << payload_size << "/" << (msg->length - msg->pos) << ")"); return false; } } @@ -773,226 +815,274 @@ bool MessageReceiver::proc_Submsg_DataFrag(CDRMessage_t* msg, SubmessageHeader_t logError(RTPS_MSG_IN, IDSTRING"Bad encapsulation for KeyHash and status parameter list"); return false; } - //uint32_t param_size; - if (ParameterList::readParameterListfromCDRMsg(msg, &m_ParamList, ch, false) <= 0) - { - logInfo(RTPS_MSG_IN, IDSTRING"SubMessage Data ERROR, keyFlag ParameterList"); - return false; - } - msg->msg_endian = previous_endian; - */ + //uint32_t param_size; + if (ParameterList::readParameterListfromCDRMsg(msg, &m_ParamList, ch, false) <= 0) + { + logInfo(RTPS_MSG_IN, IDSTRING"SubMessage Data ERROR, keyFlag ParameterList"); + return false; + } + msg->msg_endian = previous_endian; + */ } // Set sourcetimestamp - if (haveTimestamp) - ch.sourceTimestamp = this->timestamp; + if (have_timestamp_) + { + ch.sourceTimestamp = timestamp_; + } //FIXME: DO SOMETHING WITH PARAMETERLIST CREATED. - logInfo(RTPS_MSG_IN, IDSTRING"from Writer " << ch.writerGUID << "; possible RTPSReader entities: " - << AssociatedReaders.size()); + logInfo(RTPS_MSG_IN, IDSTRING "from Writer " << ch.writerGUID << "; possible RTPSReader entities: " << + associated_readers_.size()); + //Look for the correct reader to add the change - findAllReaders(readerID, [ - &ch, sampleSize, fragmentStartingNum, fragmentsInSubmessage - ] (RTPSReader* reader) { + findAllReaders(readerID, + [&ch, sampleSize, fragmentStartingNum, fragmentsInSubmessage] (RTPSReader* reader) + { reader->processDataFragMsg(&ch, sampleSize, fragmentStartingNum, fragmentsInSubmessage); }); ch.serializedPayload.data = nullptr; - logInfo(RTPS_MSG_IN, IDSTRING"Sub Message DATA_FRAG processed"); + logInfo(RTPS_MSG_IN, IDSTRING "Sub Message DATA_FRAG processed"); return true; } - -bool MessageReceiver::proc_Submsg_Heartbeat(CDRMessage_t* msg,SubmessageHeader_t* smh) +bool MessageReceiver::proc_Submsg_Heartbeat( + CDRMessage_t* msg, + SubmessageHeader_t* smh) { - bool endiannessFlag = smh->flags & BIT(0) ? true : false; - bool finalFlag = smh->flags & BIT(1) ? true : false; - bool livelinessFlag = smh->flags & BIT(2) ? true : false; + bool endiannessFlag = (smh->flags & BIT(0)) != 0; + bool finalFlag = (smh->flags & BIT(1)) != 0; + bool livelinessFlag = (smh->flags & BIT(2)) != 0; //Assign message endianness - if(endiannessFlag) + if (endiannessFlag) + { msg->msg_endian = LITTLEEND; + } else + { msg->msg_endian = BIGEND; + } - GUID_t readerGUID, writerGUID; - readerGUID.guidPrefix = destGuidPrefix; - CDRMessage::readEntityId(msg,&readerGUID.entityId); - writerGUID.guidPrefix = sourceGuidPrefix; - CDRMessage::readEntityId(msg,&writerGUID.entityId); - SequenceNumber_t firstSN, lastSN; - CDRMessage::readSequenceNumber(msg,&firstSN); - CDRMessage::readSequenceNumber(msg,&lastSN); - if(lastSN < firstSN && lastSN != firstSN-1) - { - logWarning(RTPS_MSG_IN, IDSTRING"Invalid Heartbeat received (" << firstSN << ") - (" << + GUID_t readerGUID; + GUID_t writerGUID; + readerGUID.guidPrefix = dest_guid_prefix_; + CDRMessage::readEntityId(msg, &readerGUID.entityId); + writerGUID.guidPrefix = source_guid_prefix_; + CDRMessage::readEntityId(msg, &writerGUID.entityId); + SequenceNumber_t firstSN; + SequenceNumber_t lastSN; + CDRMessage::readSequenceNumber(msg, &firstSN); + CDRMessage::readSequenceNumber(msg, &lastSN); + if (lastSN < firstSN && lastSN != firstSN - 1) + { + logWarning(RTPS_MSG_IN, IDSTRING "Invalid Heartbeat received (" << firstSN << ") - (" << lastSN << "), ignoring"); return false; } uint32_t HBCount; - CDRMessage::readUInt32(msg,&HBCount); + CDRMessage::readUInt32(msg, &HBCount); - std::lock_guard guard(mtx); + std::lock_guard guard(mtx_); //Look for the correct reader and writers: - findAllReaders(readerGUID.entityId, [ - &writerGUID, &HBCount, &firstSN, &lastSN, finalFlag, livelinessFlag - ] (RTPSReader* reader) { + findAllReaders(readerGUID.entityId, + [&writerGUID, &HBCount, &firstSN, &lastSN, finalFlag, livelinessFlag] (RTPSReader* reader) + { reader->processHeartbeatMsg(writerGUID, HBCount, firstSN, lastSN, finalFlag, livelinessFlag); }); return true; } - -bool MessageReceiver::proc_Submsg_Acknack(CDRMessage_t* msg,SubmessageHeader_t* smh) +bool MessageReceiver::proc_Submsg_Acknack( + CDRMessage_t* msg, + SubmessageHeader_t* smh) { - bool endiannessFlag = smh->flags & BIT(0) ? true : false; - bool finalFlag = smh->flags & BIT(1) ? true: false; + bool endiannessFlag = (smh->flags & BIT(0)) != 0; + bool finalFlag = (smh->flags & BIT(1)) != 0; //Assign message endianness - if(endiannessFlag) + if (endiannessFlag) + { msg->msg_endian = LITTLEEND; + } else + { msg->msg_endian = BIGEND; - GUID_t readerGUID,writerGUID; - readerGUID.guidPrefix = sourceGuidPrefix; - CDRMessage::readEntityId(msg,&readerGUID.entityId); - writerGUID.guidPrefix = destGuidPrefix; - CDRMessage::readEntityId(msg,&writerGUID.entityId); + } + GUID_t readerGUID; + GUID_t writerGUID; + readerGUID.guidPrefix = source_guid_prefix_; + CDRMessage::readEntityId(msg, &readerGUID.entityId); + writerGUID.guidPrefix = dest_guid_prefix_; + CDRMessage::readEntityId(msg, &writerGUID.entityId); SequenceNumberSet_t SNSet = CDRMessage::readSequenceNumberSet(msg); uint32_t Ackcount; - CDRMessage::readUInt32(msg,&Ackcount); + CDRMessage::readUInt32(msg, &Ackcount); - std::lock_guard guard(mtx); + std::lock_guard guard(mtx_); //Look for the correct writer to use the acknack - for (std::vector::iterator it = AssociatedWriters.begin(); - it != AssociatedWriters.end(); ++it) + for (RTPSWriter* it : associated_writers_) { bool result; - if ((*it)->process_acknack(writerGUID, readerGUID, Ackcount, SNSet, finalFlag, result)) + if (it->process_acknack(writerGUID, readerGUID, Ackcount, SNSet, finalFlag, result)) { if (!result) { - logInfo(RTPS_MSG_IN, IDSTRING"Acknack msg to NOT stateful writer "); + logInfo(RTPS_MSG_IN, IDSTRING "Acknack msg to NOT stateful writer "); } return result; } } - logInfo(RTPS_MSG_IN,IDSTRING"Acknack msg to UNKNOWN writer (I loooked through " - << AssociatedWriters.size() << " writers in this ListenResource)"); + logInfo(RTPS_MSG_IN, IDSTRING "Acknack msg to UNKNOWN writer (I loooked through " + << associated_writers_.size() << " writers in this ListenResource)"); return false; } - - -bool MessageReceiver::proc_Submsg_Gap(CDRMessage_t* msg,SubmessageHeader_t* smh) +bool MessageReceiver::proc_Submsg_Gap( + CDRMessage_t* msg, + SubmessageHeader_t* smh) { - bool endiannessFlag = smh->flags & BIT(0) ? true : false; + bool endiannessFlag = (smh->flags & BIT(0)) != 0; //Assign message endianness - if(endiannessFlag) + if (endiannessFlag) + { msg->msg_endian = LITTLEEND; + } else + { msg->msg_endian = BIGEND; + } - GUID_t writerGUID,readerGUID; - readerGUID.guidPrefix = destGuidPrefix; - CDRMessage::readEntityId(msg,&readerGUID.entityId); - writerGUID.guidPrefix = sourceGuidPrefix; - CDRMessage::readEntityId(msg,&writerGUID.entityId); + GUID_t writerGUID; + GUID_t readerGUID; + readerGUID.guidPrefix = dest_guid_prefix_; + CDRMessage::readEntityId(msg, &readerGUID.entityId); + writerGUID.guidPrefix = source_guid_prefix_; + CDRMessage::readEntityId(msg, &writerGUID.entityId); SequenceNumber_t gapStart; - CDRMessage::readSequenceNumber(msg,&gapStart); + CDRMessage::readSequenceNumber(msg, &gapStart); SequenceNumberSet_t gapList = CDRMessage::readSequenceNumberSet(msg); - if(gapStart <= SequenceNumber_t(0, 0)) + if (gapStart <= SequenceNumber_t(0, 0)) + { return false; + } - std::lock_guard guard(mtx); - findAllReaders(readerGUID.entityId, [ - &writerGUID, &gapStart, &gapList - ] (RTPSReader* reader) { + std::lock_guard guard(mtx_); + findAllReaders(readerGUID.entityId, + [&writerGUID, &gapStart, &gapList] (RTPSReader* reader) + { reader->processGapMsg(writerGUID, gapStart, gapList); }); return true; } -bool MessageReceiver::proc_Submsg_InfoTS(CDRMessage_t* msg,SubmessageHeader_t* smh) +bool MessageReceiver::proc_Submsg_InfoTS( + CDRMessage_t* msg, + SubmessageHeader_t* smh) { - bool endiannessFlag = smh->flags & BIT(0) ? true : false; - bool timeFlag = smh->flags & BIT(1) ? true : false; + bool endiannessFlag = (smh->flags & BIT(0)) != 0; + bool timeFlag = (smh->flags & BIT(1)) != 0; //Assign message endianness - if(endiannessFlag) + if (endiannessFlag) + { msg->msg_endian = LITTLEEND; + } else + { msg->msg_endian = BIGEND; - if(!timeFlag) + } + if (!timeFlag) { - haveTimestamp = true; - CDRMessage::readTimestamp(msg,×tamp); + have_timestamp_ = true; + CDRMessage::readTimestamp(msg, ×tamp_); } else - haveTimestamp = false; + { + have_timestamp_ = false; + } return true; } -bool MessageReceiver::proc_Submsg_InfoDST(CDRMessage_t* msg,SubmessageHeader_t* smh) +bool MessageReceiver::proc_Submsg_InfoDST( + CDRMessage_t* msg, + SubmessageHeader_t* smh) { - bool endiannessFlag = smh->flags & BIT(0) ? true : false; + bool endiannessFlag = (smh->flags & BIT(0)) != 0u; //bool timeFlag = smh->flags & BIT(1) ? true : false; //Assign message endianness - if(endiannessFlag) + if (endiannessFlag) + { msg->msg_endian = LITTLEEND; + } else + { msg->msg_endian = BIGEND; + } GuidPrefix_t guidP; - CDRMessage::readData(msg,guidP.value,12); - if(guidP != c_GuidPrefix_Unknown) + CDRMessage::readData(msg, guidP.value, GuidPrefix_t::size); + if (guidP != c_GuidPrefix_Unknown) { - this->destGuidPrefix = guidP; - logInfo(RTPS_MSG_IN,IDSTRING"DST RTPSParticipant is now: "<< this->destGuidPrefix); + dest_guid_prefix_ = guidP; + logInfo(RTPS_MSG_IN, IDSTRING "DST RTPSParticipant is now: " << dest_guid_prefix_); } return true; } -bool MessageReceiver::proc_Submsg_InfoSRC(CDRMessage_t* msg,SubmessageHeader_t* smh) +bool MessageReceiver::proc_Submsg_InfoSRC( + CDRMessage_t* msg, + SubmessageHeader_t* smh) { - bool endiannessFlag = smh->flags & BIT(0) ? true : false; + bool endiannessFlag = (smh->flags & BIT(0)) != 0; //bool timeFlag = smh->flags & BIT(1) ? true : false; //Assign message endianness - if(endiannessFlag) + if (endiannessFlag) + { msg->msg_endian = LITTLEEND; + } else + { msg->msg_endian = BIGEND; - if(smh->submessageLength == 20) + } + if (smh->submessageLength == INFO_SRC_SUBMSG_LENGTH) { //AVOID FIRST 4 BYTES: - msg->pos+=4; - CDRMessage::readOctet(msg,&this->sourceVersion.m_major); - CDRMessage::readOctet(msg,&this->sourceVersion.m_minor); - CDRMessage::readData(msg,&this->sourceVendorId[0],2); - CDRMessage::readData(msg,this->sourceGuidPrefix.value,12); - logInfo(RTPS_MSG_IN,IDSTRING"SRC RTPSParticipant is now: "<sourceGuidPrefix); + msg->pos += 4; + CDRMessage::readOctet(msg, &source_version_.m_major); + CDRMessage::readOctet(msg, &source_version_.m_minor); + CDRMessage::readData(msg, &source_vendor_id_[0], 2); + CDRMessage::readData(msg, source_guid_prefix_.value, GuidPrefix_t::size); + logInfo(RTPS_MSG_IN, IDSTRING "SRC RTPSParticipant is now: " << source_guid_prefix_); return true; } return false; } -bool MessageReceiver::proc_Submsg_NackFrag(CDRMessage_t*msg, SubmessageHeader_t* smh) { - - - bool endiannessFlag = smh->flags & BIT(0) ? true : false; +bool MessageReceiver::proc_Submsg_NackFrag( + CDRMessage_t* msg, + SubmessageHeader_t* smh) +{ + bool endiannessFlag = (smh->flags & BIT(0)) != 0; //Assign message endianness if (endiannessFlag) + { msg->msg_endian = LITTLEEND; + } else + { msg->msg_endian = BIGEND; + } - GUID_t readerGUID, writerGUID; - readerGUID.guidPrefix = sourceGuidPrefix; + GUID_t readerGUID; + GUID_t writerGUID; + readerGUID.guidPrefix = source_guid_prefix_; CDRMessage::readEntityId(msg, &readerGUID.entityId); - writerGUID.guidPrefix = destGuidPrefix; + writerGUID.guidPrefix = dest_guid_prefix_; CDRMessage::readEntityId(msg, &writerGUID.entityId); SequenceNumber_t writerSN; @@ -1004,46 +1094,52 @@ bool MessageReceiver::proc_Submsg_NackFrag(CDRMessage_t*msg, SubmessageHeader_t* uint32_t Ackcount; CDRMessage::readUInt32(msg, &Ackcount); - std::lock_guard guard(mtx); + std::lock_guard guard(mtx_); //Look for the correct writer to use the acknack - for (std::vector::iterator it = AssociatedWriters.begin(); - it != AssociatedWriters.end(); ++it) + for (RTPSWriter* it : associated_writers_) { bool result; - if ((*it)->process_nack_frag(writerGUID, readerGUID, Ackcount, writerSN, fnState, result)) + if (it->process_nack_frag(writerGUID, readerGUID, Ackcount, writerSN, fnState, result)) { if (!result) { - logInfo(RTPS_MSG_IN, IDSTRING"Acknack msg to NOT stateful writer "); + logInfo(RTPS_MSG_IN, IDSTRING "Acknack msg to NOT stateful writer "); } return result; } } - logInfo(RTPS_MSG_IN, IDSTRING"Acknack msg to UNKNOWN writer (I looked through " - << AssociatedWriters.size() << " writers in this ListenResource)"); + logInfo(RTPS_MSG_IN, IDSTRING "Acknack msg to UNKNOWN writer (I looked through " + << associated_writers_.size() << " writers in this ListenResource)"); return false; } -bool MessageReceiver::proc_Submsg_HeartbeatFrag(CDRMessage_t*msg, SubmessageHeader_t* smh) { - - bool endiannessFlag = smh->flags & BIT(0) ? true : false; +bool MessageReceiver::proc_Submsg_HeartbeatFrag( + CDRMessage_t* msg, + SubmessageHeader_t* smh) +{ + bool endiannessFlag = (smh->flags & BIT(0)) != 0; //Assign message endianness if (endiannessFlag) + { msg->msg_endian = LITTLEEND; + } else + { msg->msg_endian = BIGEND; + } - GUID_t readerGUID, writerGUID; - readerGUID.guidPrefix = destGuidPrefix; + GUID_t readerGUID; + GUID_t writerGUID; + readerGUID.guidPrefix = dest_guid_prefix_; CDRMessage::readEntityId(msg, &readerGUID.entityId); - writerGUID.guidPrefix = sourceGuidPrefix; + writerGUID.guidPrefix = source_guid_prefix_; CDRMessage::readEntityId(msg, &writerGUID.entityId); SequenceNumber_t writerSN; CDRMessage::readSequenceNumber(msg, &writerSN); FragmentNumber_t lastFN; - CDRMessage::readUInt32(msg, (uint32_t*)&lastFN); + CDRMessage::readUInt32(msg, static_cast(&lastFN)); uint32_t HBCount; CDRMessage::readUInt32(msg, &HBCount); @@ -1052,20 +1148,19 @@ bool MessageReceiver::proc_Submsg_HeartbeatFrag(CDRMessage_t*msg, SubmessageHead //Look for the correct reader and writers: /* XXX TODO - std::lock_guard guard(mtx); - for (std::vector::iterator it = AssociatedReaders.begin(); - it != AssociatedReaders.end(); ++it) - { + std::lock_guard guard(mtx_); + for (std::vector::iterator it = associated_readers_.begin(); + it != associated_readers_.end(); ++it) + { if ((*it)->acceptMsgDirectedTo(readerGUID.entityId)) { (*it)->processHeartbeatMsg(writerGUID, HBCount, firstSN, lastSN, finalFlag, livelinessFlag); } - } - */ + } + */ return true; } - -} } /* namespace rtps */ +} /* namespace fastrtps */ } /* namespace eprosima */ diff --git a/src/cpp/rtps/messages/SendBuffersManager.cpp b/src/cpp/rtps/messages/SendBuffersManager.cpp index ff4218723fd..9b9215128b1 100644 --- a/src/cpp/rtps/messages/SendBuffersManager.cpp +++ b/src/cpp/rtps/messages/SendBuffersManager.cpp @@ -80,7 +80,7 @@ std::unique_ptr SendBuffersManager::get_buffer( std::unique_ptr ret_val; - if (pool_.empty()) + while (pool_.empty()) { if (allow_growing_ || n_created_ < pool_.capacity()) { @@ -88,8 +88,8 @@ std::unique_ptr SendBuffersManager::get_buffer( } else { + logInfo(RTPS_PARTICIPANT, "Waiting for send buffer"); available_cv_.wait(lock); - assert(!pool_.empty()); } } diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index 8ec77aa4cda..77b6d81c0cd 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -264,6 +264,18 @@ RTPSParticipantImpl::RTPSParticipantImpl( m_att.defaultMulticastLocatorList.clear(); } +#if HAVE_SECURITY + // Start security + // TODO(Ricardo) Get returned value in future. + m_security_manager_initialized = + m_security_manager.init(security_attributes_, PParam.properties, m_is_security_active); + if (!m_security_manager_initialized) + { + // Participant will be deleted, no need to allocate buffers or create builtin endpoints + return; + } +#endif + createReceiverResources(m_att.builtin.metatrafficMulticastLocatorList, true, false); createReceiverResources(m_att.builtin.metatrafficUnicastLocatorList, true, false); @@ -282,22 +294,21 @@ RTPSParticipantImpl::RTPSParticipantImpl( // Create buffer pool send_buffers_.reset(new SendBuffersManager(num_send_buffers, allow_growing_buffers)); + send_buffers_->init(this); #if HAVE_SECURITY - // Start security - // TODO(Ricardo) Get returned value in future. - m_security_manager_initialized = - m_security_manager.init(security_attributes_, PParam.properties, m_is_security_active); - if (!m_security_manager_initialized) + if (m_is_security_active) { - // Participant will be deleted, no need to allocate buffers or create builtin endpoints - return; + m_is_security_active = m_security_manager.create_entities(); + if (!m_is_security_active) + { + // Participant will be deleted, no need to create builtin endpoints + m_security_manager_initialized = false; + return; + } } #endif - // Allocate all pending send buffers - send_buffers_->init(this); - mp_builtinProtocols = new BuiltinProtocols(); logInfo(RTPS_PARTICIPANT, "RTPSParticipant \"" << m_att.getName() << "\" with guidPrefix: " << m_guid.guidPrefix); diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.h b/src/cpp/rtps/participant/RTPSParticipantImpl.h index d36ad4a84cf..470996e2deb 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.h +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.h @@ -61,11 +61,11 @@ namespace fastrtps{ class WriterQos; class ReaderQos; class TopicAttributes; -class MessageReceiver; namespace rtps { +class MessageReceiver; class RTPSParticipant; class RTPSParticipantListener; class BuiltinProtocols; diff --git a/src/cpp/rtps/security/SecurityManager.cpp b/src/cpp/rtps/security/SecurityManager.cpp index eba7ca230d7..2e2f24a3ecd 100644 --- a/src/cpp/rtps/security/SecurityManager.cpp +++ b/src/cpp/rtps/security/SecurityManager.cpp @@ -277,36 +277,11 @@ bool SecurityManager::init( { // Should be activated here, to enable encription buffer on created entities security_activated = true; - - // Create RTPS entities - if (create_entities()) - { - logInfo(SECURITY, "Initialized security manager for participant " << participant_->getGuid()); - return true; - } - - // Deactivate security if there is an error while creating entities - security_activated = false; - } - - if (local_participant_crypto_handle_ != nullptr) - { - crypto_plugin_->cryptokeyfactory()->unregister_participant(local_participant_crypto_handle_, exception); - } - - if (crypto_plugin_ != nullptr) - { - delete crypto_plugin_; - crypto_plugin_ = nullptr; + return true; } - //TODO(Ricardo) Return local_permissions - - if (access_plugin_ != nullptr) - { - delete access_plugin_; - access_plugin_ = nullptr; - } + cancel_init(); + return false; } else { @@ -320,12 +295,38 @@ bool SecurityManager::init( } else { - logInfo(SECURITY, "Authentication plugin not configured. Security will be disable"); + logInfo(SECURITY, "Authentication plugin not configured. Security will be disabled"); } return true; } +void SecurityManager::cancel_init() +{ + SecurityException exception; + if (local_participant_crypto_handle_ != nullptr) + { + crypto_plugin_->cryptokeyfactory()->unregister_participant(local_participant_crypto_handle_, exception); + } + + if (crypto_plugin_ != nullptr) + { + delete crypto_plugin_; + crypto_plugin_ = nullptr; + } + + //TODO(Ricardo) Return local_permissions + + if (access_plugin_ != nullptr) + { + delete access_plugin_; + access_plugin_ = nullptr; + } + + delete authentication_plugin_; + authentication_plugin_ = nullptr; +} + void SecurityManager::destroy() { if (authentication_plugin_ != nullptr) @@ -878,6 +879,7 @@ bool SecurityManager::create_entities() delete_participant_stateless_message_entities(); } + cancel_init(); return false; } diff --git a/src/cpp/rtps/security/SecurityManager.h b/src/cpp/rtps/security/SecurityManager.h index 2340d04bfff..bfd11563aa7 100644 --- a/src/cpp/rtps/security/SecurityManager.h +++ b/src/cpp/rtps/security/SecurityManager.h @@ -70,6 +70,8 @@ class SecurityManager const PropertyPolicy& participant_properties, bool& security_activated); + bool create_entities(); + void destroy(); bool discovered_participant( @@ -416,6 +418,8 @@ class SecurityManager SecurityManager &manager_; } participant_volatile_message_secure_listener_; + void cancel_init(); + void remove_discovered_participant_info( DiscoveredParticipantInfo::AuthUniquePtr& auth_ptr); @@ -423,7 +427,6 @@ class SecurityManager const GUID_t& remote_participant_key, DiscoveredParticipantInfo::AuthUniquePtr& auth_ptr); - bool create_entities(); void delete_entities(); bool create_participant_stateless_message_entities(); void delete_participant_stateless_message_entities(); diff --git a/test/unittest/rtps/security/SecurityInitializationTests.cpp b/test/unittest/rtps/security/SecurityInitializationTests.cpp index 3c7a784124d..0de0ff890a6 100644 --- a/test/unittest/rtps/security/SecurityInitializationTests.cpp +++ b/test/unittest/rtps/security/SecurityInitializationTests.cpp @@ -25,6 +25,7 @@ TEST_F(SecurityTest, initialization_auth_nullptr) DefaultValue::Set(guid); ASSERT_TRUE(manager_.init(security_attributes_, participant_properties_, security_activated_)); + ASSERT_TRUE(!security_activated_ || manager_.create_entities()); } TEST_F(SecurityTest, initialization_auth_failed) @@ -65,7 +66,9 @@ TEST_F(SecurityTest, initialization_fail_participant_stateless_message_writer) EXPECT_CALL(participant_, createWriter_mock(_,_,_,_,_,_)).Times(1). WillOnce(Return(false)); - ASSERT_FALSE(manager_.init(security_attributes_, participant_properties_, security_activated_)); + ASSERT_TRUE(manager_.init(security_attributes_, participant_properties_, security_activated_)); + ASSERT_TRUE(security_activated_); + ASSERT_FALSE(manager_.create_entities()); } TEST_F(SecurityTest, initialization_fail_participant_stateless_message_reader) @@ -86,7 +89,9 @@ TEST_F(SecurityTest, initialization_fail_participant_stateless_message_reader) EXPECT_CALL(participant_, createReader_mock(_,_,_,_,_,_,_)).Times(1). WillOnce(Return(false)); - ASSERT_FALSE(manager_.init(security_attributes_, participant_properties_, security_activated_)); + ASSERT_TRUE(manager_.init(security_attributes_, participant_properties_, security_activated_)); + ASSERT_TRUE(security_activated_); + ASSERT_FALSE(manager_.create_entities()); } TEST_F(SecurityTest, initialization_fail_participant_volatile_message_writer) @@ -109,7 +114,9 @@ TEST_F(SecurityTest, initialization_fail_participant_volatile_message_writer) EXPECT_CALL(participant_, createReader_mock(_,_,_,_,_,_,_)).Times(1). WillOnce(DoAll(SetArgPointee<0>(stateless_reader), Return(true))); - ASSERT_FALSE(manager_.init(security_attributes_, participant_properties_, security_activated_)); + ASSERT_TRUE(manager_.init(security_attributes_, participant_properties_, security_activated_)); + ASSERT_TRUE(security_activated_); + ASSERT_FALSE(manager_.create_entities()); } TEST_F(SecurityTest, initialization_fail_participant_volatile_message_reader) @@ -134,7 +141,9 @@ TEST_F(SecurityTest, initialization_fail_participant_volatile_message_reader) WillOnce(DoAll(SetArgPointee<0>(stateless_reader), Return(true))). WillOnce(Return(false)); - ASSERT_FALSE(manager_.init(security_attributes_, participant_properties_, security_activated_)); + ASSERT_TRUE(manager_.init(security_attributes_, participant_properties_, security_activated_)); + ASSERT_TRUE(security_activated_); + ASSERT_FALSE(manager_.create_entities()); } TEST_F(SecurityTest, initialization_auth_retry) @@ -164,6 +173,7 @@ TEST_F(SecurityTest, initialization_auth_retry) WillOnce(Return(true)); ASSERT_TRUE(manager_.init(security_attributes_, participant_properties_, security_activated_)); + ASSERT_TRUE(!security_activated_ || manager_.create_entities()); } diff --git a/test/unittest/rtps/security/SecurityTests.cpp b/test/unittest/rtps/security/SecurityTests.cpp index 2652ab3da79..e23879e1366 100644 --- a/test/unittest/rtps/security/SecurityTests.cpp +++ b/test/unittest/rtps/security/SecurityTests.cpp @@ -38,6 +38,7 @@ void SecurityTest::initialization_ok() WillOnce(DoAll(SetArgPointee<0>(volatile_reader_), Return(true))); ASSERT_TRUE(manager_.init(security_attributes_, participant_properties_, security_activated_)); + ASSERT_TRUE(!security_activated_ || manager_.create_entities()); } void SecurityTest::initialization_auth_ok() @@ -58,6 +59,7 @@ void SecurityTest::initialization_auth_ok() WillOnce(DoAll(SetArgPointee<0>(stateless_reader_), Return(true))); ASSERT_TRUE(manager_.init(security_attributes_, participant_properties_, security_activated_)); + ASSERT_TRUE(!security_activated_ || manager_.create_entities()); } void SecurityTest::request_process_ok(CacheChange_t** request_message_change)