Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message receiver improvements <master> [7485] #987

Merged
merged 5 commits into from
Jan 31, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/fastdds/dds/core/policy/ParameterTypes.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ class ParameterKey_t : public Parameter_t
fastrtps::rtps::CDRMessage_t* msg) override;
};

#define PARAMETER_KEY_HASH_LENGTH 16

/**
*
*/
Expand Down
30 changes: 15 additions & 15 deletions include/fastdds/rtps/common/Types.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,23 +80,23 @@ const Endianness_t DEFAULT_ENDIAN = BIGEND;
const Endianness_t DEFAULT_ENDIAN = LITTLEEND;
#endif

typedef unsigned char octet;
using octet = unsigned char;
//typedef unsigned int uint;
//typedef unsigned short ushort;
typedef unsigned char SubmessageFlag;
typedef uint32_t BuiltinEndpointSet_t;
typedef uint32_t Count_t;

#define BIT0 0x1
#define BIT1 0x2
#define BIT2 0x4
#define BIT3 0x8
#define BIT4 0x10
#define BIT5 0x20
#define BIT6 0x40
#define BIT7 0x80

#define BIT(i) ((i==0) ? BIT0 : (i==1) ? BIT1 :(i==2)?BIT2:(i==3)?BIT3:(i==4)?BIT4:(i==5)?BIT5:(i==6)?BIT6:(i==7)?BIT7:0x0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch!!!

using SubmessageFlag = unsigned char;
using BuiltinEndpointSet_t = uint32_t;
using Count_t = uint32_t;

#define BIT0 0x01u
#define BIT1 0x02u
#define BIT2 0x04u
#define BIT3 0x08u
#define BIT4 0x10u
#define BIT5 0x20u
#define BIT6 0x40u
#define BIT7 0x80u

#define BIT(i) (1U << static_cast<unsigned>(i))

//!@brief Structure ProtocolVersion_t, contains the protocol version.
struct RTPS_DllAPI ProtocolVersion_t
Expand Down
99 changes: 53 additions & 46 deletions src/cpp/rtps/messages/MessageReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
#include <limits>
#include <mutex>

#define IDSTRING "(ID:" << std::this_thread::get_id() << ") " <<
#define INFO_SRC_SUBMSG_LENGTH 20

using namespace eprosima::fastrtps;
#define IDSTRING "(ID:" << std::this_thread::get_id() << ") " <<

