From d1d149b70e885bfffbd4d3fa152b3d59863feabc Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Thu, 30 Jan 2020 09:15:41 +0100 Subject: [PATCH 1/7] Refs #7478. Uncrustify. --- .../fastrtps/rtps/messages/MessageReceiver.h | 275 ++++---- src/cpp/rtps/messages/MessageReceiver.cpp | 654 ++++++++++-------- 2 files changed, 537 insertions(+), 392 deletions(-) diff --git a/include/fastrtps/rtps/messages/MessageReceiver.h b/include/fastrtps/rtps/messages/MessageReceiver.h index ea08a482668..cbc9148c601 100644 --- a/include/fastrtps/rtps/messages/MessageReceiver.h +++ b/include/fastrtps/rtps/messages/MessageReceiver.h @@ -16,17 +16,15 @@ * @file MessageReceiver.h */ - - #ifndef MESSAGERECEIVER_H_ #define MESSAGERECEIVER_H_ #ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC -#include #include "../common/all_common.h" #include "../../qos/ParameterList.h" #include +#include namespace eprosima { namespace fastrtps { @@ -44,126 +42,165 @@ struct SubmessageHeader_t; */ class MessageReceiver { - public: - /** - * @param participant - * @param rec_buffer_size - */ - MessageReceiver(RTPSParticipantImpl* participant, uint32_t rec_buffer_size); - - virtual ~MessageReceiver(); - //!Reset the MessageReceiver to process a new message. - void reset(); - /** Init MessageReceiver. Does what the constructor used to do. - This is now on an independent function since MessageReceiver now stands inside - a struct. - @param rec_buffer_size - **/ - void init(uint32_t rec_buffer_size); - - /** - * Process a new CDR message. - * @param[in] loc Locator indicating the sending address. - * @param[in] msg Pointer to the message - */ - void processCDRMsg(const Locator_t& loc, CDRMessage_t*msg); - - //!Pointer to the Listen Resource that contains this MessageReceiver. - - //!Received message +public: + + /** + * @param participant + * @param rec_buffer_size + */ + MessageReceiver( + RTPSParticipantImpl* participant, + uint32_t rec_buffer_size); + + virtual ~MessageReceiver(); + + //! Reset the MessageReceiver to process a new message. + void reset(); + + /** + * Init MessageReceiver. Does what the constructor used to do. + * This is now on an independent function since MessageReceiver now stands inside + * a struct. + * @param rec_buffer_size + */ + void init( + uint32_t rec_buffer_size); + + /** + * Process a new CDR message. + * @param[in] loc Locator indicating the sending address. + * @param[in] msg Pointer to the message + */ + void processCDRMsg( + const Locator_t& loc, + CDRMessage_t* msg); + + // Functions to associate/remove associatedendpoints + void associateEndpoint( + Endpoint* to_add); + void removeEndpoint( + Endpoint* to_remove); + +private: + + std::mutex mtx; + std::vector AssociatedWriters; + std::unordered_map > AssociatedReaders; + //!Protocol version of the message + ProtocolVersion_t sourceVersion; + //!VendorID that created the message + VendorId_t sourceVendorId; + //!GuidPrefix of the entity that created the message + GuidPrefix_t sourceGuidPrefix; + //!GuidPrefix of the entity that receives the message. GuidPrefix of the RTPSParticipant. + GuidPrefix_t destGuidPrefix; + //!Has the message timestamp? + bool haveTimestamp; + //!Timestamp associated with the message + Time_t timestamp; + //!Version of the protocol used by the receiving end. + ProtocolVersion_t destVersion; + + uint16_t mMaxPayload_; + #if HAVE_SECURITY - CDRMessage_t m_crypto_msg; + CDRMessage_t m_crypto_msg; #endif - // Functions to associate/remove associatedendpoints - void associateEndpoint(Endpoint *to_add); - void removeEndpoint(Endpoint *to_remove); - - private: - std::vector AssociatedWriters; - std::unordered_map> AssociatedReaders; - std::mutex mtx; - //!Protocol version of the message - ProtocolVersion_t sourceVersion; - //!VendorID that created the message - VendorId_t sourceVendorId; - //!GuidPrefix of the entity that created the message - GuidPrefix_t sourceGuidPrefix; - //!GuidPrefix of the entity that receives the message. GuidPrefix of the RTPSParticipant. - GuidPrefix_t destGuidPrefix; - //!Has the message timestamp? - bool haveTimestamp; - //!Timestamp associated with the message - Time_t timestamp; - //!Version of the protocol used by the receiving end. - ProtocolVersion_t destVersion; - - uint16_t mMaxPayload_; - - - /**@name Processing methods. - * These methods are designed to read a part of the message - * and perform the corresponding actions: - * -Modify the message receiver state if necessary. - * -Add information to the history. - * -Return an error if the message is malformed. - * @param[in] msg Pointer to the message - * @param[out] params Different parameters depending on the message - * @return True if correct, false otherwise - */ - - ///@{ - /** - * Check the RTPSHeader of a received message. - * @param msg Pointer to the message. - * @return True if correct. - */ - bool checkRTPSHeader(CDRMessage_t*msg); - /** - * Read the submessage header of a message. - * @param msg Pointer to the CDRMessage_t to read. - * @param smh Pointer to the submessageheader structure. - * @return True if correctly read. - */ - bool readSubmessageHeader(CDRMessage_t*msg, SubmessageHeader_t* smh); - - /** - * Find if there is a reader (in AssociatedReaders) that will accept a msg directed - * to the given entity ID. - */ - bool willAReaderAcceptMsgDirectedTo(const EntityId_t & readerID); - - /** - * Find all readers (in AssociatedReaders), with the given entity ID, and call the - * callback provided. - */ - template - void findAllReaders( - const EntityId_t & readerID, - const Functor & callback); - - /** - * - * @param msg - * @param smh - * @return - */ - bool proc_Submsg_Data(CDRMessage_t*msg, SubmessageHeader_t* smh); - bool proc_Submsg_DataFrag(CDRMessage_t*msg, SubmessageHeader_t* smh); - bool proc_Submsg_Acknack(CDRMessage_t*msg, SubmessageHeader_t* smh); - bool proc_Submsg_Heartbeat(CDRMessage_t*msg, SubmessageHeader_t* smh); - bool proc_Submsg_Gap(CDRMessage_t*msg, SubmessageHeader_t* smh); - bool proc_Submsg_InfoTS(CDRMessage_t*msg, SubmessageHeader_t* smh); - bool proc_Submsg_InfoDST(CDRMessage_t*msg,SubmessageHeader_t* smh); - bool proc_Submsg_InfoSRC(CDRMessage_t*msg,SubmessageHeader_t* smh); - bool proc_Submsg_NackFrag(CDRMessage_t*msg, SubmessageHeader_t* smh); - bool proc_Submsg_HeartbeatFrag(CDRMessage_t*msg, SubmessageHeader_t* smh); - bool proc_Submsg_SecureMessage(CDRMessage_t*msg, SubmessageHeader_t* smh); - bool proc_Submsg_SecureSubMessage(CDRMessage_t*msg, SubmessageHeader_t* smh); - - RTPSParticipantImpl* participant_; + + /**@name Processing methods. + * These methods are designed to read a part of the message + * and perform the corresponding actions: + * -Modify the message receiver state if necessary. + * -Add information to the history. + * -Return an error if the message is malformed. + * @param[in] msg Pointer to the message + * @param[out] params Different parameters depending on the message + * @return True if correct, false otherwise + */ + + ///@{ + /** + * Check the RTPSHeader of a received message. + * @param msg Pointer to the message. + * @return True if correct. + */ + bool checkRTPSHeader( + CDRMessage_t* msg); + /** + * Read the submessage header of a message. + * @param msg Pointer to the CDRMessage_t to read. + * @param smh Pointer to the submessageheader structure. + * @return True if correctly read. + */ + bool readSubmessageHeader( + CDRMessage_t* msg, + SubmessageHeader_t* smh); + + /** + * Find if there is a reader (in AssociatedReaders) that will accept a msg directed + * to the given entity ID. + */ + bool willAReaderAcceptMsgDirectedTo( + const EntityId_t& readerID); + + /** + * Find all readers (in AssociatedReaders), with the given entity ID, and call the + * callback provided. + */ + template + void findAllReaders( + const EntityId_t& readerID, + const Functor& callback); + + /** + * + * @param msg + * @param smh + * @return + */ + bool proc_Submsg_Data( + CDRMessage_t* msg, + SubmessageHeader_t* smh); + bool proc_Submsg_DataFrag( + CDRMessage_t* msg, + SubmessageHeader_t* smh); + bool proc_Submsg_Acknack( + CDRMessage_t* msg, + SubmessageHeader_t* smh); + bool proc_Submsg_Heartbeat( + CDRMessage_t* msg, + SubmessageHeader_t* smh); + bool proc_Submsg_Gap( + CDRMessage_t* msg, + SubmessageHeader_t* smh); + bool proc_Submsg_InfoTS( + CDRMessage_t* msg, + SubmessageHeader_t* smh); + bool proc_Submsg_InfoDST( + CDRMessage_t* msg, + SubmessageHeader_t* smh); + bool proc_Submsg_InfoSRC( + CDRMessage_t* msg, + SubmessageHeader_t* smh); + bool proc_Submsg_NackFrag( + CDRMessage_t* msg, + SubmessageHeader_t* smh); + bool proc_Submsg_HeartbeatFrag( + CDRMessage_t* msg, + SubmessageHeader_t* smh); + bool proc_Submsg_SecureMessage( + CDRMessage_t* msg, + SubmessageHeader_t* smh); + bool proc_Submsg_SecureSubMessage( + CDRMessage_t* msg, + SubmessageHeader_t* smh); + + RTPSParticipantImpl* participant_; }; -} + } /* namespace rtps */ +} /* namespace fastrtps */ } /* namespace eprosima */ + #endif #endif /* MESSAGERECEIVER_H_ */ diff --git a/src/cpp/rtps/messages/MessageReceiver.cpp b/src/cpp/rtps/messages/MessageReceiver.cpp index 60236c93592..42729688f9f 100644 --- a/src/cpp/rtps/messages/MessageReceiver.cpp +++ b/src/cpp/rtps/messages/MessageReceiver.cpp @@ -20,40 +20,40 @@ #include #include - #include - #include -#include "../participant/RTPSParticipantImpl.h" +#include -#include +#include "../participant/RTPSParticipantImpl.h" -#include #include +#include +#include - -#include - -#define IDSTRING "(ID:" << std::this_thread::get_id() <<") "<< +#define IDSTRING "(ID:" << std::this_thread::get_id() << ") " << using namespace eprosima::fastrtps; namespace eprosima { -namespace fastrtps{ +namespace fastrtps { namespace rtps { - -MessageReceiver::MessageReceiver(RTPSParticipantImpl* participant, uint32_t rec_buffer_size) : +MessageReceiver::MessageReceiver( + RTPSParticipantImpl* participant, + uint32_t rec_buffer_size) + : sourceVendorId(c_VendorId_Unknown) + , participant_(participant) #if HAVE_SECURITY - m_crypto_msg(rec_buffer_size), + , m_crypto_msg(rec_buffer_size) #endif - sourceVendorId(c_VendorId_Unknown), participant_(participant) { init(rec_buffer_size); } -void MessageReceiver::init(uint32_t rec_buffer_size){ +void MessageReceiver::init( + uint32_t rec_buffer_size) +{ destVersion = c_ProtocolVersion; sourceVersion = c_ProtocolVersion; sourceVendorId = c_VendorId_Unknown; @@ -62,31 +62,38 @@ void MessageReceiver::init(uint32_t rec_buffer_size){ haveTimestamp = false; timestamp = c_TimeInvalid; - logInfo(RTPS_MSG_IN,"Created with CDRMessage of size: "<< rec_buffer_size); - mMaxPayload_ = ((uint32_t)std::numeric_limits::max() < rec_buffer_size) ? std::numeric_limits::max() : (uint16_t)rec_buffer_size; + logInfo(RTPS_MSG_IN, "Created with CDRMessage of size: " << rec_buffer_size); + mMaxPayload_ = + ((uint32_t)std::numeric_limits::max() < + rec_buffer_size) ? std::numeric_limits::max() : (uint16_t)rec_buffer_size; } MessageReceiver::~MessageReceiver() { - logInfo(RTPS_MSG_IN,""); + logInfo(RTPS_MSG_IN, ""); assert(AssociatedWriters.size() == 0); assert(AssociatedReaders.size() == 0); } -void MessageReceiver::associateEndpoint(Endpoint *to_add){ +void MessageReceiver::associateEndpoint( + Endpoint* to_add) +{ bool found = false; std::lock_guard guard(mtx); - if(to_add->getAttributes().endpointKind == WRITER) + if (to_add->getAttributes().endpointKind == WRITER) { - for(auto it = AssociatedWriters.begin(); it != AssociatedWriters.end(); ++it) + for (auto it = AssociatedWriters.begin(); it != AssociatedWriters.end(); ++it) { - if( (*it) == (RTPSWriter*)to_add ) + if ( (*it) == (RTPSWriter*)to_add ) { found = true; break; } } - if(!found) AssociatedWriters.push_back((RTPSWriter*)to_add); + if (!found) + { + AssociatedWriters.push_back((RTPSWriter*)to_add); + } } else { @@ -94,42 +101,63 @@ void MessageReceiver::associateEndpoint(Endpoint *to_add){ const auto entityId = reader->getGuid().entityId; // search for set of readers by entity ID const auto readers = AssociatedReaders.find(entityId); - if (readers == AssociatedReaders.end()) { + if (readers == AssociatedReaders.end()) + { auto vec = std::vector(); vec.push_back(reader); AssociatedReaders.emplace(entityId, vec); } - else { - for (const auto & it : readers->second) { - if (it == reader) { + else + { + for (const auto& it : readers->second) + { + if (it == reader) + { found = true; break; } } - if (!found) readers->second.push_back(reader); + if (!found) + { + readers->second.push_back(reader); + } } } return; } -void MessageReceiver::removeEndpoint(Endpoint *to_remove){ + +void MessageReceiver::removeEndpoint( + Endpoint* to_remove) +{ std::lock_guard guard(mtx); - if(to_remove->getAttributes().endpointKind == WRITER){ - RTPSWriter* var = (RTPSWriter *)to_remove; - for(auto it=AssociatedWriters.begin(); it !=AssociatedWriters.end(); ++it){ - if ((*it) == var){ + if (to_remove->getAttributes().endpointKind == WRITER) + { + RTPSWriter* var = (RTPSWriter*)to_remove; + for (auto it = AssociatedWriters.begin(); it != AssociatedWriters.end(); ++it) + { + if ((*it) == var) + { AssociatedWriters.erase(it); break; } } - }else{ + } + else + { auto readers = AssociatedReaders.find(to_remove->getGuid().entityId); - if (readers != AssociatedReaders.end()) { - RTPSReader *var = (RTPSReader *)to_remove; - for (auto it = readers->second.begin(); it != readers->second.end(); ++it) { - if (*it == var){ + if (readers != AssociatedReaders.end()) + { + RTPSReader* var = (RTPSReader*)to_remove; + for (auto it = readers->second.begin(); it != readers->second.end(); ++it) + { + if (*it == var) + { readers->second.erase(it); - if (readers->second.empty()) AssociatedReaders.erase(readers); + if (readers->second.empty()) + { + AssociatedReaders.erase(readers); + } break; } } @@ -138,8 +166,8 @@ void MessageReceiver::removeEndpoint(Endpoint *to_remove){ return; } - -void MessageReceiver::reset(){ +void MessageReceiver::reset() +{ destVersion = c_ProtocolVersion; sourceVersion = c_ProtocolVersion; sourceVendorId = c_VendorId_Unknown; @@ -149,13 +177,15 @@ void MessageReceiver::reset(){ timestamp = c_TimeInvalid; } -void MessageReceiver::processCDRMsg(const Locator_t& loc, CDRMessage_t*msg) +void MessageReceiver::processCDRMsg( + const Locator_t& loc, + CDRMessage_t* msg) { (void)loc; - if(msg->length < RTPSMESSAGE_HEADER_SIZE) + if (msg->length < RTPSMESSAGE_HEADER_SIZE) { - logWarning(RTPS_MSG_IN,IDSTRING"Received message too short, ignoring"); + logWarning(RTPS_MSG_IN, IDSTRING "Received message too short, ignoring"); return; } @@ -167,7 +197,7 @@ void MessageReceiver::processCDRMsg(const Locator_t& loc, CDRMessage_t*msg) msg->pos = 0; //Start reading at 0 //Once everything is set, the reading begins: - if(!checkRTPSHeader(msg)) + if (!checkRTPSHeader(msg)) { return; } @@ -178,9 +208,11 @@ void MessageReceiver::processCDRMsg(const Locator_t& loc, CDRMessage_t*msg) int decode_ret = participant_->security_manager().decode_rtps_message(*msg, *auxiliary_buffer, sourceGuidPrefix); - if(decode_ret < 0) + if (decode_ret < 0) + { return; - else if(decode_ret == 0) + } + else if (decode_ret == 0) { // Swap std::swap(msg, auxiliary_buffer); @@ -193,7 +225,7 @@ void MessageReceiver::processCDRMsg(const Locator_t& loc, CDRMessage_t*msg) SubmessageHeader_t submsgh; //Current submessage header //Pointers to different types of messages: - while(msg->pos < msg->length)// end of the message + while (msg->pos < msg->length)// end of the message { CDRMessage_t* submessage = msg; @@ -201,132 +233,134 @@ void MessageReceiver::processCDRMsg(const Locator_t& loc, CDRMessage_t*msg) CDRMessage::initCDRMsg(auxiliary_buffer); decode_ret = participant_->security_manager().decode_rtps_submessage(*msg, *auxiliary_buffer, sourceGuidPrefix); - if(decode_ret < 0) + if (decode_ret < 0) { return; } - else if(decode_ret == 0) + else if (decode_ret == 0) { submessage = auxiliary_buffer; } #endif //First 4 bytes must contain: ID | flags | octets to next header - if(!readSubmessageHeader(submessage, &submsgh)) + if (!readSubmessageHeader(submessage, &submsgh)) + { return; + } valid = true; count++; uint32_t next_msg_pos = submessage->pos; next_msg_pos += (submsgh.submessageLength + 3) & ~3; - switch(submsgh.submessageId) + switch (submsgh.submessageId) { case DATA: + { + if (this->destGuidPrefix != participantGuidPrefix) { - if(this->destGuidPrefix != participantGuidPrefix) - { - logInfo(RTPS_MSG_IN,IDSTRING"Data Submsg ignored, DST is another RTPSParticipant"); - } - else - { - logInfo(RTPS_MSG_IN,IDSTRING"Data Submsg received, processing."); - valid = proc_Submsg_Data(submessage, &submsgh); - } - break; + logInfo(RTPS_MSG_IN, IDSTRING "Data Submsg ignored, DST is another RTPSParticipant"); + } + else + { + logInfo(RTPS_MSG_IN, IDSTRING "Data Submsg received, processing."); + valid = proc_Submsg_Data(submessage, &submsgh); } + break; + } case DATA_FRAG: if (this->destGuidPrefix != participantGuidPrefix) { - logInfo(RTPS_MSG_IN, IDSTRING"DataFrag Submsg ignored, DST is another RTPSParticipant"); + logInfo(RTPS_MSG_IN, IDSTRING "DataFrag Submsg ignored, DST is another RTPSParticipant"); } else { - logInfo(RTPS_MSG_IN, IDSTRING"DataFrag Submsg received, processing."); + logInfo(RTPS_MSG_IN, IDSTRING "DataFrag Submsg received, processing."); valid = proc_Submsg_DataFrag(submessage, &submsgh); } break; case GAP: + { + if (this->destGuidPrefix != participantGuidPrefix) { - if(this->destGuidPrefix != participantGuidPrefix) - { - logInfo(RTPS_MSG_IN,IDSTRING"Gap Submsg ignored, DST is another RTPSParticipant..."); - } - else - { - logInfo(RTPS_MSG_IN,IDSTRING"Gap Submsg received, processing..."); - valid = proc_Submsg_Gap(submessage, &submsgh); - } - break; + logInfo(RTPS_MSG_IN, IDSTRING "Gap Submsg ignored, DST is another RTPSParticipant..."); } + else + { + logInfo(RTPS_MSG_IN, IDSTRING "Gap Submsg received, processing..."); + valid = proc_Submsg_Gap(submessage, &submsgh); + } + break; + } case ACKNACK: + { + if (this->destGuidPrefix != participantGuidPrefix) { - if(this->destGuidPrefix != participantGuidPrefix) - { - logInfo(RTPS_MSG_IN,IDSTRING"Acknack Submsg ignored, DST is another RTPSParticipant..."); - } - else - { - logInfo(RTPS_MSG_IN,IDSTRING"Acknack Submsg received, processing..."); - valid = proc_Submsg_Acknack(submessage, &submsgh); - } - break; + logInfo(RTPS_MSG_IN, IDSTRING "Acknack Submsg ignored, DST is another RTPSParticipant..."); } + else + { + logInfo(RTPS_MSG_IN, IDSTRING "Acknack Submsg received, processing..."); + valid = proc_Submsg_Acknack(submessage, &submsgh); + } + break; + } case NACK_FRAG: + { + if (this->destGuidPrefix != participantGuidPrefix) { - if (this->destGuidPrefix != participantGuidPrefix) - { - logInfo(RTPS_MSG_IN, IDSTRING"NackFrag Submsg ignored, DST is another RTPSParticipant..."); - } - else - { - logInfo(RTPS_MSG_IN, IDSTRING"NackFrag Submsg received, processing..."); - valid = proc_Submsg_NackFrag(submessage, &submsgh); - } - break; + logInfo(RTPS_MSG_IN, IDSTRING "NackFrag Submsg ignored, DST is another RTPSParticipant..."); + } + else + { + logInfo(RTPS_MSG_IN, IDSTRING "NackFrag Submsg received, processing..."); + valid = proc_Submsg_NackFrag(submessage, &submsgh); } + break; + } case HEARTBEAT: + { + if (this->destGuidPrefix != participantGuidPrefix) { - if(this->destGuidPrefix != participantGuidPrefix) - { - logInfo(RTPS_MSG_IN,IDSTRING"HB Submsg ignored, DST is another RTPSParticipant..."); - } - else - { - logInfo(RTPS_MSG_IN,IDSTRING"Heartbeat Submsg received, processing..."); - valid = proc_Submsg_Heartbeat(submessage, &submsgh); - } - break; + logInfo(RTPS_MSG_IN, IDSTRING "HB Submsg ignored, DST is another RTPSParticipant..."); + } + else + { + logInfo(RTPS_MSG_IN, IDSTRING "Heartbeat Submsg received, processing..."); + valid = proc_Submsg_Heartbeat(submessage, &submsgh); } + break; + } case HEARTBEAT_FRAG: + { + if (this->destGuidPrefix != participantGuidPrefix) { - if (this->destGuidPrefix != participantGuidPrefix) - { - logInfo(RTPS_MSG_IN, IDSTRING"HBFrag Submsg ignored, DST is another RTPSParticipant..."); - } - else - { - logInfo(RTPS_MSG_IN, IDSTRING"HeartbeatFrag Submsg received, processing..."); - valid = proc_Submsg_HeartbeatFrag(submessage, &submsgh); - } - break; + logInfo(RTPS_MSG_IN, IDSTRING "HBFrag Submsg ignored, DST is another RTPSParticipant..."); } + else + { + logInfo(RTPS_MSG_IN, IDSTRING "HeartbeatFrag Submsg received, processing..."); + valid = proc_Submsg_HeartbeatFrag(submessage, &submsgh); + } + break; + } case PAD: - logWarning(RTPS_MSG_IN,IDSTRING"PAD messages not yet implemented, ignoring"); + logWarning(RTPS_MSG_IN, IDSTRING "PAD messages not yet implemented, ignoring"); break; case INFO_DST: - logInfo(RTPS_MSG_IN,IDSTRING"InfoDST message received, processing..."); + logInfo(RTPS_MSG_IN, IDSTRING "InfoDST message received, processing..."); valid = proc_Submsg_InfoDST(submessage, &submsgh); break; case INFO_SRC: - logInfo(RTPS_MSG_IN,IDSTRING"InfoSRC message received, processing..."); + logInfo(RTPS_MSG_IN, IDSTRING "InfoSRC message received, processing..."); valid = proc_Submsg_InfoSRC(submessage, &submsgh); break; case INFO_TS: - { - logInfo(RTPS_MSG_IN,IDSTRING"InfoTS Submsg received, processing..."); - valid = proc_Submsg_InfoTS(submessage, &submsgh); - break; - } + { + logInfo(RTPS_MSG_IN, IDSTRING "InfoTS Submsg received, processing..."); + valid = proc_Submsg_InfoTS(submessage, &submsgh); + break; + } case INFO_REPLY: break; case INFO_REPLY_IP4: @@ -335,7 +369,7 @@ void MessageReceiver::processCDRMsg(const Locator_t& loc, CDRMessage_t*msg) break; } - if(!valid || submsgh.is_last) + if (!valid || submsgh.is_last) { break; } @@ -347,58 +381,61 @@ void MessageReceiver::processCDRMsg(const Locator_t& loc, CDRMessage_t*msg) participant_->assert_remote_participant_liveliness(sourceGuidPrefix); } -bool MessageReceiver::checkRTPSHeader(CDRMessage_t*msg) //check and proccess the RTPS Header +bool MessageReceiver::checkRTPSHeader( + CDRMessage_t* msg) //check and proccess the RTPS Header { - if(msg->buffer[0] != 'R' || msg->buffer[1] != 'T' || + if (msg->buffer[0] != 'R' || msg->buffer[1] != 'T' || msg->buffer[2] != 'P' || msg->buffer[3] != 'S') { - logInfo(RTPS_MSG_IN,IDSTRING"Msg received with no RTPS in header, ignoring..."); + logInfo(RTPS_MSG_IN, IDSTRING "Msg received with no RTPS in header, ignoring..."); return false; } - msg->pos+=4; + msg->pos += 4; //CHECK AND SET protocol version - if(msg->buffer[msg->pos] <= destVersion.m_major) + if (msg->buffer[msg->pos] <= destVersion.m_major) { - sourceVersion.m_major = msg->buffer[msg->pos];msg->pos++; - sourceVersion.m_minor = msg->buffer[msg->pos];msg->pos++; + sourceVersion.m_major = msg->buffer[msg->pos]; msg->pos++; + sourceVersion.m_minor = msg->buffer[msg->pos]; msg->pos++; } else { - logWarning(RTPS_MSG_IN,IDSTRING"Major RTPS Version not supported"); + logWarning(RTPS_MSG_IN, IDSTRING "Major RTPS Version not supported"); return false; } //Set source vendor id - sourceVendorId[0] = msg->buffer[msg->pos];msg->pos++; - sourceVendorId[1] = msg->buffer[msg->pos];msg->pos++; + sourceVendorId[0] = msg->buffer[msg->pos]; msg->pos++; + sourceVendorId[1] = msg->buffer[msg->pos]; msg->pos++; //set source guid prefix - memcpy(sourceGuidPrefix.value,&msg->buffer[msg->pos],12); - msg->pos+=12; + memcpy(sourceGuidPrefix.value, &msg->buffer[msg->pos], 12); + msg->pos += 12; haveTimestamp = false; return true; } - -bool MessageReceiver::readSubmessageHeader(CDRMessage_t* msg, SubmessageHeader_t* smh) +bool MessageReceiver::readSubmessageHeader( + CDRMessage_t* msg, + SubmessageHeader_t* smh) { - if(msg->length - msg->pos < 4) + if (msg->length - msg->pos < 4) { - logWarning(RTPS_MSG_IN,IDSTRING"SubmessageHeader too short"); + logWarning(RTPS_MSG_IN, IDSTRING "SubmessageHeader too short"); return false; } - smh->submessageId = msg->buffer[msg->pos];msg->pos++; - smh->flags = msg->buffer[msg->pos];msg->pos++; + smh->submessageId = msg->buffer[msg->pos]; msg->pos++; + smh->flags = msg->buffer[msg->pos]; msg->pos++; //Set endianness of message msg->msg_endian = smh->flags & BIT(0) ? LITTLEEND : BIGEND; uint16_t length = 0; - CDRMessage::readUInt16(msg,&length); + CDRMessage::readUInt16(msg, &length); if (msg->pos + length > msg->length) { - logWarning(RTPS_MSG_IN, IDSTRING"SubMsg of invalid length (" << length - << ") with current msg position/length (" << msg->pos << "/" << msg->length << ")"); + logWarning(RTPS_MSG_IN, IDSTRING "SubMsg of invalid length (" << length + << ") with current msg position/length (" << msg->pos << "/" << msg->length << + ")"); return false; } @@ -420,9 +457,9 @@ bool MessageReceiver::readSubmessageHeader(CDRMessage_t* msg, SubmessageHeader_t bool MessageReceiver::willAReaderAcceptMsgDirectedTo( const EntityId_t& readerID) { - if(AssociatedReaders.empty()) + if (AssociatedReaders.empty()) { - logWarning(RTPS_MSG_IN,IDSTRING"Data received when NO readers are listening"); + logWarning(RTPS_MSG_IN, IDSTRING "Data received when NO readers are listening"); return false; } @@ -436,7 +473,7 @@ bool MessageReceiver::willAReaderAcceptMsgDirectedTo( } else { - for(const auto& readers : AssociatedReaders) + for (const auto& readers : AssociatedReaders) { if (readers.second.empty()) { @@ -453,7 +490,7 @@ bool MessageReceiver::willAReaderAcceptMsgDirectedTo( } } - logWarning(RTPS_MSG_IN, IDSTRING"No Reader accepts this message (directed to: " < guard(mtx); //READ and PROCESS - if(smh->submessageLength < RTPSMESSAGE_DATA_MIN_LENGTH) + if (smh->submessageLength < RTPSMESSAGE_DATA_MIN_LENGTH) { - logInfo(RTPS_MSG_IN,IDSTRING"Too short submessage received, ignoring"); + logInfo(RTPS_MSG_IN, IDSTRING "Too short submessage received, ignoring"); return false; } //Fill flags bool values @@ -503,20 +542,24 @@ bool MessageReceiver::proc_Submsg_Data(CDRMessage_t* msg,SubmessageHeader_t* smh bool inlineQosFlag = smh->flags & BIT(1) ? true : false; bool dataFlag = smh->flags & BIT(2) ? true : false; bool keyFlag = smh->flags & BIT(3) ? true : false; - if(keyFlag && dataFlag) + if (keyFlag && dataFlag) { - logWarning(RTPS_MSG_IN,IDSTRING"Message received with Data and Key Flag set, ignoring"); + logWarning(RTPS_MSG_IN, IDSTRING "Message received with Data and Key Flag set, ignoring"); return false; } //Assign message endianness - if(endiannessFlag) + if (endiannessFlag) + { msg->msg_endian = LITTLEEND; + } else + { msg->msg_endian = BIGEND; + } //Extra flags don't matter now. Avoid those bytes - msg->pos+=2; + msg->pos += 2; bool valid = true; int16_t octetsToInlineQos; @@ -524,7 +567,7 @@ bool MessageReceiver::proc_Submsg_Data(CDRMessage_t* msg,SubmessageHeader_t* smh //reader and writer ID EntityId_t readerID; - valid &= CDRMessage::readEntityId(msg,&readerID); + valid &= CDRMessage::readEntityId(msg, &readerID); //WE KNOW THE READER THAT THE MESSAGE IS DIRECTED TO SO WE LOOK FOR IT: if (!willAReaderAcceptMsgDirectedTo(readerID)) @@ -538,52 +581,55 @@ bool MessageReceiver::proc_Submsg_Data(CDRMessage_t* msg,SubmessageHeader_t* smh ch.kind = ALIVE; ch.serializedPayload.max_size = mMaxPayload_; ch.writerGUID.guidPrefix = sourceGuidPrefix; - valid &= CDRMessage::readEntityId(msg,&ch.writerGUID.entityId); + valid &= CDRMessage::readEntityId(msg, &ch.writerGUID.entityId); //Get sequence number - valid &= CDRMessage::readSequenceNumber(msg,&ch.sequenceNumber); + valid &= CDRMessage::readSequenceNumber(msg, &ch.sequenceNumber); - if (!valid){ + if (!valid) + { return false; } - if(ch.sequenceNumber <= SequenceNumber_t(0, 0) || (ch.sequenceNumber.high == -1 && ch.sequenceNumber.low == 0)) //message invalid //TODO make faster + if (ch.sequenceNumber <= SequenceNumber_t(0, 0) || (ch.sequenceNumber.high == -1 && ch.sequenceNumber.low == 0)) //message invalid //TODO make faster { - logWarning(RTPS_MSG_IN,IDSTRING"Invalid message received, bad sequence Number"); + logWarning(RTPS_MSG_IN, IDSTRING "Invalid message received, bad sequence Number"); return false; } //Jump ahead if more parameters are before inlineQos (not in this version, maybe if further minor versions.) - if(octetsToInlineQos > RTPSMESSAGE_OCTETSTOINLINEQOS_DATASUBMSG) + if (octetsToInlineQos > RTPSMESSAGE_OCTETSTOINLINEQOS_DATASUBMSG) { msg->pos += (octetsToInlineQos - RTPSMESSAGE_OCTETSTOINLINEQOS_DATASUBMSG); if (msg->pos > msg->length) { - logWarning(RTPS_MSG_IN, IDSTRING "Invalid jump through msg, msg->pos " << msg->pos << " > msg->length " << msg->length); + logWarning(RTPS_MSG_IN, + IDSTRING "Invalid jump through msg, msg->pos " << msg->pos << " > msg->length " << msg->length); return false; } } uint32_t inlineQosSize = 0; - if(inlineQosFlag) + if (inlineQosFlag) { - if(false == ParameterList::updateCacheChangeFromInlineQos(ch, msg, inlineQosSize) ) + if (false == ParameterList::updateCacheChangeFromInlineQos(ch, msg, inlineQosSize) ) { - logInfo(RTPS_MSG_IN,IDSTRING"SubMessage Data ERROR, Inline Qos ParameterList error"); + logInfo(RTPS_MSG_IN, IDSTRING "SubMessage Data ERROR, Inline Qos ParameterList error"); return false; } } - if(dataFlag || keyFlag) + if (dataFlag || keyFlag) { uint32_t payload_size; - payload_size = smh->submessageLength - (RTPSMESSAGE_DATA_EXTRA_INLINEQOS_SIZE+octetsToInlineQos+inlineQosSize); + payload_size = smh->submessageLength - + (RTPSMESSAGE_DATA_EXTRA_INLINEQOS_SIZE + octetsToInlineQos + inlineQosSize); - if(dataFlag) + if (dataFlag) { - if(ch.serializedPayload.max_size >= payload_size && payload_size > 0) + if (ch.serializedPayload.max_size >= payload_size && payload_size > 0) { ch.serializedPayload.data = &msg->buffer[msg->pos]; ch.serializedPayload.length = payload_size; @@ -591,16 +637,16 @@ bool MessageReceiver::proc_Submsg_Data(CDRMessage_t* msg,SubmessageHeader_t* smh } else { - logWarning(RTPS_MSG_IN,IDSTRING"Serialized Payload value invalid or larger than maximum allowed size" - "(" <pos += payload_size; } } // Set sourcetimestamp - if(haveTimestamp) + if (haveTimestamp) { ch.sourceTimestamp = this->timestamp; } //FIXME: DO SOMETHING WITH PARAMETERLIST CREATED. - logInfo(RTPS_MSG_IN,IDSTRING"from Writer " << ch.writerGUID << "; possible RTPSReader entities: " - << AssociatedReaders.size()); + logInfo(RTPS_MSG_IN, IDSTRING "from Writer " << ch.writerGUID << "; possible RTPSReader entities: " + << AssociatedReaders.size()); //Look for the correct reader to add the change findAllReaders(readerID, [ - &ch - ] (RTPSReader* reader) { - reader->processDataMsg(&ch); - }); + &ch + ] (RTPSReader* reader) { + reader->processDataMsg(&ch); + }); //TODO(Ricardo) If a exception is thrown (ex, by fastcdr), this line is not executed -> segmentation fault ch.serializedPayload.data = nullptr; - logInfo(RTPS_MSG_IN,IDSTRING"Sub Message DATA processed"); + logInfo(RTPS_MSG_IN, IDSTRING "Sub Message DATA processed"); return true; } -bool MessageReceiver::proc_Submsg_DataFrag(CDRMessage_t* msg, SubmessageHeader_t* smh) +bool MessageReceiver::proc_Submsg_DataFrag( + CDRMessage_t* msg, + SubmessageHeader_t* smh) { std::lock_guard guard(mtx); //READ and PROCESS if (smh->submessageLength < RTPSMESSAGE_DATA_MIN_LENGTH) { - logInfo(RTPS_MSG_IN, IDSTRING"Too short submessage received, ignoring"); + logInfo(RTPS_MSG_IN, IDSTRING "Too short submessage received, ignoring"); return false; } @@ -659,9 +707,13 @@ bool MessageReceiver::proc_Submsg_DataFrag(CDRMessage_t* msg, SubmessageHeader_t //Assign message endianness if (endiannessFlag) + { msg->msg_endian = LITTLEEND; + } else + { msg->msg_endian = BIGEND; + } //Extra flags don't matter now. Avoid those bytes msg->pos += 2; @@ -675,7 +727,10 @@ bool MessageReceiver::proc_Submsg_DataFrag(CDRMessage_t* msg, SubmessageHeader_t valid &= CDRMessage::readEntityId(msg, &readerID); //WE KNOW THE READER THAT THE MESSAGE IS DIRECTED TO SO WE LOOK FOR IT: - if (!willAReaderAcceptMsgDirectedTo(readerID)) return false; + if (!willAReaderAcceptMsgDirectedTo(readerID)) + { + return false; + } //FOUND THE READER. //We ask the reader for a cachechange to store the information. @@ -689,7 +744,7 @@ bool MessageReceiver::proc_Submsg_DataFrag(CDRMessage_t* msg, SubmessageHeader_t if (ch.sequenceNumber <= SequenceNumber_t()) { - logWarning(RTPS_MSG_IN, IDSTRING"Invalid message received, bad sequence Number"); + logWarning(RTPS_MSG_IN, IDSTRING "Invalid message received, bad sequence Number"); return false; } @@ -709,7 +764,8 @@ bool MessageReceiver::proc_Submsg_DataFrag(CDRMessage_t* msg, SubmessageHeader_t uint32_t sampleSize; valid &= CDRMessage::readUInt32(msg, &sampleSize); - if(!valid){ + if (!valid) + { return false; } @@ -719,7 +775,8 @@ bool MessageReceiver::proc_Submsg_DataFrag(CDRMessage_t* msg, SubmessageHeader_t msg->pos += (octetsToInlineQos - RTPSMESSAGE_OCTETSTOINLINEQOS_DATAFRAGSUBMSG); if (msg->pos > msg->length) { - logWarning(RTPS_MSG_IN, IDSTRING "Invalid jump through msg, msg->pos " << msg->pos << " > msg->length " << msg->length); + logWarning(RTPS_MSG_IN, + IDSTRING "Invalid jump through msg, msg->pos " << msg->pos << " > msg->length " << msg->length); return false; } } @@ -730,7 +787,7 @@ bool MessageReceiver::proc_Submsg_DataFrag(CDRMessage_t* msg, SubmessageHeader_t { if (false == ParameterList::updateCacheChangeFromInlineQos(ch, msg, inlineQosSize)) { - logInfo(RTPS_MSG_IN, IDSTRING"SubMessage Data ERROR, Inline Qos ParameterList error"); + logInfo(RTPS_MSG_IN, IDSTRING "SubMessage Data ERROR, Inline Qos ParameterList error"); return false; } } @@ -755,7 +812,7 @@ bool MessageReceiver::proc_Submsg_DataFrag(CDRMessage_t* msg, SubmessageHeader_t } else { - logWarning(RTPS_MSG_IN, IDSTRING"Serialized Payload value invalid or larger than maximum allowed size " + logWarning(RTPS_MSG_IN, IDSTRING "Serialized Payload value invalid or larger than maximum allowed size " "(" << payload_size << "/" << ch.serializedPayload.max_size << ")"); return false; } @@ -773,97 +830,109 @@ bool MessageReceiver::proc_Submsg_DataFrag(CDRMessage_t* msg, SubmessageHeader_t logError(RTPS_MSG_IN, IDSTRING"Bad encapsulation for KeyHash and status parameter list"); return false; } - //uint32_t param_size; - if (ParameterList::readParameterListfromCDRMsg(msg, &m_ParamList, ch, false) <= 0) - { - logInfo(RTPS_MSG_IN, IDSTRING"SubMessage Data ERROR, keyFlag ParameterList"); - return false; - } - msg->msg_endian = previous_endian; - */ + //uint32_t param_size; + if (ParameterList::readParameterListfromCDRMsg(msg, &m_ParamList, ch, false) <= 0) + { + logInfo(RTPS_MSG_IN, IDSTRING"SubMessage Data ERROR, keyFlag ParameterList"); + return false; + } + msg->msg_endian = previous_endian; + */ } // Set sourcetimestamp if (haveTimestamp) + { ch.sourceTimestamp = this->timestamp; + } //FIXME: DO SOMETHING WITH PARAMETERLIST CREATED. - logInfo(RTPS_MSG_IN, IDSTRING"from Writer " << ch.writerGUID << "; possible RTPSReader entities: " - << AssociatedReaders.size()); + logInfo(RTPS_MSG_IN, IDSTRING "from Writer " << ch.writerGUID << "; possible RTPSReader entities: " + << AssociatedReaders.size()); //Look for the correct reader to add the change findAllReaders(readerID, [ - &ch, sampleSize, fragmentStartingNum, fragmentsInSubmessage - ] (RTPSReader* reader) { - reader->processDataFragMsg(&ch, sampleSize, fragmentStartingNum, fragmentsInSubmessage); - }); + &ch, sampleSize, fragmentStartingNum, fragmentsInSubmessage + ] (RTPSReader* reader) { + reader->processDataFragMsg(&ch, sampleSize, fragmentStartingNum, fragmentsInSubmessage); + }); ch.serializedPayload.data = nullptr; - logInfo(RTPS_MSG_IN, IDSTRING"Sub Message DATA_FRAG processed"); + logInfo(RTPS_MSG_IN, IDSTRING "Sub Message DATA_FRAG processed"); return true; } - -bool MessageReceiver::proc_Submsg_Heartbeat(CDRMessage_t* msg,SubmessageHeader_t* smh) +bool MessageReceiver::proc_Submsg_Heartbeat( + CDRMessage_t* msg, + SubmessageHeader_t* smh) { bool endiannessFlag = smh->flags & BIT(0) ? true : false; bool finalFlag = smh->flags & BIT(1) ? true : false; bool livelinessFlag = smh->flags & BIT(2) ? true : false; //Assign message endianness - if(endiannessFlag) + if (endiannessFlag) + { msg->msg_endian = LITTLEEND; + } else + { msg->msg_endian = BIGEND; + } GUID_t readerGUID, writerGUID; readerGUID.guidPrefix = destGuidPrefix; - CDRMessage::readEntityId(msg,&readerGUID.entityId); + CDRMessage::readEntityId(msg, &readerGUID.entityId); writerGUID.guidPrefix = sourceGuidPrefix; - CDRMessage::readEntityId(msg,&writerGUID.entityId); + CDRMessage::readEntityId(msg, &writerGUID.entityId); SequenceNumber_t firstSN, lastSN; - CDRMessage::readSequenceNumber(msg,&firstSN); - CDRMessage::readSequenceNumber(msg,&lastSN); - if(lastSN < firstSN && lastSN != firstSN-1) + CDRMessage::readSequenceNumber(msg, &firstSN); + CDRMessage::readSequenceNumber(msg, &lastSN); + if (lastSN < firstSN && lastSN != firstSN - 1) { - logWarning(RTPS_MSG_IN, IDSTRING"Invalid Heartbeat received (" << firstSN << ") - (" << + logWarning(RTPS_MSG_IN, IDSTRING "Invalid Heartbeat received (" << firstSN << ") - (" << lastSN << "), ignoring"); return false; } uint32_t HBCount; - CDRMessage::readUInt32(msg,&HBCount); + CDRMessage::readUInt32(msg, &HBCount); std::lock_guard guard(mtx); //Look for the correct reader and writers: findAllReaders(readerGUID.entityId, [ - &writerGUID, &HBCount, &firstSN, &lastSN, finalFlag, livelinessFlag - ] (RTPSReader* reader) { - reader->processHeartbeatMsg(writerGUID, HBCount, firstSN, lastSN, finalFlag, livelinessFlag); - }); + &writerGUID, &HBCount, &firstSN, &lastSN, finalFlag, livelinessFlag + ] (RTPSReader* reader) { + reader->processHeartbeatMsg(writerGUID, HBCount, firstSN, lastSN, finalFlag, livelinessFlag); + }); return true; } - -bool MessageReceiver::proc_Submsg_Acknack(CDRMessage_t* msg,SubmessageHeader_t* smh) +bool MessageReceiver::proc_Submsg_Acknack( + CDRMessage_t* msg, + SubmessageHeader_t* smh) { bool endiannessFlag = smh->flags & BIT(0) ? true : false; - bool finalFlag = smh->flags & BIT(1) ? true: false; + bool finalFlag = smh->flags & BIT(1) ? true : false; //Assign message endianness - if(endiannessFlag) + if (endiannessFlag) + { msg->msg_endian = LITTLEEND; + } else + { msg->msg_endian = BIGEND; - GUID_t readerGUID,writerGUID; + } + GUID_t readerGUID, writerGUID; readerGUID.guidPrefix = sourceGuidPrefix; - CDRMessage::readEntityId(msg,&readerGUID.entityId); + CDRMessage::readEntityId(msg, &readerGUID.entityId); writerGUID.guidPrefix = destGuidPrefix; - CDRMessage::readEntityId(msg,&writerGUID.entityId); + CDRMessage::readEntityId(msg, &writerGUID.entityId); SequenceNumberSet_t SNSet = CDRMessage::readSequenceNumberSet(msg); uint32_t Ackcount; - CDRMessage::readUInt32(msg,&Ackcount); + CDRMessage::readUInt32(msg, &Ackcount); std::lock_guard guard(mtx); //Look for the correct writer to use the acknack @@ -875,119 +944,152 @@ bool MessageReceiver::proc_Submsg_Acknack(CDRMessage_t* msg,SubmessageHeader_t* { if (!result) { - logInfo(RTPS_MSG_IN, IDSTRING"Acknack msg to NOT stateful writer "); + logInfo(RTPS_MSG_IN, IDSTRING "Acknack msg to NOT stateful writer "); } return result; } } - logInfo(RTPS_MSG_IN,IDSTRING"Acknack msg to UNKNOWN writer (I loooked through " + logInfo(RTPS_MSG_IN, IDSTRING "Acknack msg to UNKNOWN writer (I loooked through " << AssociatedWriters.size() << " writers in this ListenResource)"); return false; } - - -bool MessageReceiver::proc_Submsg_Gap(CDRMessage_t* msg,SubmessageHeader_t* smh) +bool MessageReceiver::proc_Submsg_Gap( + CDRMessage_t* msg, + SubmessageHeader_t* smh) { bool endiannessFlag = smh->flags & BIT(0) ? true : false; //Assign message endianness - if(endiannessFlag) + if (endiannessFlag) + { msg->msg_endian = LITTLEEND; + } else + { msg->msg_endian = BIGEND; + } - GUID_t writerGUID,readerGUID; + GUID_t writerGUID, readerGUID; readerGUID.guidPrefix = destGuidPrefix; - CDRMessage::readEntityId(msg,&readerGUID.entityId); + CDRMessage::readEntityId(msg, &readerGUID.entityId); writerGUID.guidPrefix = sourceGuidPrefix; - CDRMessage::readEntityId(msg,&writerGUID.entityId); + CDRMessage::readEntityId(msg, &writerGUID.entityId); SequenceNumber_t gapStart; - CDRMessage::readSequenceNumber(msg,&gapStart); + CDRMessage::readSequenceNumber(msg, &gapStart); SequenceNumberSet_t gapList = CDRMessage::readSequenceNumberSet(msg); - if(gapStart <= SequenceNumber_t(0, 0)) + if (gapStart <= SequenceNumber_t(0, 0)) + { return false; + } std::lock_guard guard(mtx); findAllReaders(readerGUID.entityId, [ - &writerGUID, &gapStart, &gapList - ] (RTPSReader* reader) { - reader->processGapMsg(writerGUID, gapStart, gapList); - }); + &writerGUID, &gapStart, &gapList + ] (RTPSReader* reader) { + reader->processGapMsg(writerGUID, gapStart, gapList); + }); return true; } -bool MessageReceiver::proc_Submsg_InfoTS(CDRMessage_t* msg,SubmessageHeader_t* smh) +bool MessageReceiver::proc_Submsg_InfoTS( + CDRMessage_t* msg, + SubmessageHeader_t* smh) { bool endiannessFlag = smh->flags & BIT(0) ? true : false; bool timeFlag = smh->flags & BIT(1) ? true : false; //Assign message endianness - if(endiannessFlag) + if (endiannessFlag) + { msg->msg_endian = LITTLEEND; + } else + { msg->msg_endian = BIGEND; - if(!timeFlag) + } + if (!timeFlag) { haveTimestamp = true; - CDRMessage::readTimestamp(msg,×tamp); + CDRMessage::readTimestamp(msg, ×tamp); } else + { haveTimestamp = false; + } return true; } -bool MessageReceiver::proc_Submsg_InfoDST(CDRMessage_t* msg,SubmessageHeader_t* smh) +bool MessageReceiver::proc_Submsg_InfoDST( + CDRMessage_t* msg, + SubmessageHeader_t* smh) { bool endiannessFlag = smh->flags & BIT(0) ? true : false; //bool timeFlag = smh->flags & BIT(1) ? true : false; //Assign message endianness - if(endiannessFlag) + if (endiannessFlag) + { msg->msg_endian = LITTLEEND; + } else + { msg->msg_endian = BIGEND; + } GuidPrefix_t guidP; - CDRMessage::readData(msg,guidP.value,12); - if(guidP != c_GuidPrefix_Unknown) + CDRMessage::readData(msg, guidP.value, 12); + if (guidP != c_GuidPrefix_Unknown) { this->destGuidPrefix = guidP; - logInfo(RTPS_MSG_IN,IDSTRING"DST RTPSParticipant is now: "<< this->destGuidPrefix); + logInfo(RTPS_MSG_IN, IDSTRING "DST RTPSParticipant is now: " << this->destGuidPrefix); } return true; } -bool MessageReceiver::proc_Submsg_InfoSRC(CDRMessage_t* msg,SubmessageHeader_t* smh) +bool MessageReceiver::proc_Submsg_InfoSRC( + CDRMessage_t* msg, + SubmessageHeader_t* smh) { bool endiannessFlag = smh->flags & BIT(0) ? true : false; //bool timeFlag = smh->flags & BIT(1) ? true : false; //Assign message endianness - if(endiannessFlag) + if (endiannessFlag) + { msg->msg_endian = LITTLEEND; + } else + { msg->msg_endian = BIGEND; - if(smh->submessageLength == 20) + } + if (smh->submessageLength == 20) { //AVOID FIRST 4 BYTES: - msg->pos+=4; - CDRMessage::readOctet(msg,&this->sourceVersion.m_major); - CDRMessage::readOctet(msg,&this->sourceVersion.m_minor); - CDRMessage::readData(msg,&this->sourceVendorId[0],2); - CDRMessage::readData(msg,this->sourceGuidPrefix.value,12); - logInfo(RTPS_MSG_IN,IDSTRING"SRC RTPSParticipant is now: "<sourceGuidPrefix); + msg->pos += 4; + CDRMessage::readOctet(msg, &this->sourceVersion.m_major); + CDRMessage::readOctet(msg, &this->sourceVersion.m_minor); + CDRMessage::readData(msg, &this->sourceVendorId[0], 2); + CDRMessage::readData(msg, this->sourceGuidPrefix.value, 12); + logInfo(RTPS_MSG_IN, IDSTRING "SRC RTPSParticipant is now: " << this->sourceGuidPrefix); return true; } return false; } -bool MessageReceiver::proc_Submsg_NackFrag(CDRMessage_t*msg, SubmessageHeader_t* smh) { +bool MessageReceiver::proc_Submsg_NackFrag( + CDRMessage_t* msg, + SubmessageHeader_t* smh) +{ bool endiannessFlag = smh->flags & BIT(0) ? true : false; //Assign message endianness if (endiannessFlag) + { msg->msg_endian = LITTLEEND; + } else + { msg->msg_endian = BIGEND; + } GUID_t readerGUID, writerGUID; readerGUID.guidPrefix = sourceGuidPrefix; @@ -1014,24 +1116,31 @@ bool MessageReceiver::proc_Submsg_NackFrag(CDRMessage_t*msg, SubmessageHeader_t* { if (!result) { - logInfo(RTPS_MSG_IN, IDSTRING"Acknack msg to NOT stateful writer "); + logInfo(RTPS_MSG_IN, IDSTRING "Acknack msg to NOT stateful writer "); } return result; } } - logInfo(RTPS_MSG_IN, IDSTRING"Acknack msg to UNKNOWN writer (I looked through " + logInfo(RTPS_MSG_IN, IDSTRING "Acknack msg to UNKNOWN writer (I looked through " << AssociatedWriters.size() << " writers in this ListenResource)"); return false; } -bool MessageReceiver::proc_Submsg_HeartbeatFrag(CDRMessage_t*msg, SubmessageHeader_t* smh) { +bool MessageReceiver::proc_Submsg_HeartbeatFrag( + CDRMessage_t* msg, + SubmessageHeader_t* smh) +{ bool endiannessFlag = smh->flags & BIT(0) ? true : false; //Assign message endianness if (endiannessFlag) + { msg->msg_endian = LITTLEEND; + } else + { msg->msg_endian = BIGEND; + } GUID_t readerGUID, writerGUID; readerGUID.guidPrefix = destGuidPrefix; @@ -1052,20 +1161,19 @@ bool MessageReceiver::proc_Submsg_HeartbeatFrag(CDRMessage_t*msg, SubmessageHead //Look for the correct reader and writers: /* XXX TODO - std::lock_guard guard(mtx); - for (std::vector::iterator it = AssociatedReaders.begin(); + std::lock_guard guard(mtx); + for (std::vector::iterator it = AssociatedReaders.begin(); it != AssociatedReaders.end(); ++it) - { + { if ((*it)->acceptMsgDirectedTo(readerGUID.entityId)) { (*it)->processHeartbeatMsg(writerGUID, HBCount, firstSN, lastSN, finalFlag, livelinessFlag); } - } - */ + } + */ return true; } - -} } /* namespace rtps */ +} /* namespace fastrtps */ } /* namespace eprosima */ From 4616a94311aefb23f18ce10a3c7b7bb1aa2d918b Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Thu, 30 Jan 2020 10:48:48 +0100 Subject: [PATCH 2/7] Refs #7478. Improvements and cleanup --- .../fastrtps/rtps/messages/MessageReceiver.h | 98 +++-- src/cpp/rtps/messages/MessageReceiver.cpp | 344 +++++++++--------- 2 files changed, 203 insertions(+), 239 deletions(-) diff --git a/include/fastrtps/rtps/messages/MessageReceiver.h b/include/fastrtps/rtps/messages/MessageReceiver.h index cbc9148c601..273de83b973 100644 --- a/include/fastrtps/rtps/messages/MessageReceiver.h +++ b/include/fastrtps/rtps/messages/MessageReceiver.h @@ -54,18 +54,6 @@ class MessageReceiver virtual ~MessageReceiver(); - //! Reset the MessageReceiver to process a new message. - void reset(); - - /** - * Init MessageReceiver. Does what the constructor used to do. - * This is now on an independent function since MessageReceiver now stands inside - * a struct. - * @param rec_buffer_size - */ - void init( - uint32_t rec_buffer_size); - /** * Process a new CDR message. * @param[in] loc Locator indicating the sending address. @@ -83,42 +71,48 @@ class MessageReceiver private: - std::mutex mtx; - std::vector AssociatedWriters; - std::unordered_map > AssociatedReaders; + std::mutex mtx_; + std::vector associated_writers_; + std::unordered_map > associated_readers_; + + RTPSParticipantImpl* participant_; + //!Protocol version of the message - ProtocolVersion_t sourceVersion; + ProtocolVersion_t source_version_; //!VendorID that created the message - VendorId_t sourceVendorId; + VendorId_t source_vendor_id_; //!GuidPrefix of the entity that created the message - GuidPrefix_t sourceGuidPrefix; + GuidPrefix_t source_guid_prefix_; //!GuidPrefix of the entity that receives the message. GuidPrefix of the RTPSParticipant. - GuidPrefix_t destGuidPrefix; + GuidPrefix_t dest_guid_prefix_; //!Has the message timestamp? - bool haveTimestamp; + bool have_timestamp_; //!Timestamp associated with the message - Time_t timestamp; - //!Version of the protocol used by the receiving end. - ProtocolVersion_t destVersion; - - uint16_t mMaxPayload_; + Time_t timestamp_; #if HAVE_SECURITY - CDRMessage_t m_crypto_msg; + CDRMessage_t crypto_msg_; #endif - /**@name Processing methods. - * These methods are designed to read a part of the message - * and perform the corresponding actions: - * -Modify the message receiver state if necessary. - * -Add information to the history. - * -Return an error if the message is malformed. - * @param[in] msg Pointer to the message - * @param[out] params Different parameters depending on the message - * @return True if correct, false otherwise + //! Reset the MessageReceiver to process a new message. + void reset(); + + /** + * Find if there is a reader (in associated_readers_) that will accept a msg directed + * to the given entity ID. */ + bool willAReaderAcceptMsgDirectedTo( + const EntityId_t& readerID); + + /** + * Find all readers (in associated_readers_), with the given entity ID, and call the + * callback provided. + */ + template + void findAllReaders( + const EntityId_t& readerID, + const Functor& callback); - ///@{ /** * Check the RTPSHeader of a received message. * @param msg Pointer to the message. @@ -137,20 +131,17 @@ class MessageReceiver SubmessageHeader_t* smh); /** - * Find if there is a reader (in AssociatedReaders) that will accept a msg directed - * to the given entity ID. - */ - bool willAReaderAcceptMsgDirectedTo( - const EntityId_t& readerID); - - /** - * Find all readers (in AssociatedReaders), with the given entity ID, and call the - * callback provided. + * @name Processing methods. + * These methods are designed to read a part of the message + * and perform the corresponding actions: + * -Modify the message receiver state if necessary. + * -Add information to the history. + * -Return an error if the message is malformed. + * @param[in,out] msg Pointer to the message + * @param[in] smh Pointer to the submessage header + * @return True if correct, false otherwise */ - template - void findAllReaders( - const EntityId_t& readerID, - const Functor& callback); + ///@{ /** * @@ -188,14 +179,7 @@ class MessageReceiver bool proc_Submsg_HeartbeatFrag( CDRMessage_t* msg, SubmessageHeader_t* smh); - bool proc_Submsg_SecureMessage( - CDRMessage_t* msg, - SubmessageHeader_t* smh); - bool proc_Submsg_SecureSubMessage( - CDRMessage_t* msg, - SubmessageHeader_t* smh); - - RTPSParticipantImpl* participant_; + ///@} }; } /* namespace rtps */ diff --git a/src/cpp/rtps/messages/MessageReceiver.cpp b/src/cpp/rtps/messages/MessageReceiver.cpp index 42729688f9f..c3f74a3d625 100644 --- a/src/cpp/rtps/messages/MessageReceiver.cpp +++ b/src/cpp/rtps/messages/MessageReceiver.cpp @@ -42,70 +42,56 @@ namespace rtps { MessageReceiver::MessageReceiver( RTPSParticipantImpl* participant, uint32_t rec_buffer_size) - : sourceVendorId(c_VendorId_Unknown) - , participant_(participant) + : participant_(participant) + , source_version_(c_ProtocolVersion) + , source_vendor_id_(c_VendorId_Unknown) + , source_guid_prefix_(c_GuidPrefix_Unknown) + , dest_guid_prefix_(c_GuidPrefix_Unknown) + , have_timestamp_(false) + , timestamp_(c_TimeInvalid) #if HAVE_SECURITY - , m_crypto_msg(rec_buffer_size) + , crypto_msg_(rec_buffer_size) #endif { - init(rec_buffer_size); -} - -void MessageReceiver::init( - uint32_t rec_buffer_size) -{ - destVersion = c_ProtocolVersion; - sourceVersion = c_ProtocolVersion; - sourceVendorId = c_VendorId_Unknown; - sourceGuidPrefix = c_GuidPrefix_Unknown; - destGuidPrefix = c_GuidPrefix_Unknown; - haveTimestamp = false; - timestamp = c_TimeInvalid; - + (void)rec_buffer_size; logInfo(RTPS_MSG_IN, "Created with CDRMessage of size: " << rec_buffer_size); - mMaxPayload_ = - ((uint32_t)std::numeric_limits::max() < - rec_buffer_size) ? std::numeric_limits::max() : (uint16_t)rec_buffer_size; } MessageReceiver::~MessageReceiver() { logInfo(RTPS_MSG_IN, ""); - assert(AssociatedWriters.size() == 0); - assert(AssociatedReaders.size() == 0); + assert(associated_writers_.size() == 0); + assert(associated_readers_.size() == 0); } void MessageReceiver::associateEndpoint( Endpoint* to_add) { - bool found = false; - std::lock_guard guard(mtx); + std::lock_guard guard(mtx_); if (to_add->getAttributes().endpointKind == WRITER) { - for (auto it = AssociatedWriters.begin(); it != AssociatedWriters.end(); ++it) + const auto writer = static_cast(to_add); + for (const auto& it : associated_writers_) { - if ( (*it) == (RTPSWriter*)to_add ) + if (it == writer) { - found = true; - break; + return; } } - if (!found) - { - AssociatedWriters.push_back((RTPSWriter*)to_add); - } + + associated_writers_.push_back(writer); } else { const auto reader = static_cast(to_add); const auto entityId = reader->getGuid().entityId; // search for set of readers by entity ID - const auto readers = AssociatedReaders.find(entityId); - if (readers == AssociatedReaders.end()) + const auto readers = associated_readers_.find(entityId); + if (readers == associated_readers_.end()) { auto vec = std::vector(); vec.push_back(reader); - AssociatedReaders.emplace(entityId, vec); + associated_readers_.emplace(entityId, vec); } else { @@ -113,42 +99,38 @@ void MessageReceiver::associateEndpoint( { if (it == reader) { - found = true; - break; + return; } } - if (!found) - { - readers->second.push_back(reader); - } + + readers->second.push_back(reader); } } - return; } void MessageReceiver::removeEndpoint( Endpoint* to_remove) { + std::lock_guard guard(mtx_); - std::lock_guard guard(mtx); if (to_remove->getAttributes().endpointKind == WRITER) { - RTPSWriter* var = (RTPSWriter*)to_remove; - for (auto it = AssociatedWriters.begin(); it != AssociatedWriters.end(); ++it) + RTPSWriter* var = static_cast(to_remove); + for (auto it = associated_writers_.begin(); it != associated_writers_.end(); ++it) { - if ((*it) == var) + if (*it == var) { - AssociatedWriters.erase(it); + associated_writers_.erase(it); break; } } } else { - auto readers = AssociatedReaders.find(to_remove->getGuid().entityId); - if (readers != AssociatedReaders.end()) + auto readers = associated_readers_.find(to_remove->getGuid().entityId); + if (readers != associated_readers_.end()) { - RTPSReader* var = (RTPSReader*)to_remove; + RTPSReader* var = static_cast(to_remove); for (auto it = readers->second.begin(); it != readers->second.end(); ++it) { if (*it == var) @@ -156,25 +138,23 @@ void MessageReceiver::removeEndpoint( readers->second.erase(it); if (readers->second.empty()) { - AssociatedReaders.erase(readers); + associated_readers_.erase(readers); } break; } } } } - return; } void MessageReceiver::reset() { - destVersion = c_ProtocolVersion; - sourceVersion = c_ProtocolVersion; - sourceVendorId = c_VendorId_Unknown; - sourceGuidPrefix = c_GuidPrefix_Unknown; - destGuidPrefix = c_GuidPrefix_Unknown; - haveTimestamp = false; - timestamp = c_TimeInvalid; + source_version_ = c_ProtocolVersion; + source_vendor_id_ = c_VendorId_Unknown; + source_guid_prefix_ = c_GuidPrefix_Unknown; + dest_guid_prefix_ = c_GuidPrefix_Unknown; + have_timestamp_ = false; + timestamp_ = c_TimeInvalid; } void MessageReceiver::processCDRMsg( @@ -189,10 +169,10 @@ void MessageReceiver::processCDRMsg( return; } - this->reset(); + reset(); GuidPrefix_t participantGuidPrefix = participant_->getGuid().guidPrefix; - destGuidPrefix = participantGuidPrefix; + dest_guid_prefix_ = participantGuidPrefix; msg->pos = 0; //Start reading at 0 @@ -203,10 +183,11 @@ void MessageReceiver::processCDRMsg( } #if HAVE_SECURITY - CDRMessage_t* auxiliary_buffer = &m_crypto_msg; + security::SecurityManager& security = participant_->security_manager(); + CDRMessage_t* auxiliary_buffer = &crypto_msg_; CDRMessage::initCDRMsg(auxiliary_buffer); - int decode_ret = participant_->security_manager().decode_rtps_message(*msg, *auxiliary_buffer, sourceGuidPrefix); + int decode_ret = security.decode_rtps_message(*msg, *auxiliary_buffer, source_guid_prefix_); if (decode_ret < 0) { @@ -223,7 +204,6 @@ void MessageReceiver::processCDRMsg( bool valid; int count = 0; SubmessageHeader_t submsgh; //Current submessage header - //Pointers to different types of messages: while (msg->pos < msg->length)// end of the message { @@ -231,7 +211,7 @@ void MessageReceiver::processCDRMsg( #if HAVE_SECURITY CDRMessage::initCDRMsg(auxiliary_buffer); - decode_ret = participant_->security_manager().decode_rtps_submessage(*msg, *auxiliary_buffer, sourceGuidPrefix); + decode_ret = security.decode_rtps_submessage(*msg, *auxiliary_buffer, source_guid_prefix_); if (decode_ret < 0) { @@ -257,7 +237,7 @@ void MessageReceiver::processCDRMsg( { case DATA: { - if (this->destGuidPrefix != participantGuidPrefix) + if (dest_guid_prefix_ != participantGuidPrefix) { logInfo(RTPS_MSG_IN, IDSTRING "Data Submsg ignored, DST is another RTPSParticipant"); } @@ -269,7 +249,7 @@ void MessageReceiver::processCDRMsg( break; } case DATA_FRAG: - if (this->destGuidPrefix != participantGuidPrefix) + if (dest_guid_prefix_ != participantGuidPrefix) { logInfo(RTPS_MSG_IN, IDSTRING "DataFrag Submsg ignored, DST is another RTPSParticipant"); } @@ -281,7 +261,7 @@ void MessageReceiver::processCDRMsg( break; case GAP: { - if (this->destGuidPrefix != participantGuidPrefix) + if (dest_guid_prefix_ != participantGuidPrefix) { logInfo(RTPS_MSG_IN, IDSTRING "Gap Submsg ignored, DST is another RTPSParticipant..."); } @@ -294,7 +274,7 @@ void MessageReceiver::processCDRMsg( } case ACKNACK: { - if (this->destGuidPrefix != participantGuidPrefix) + if (dest_guid_prefix_ != participantGuidPrefix) { logInfo(RTPS_MSG_IN, IDSTRING "Acknack Submsg ignored, DST is another RTPSParticipant..."); } @@ -307,7 +287,7 @@ void MessageReceiver::processCDRMsg( } case NACK_FRAG: { - if (this->destGuidPrefix != participantGuidPrefix) + if (dest_guid_prefix_ != participantGuidPrefix) { logInfo(RTPS_MSG_IN, IDSTRING "NackFrag Submsg ignored, DST is another RTPSParticipant..."); } @@ -320,7 +300,7 @@ void MessageReceiver::processCDRMsg( } case HEARTBEAT: { - if (this->destGuidPrefix != participantGuidPrefix) + if (dest_guid_prefix_ != participantGuidPrefix) { logInfo(RTPS_MSG_IN, IDSTRING "HB Submsg ignored, DST is another RTPSParticipant..."); } @@ -333,7 +313,7 @@ void MessageReceiver::processCDRMsg( } case HEARTBEAT_FRAG: { - if (this->destGuidPrefix != participantGuidPrefix) + if (dest_guid_prefix_ != participantGuidPrefix) { logInfo(RTPS_MSG_IN, IDSTRING "HBFrag Submsg ignored, DST is another RTPSParticipant..."); } @@ -377,14 +357,13 @@ void MessageReceiver::processCDRMsg( submessage->pos = next_msg_pos; } - - participant_->assert_remote_participant_liveliness(sourceGuidPrefix); + participant_->assert_remote_participant_liveliness(source_guid_prefix_); } bool MessageReceiver::checkRTPSHeader( - CDRMessage_t* msg) //check and proccess the RTPS Header + CDRMessage_t* msg) { - + //check and proccess the RTPS Header if (msg->buffer[0] != 'R' || msg->buffer[1] != 'T' || msg->buffer[2] != 'P' || msg->buffer[3] != 'S') { @@ -395,10 +374,12 @@ bool MessageReceiver::checkRTPSHeader( msg->pos += 4; //CHECK AND SET protocol version - if (msg->buffer[msg->pos] <= destVersion.m_major) + if (msg->buffer[msg->pos] <= c_ProtocolVersion.m_major) { - sourceVersion.m_major = msg->buffer[msg->pos]; msg->pos++; - sourceVersion.m_minor = msg->buffer[msg->pos]; msg->pos++; + source_version_.m_major = msg->buffer[msg->pos]; + msg->pos++; + source_version_.m_minor = msg->buffer[msg->pos]; + msg->pos++; } else { @@ -407,12 +388,14 @@ bool MessageReceiver::checkRTPSHeader( } //Set source vendor id - sourceVendorId[0] = msg->buffer[msg->pos]; msg->pos++; - sourceVendorId[1] = msg->buffer[msg->pos]; msg->pos++; + source_vendor_id_[0] = msg->buffer[msg->pos]; + msg->pos++; + source_vendor_id_[1] = msg->buffer[msg->pos]; + msg->pos++; //set source guid prefix - memcpy(sourceGuidPrefix.value, &msg->buffer[msg->pos], 12); + memcpy(source_guid_prefix_.value, &msg->buffer[msg->pos], 12); msg->pos += 12; - haveTimestamp = false; + have_timestamp_ = false; return true; } @@ -425,17 +408,20 @@ bool MessageReceiver::readSubmessageHeader( logWarning(RTPS_MSG_IN, IDSTRING "SubmessageHeader too short"); return false; } - smh->submessageId = msg->buffer[msg->pos]; msg->pos++; - smh->flags = msg->buffer[msg->pos]; msg->pos++; + + smh->submessageId = msg->buffer[msg->pos]; + msg->pos++; + smh->flags = msg->buffer[msg->pos]; + msg->pos++; + //Set endianness of message msg->msg_endian = smh->flags & BIT(0) ? LITTLEEND : BIGEND; uint16_t length = 0; CDRMessage::readUInt16(msg, &length); if (msg->pos + length > msg->length) { - logWarning(RTPS_MSG_IN, IDSTRING "SubMsg of invalid length (" << length - << ") with current msg position/length (" << msg->pos << "/" << msg->length << - ")"); + logWarning(RTPS_MSG_IN, IDSTRING "SubMsg of invalid length (" << length << + ") with current msg position/length (" << msg->pos << "/" << msg->length << ")"); return false; } @@ -457,7 +443,7 @@ bool MessageReceiver::readSubmessageHeader( bool MessageReceiver::willAReaderAcceptMsgDirectedTo( const EntityId_t& readerID) { - if (AssociatedReaders.empty()) + if (associated_readers_.empty()) { logWarning(RTPS_MSG_IN, IDSTRING "Data received when NO readers are listening"); return false; @@ -465,15 +451,15 @@ bool MessageReceiver::willAReaderAcceptMsgDirectedTo( if (readerID != c_EntityId_Unknown) { - const auto readers = AssociatedReaders.find(readerID); - if (readers != AssociatedReaders.end()) + const auto readers = associated_readers_.find(readerID); + if (readers != associated_readers_.end()) { return true; } } else { - for (const auto& readers : AssociatedReaders) + for (const auto& readers : associated_readers_) { if (readers.second.empty()) { @@ -501,8 +487,8 @@ void MessageReceiver::findAllReaders( { if (readerID != c_EntityId_Unknown) { - const auto readers = AssociatedReaders.find(readerID); - if (readers != AssociatedReaders.end()) + const auto readers = associated_readers_.find(readerID); + if (readers != associated_readers_.end()) { for (const auto& it : readers->second) { @@ -512,7 +498,7 @@ void MessageReceiver::findAllReaders( } else { - for (const auto& readers : AssociatedReaders) + for (const auto& readers : associated_readers_) { for (const auto& it : readers.second) { @@ -529,7 +515,7 @@ bool MessageReceiver::proc_Submsg_Data( CDRMessage_t* msg, SubmessageHeader_t* smh) { - std::lock_guard guard(mtx); + std::lock_guard guard(mtx_); //READ and PROCESS if (smh->submessageLength < RTPSMESSAGE_DATA_MIN_LENGTH) @@ -579,8 +565,7 @@ bool MessageReceiver::proc_Submsg_Data( //We ask the reader for a cachechange to store the information. CacheChange_t ch; ch.kind = ALIVE; - ch.serializedPayload.max_size = mMaxPayload_; - ch.writerGUID.guidPrefix = sourceGuidPrefix; + ch.writerGUID.guidPrefix = source_guid_prefix_; valid &= CDRMessage::readEntityId(msg, &ch.writerGUID.entityId); //Get sequence number @@ -591,7 +576,7 @@ bool MessageReceiver::proc_Submsg_Data( return false; } - if (ch.sequenceNumber <= SequenceNumber_t(0, 0) || (ch.sequenceNumber.high == -1 && ch.sequenceNumber.low == 0)) //message invalid //TODO make faster + if (ch.sequenceNumber <= SequenceNumber_t()) { logWarning(RTPS_MSG_IN, IDSTRING "Invalid message received, bad sequence Number"); return false; @@ -618,7 +603,6 @@ bool MessageReceiver::proc_Submsg_Data( logInfo(RTPS_MSG_IN, IDSTRING "SubMessage Data ERROR, Inline Qos ParameterList error"); return false; } - } if (dataFlag || keyFlag) @@ -629,16 +613,18 @@ bool MessageReceiver::proc_Submsg_Data( if (dataFlag) { - if (ch.serializedPayload.max_size >= payload_size && payload_size > 0) + uint32_t next_pos = msg->pos + payload_size; + if (msg->length >= next_pos && payload_size > 0) { ch.serializedPayload.data = &msg->buffer[msg->pos]; ch.serializedPayload.length = payload_size; - msg->pos += payload_size; + ch.serializedPayload.max_size = payload_size; + msg->pos = next_pos; } else { logWarning(RTPS_MSG_IN, IDSTRING "Serialized Payload value invalid or larger than maximum allowed size" - "(" << payload_size << "/" << ch.serializedPayload.max_size << ")"); + "(" << payload_size << "/" << (msg->length - msg->pos) << ")"); return false; } } @@ -656,29 +642,28 @@ bool MessageReceiver::proc_Submsg_Data( } else { - logWarning(RTPS_MSG_IN, IDSTRING "Ignoring Serialized Payload for too large key-only data" - "(" << payload_size << ")"); + logWarning(RTPS_MSG_IN, IDSTRING "Ignoring Serialized Payload for too large key-only data (" << + payload_size << ")"); } msg->pos += payload_size; } } // Set sourcetimestamp - if (haveTimestamp) + if (have_timestamp_) { - ch.sourceTimestamp = this->timestamp; + ch.sourceTimestamp = timestamp_; } + logInfo(RTPS_MSG_IN, IDSTRING "from Writer " << ch.writerGUID << "; possible RTPSReader entities: " << + associated_readers_.size()); - //FIXME: DO SOMETHING WITH PARAMETERLIST CREATED. - logInfo(RTPS_MSG_IN, IDSTRING "from Writer " << ch.writerGUID << "; possible RTPSReader entities: " - << AssociatedReaders.size()); //Look for the correct reader to add the change - findAllReaders(readerID, [ - &ch - ] (RTPSReader* reader) { - reader->processDataMsg(&ch); - }); + findAllReaders(readerID, + [&ch] (RTPSReader* reader) + { + reader->processDataMsg(&ch); + }); //TODO(Ricardo) If a exception is thrown (ex, by fastcdr), this line is not executed -> segmentation fault ch.serializedPayload.data = nullptr; @@ -691,7 +676,7 @@ bool MessageReceiver::proc_Submsg_DataFrag( CDRMessage_t* msg, SubmessageHeader_t* smh) { - std::lock_guard guard(mtx); + std::lock_guard guard(mtx_); //READ and PROCESS if (smh->submessageLength < RTPSMESSAGE_DATA_MIN_LENGTH) @@ -735,8 +720,7 @@ bool MessageReceiver::proc_Submsg_DataFrag( //FOUND THE READER. //We ask the reader for a cachechange to store the information. CacheChange_t ch; - ch.serializedPayload.max_size = mMaxPayload_; - ch.writerGUID.guidPrefix = sourceGuidPrefix; + ch.writerGUID.guidPrefix = source_guid_prefix_; valid &= CDRMessage::readEntityId(msg, &ch.writerGUID.entityId); //Get sequence number @@ -799,21 +783,21 @@ bool MessageReceiver::proc_Submsg_DataFrag( if (!keyFlag) { - if (ch.serializedPayload.max_size >= payload_size && payload_size > 0) + uint32_t next_pos = msg->pos + payload_size; + if (msg->length >= next_pos && payload_size > 0) { - ch.serializedPayload.length = payload_size; - - ch.setFragmentSize(fragmentSize); + ch.kind = ALIVE; ch.serializedPayload.data = &msg->buffer[msg->pos]; ch.serializedPayload.length = payload_size; - msg->pos += payload_size; + ch.serializedPayload.max_size = payload_size; + ch.setFragmentSize(fragmentSize); - ch.kind = ALIVE; + msg->pos = next_pos; } else { logWarning(RTPS_MSG_IN, IDSTRING "Serialized Payload value invalid or larger than maximum allowed size " - "(" << payload_size << "/" << ch.serializedPayload.max_size << ")"); + "(" << payload_size << "/" << (msg->length - msg->pos) << ")"); return false; } } @@ -841,20 +825,21 @@ bool MessageReceiver::proc_Submsg_DataFrag( } // Set sourcetimestamp - if (haveTimestamp) + if (have_timestamp_) { - ch.sourceTimestamp = this->timestamp; + ch.sourceTimestamp = timestamp_; } //FIXME: DO SOMETHING WITH PARAMETERLIST CREATED. - logInfo(RTPS_MSG_IN, IDSTRING "from Writer " << ch.writerGUID << "; possible RTPSReader entities: " - << AssociatedReaders.size()); + logInfo(RTPS_MSG_IN, IDSTRING "from Writer " << ch.writerGUID << "; possible RTPSReader entities: " << + associated_readers_.size()); + //Look for the correct reader to add the change - findAllReaders(readerID, [ - &ch, sampleSize, fragmentStartingNum, fragmentsInSubmessage - ] (RTPSReader* reader) { - reader->processDataFragMsg(&ch, sampleSize, fragmentStartingNum, fragmentsInSubmessage); - }); + findAllReaders(readerID, + [&ch, sampleSize, fragmentStartingNum, fragmentsInSubmessage] (RTPSReader* reader) + { + reader->processDataFragMsg(&ch, sampleSize, fragmentStartingNum, fragmentsInSubmessage); + }); ch.serializedPayload.data = nullptr; @@ -881,9 +866,9 @@ bool MessageReceiver::proc_Submsg_Heartbeat( } GUID_t readerGUID, writerGUID; - readerGUID.guidPrefix = destGuidPrefix; + readerGUID.guidPrefix = dest_guid_prefix_; CDRMessage::readEntityId(msg, &readerGUID.entityId); - writerGUID.guidPrefix = sourceGuidPrefix; + writerGUID.guidPrefix = source_guid_prefix_; CDRMessage::readEntityId(msg, &writerGUID.entityId); SequenceNumber_t firstSN, lastSN; CDRMessage::readSequenceNumber(msg, &firstSN); @@ -897,13 +882,13 @@ bool MessageReceiver::proc_Submsg_Heartbeat( uint32_t HBCount; CDRMessage::readUInt32(msg, &HBCount); - std::lock_guard guard(mtx); + std::lock_guard guard(mtx_); //Look for the correct reader and writers: - findAllReaders(readerGUID.entityId, [ - &writerGUID, &HBCount, &firstSN, &lastSN, finalFlag, livelinessFlag - ] (RTPSReader* reader) { - reader->processHeartbeatMsg(writerGUID, HBCount, firstSN, lastSN, finalFlag, livelinessFlag); - }); + findAllReaders(readerGUID.entityId, + [&writerGUID, &HBCount, &firstSN, &lastSN, finalFlag, livelinessFlag] (RTPSReader* reader) + { + reader->processHeartbeatMsg(writerGUID, HBCount, firstSN, lastSN, finalFlag, livelinessFlag); + }); return true; } @@ -924,9 +909,9 @@ bool MessageReceiver::proc_Submsg_Acknack( msg->msg_endian = BIGEND; } GUID_t readerGUID, writerGUID; - readerGUID.guidPrefix = sourceGuidPrefix; + readerGUID.guidPrefix = source_guid_prefix_; CDRMessage::readEntityId(msg, &readerGUID.entityId); - writerGUID.guidPrefix = destGuidPrefix; + writerGUID.guidPrefix = dest_guid_prefix_; CDRMessage::readEntityId(msg, &writerGUID.entityId); @@ -934,13 +919,12 @@ bool MessageReceiver::proc_Submsg_Acknack( uint32_t Ackcount; CDRMessage::readUInt32(msg, &Ackcount); - std::lock_guard guard(mtx); + std::lock_guard guard(mtx_); //Look for the correct writer to use the acknack - for (std::vector::iterator it = AssociatedWriters.begin(); - it != AssociatedWriters.end(); ++it) + for (RTPSWriter* it : associated_writers_) { bool result; - if ((*it)->process_acknack(writerGUID, readerGUID, Ackcount, SNSet, finalFlag, result)) + if (it->process_acknack(writerGUID, readerGUID, Ackcount, SNSet, finalFlag, result)) { if (!result) { @@ -950,7 +934,7 @@ bool MessageReceiver::proc_Submsg_Acknack( } } logInfo(RTPS_MSG_IN, IDSTRING "Acknack msg to UNKNOWN writer (I loooked through " - << AssociatedWriters.size() << " writers in this ListenResource)"); + << associated_writers_.size() << " writers in this ListenResource)"); return false; } @@ -970,9 +954,9 @@ bool MessageReceiver::proc_Submsg_Gap( } GUID_t writerGUID, readerGUID; - readerGUID.guidPrefix = destGuidPrefix; + readerGUID.guidPrefix = dest_guid_prefix_; CDRMessage::readEntityId(msg, &readerGUID.entityId); - writerGUID.guidPrefix = sourceGuidPrefix; + writerGUID.guidPrefix = source_guid_prefix_; CDRMessage::readEntityId(msg, &writerGUID.entityId); SequenceNumber_t gapStart; CDRMessage::readSequenceNumber(msg, &gapStart); @@ -982,12 +966,12 @@ bool MessageReceiver::proc_Submsg_Gap( return false; } - std::lock_guard guard(mtx); - findAllReaders(readerGUID.entityId, [ - &writerGUID, &gapStart, &gapList - ] (RTPSReader* reader) { - reader->processGapMsg(writerGUID, gapStart, gapList); - }); + std::lock_guard guard(mtx_); + findAllReaders(readerGUID.entityId, + [&writerGUID, &gapStart, &gapList] (RTPSReader* reader) + { + reader->processGapMsg(writerGUID, gapStart, gapList); + }); return true; } @@ -1009,12 +993,12 @@ bool MessageReceiver::proc_Submsg_InfoTS( } if (!timeFlag) { - haveTimestamp = true; - CDRMessage::readTimestamp(msg, ×tamp); + have_timestamp_ = true; + CDRMessage::readTimestamp(msg, ×tamp_); } else { - haveTimestamp = false; + have_timestamp_ = false; } return true; @@ -1039,8 +1023,8 @@ bool MessageReceiver::proc_Submsg_InfoDST( CDRMessage::readData(msg, guidP.value, 12); if (guidP != c_GuidPrefix_Unknown) { - this->destGuidPrefix = guidP; - logInfo(RTPS_MSG_IN, IDSTRING "DST RTPSParticipant is now: " << this->destGuidPrefix); + dest_guid_prefix_ = guidP; + logInfo(RTPS_MSG_IN, IDSTRING "DST RTPSParticipant is now: " << dest_guid_prefix_); } return true; } @@ -1064,11 +1048,11 @@ bool MessageReceiver::proc_Submsg_InfoSRC( { //AVOID FIRST 4 BYTES: msg->pos += 4; - CDRMessage::readOctet(msg, &this->sourceVersion.m_major); - CDRMessage::readOctet(msg, &this->sourceVersion.m_minor); - CDRMessage::readData(msg, &this->sourceVendorId[0], 2); - CDRMessage::readData(msg, this->sourceGuidPrefix.value, 12); - logInfo(RTPS_MSG_IN, IDSTRING "SRC RTPSParticipant is now: " << this->sourceGuidPrefix); + CDRMessage::readOctet(msg, &source_version_.m_major); + CDRMessage::readOctet(msg, &source_version_.m_minor); + CDRMessage::readData(msg, &source_vendor_id_[0], 2); + CDRMessage::readData(msg, source_guid_prefix_.value, 12); + logInfo(RTPS_MSG_IN, IDSTRING "SRC RTPSParticipant is now: " << source_guid_prefix_); return true; } return false; @@ -1078,8 +1062,6 @@ bool MessageReceiver::proc_Submsg_NackFrag( CDRMessage_t* msg, SubmessageHeader_t* smh) { - - bool endiannessFlag = smh->flags & BIT(0) ? true : false; //Assign message endianness if (endiannessFlag) @@ -1092,9 +1074,9 @@ bool MessageReceiver::proc_Submsg_NackFrag( } GUID_t readerGUID, writerGUID; - readerGUID.guidPrefix = sourceGuidPrefix; + readerGUID.guidPrefix = source_guid_prefix_; CDRMessage::readEntityId(msg, &readerGUID.entityId); - writerGUID.guidPrefix = destGuidPrefix; + writerGUID.guidPrefix = dest_guid_prefix_; CDRMessage::readEntityId(msg, &writerGUID.entityId); SequenceNumber_t writerSN; @@ -1106,13 +1088,12 @@ bool MessageReceiver::proc_Submsg_NackFrag( uint32_t Ackcount; CDRMessage::readUInt32(msg, &Ackcount); - std::lock_guard guard(mtx); + std::lock_guard guard(mtx_); //Look for the correct writer to use the acknack - for (std::vector::iterator it = AssociatedWriters.begin(); - it != AssociatedWriters.end(); ++it) + for (RTPSWriter* it : associated_writers_) { bool result; - if ((*it)->process_nack_frag(writerGUID, readerGUID, Ackcount, writerSN, fnState, result)) + if (it->process_nack_frag(writerGUID, readerGUID, Ackcount, writerSN, fnState, result)) { if (!result) { @@ -1122,7 +1103,7 @@ bool MessageReceiver::proc_Submsg_NackFrag( } } logInfo(RTPS_MSG_IN, IDSTRING "Acknack msg to UNKNOWN writer (I looked through " - << AssociatedWriters.size() << " writers in this ListenResource)"); + << associated_writers_.size() << " writers in this ListenResource)"); return false; } @@ -1130,7 +1111,6 @@ bool MessageReceiver::proc_Submsg_HeartbeatFrag( CDRMessage_t* msg, SubmessageHeader_t* smh) { - bool endiannessFlag = smh->flags & BIT(0) ? true : false; //Assign message endianness if (endiannessFlag) @@ -1143,9 +1123,9 @@ bool MessageReceiver::proc_Submsg_HeartbeatFrag( } GUID_t readerGUID, writerGUID; - readerGUID.guidPrefix = destGuidPrefix; + readerGUID.guidPrefix = dest_guid_prefix_; CDRMessage::readEntityId(msg, &readerGUID.entityId); - writerGUID.guidPrefix = sourceGuidPrefix; + writerGUID.guidPrefix = source_guid_prefix_; CDRMessage::readEntityId(msg, &writerGUID.entityId); SequenceNumber_t writerSN; @@ -1161,9 +1141,9 @@ bool MessageReceiver::proc_Submsg_HeartbeatFrag( //Look for the correct reader and writers: /* XXX TODO - std::lock_guard guard(mtx); - for (std::vector::iterator it = AssociatedReaders.begin(); - it != AssociatedReaders.end(); ++it) + std::lock_guard guard(mtx_); + for (std::vector::iterator it = associated_readers_.begin(); + it != associated_readers_.end(); ++it) { if ((*it)->acceptMsgDirectedTo(readerGUID.entityId)) { From 0036c54f713d60495208c8131added3fda3839f8 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Thu, 30 Jan 2020 11:52:01 +0100 Subject: [PATCH 3/7] Refs #7478. Divide SecurityManager initialization in two phases. --- .../rtps/participant/RTPSParticipantImpl.cpp | 25 ++++++-- src/cpp/rtps/security/SecurityManager.cpp | 60 ++++++++++--------- src/cpp/rtps/security/SecurityManager.h | 5 +- .../security/SecurityInitializationTests.cpp | 18 ++++-- test/unittest/rtps/security/SecurityTests.cpp | 2 + 5 files changed, 70 insertions(+), 40 deletions(-) diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index 8ec77aa4cda..a56209010fd 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -264,12 +264,6 @@ RTPSParticipantImpl::RTPSParticipantImpl( m_att.defaultMulticastLocatorList.clear(); } - createReceiverResources(m_att.builtin.metatrafficMulticastLocatorList, true, false); - createReceiverResources(m_att.builtin.metatrafficUnicastLocatorList, true, false); - - createReceiverResources(m_att.defaultUnicastLocatorList, true, false); - createReceiverResources(m_att.defaultMulticastLocatorList, true, false); - bool allow_growing_buffers = m_att.allocation.send_buffers.dynamic; size_t num_send_buffers = m_att.allocation.send_buffers.preallocated_number; if (num_send_buffers == 0) @@ -295,9 +289,28 @@ RTPSParticipantImpl::RTPSParticipantImpl( } #endif + createReceiverResources(m_att.builtin.metatrafficMulticastLocatorList, true, false); + createReceiverResources(m_att.builtin.metatrafficUnicastLocatorList, true, false); + + createReceiverResources(m_att.defaultUnicastLocatorList, true, false); + createReceiverResources(m_att.defaultMulticastLocatorList, true, false); + // Allocate all pending send buffers send_buffers_->init(this); +#if HAVE_SECURITY + if (m_is_security_active) + { + m_is_security_active = m_security_manager.create_entities(); + if (!m_is_security_active) + { + // Participant will be deleted, no need to create builtin endpoints + m_security_manager_initialized = false; + return; + } + } +#endif + mp_builtinProtocols = new BuiltinProtocols(); logInfo(RTPS_PARTICIPANT, "RTPSParticipant \"" << m_att.getName() << "\" with guidPrefix: " << m_guid.guidPrefix); diff --git a/src/cpp/rtps/security/SecurityManager.cpp b/src/cpp/rtps/security/SecurityManager.cpp index eba7ca230d7..2e2f24a3ecd 100644 --- a/src/cpp/rtps/security/SecurityManager.cpp +++ b/src/cpp/rtps/security/SecurityManager.cpp @@ -277,36 +277,11 @@ bool SecurityManager::init( { // Should be activated here, to enable encription buffer on created entities security_activated = true; - - // Create RTPS entities - if (create_entities()) - { - logInfo(SECURITY, "Initialized security manager for participant " << participant_->getGuid()); - return true; - } - - // Deactivate security if there is an error while creating entities - security_activated = false; - } - - if (local_participant_crypto_handle_ != nullptr) - { - crypto_plugin_->cryptokeyfactory()->unregister_participant(local_participant_crypto_handle_, exception); - } - - if (crypto_plugin_ != nullptr) - { - delete crypto_plugin_; - crypto_plugin_ = nullptr; + return true; } - //TODO(Ricardo) Return local_permissions - - if (access_plugin_ != nullptr) - { - delete access_plugin_; - access_plugin_ = nullptr; - } + cancel_init(); + return false; } else { @@ -320,12 +295,38 @@ bool SecurityManager::init( } else { - logInfo(SECURITY, "Authentication plugin not configured. Security will be disable"); + logInfo(SECURITY, "Authentication plugin not configured. Security will be disabled"); } return true; } +void SecurityManager::cancel_init() +{ + SecurityException exception; + if (local_participant_crypto_handle_ != nullptr) + { + crypto_plugin_->cryptokeyfactory()->unregister_participant(local_participant_crypto_handle_, exception); + } + + if (crypto_plugin_ != nullptr) + { + delete crypto_plugin_; + crypto_plugin_ = nullptr; + } + + //TODO(Ricardo) Return local_permissions + + if (access_plugin_ != nullptr) + { + delete access_plugin_; + access_plugin_ = nullptr; + } + + delete authentication_plugin_; + authentication_plugin_ = nullptr; +} + void SecurityManager::destroy() { if (authentication_plugin_ != nullptr) @@ -878,6 +879,7 @@ bool SecurityManager::create_entities() delete_participant_stateless_message_entities(); } + cancel_init(); return false; } diff --git a/src/cpp/rtps/security/SecurityManager.h b/src/cpp/rtps/security/SecurityManager.h index 2340d04bfff..bfd11563aa7 100644 --- a/src/cpp/rtps/security/SecurityManager.h +++ b/src/cpp/rtps/security/SecurityManager.h @@ -70,6 +70,8 @@ class SecurityManager const PropertyPolicy& participant_properties, bool& security_activated); + bool create_entities(); + void destroy(); bool discovered_participant( @@ -416,6 +418,8 @@ class SecurityManager SecurityManager &manager_; } participant_volatile_message_secure_listener_; + void cancel_init(); + void remove_discovered_participant_info( DiscoveredParticipantInfo::AuthUniquePtr& auth_ptr); @@ -423,7 +427,6 @@ class SecurityManager const GUID_t& remote_participant_key, DiscoveredParticipantInfo::AuthUniquePtr& auth_ptr); - bool create_entities(); void delete_entities(); bool create_participant_stateless_message_entities(); void delete_participant_stateless_message_entities(); diff --git a/test/unittest/rtps/security/SecurityInitializationTests.cpp b/test/unittest/rtps/security/SecurityInitializationTests.cpp index 3c7a784124d..0de0ff890a6 100644 --- a/test/unittest/rtps/security/SecurityInitializationTests.cpp +++ b/test/unittest/rtps/security/SecurityInitializationTests.cpp @@ -25,6 +25,7 @@ TEST_F(SecurityTest, initialization_auth_nullptr) DefaultValue::Set(guid); ASSERT_TRUE(manager_.init(security_attributes_, participant_properties_, security_activated_)); + ASSERT_TRUE(!security_activated_ || manager_.create_entities()); } TEST_F(SecurityTest, initialization_auth_failed) @@ -65,7 +66,9 @@ TEST_F(SecurityTest, initialization_fail_participant_stateless_message_writer) EXPECT_CALL(participant_, createWriter_mock(_,_,_,_,_,_)).Times(1). WillOnce(Return(false)); - ASSERT_FALSE(manager_.init(security_attributes_, participant_properties_, security_activated_)); + ASSERT_TRUE(manager_.init(security_attributes_, participant_properties_, security_activated_)); + ASSERT_TRUE(security_activated_); + ASSERT_FALSE(manager_.create_entities()); } TEST_F(SecurityTest, initialization_fail_participant_stateless_message_reader) @@ -86,7 +89,9 @@ TEST_F(SecurityTest, initialization_fail_participant_stateless_message_reader) EXPECT_CALL(participant_, createReader_mock(_,_,_,_,_,_,_)).Times(1). WillOnce(Return(false)); - ASSERT_FALSE(manager_.init(security_attributes_, participant_properties_, security_activated_)); + ASSERT_TRUE(manager_.init(security_attributes_, participant_properties_, security_activated_)); + ASSERT_TRUE(security_activated_); + ASSERT_FALSE(manager_.create_entities()); } TEST_F(SecurityTest, initialization_fail_participant_volatile_message_writer) @@ -109,7 +114,9 @@ TEST_F(SecurityTest, initialization_fail_participant_volatile_message_writer) EXPECT_CALL(participant_, createReader_mock(_,_,_,_,_,_,_)).Times(1). WillOnce(DoAll(SetArgPointee<0>(stateless_reader), Return(true))); - ASSERT_FALSE(manager_.init(security_attributes_, participant_properties_, security_activated_)); + ASSERT_TRUE(manager_.init(security_attributes_, participant_properties_, security_activated_)); + ASSERT_TRUE(security_activated_); + ASSERT_FALSE(manager_.create_entities()); } TEST_F(SecurityTest, initialization_fail_participant_volatile_message_reader) @@ -134,7 +141,9 @@ TEST_F(SecurityTest, initialization_fail_participant_volatile_message_reader) WillOnce(DoAll(SetArgPointee<0>(stateless_reader), Return(true))). WillOnce(Return(false)); - ASSERT_FALSE(manager_.init(security_attributes_, participant_properties_, security_activated_)); + ASSERT_TRUE(manager_.init(security_attributes_, participant_properties_, security_activated_)); + ASSERT_TRUE(security_activated_); + ASSERT_FALSE(manager_.create_entities()); } TEST_F(SecurityTest, initialization_auth_retry) @@ -164,6 +173,7 @@ TEST_F(SecurityTest, initialization_auth_retry) WillOnce(Return(true)); ASSERT_TRUE(manager_.init(security_attributes_, participant_properties_, security_activated_)); + ASSERT_TRUE(!security_activated_ || manager_.create_entities()); } diff --git a/test/unittest/rtps/security/SecurityTests.cpp b/test/unittest/rtps/security/SecurityTests.cpp index 2652ab3da79..e23879e1366 100644 --- a/test/unittest/rtps/security/SecurityTests.cpp +++ b/test/unittest/rtps/security/SecurityTests.cpp @@ -38,6 +38,7 @@ void SecurityTest::initialization_ok() WillOnce(DoAll(SetArgPointee<0>(volatile_reader_), Return(true))); ASSERT_TRUE(manager_.init(security_attributes_, participant_properties_, security_activated_)); + ASSERT_TRUE(!security_activated_ || manager_.create_entities()); } void SecurityTest::initialization_auth_ok() @@ -58,6 +59,7 @@ void SecurityTest::initialization_auth_ok() WillOnce(DoAll(SetArgPointee<0>(stateless_reader_), Return(true))); ASSERT_TRUE(manager_.init(security_attributes_, participant_properties_, security_activated_)); + ASSERT_TRUE(!security_activated_ || manager_.create_entities()); } void SecurityTest::request_process_ok(CacheChange_t** request_message_change) From d374054dbc2059ad01fd146d88021010e98a3bbf Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Thu, 30 Jan 2020 11:53:08 +0100 Subject: [PATCH 4/7] Refs #7478. Not allocating memory for crypto message when participant is not secure --- src/cpp/rtps/messages/MessageReceiver.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cpp/rtps/messages/MessageReceiver.cpp b/src/cpp/rtps/messages/MessageReceiver.cpp index c3f74a3d625..54e9a22ce8c 100644 --- a/src/cpp/rtps/messages/MessageReceiver.cpp +++ b/src/cpp/rtps/messages/MessageReceiver.cpp @@ -50,7 +50,7 @@ MessageReceiver::MessageReceiver( , have_timestamp_(false) , timestamp_(c_TimeInvalid) #if HAVE_SECURITY - , crypto_msg_(rec_buffer_size) + , crypto_msg_(participant->is_secure() ? rec_buffer_size : 0) #endif { (void)rec_buffer_size; From af920082d0bd483e72e50748670c8263ae9640ba Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Thu, 30 Jan 2020 12:50:03 +0100 Subject: [PATCH 5/7] Refs #7478. clang-tidy. --- include/fastrtps/qos/ParameterTypes.h | 2 + include/fastrtps/rtps/common/Types.h | 30 +++--- src/cpp/rtps/messages/MessageReceiver.cpp | 99 ++++++++++--------- .../rtps/participant/RTPSParticipantImpl.h | 2 +- 4 files changed, 71 insertions(+), 62 deletions(-) diff --git a/include/fastrtps/qos/ParameterTypes.h b/include/fastrtps/qos/ParameterTypes.h index 2e4249675d3..49ba3ef8bfc 100644 --- a/include/fastrtps/qos/ParameterTypes.h +++ b/include/fastrtps/qos/ParameterTypes.h @@ -241,6 +241,8 @@ class ParameterKey_t : public Parameter_t }; #define PARAMETER_KEY_LENGTH 16 +#define PARAMETER_KEY_HASH_LENGTH 16 + /** * */ diff --git a/include/fastrtps/rtps/common/Types.h b/include/fastrtps/rtps/common/Types.h index 2a439d50d52..b1a4a7e847c 100644 --- a/include/fastrtps/rtps/common/Types.h +++ b/include/fastrtps/rtps/common/Types.h @@ -89,23 +89,23 @@ const Endianness_t DEFAULT_ENDIAN = BIGEND; const Endianness_t DEFAULT_ENDIAN = LITTLEEND; #endif -typedef unsigned char octet; +using octet = unsigned char; //typedef unsigned int uint; //typedef unsigned short ushort; -typedef unsigned char SubmessageFlag; -typedef uint32_t BuiltinEndpointSet_t; -typedef uint32_t Count_t; - -#define BIT0 0x1 -#define BIT1 0x2 -#define BIT2 0x4 -#define BIT3 0x8 -#define BIT4 0x10 -#define BIT5 0x20 -#define BIT6 0x40 -#define BIT7 0x80 - -#define BIT(i) ((i==0) ? BIT0 : (i==1) ? BIT1 :(i==2)?BIT2:(i==3)?BIT3:(i==4)?BIT4:(i==5)?BIT5:(i==6)?BIT6:(i==7)?BIT7:0x0) +using SubmessageFlag = unsigned char; +using BuiltinEndpointSet_t = uint32_t; +using Count_t = uint32_t; + +#define BIT0 0x1u +#define BIT1 0x2u +#define BIT2 0x4u +#define BIT3 0x8u +#define BIT4 0x10u +#define BIT5 0x20u +#define BIT6 0x40u +#define BIT7 0x80u + +#define BIT(i) (1U << static_cast(i)) //!@brief Structure ProtocolVersion_t, contains the protocol version. struct RTPS_DllAPI ProtocolVersion_t diff --git a/src/cpp/rtps/messages/MessageReceiver.cpp b/src/cpp/rtps/messages/MessageReceiver.cpp index 54e9a22ce8c..65bb2226490 100644 --- a/src/cpp/rtps/messages/MessageReceiver.cpp +++ b/src/cpp/rtps/messages/MessageReceiver.cpp @@ -31,9 +31,9 @@ #include #include -#define IDSTRING "(ID:" << std::this_thread::get_id() << ") " << +#define INFO_SRC_SUBMSG_LENGTH 20 -using namespace eprosima::fastrtps; +#define IDSTRING "(ID:" << std::this_thread::get_id() << ") " << namespace eprosima { namespace fastrtps { @@ -60,8 +60,8 @@ MessageReceiver::MessageReceiver( MessageReceiver::~MessageReceiver() { logInfo(RTPS_MSG_IN, ""); - assert(associated_writers_.size() == 0); - assert(associated_readers_.size() == 0); + assert(associated_writers_.empty()); + assert(associated_readers_.empty()); } void MessageReceiver::associateEndpoint( @@ -70,7 +70,7 @@ void MessageReceiver::associateEndpoint( std::lock_guard guard(mtx_); if (to_add->getAttributes().endpointKind == WRITER) { - const auto writer = static_cast(to_add); + const auto writer = dynamic_cast(to_add); for (const auto& it : associated_writers_) { if (it == writer) @@ -83,7 +83,7 @@ void MessageReceiver::associateEndpoint( } else { - const auto reader = static_cast(to_add); + const auto reader = dynamic_cast(to_add); const auto entityId = reader->getGuid().entityId; // search for set of readers by entity ID const auto readers = associated_readers_.find(entityId); @@ -115,7 +115,7 @@ void MessageReceiver::removeEndpoint( if (to_remove->getAttributes().endpointKind == WRITER) { - RTPSWriter* var = static_cast(to_remove); + auto* var = dynamic_cast(to_remove); for (auto it = associated_writers_.begin(); it != associated_writers_.end(); ++it) { if (*it == var) @@ -130,7 +130,7 @@ void MessageReceiver::removeEndpoint( auto readers = associated_readers_.find(to_remove->getGuid().entityId); if (readers != associated_readers_.end()) { - RTPSReader* var = static_cast(to_remove); + auto* var = dynamic_cast(to_remove); for (auto it = readers->second.begin(); it != readers->second.end(); ++it) { if (*it == var) @@ -193,7 +193,8 @@ void MessageReceiver::processCDRMsg( { return; } - else if (decode_ret == 0) + + if (decode_ret == 0) { // Swap std::swap(msg, auxiliary_buffer); @@ -217,7 +218,8 @@ void MessageReceiver::processCDRMsg( { return; } - else if (decode_ret == 0) + + if (decode_ret == 0) { submessage = auxiliary_buffer; } @@ -232,7 +234,7 @@ void MessageReceiver::processCDRMsg( valid = true; count++; uint32_t next_msg_pos = submessage->pos; - next_msg_pos += (submsgh.submessageLength + 3) & ~3; + next_msg_pos += (submsgh.submessageLength + 3u) & ~3u; switch (submsgh.submessageId) { case DATA: @@ -393,8 +395,7 @@ bool MessageReceiver::checkRTPSHeader( source_vendor_id_[1] = msg->buffer[msg->pos]; msg->pos++; //set source guid prefix - memcpy(source_guid_prefix_.value, &msg->buffer[msg->pos], 12); - msg->pos += 12; + CDRMessage::readData(msg, source_guid_prefix_.value, GuidPrefix_t::size); have_timestamp_ = false; return true; } @@ -415,7 +416,7 @@ bool MessageReceiver::readSubmessageHeader( msg->pos++; //Set endianness of message - msg->msg_endian = smh->flags & BIT(0) ? LITTLEEND : BIGEND; + msg->msg_endian = (smh->flags & BIT(0)) != 0 ? LITTLEEND : BIGEND; uint16_t length = 0; CDRMessage::readUInt16(msg, &length); if (msg->pos + length > msg->length) @@ -524,10 +525,10 @@ bool MessageReceiver::proc_Submsg_Data( return false; } //Fill flags bool values - bool endiannessFlag = smh->flags & BIT(0) ? true : false; - bool inlineQosFlag = smh->flags & BIT(1) ? true : false; - bool dataFlag = smh->flags & BIT(2) ? true : false; - bool keyFlag = smh->flags & BIT(3) ? true : false; + bool endiannessFlag = (smh->flags & BIT(0)) != 0; + bool inlineQosFlag = (smh->flags & BIT(1)) != 0; + bool dataFlag = (smh->flags & BIT(2)) != 0; + bool keyFlag = (smh->flags & BIT(3)) != 0; if (keyFlag && dataFlag) { logWarning(RTPS_MSG_IN, IDSTRING "Message received with Data and Key Flag set, ignoring"); @@ -598,7 +599,7 @@ bool MessageReceiver::proc_Submsg_Data( if (inlineQosFlag) { - if (false == ParameterList::updateCacheChangeFromInlineQos(ch, msg, inlineQosSize) ) + if (!ParameterList::updateCacheChangeFromInlineQos(ch, msg, inlineQosSize) ) { logInfo(RTPS_MSG_IN, IDSTRING "SubMessage Data ERROR, Inline Qos ParameterList error"); return false; @@ -636,7 +637,7 @@ bool MessageReceiver::proc_Submsg_Data( return false; } - if (payload_size <= 16) + if (payload_size <= PARAMETER_KEY_HASH_LENGTH) { memcpy(ch.instanceHandle.value, &msg->buffer[msg->pos], payload_size); } @@ -686,9 +687,9 @@ bool MessageReceiver::proc_Submsg_DataFrag( } //Fill flags bool values - bool endiannessFlag = smh->flags & BIT(0) ? true : false; - bool inlineQosFlag = smh->flags & BIT(1) ? true : false; - bool keyFlag = smh->flags & BIT(2) ? true : false; + bool endiannessFlag = (smh->flags & BIT(0)) != 0; + bool inlineQosFlag = (smh->flags & BIT(1)) != 0; + bool keyFlag = (smh->flags & BIT(2)) != 0; //Assign message endianness if (endiannessFlag) @@ -769,7 +770,7 @@ bool MessageReceiver::proc_Submsg_DataFrag( if (inlineQosFlag) { - if (false == ParameterList::updateCacheChangeFromInlineQos(ch, msg, inlineQosSize)) + if (!ParameterList::updateCacheChangeFromInlineQos(ch, msg, inlineQosSize)) { logInfo(RTPS_MSG_IN, IDSTRING "SubMessage Data ERROR, Inline Qos ParameterList error"); return false; @@ -852,9 +853,9 @@ bool MessageReceiver::proc_Submsg_Heartbeat( CDRMessage_t* msg, SubmessageHeader_t* smh) { - bool endiannessFlag = smh->flags & BIT(0) ? true : false; - bool finalFlag = smh->flags & BIT(1) ? true : false; - bool livelinessFlag = smh->flags & BIT(2) ? true : false; + bool endiannessFlag = (smh->flags & BIT(0)) != 0; + bool finalFlag = (smh->flags & BIT(1)) != 0; + bool livelinessFlag = (smh->flags & BIT(2)) != 0; //Assign message endianness if (endiannessFlag) { @@ -865,12 +866,14 @@ bool MessageReceiver::proc_Submsg_Heartbeat( msg->msg_endian = BIGEND; } - GUID_t readerGUID, writerGUID; + GUID_t readerGUID; + GUID_t writerGUID; readerGUID.guidPrefix = dest_guid_prefix_; CDRMessage::readEntityId(msg, &readerGUID.entityId); writerGUID.guidPrefix = source_guid_prefix_; CDRMessage::readEntityId(msg, &writerGUID.entityId); - SequenceNumber_t firstSN, lastSN; + SequenceNumber_t firstSN; + SequenceNumber_t lastSN; CDRMessage::readSequenceNumber(msg, &firstSN); CDRMessage::readSequenceNumber(msg, &lastSN); if (lastSN < firstSN && lastSN != firstSN - 1) @@ -897,8 +900,8 @@ bool MessageReceiver::proc_Submsg_Acknack( CDRMessage_t* msg, SubmessageHeader_t* smh) { - bool endiannessFlag = smh->flags & BIT(0) ? true : false; - bool finalFlag = smh->flags & BIT(1) ? true : false; + bool endiannessFlag = (smh->flags & BIT(0)) != 0; + bool finalFlag = (smh->flags & BIT(1)) != 0; //Assign message endianness if (endiannessFlag) { @@ -908,7 +911,8 @@ bool MessageReceiver::proc_Submsg_Acknack( { msg->msg_endian = BIGEND; } - GUID_t readerGUID, writerGUID; + GUID_t readerGUID; + GUID_t writerGUID; readerGUID.guidPrefix = source_guid_prefix_; CDRMessage::readEntityId(msg, &readerGUID.entityId); writerGUID.guidPrefix = dest_guid_prefix_; @@ -942,7 +946,7 @@ bool MessageReceiver::proc_Submsg_Gap( CDRMessage_t* msg, SubmessageHeader_t* smh) { - bool endiannessFlag = smh->flags & BIT(0) ? true : false; + bool endiannessFlag = (smh->flags & BIT(0)) != 0; //Assign message endianness if (endiannessFlag) { @@ -953,7 +957,8 @@ bool MessageReceiver::proc_Submsg_Gap( msg->msg_endian = BIGEND; } - GUID_t writerGUID, readerGUID; + GUID_t writerGUID; + GUID_t readerGUID; readerGUID.guidPrefix = dest_guid_prefix_; CDRMessage::readEntityId(msg, &readerGUID.entityId); writerGUID.guidPrefix = source_guid_prefix_; @@ -980,8 +985,8 @@ bool MessageReceiver::proc_Submsg_InfoTS( CDRMessage_t* msg, SubmessageHeader_t* smh) { - bool endiannessFlag = smh->flags & BIT(0) ? true : false; - bool timeFlag = smh->flags & BIT(1) ? true : false; + bool endiannessFlag = (smh->flags & BIT(0)) != 0; + bool timeFlag = (smh->flags & BIT(1)) != 0; //Assign message endianness if (endiannessFlag) { @@ -1008,7 +1013,7 @@ bool MessageReceiver::proc_Submsg_InfoDST( CDRMessage_t* msg, SubmessageHeader_t* smh) { - bool endiannessFlag = smh->flags & BIT(0) ? true : false; + bool endiannessFlag = (smh->flags & BIT(0)) != 0u; //bool timeFlag = smh->flags & BIT(1) ? true : false; //Assign message endianness if (endiannessFlag) @@ -1020,7 +1025,7 @@ bool MessageReceiver::proc_Submsg_InfoDST( msg->msg_endian = BIGEND; } GuidPrefix_t guidP; - CDRMessage::readData(msg, guidP.value, 12); + CDRMessage::readData(msg, guidP.value, GuidPrefix_t::size); if (guidP != c_GuidPrefix_Unknown) { dest_guid_prefix_ = guidP; @@ -1033,7 +1038,7 @@ bool MessageReceiver::proc_Submsg_InfoSRC( CDRMessage_t* msg, SubmessageHeader_t* smh) { - bool endiannessFlag = smh->flags & BIT(0) ? true : false; + bool endiannessFlag = (smh->flags & BIT(0)) != 0; //bool timeFlag = smh->flags & BIT(1) ? true : false; //Assign message endianness if (endiannessFlag) @@ -1044,14 +1049,14 @@ bool MessageReceiver::proc_Submsg_InfoSRC( { msg->msg_endian = BIGEND; } - if (smh->submessageLength == 20) + if (smh->submessageLength == INFO_SRC_SUBMSG_LENGTH) { //AVOID FIRST 4 BYTES: msg->pos += 4; CDRMessage::readOctet(msg, &source_version_.m_major); CDRMessage::readOctet(msg, &source_version_.m_minor); CDRMessage::readData(msg, &source_vendor_id_[0], 2); - CDRMessage::readData(msg, source_guid_prefix_.value, 12); + CDRMessage::readData(msg, source_guid_prefix_.value, GuidPrefix_t::size); logInfo(RTPS_MSG_IN, IDSTRING "SRC RTPSParticipant is now: " << source_guid_prefix_); return true; } @@ -1062,7 +1067,7 @@ bool MessageReceiver::proc_Submsg_NackFrag( CDRMessage_t* msg, SubmessageHeader_t* smh) { - bool endiannessFlag = smh->flags & BIT(0) ? true : false; + bool endiannessFlag = (smh->flags & BIT(0)) != 0; //Assign message endianness if (endiannessFlag) { @@ -1073,7 +1078,8 @@ bool MessageReceiver::proc_Submsg_NackFrag( msg->msg_endian = BIGEND; } - GUID_t readerGUID, writerGUID; + GUID_t readerGUID; + GUID_t writerGUID; readerGUID.guidPrefix = source_guid_prefix_; CDRMessage::readEntityId(msg, &readerGUID.entityId); writerGUID.guidPrefix = dest_guid_prefix_; @@ -1111,7 +1117,7 @@ bool MessageReceiver::proc_Submsg_HeartbeatFrag( CDRMessage_t* msg, SubmessageHeader_t* smh) { - bool endiannessFlag = smh->flags & BIT(0) ? true : false; + bool endiannessFlag = (smh->flags & BIT(0)) != 0; //Assign message endianness if (endiannessFlag) { @@ -1122,7 +1128,8 @@ bool MessageReceiver::proc_Submsg_HeartbeatFrag( msg->msg_endian = BIGEND; } - GUID_t readerGUID, writerGUID; + GUID_t readerGUID; + GUID_t writerGUID; readerGUID.guidPrefix = dest_guid_prefix_; CDRMessage::readEntityId(msg, &readerGUID.entityId); writerGUID.guidPrefix = source_guid_prefix_; @@ -1132,7 +1139,7 @@ bool MessageReceiver::proc_Submsg_HeartbeatFrag( CDRMessage::readSequenceNumber(msg, &writerSN); FragmentNumber_t lastFN; - CDRMessage::readUInt32(msg, (uint32_t*)&lastFN); + CDRMessage::readUInt32(msg, static_cast(&lastFN)); uint32_t HBCount; CDRMessage::readUInt32(msg, &HBCount); diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.h b/src/cpp/rtps/participant/RTPSParticipantImpl.h index d36ad4a84cf..470996e2deb 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.h +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.h @@ -61,11 +61,11 @@ namespace fastrtps{ class WriterQos; class ReaderQos; class TopicAttributes; -class MessageReceiver; namespace rtps { +class MessageReceiver; class RTPSParticipant; class RTPSParticipantListener; class BuiltinProtocols; From 8918375175eceeb6f124d47dd5a4fbf649ced515 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Thu, 6 Feb 2020 16:47:50 +0100 Subject: [PATCH 6/7] Refs #7478. Protecting against spurious wakeup --- src/cpp/rtps/messages/SendBuffersManager.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cpp/rtps/messages/SendBuffersManager.cpp b/src/cpp/rtps/messages/SendBuffersManager.cpp index ff4218723fd..9b9215128b1 100644 --- a/src/cpp/rtps/messages/SendBuffersManager.cpp +++ b/src/cpp/rtps/messages/SendBuffersManager.cpp @@ -80,7 +80,7 @@ std::unique_ptr SendBuffersManager::get_buffer( std::unique_ptr ret_val; - if (pool_.empty()) + while (pool_.empty()) { if (allow_growing_ || n_created_ < pool_.capacity()) { @@ -88,8 +88,8 @@ std::unique_ptr SendBuffersManager::get_buffer( } else { + logInfo(RTPS_PARTICIPANT, "Waiting for send buffer"); available_cv_.wait(lock); - assert(!pool_.empty()); } } From 0b9bac5f0b3883803b86836fa4c044c8b788ef7b Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Thu, 6 Feb 2020 16:48:38 +0100 Subject: [PATCH 7/7] Refs #7478. Creating send buffers after receiver resources. --- .../rtps/participant/RTPSParticipantImpl.cpp | 26 +++++++++---------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index a56209010fd..77b6d81c0cd 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -264,19 +264,6 @@ RTPSParticipantImpl::RTPSParticipantImpl( m_att.defaultMulticastLocatorList.clear(); } - bool allow_growing_buffers = m_att.allocation.send_buffers.dynamic; - size_t num_send_buffers = m_att.allocation.send_buffers.preallocated_number; - if (num_send_buffers == 0) - { - // Three buffers (user, events and async writer threads) - num_send_buffers = 3; - // Add one buffer per reception thread - num_send_buffers += m_receiverResourcelist.size(); - } - - // Create buffer pool - send_buffers_.reset(new SendBuffersManager(num_send_buffers, allow_growing_buffers)); - #if HAVE_SECURITY // Start security // TODO(Ricardo) Get returned value in future. @@ -295,7 +282,18 @@ RTPSParticipantImpl::RTPSParticipantImpl( createReceiverResources(m_att.defaultUnicastLocatorList, true, false); createReceiverResources(m_att.defaultMulticastLocatorList, true, false); - // Allocate all pending send buffers + bool allow_growing_buffers = m_att.allocation.send_buffers.dynamic; + size_t num_send_buffers = m_att.allocation.send_buffers.preallocated_number; + if (num_send_buffers == 0) + { + // Three buffers (user, events and async writer threads) + num_send_buffers = 3; + // Add one buffer per reception thread + num_send_buffers += m_receiverResourcelist.size(); + } + + // Create buffer pool + send_buffers_.reset(new SendBuffersManager(num_send_buffers, allow_growing_buffers)); send_buffers_->init(this); #if HAVE_SECURITY