Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed error sending more than 256 fragments <master> [7404] #974

Merged
merged 10 commits into from
Feb 28, 2020
86 changes: 52 additions & 34 deletions include/fastdds/rtps/common/CDRMessage_t.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@
#include <cstdlib>
#include <cstring>

namespace eprosima{
namespace fastrtps{
namespace rtps{

namespace eprosima {
namespace fastrtps {
namespace rtps {

//!Max size of RTPS message in bytes.
#define RTPSMESSAGE_DEFAULT_SIZE 10500 //max size of rtps message in bytes
Expand All @@ -48,43 +47,44 @@ namespace rtps{
* @brief Structure CDRMessage_t, contains a serialized message.
* @ingroup COMMON_MODULE
*/
struct RTPS_DllAPI CDRMessage_t{
//! Default constructor
CDRMessage_t():wraps(false){
pos = 0;
length = 0;
buffer = (octet*) malloc(RTPSMESSAGE_DEFAULT_SIZE);
max_size = RTPSMESSAGE_DEFAULT_SIZE;

#if __BIG_ENDIAN__
msg_endian = BIGEND;
#else
msg_endian = LITTLEEND;
#endif
struct RTPS_DllAPI CDRMessage_t final
{
// TODO(Miguel C): Deprecate when not used in mocks
CDRMessage_t()
: CDRMessage_t(RTPSMESSAGE_DEFAULT_SIZE)
{
}

~CDRMessage_t()
{
if(buffer != nullptr && !wraps)
if (buffer != nullptr && !wraps)
{
free(buffer);
}
}

/**
* Constructor with maximum size
* @param size Maximum size
*/
CDRMessage_t(uint32_t size)
explicit CDRMessage_t(
uint32_t size)
{
wraps = false;
pos = 0;
length = 0;

if(size != 0)
if (size != 0)
{
buffer = (octet*)malloc(size);
}
else
{
buffer = nullptr;
}

max_size = size;
reserved_size = size;

#if __BIG_ENDIAN__
msg_endian = BIGEND;
Expand All @@ -94,36 +94,44 @@ struct RTPS_DllAPI CDRMessage_t{
}

/**
* Constructor to wrap a serialized payload
* @param payload Payload to wrap
*/
CDRMessage_t(const SerializedPayload_t& payload) : wraps(true)
* Constructor to wrap a serialized payload
* @param payload Payload to wrap
*/
explicit CDRMessage_t(
const SerializedPayload_t& payload)
: wraps(true)
{
msg_endian = payload.encapsulation == PL_CDR_BE ? BIGEND : LITTLEEND;
pos = payload.pos;
length = payload.length;
buffer = payload.data;
max_size = payload.max_size;
reserved_size = payload.max_size;
}

CDRMessage_t(const CDRMessage_t& message)
CDRMessage_t(
const CDRMessage_t& message)
{
wraps = false;
pos = 0;
length = message.length;
max_size = message.max_size;
msg_endian = message.msg_endian;

if(max_size != 0)
reserved_size = max_size;
if (max_size != 0)
{
buffer = (octet*)malloc(max_size);
memcpy(buffer, message.buffer, length);
}
else
{
buffer = nullptr;
}
}

CDRMessage_t(CDRMessage_t&& message)
CDRMessage_t(
CDRMessage_t&& message)
{
wraps = message.wraps;
message.wraps = false;
Expand All @@ -133,6 +141,8 @@ struct RTPS_DllAPI CDRMessage_t{
message.length = 0;
max_size = message.max_size;
message.max_size = 0;
reserved_size = message.reserved_size;
message.reserved_size = 0;
msg_endian = message.msg_endian;
#if __BIG_ENDIAN__
message.msg_endian = BIGEND;
Expand All @@ -143,7 +153,8 @@ struct RTPS_DllAPI CDRMessage_t{
message.buffer = nullptr;
}

CDRMessage_t& operator=(CDRMessage_t &&message)
CDRMessage_t& operator =(
CDRMessage_t&& message)
{
wraps = message.wraps;
message.wraps = false;
Expand All @@ -153,6 +164,8 @@ struct RTPS_DllAPI CDRMessage_t{
message.length = 0;
max_size = message.max_size;
message.max_size = 0;
reserved_size = message.reserved_size;
message.reserved_size = 0;
msg_endian = message.msg_endian;
#if __BIG_ENDIAN__
message.msg_endian = BIGEND;
Expand All @@ -169,7 +182,7 @@ struct RTPS_DllAPI CDRMessage_t{
uint32_t size)
{
assert(wraps == false);
if (size > max_size)
if (size > reserved_size)
{
octet* new_buffer = (octet*) realloc(buffer, size);
if (new_buffer == nullptr)
Expand All @@ -179,9 +192,11 @@ struct RTPS_DllAPI CDRMessage_t{
else
{
buffer = new_buffer;
max_size = size;
reserved_size = size;
}
}

max_size = size;
}

//!Pointer to the buffer where the data is stored.
Expand All @@ -190,6 +205,8 @@ struct RTPS_DllAPI CDRMessage_t{
uint32_t pos;
//!Max size of the message.
uint32_t max_size;
//!Size allocated on buffer. May be higher than max_size.
uint32_t reserved_size;
IkerLuengo marked this conversation as resolved.
Show resolved Hide resolved
//!Current length of the message.
uint32_t length;
//!Endianness of the message.
Expand All @@ -198,8 +215,9 @@ struct RTPS_DllAPI CDRMessage_t{
bool wraps;
};

}
}
}
#endif
} // namespace rtps
} // namespace fastrtps
} // namespace eprosima

#endif /* DOXYGEN_SHOULD_SKIP_THIS_PUBLIC */
#endif /* _FASTDDS_RTPS_CDRMESSAGE_T_H_ */
11 changes: 2 additions & 9 deletions include/fastdds/rtps/messages/RTPSMessageCreator.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,8 @@ class RTPSMessageCreator
{
public:

RTPSMessageCreator();
virtual ~RTPSMessageCreator();

//!
CDRMessage_t rtpsmc_submsgElem;
//!
CDRMessage_t rtpsmc_submsgHeader;


RTPSMessageCreator() = delete;
~RTPSMessageCreator() = delete;

/**
* Create a Header to the serialized message.
Expand Down
1 change: 0 additions & 1 deletion include/fastdds/rtps/network/ReceiverResource.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ class ReceiverResource : public fastdds::rtps::TransportReceiverInterface

std::mutex mtx;
MessageReceiver* receiver;
CDRMessage_t msg;
};

} // namespace rtps
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/rtps/builtin/discovery/participant/PDP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -993,7 +993,7 @@ void PDP::assert_remote_participant_liveliness(
CDRMessage_t PDP::get_participant_proxy_data_serialized(Endianness_t endian)
{
std::lock_guard<std::recursive_mutex> guardPDP(*this->mp_mutex);
CDRMessage_t cdr_msg;
CDRMessage_t cdr_msg(RTPSMESSAGE_DEFAULT_SIZE);
cdr_msg.msg_endian = endian;

if (!getLocalParticipantProxyData()->writeToCDRMessage(&cdr_msg, false))
Expand Down
9 changes: 0 additions & 9 deletions src/cpp/rtps/messages/RTPSMessageCreator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,6 @@ namespace eprosima {
namespace fastrtps {
namespace rtps {

RTPSMessageCreator::RTPSMessageCreator()
{
}

RTPSMessageCreator::~RTPSMessageCreator() {
logInfo(RTPS_CDR_MSG,"RTPSMessageCreator destructor");
}


bool RTPSMessageCreator::addHeader(CDRMessage_t*msg, const GuidPrefix_t& guidPrefix,
const ProtocolVersion_t& version,const VendorId_t& vendorId)
{
Expand Down
4 changes: 2 additions & 2 deletions src/cpp/rtps/network/ReceiverResource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ ReceiverResource::ReceiverResource(TransportInterface& transport, const Locator_
, mValid(false)
, mtx()
, receiver(nullptr)
, msg(0)
{
// Internal channel is opened and assigned to this resource.
mValid = transport.OpenInputChannel(locator, this, max_size);
Expand All @@ -55,7 +54,6 @@ ReceiverResource::ReceiverResource(ReceiverResource&& rValueResource)
rValueResource.receiver = nullptr;
mValid = rValueResource.mValid;
rValueResource.mValid = false;
msg = std::move(rValueResource.msg);
}

bool ReceiverResource::SupportsLocator(const Locator_t& localLocator)
Expand Down Expand Up @@ -91,10 +89,12 @@ void ReceiverResource::OnDataReceived(const octet * data, const uint32_t size,

if (rcv != nullptr)
{
CDRMessage_t msg(0);
msg.wraps = true;
msg.buffer = const_cast<octet*>(data);
msg.length = size;
msg.max_size = size;
msg.reserved_size = size;

// TODO: Should we unlock in case UnregisterReceiver is called from callback ?
rcv->processCDRMsg(remoteLocator, &msg);
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/rtps/transport/ChannelResource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ namespace rtps {
using Log = fastdds::dds::Log;

ChannelResource::ChannelResource()
: message_buffer_()
: message_buffer_(RTPSMESSAGE_DEFAULT_SIZE)
, alive_(true)
{
logInfo(RTPS_MSG_IN, "Created with CDRMessage of size: " << message_buffer_.max_size);
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ bool RTCPMessageManager::sendData(

TCPHeader header;
TCPControlMsgHeader ctrlHeader;
CDRMessage_t msg;
CDRMessage_t msg(this->mTransport->get_configuration()->max_message_size());
fastrtps::rtps::CDRMessage::initCDRMsg(&msg);
const ResponseCode* code = (respCode != RETCODE_VOID) ? &respCode : nullptr;

Expand Down
Loading