namespace eprosima {
namespace fastrtps {
Expand Down Expand Up @@ -60,8 +60,8 @@ MessageReceiver::MessageReceiver(
MessageReceiver::~MessageReceiver()
{
logInfo(RTPS_MSG_IN, "");
assert(associated_writers_.size() == 0);
assert(associated_readers_.size() == 0);
assert(associated_writers_.empty());
assert(associated_readers_.empty());
}

void MessageReceiver::associateEndpoint(
Expand All @@ -70,7 +70,7 @@ void MessageReceiver::associateEndpoint(
std::lock_guard<std::mutex> guard(mtx_);
if (to_add->getAttributes().endpointKind == WRITER)
{
const auto writer = static_cast<RTPSWriter*>(to_add);
const auto writer = dynamic_cast<RTPSWriter*>(to_add);
for (const auto& it : associated_writers_)
{
if (it == writer)
Expand All @@ -83,7 +83,7 @@ void MessageReceiver::associateEndpoint(
}
else
{
const auto reader = static_cast<RTPSReader*>(to_add);
const auto reader = dynamic_cast<RTPSReader*>(to_add);
const auto entityId = reader->getGuid().entityId;
// search for set of readers by entity ID
const auto readers = associated_readers_.find(entityId);
Expand Down Expand Up @@ -115,7 +115,7 @@ void MessageReceiver::removeEndpoint(

if (to_remove->getAttributes().endpointKind == WRITER)
{
RTPSWriter* var = static_cast<RTPSWriter*>(to_remove);
auto* var = dynamic_cast<RTPSWriter*>(to_remove);
for (auto it = associated_writers_.begin(); it != associated_writers_.end(); ++it)
{
if (*it == var)
Expand All @@ -130,7 +130,7 @@ void MessageReceiver::removeEndpoint(
auto readers = associated_readers_.find(to_remove->getGuid().entityId);
if (readers != associated_readers_.end())
{
RTPSReader* var = static_cast<RTPSReader*>(to_remove);
auto* var = dynamic_cast<RTPSReader*>(to_remove);
for (auto it = readers->second.begin(); it != readers->second.end(); ++it)
{
if (*it == var)
Expand Down Expand Up @@ -193,7 +193,8 @@ void MessageReceiver::processCDRMsg(
{
return;
}
else if (decode_ret == 0)

if (decode_ret == 0)
{
// Swap
std::swap(msg, auxiliary_buffer);
Expand All @@ -217,7 +218,8 @@ void MessageReceiver::processCDRMsg(
{
return;
}
else if (decode_ret == 0)

if (decode_ret == 0)
{
submessage = auxiliary_buffer;
}
Expand All @@ -232,7 +234,7 @@ void MessageReceiver::processCDRMsg(
valid = true;
count++;
uint32_t next_msg_pos = submessage->pos;
next_msg_pos += (submsgh.submessageLength + 3) & ~3;
next_msg_pos += (submsgh.submessageLength + 3u) & ~3u;
switch (submsgh.submessageId)
{
case DATA:
Expand Down Expand Up @@ -393,8 +395,7 @@ bool MessageReceiver::checkRTPSHeader(
source_vendor_id_[1] = msg->buffer[msg->pos];
msg->pos++;
//set source guid prefix
memcpy(source_guid_prefix_.value, &msg->buffer[msg->pos], 12);
msg->pos += 12;
CDRMessage::readData(msg, source_guid_prefix_.value, GuidPrefix_t::size);
have_timestamp_ = false;
return true;
}
Expand All @@ -415,7 +416,7 @@ bool MessageReceiver::readSubmessageHeader(
msg->pos++;

//Set endianness of message
msg->msg_endian = smh->flags & BIT(0) ? LITTLEEND : BIGEND;
msg->msg_endian = (smh->flags & BIT(0)) != 0 ? LITTLEEND : BIGEND;
uint16_t length = 0;
CDRMessage::readUInt16(msg, &length);
if (msg->pos + length > msg->length)
Expand Down Expand Up @@ -524,10 +525,10 @@ bool MessageReceiver::proc_Submsg_Data(
return false;
}
//Fill flags bool values
bool endiannessFlag = smh->flags & BIT(0) ? true : false;
bool inlineQosFlag = smh->flags & BIT(1) ? true : false;
bool dataFlag = smh->flags & BIT(2) ? true : false;
bool keyFlag = smh->flags & BIT(3) ? true : false;
bool endiannessFlag = (smh->flags & BIT(0)) != 0;
bool inlineQosFlag = (smh->flags & BIT(1)) != 0;
bool dataFlag = (smh->flags & BIT(2)) != 0;
bool keyFlag = (smh->flags & BIT(3)) != 0;
if (keyFlag && dataFlag)
{
logWarning(RTPS_MSG_IN, IDSTRING "Message received with Data and Key Flag set, ignoring");
Expand Down Expand Up @@ -598,7 +599,7 @@ bool MessageReceiver::proc_Submsg_Data(

if (inlineQosFlag)
{
if (false == ParameterList::updateCacheChangeFromInlineQos(ch, msg, inlineQosSize) )
if (!ParameterList::updateCacheChangeFromInlineQos(ch, msg, inlineQosSize) )
{
logInfo(RTPS_MSG_IN, IDSTRING "SubMessage Data ERROR, Inline Qos ParameterList error");
return false;
Expand Down Expand Up @@ -636,7 +637,7 @@ bool MessageReceiver::proc_Submsg_Data(
return false;
}

if (payload_size <= 16)
if (payload_size <= PARAMETER_KEY_HASH_LENGTH)
{
memcpy(ch.instanceHandle.value, &msg->buffer[msg->pos], payload_size);
}
Expand Down Expand Up @@ -686,9 +687,9 @@ bool MessageReceiver::proc_Submsg_DataFrag(
}

//Fill flags bool values
bool endiannessFlag = smh->flags & BIT(0) ? true : false;
bool inlineQosFlag = smh->flags & BIT(1) ? true : false;
bool keyFlag = smh->flags & BIT(2) ? true : false;
bool endiannessFlag = (smh->flags & BIT(0)) != 0;
bool inlineQosFlag = (smh->flags & BIT(1)) != 0;
bool keyFlag = (smh->flags & BIT(2)) != 0;

//Assign message endianness
if (endiannessFlag)
Expand Down Expand Up @@ -769,7 +770,7 @@ bool MessageReceiver::proc_Submsg_DataFrag(

if (inlineQosFlag)
{
if (false == ParameterList::updateCacheChangeFromInlineQos(ch, msg, inlineQosSize))
if (!ParameterList::updateCacheChangeFromInlineQos(ch, msg, inlineQosSize))
{
logInfo(RTPS_MSG_IN, IDSTRING "SubMessage Data ERROR, Inline Qos ParameterList error");
return false;
Expand Down Expand Up @@ -852,9 +853,9 @@ bool MessageReceiver::proc_Submsg_Heartbeat(
CDRMessage_t* msg,
SubmessageHeader_t* smh)
{
bool endiannessFlag = smh->flags & BIT(0) ? true : false;
bool finalFlag = smh->flags & BIT(1) ? true : false;
bool livelinessFlag = smh->flags & BIT(2) ? true : false;
bool endiannessFlag = (smh->flags & BIT(0)) != 0;
bool finalFlag = (smh->flags & BIT(1)) != 0;
bool livelinessFlag = (smh->flags & BIT(2)) != 0;
//Assign message endianness
if (endiannessFlag)
{
Expand All @@ -865,12 +866,14 @@ bool MessageReceiver::proc_Submsg_Heartbeat(
msg->msg_endian = BIGEND;
}

GUID_t readerGUID, writerGUID;
GUID_t readerGUID;
GUID_t writerGUID;
readerGUID.guidPrefix = dest_guid_prefix_;
CDRMessage::readEntityId(msg, &readerGUID.entityId);
writerGUID.guidPrefix = source_guid_prefix_;
CDRMessage::readEntityId(msg, &writerGUID.entityId);
SequenceNumber_t firstSN, lastSN;
SequenceNumber_t firstSN;
SequenceNumber_t lastSN;
CDRMessage::readSequenceNumber(msg, &firstSN);
CDRMessage::readSequenceNumber(msg, &lastSN);
if (lastSN < firstSN && lastSN != firstSN - 1)
Expand All @@ -897,8 +900,8 @@ bool MessageReceiver::proc_Submsg_Acknack(
CDRMessage_t* msg,
SubmessageHeader_t* smh)
{
bool endiannessFlag = smh->flags & BIT(0) ? true : false;
bool finalFlag = smh->flags & BIT(1) ? true : false;
bool endiannessFlag = (smh->flags & BIT(0)) != 0;
bool finalFlag = (smh->flags & BIT(1)) != 0;
//Assign message endianness
if (endiannessFlag)
{
Expand All @@ -908,7 +911,8 @@ bool MessageReceiver::proc_Submsg_Acknack(
{
msg->msg_endian = BIGEND;
}
GUID_t readerGUID, writerGUID;
GUID_t readerGUID;
GUID_t writerGUID;
readerGUID.guidPrefix = source_guid_prefix_;
CDRMessage::readEntityId(msg, &readerGUID.entityId);
writerGUID.guidPrefix = dest_guid_prefix_;
Expand Down Expand Up @@ -942,7 +946,7 @@ bool MessageReceiver::proc_Submsg_Gap(
CDRMessage_t* msg,
SubmessageHeader_t* smh)
{
bool endiannessFlag = smh->flags & BIT(0) ? true : false;
bool endiannessFlag = (smh->flags & BIT(0)) != 0;
//Assign message endianness
if (endiannessFlag)
{
Expand All @@ -953,7 +957,8 @@ bool MessageReceiver::proc_Submsg_Gap(
msg->msg_endian = BIGEND;
}

GUID_t writerGUID, readerGUID;
GUID_t writerGUID;
GUID_t readerGUID;
readerGUID.guidPrefix = dest_guid_prefix_;
CDRMessage::readEntityId(msg, &readerGUID.entityId);
writerGUID.guidPrefix = source_guid_prefix_;
Expand All @@ -980,8 +985,8 @@ bool MessageReceiver::proc_Submsg_InfoTS(
CDRMessage_t* msg,
SubmessageHeader_t* smh)
{
bool endiannessFlag = smh->flags & BIT(0) ? true : false;
bool timeFlag = smh->flags & BIT(1) ? true : false;
bool endiannessFlag = (smh->flags & BIT(0)) != 0;
bool timeFlag = (smh->flags & BIT(1)) != 0;
//Assign message endianness
if (endiannessFlag)
{
Expand All @@ -1008,7 +1013,7 @@ bool MessageReceiver::proc_Submsg_InfoDST(
CDRMessage_t* msg,
SubmessageHeader_t* smh)
{
bool endiannessFlag = smh->flags & BIT(0) ? true : false;
bool endiannessFlag = (smh->flags & BIT(0)) != 0u;
//bool timeFlag = smh->flags & BIT(1) ? true : false;
//Assign message endianness
if (endiannessFlag)
Expand All @@ -1020,7 +1025,7 @@ bool MessageReceiver::proc_Submsg_InfoDST(
msg->msg_endian = BIGEND;
}
GuidPrefix_t guidP;
CDRMessage::readData(msg, guidP.value, 12);
CDRMessage::readData(msg, guidP.value, GuidPrefix_t::size);
if (guidP != c_GuidPrefix_Unknown)
{
dest_guid_prefix_ = guidP;
Expand All @@ -1033,7 +1038,7 @@ bool MessageReceiver::proc_Submsg_InfoSRC(
CDRMessage_t* msg,
SubmessageHeader_t* smh)
{
bool endiannessFlag = smh->flags & BIT(0) ? true : false;
bool endiannessFlag = (smh->flags & BIT(0)) != 0;
//bool timeFlag = smh->flags & BIT(1) ? true : false;
//Assign message endianness
if (endiannessFlag)
Expand All @@ -1044,14 +1049,14 @@ bool MessageReceiver::proc_Submsg_InfoSRC(
{
msg->msg_endian = BIGEND;
}
if (smh->submessageLength == 20)
if (smh->submessageLength == INFO_SRC_SUBMSG_LENGTH)
{
//AVOID FIRST 4 BYTES:
msg->pos += 4;
CDRMessage::readOctet(msg, &source_version_.m_major);
CDRMessage::readOctet(msg, &source_version_.m_minor);
CDRMessage::readData(msg, &source_vendor_id_[0], 2);
CDRMessage::readData(msg, source_guid_prefix_.value, 12);
CDRMessage::readData(msg, source_guid_prefix_.value, GuidPrefix_t::size);
logInfo(RTPS_MSG_IN, IDSTRING "SRC RTPSParticipant is now: " << source_guid_prefix_);
return true;
}
Expand All @@ -1062,7 +1067,7 @@ bool MessageReceiver::proc_Submsg_NackFrag(
CDRMessage_t* msg,
SubmessageHeader_t* smh)
{
bool endiannessFlag = smh->flags & BIT(0) ? true : false;
bool endiannessFlag = (smh->flags & BIT(0)) != 0;
//Assign message endianness
if (endiannessFlag)
{
Expand All @@ -1073,7 +1078,8 @@ bool MessageReceiver::proc_Submsg_NackFrag(
msg->msg_endian = BIGEND;
}

GUID_t readerGUID, writerGUID;
GUID_t readerGUID;
GUID_t writerGUID;
readerGUID.guidPrefix = source_guid_prefix_;
CDRMessage::readEntityId(msg, &readerGUID.entityId);
writerGUID.guidPrefix = dest_guid_prefix_;
Expand Down Expand Up @@ -1111,7 +1117,7 @@ bool MessageReceiver::proc_Submsg_HeartbeatFrag(
CDRMessage_t* msg,
SubmessageHeader_t* smh)
{
bool endiannessFlag = smh->flags & BIT(0) ? true : false;
bool endiannessFlag = (smh->flags & BIT(0)) != 0;
//Assign message endianness
if (endiannessFlag)
{
Expand All @@ -1122,7 +1128,8 @@ bool MessageReceiver::proc_Submsg_HeartbeatFrag(
msg->msg_endian = BIGEND;
}

GUID_t readerGUID, writerGUID;
GUID_t readerGUID;
GUID_t writerGUID;
readerGUID.guidPrefix = dest_guid_prefix_;
CDRMessage::readEntityId(msg, &readerGUID.entityId);
writerGUID.guidPrefix = source_guid_prefix_;
Expand All @@ -1132,7 +1139,7 @@ bool MessageReceiver::proc_Submsg_HeartbeatFrag(
CDRMessage::readSequenceNumber(msg, &writerSN);

FragmentNumber_t lastFN;
CDRMessage::readUInt32(msg, (uint32_t*)&lastFN);
CDRMessage::readUInt32(msg, static_cast<uint32_t*>(&lastFN));

uint32_t HBCount;
CDRMessage::readUInt32(msg, &HBCount);
Expand Down