Skip to content

Commit

Permalink
Fixed error sending more than 256 fragments (#974)
Browse files Browse the repository at this point in the history
* Refs #7404. Added blackbox tests.

* Refs #7404. Style on CDRMessage_t.h

* Refs #7404. Deprecated CDRMessage_t default constructor.

* Refs #7404. Distinguish reserved_size and max_size on CDRMessage_t.

* Refs #7404. Force high_mark_for_frag_ to be multiple of 4.

* Refs #7404. Fix for synchronous StatelessWriter.

* Refs #7404. Fix for synchronous StatefulWriter.

* Refs #7404. Fix for asynchronous StatelessWriter.

* Refs #7404. Style on PublisherImpl.cpp

* Refs #7404. Style on DataWriterImpl.cpp

Co-authored-by: Miguel Company <miguelcompany@eprosima.com>
  • Loading branch information
richiware and MiguelCompany authored Feb 28, 2020
1 parent df97c21 commit b800b44
Show file tree
Hide file tree
Showing 16 changed files with 387 additions and 388 deletions.
86 changes: 52 additions & 34 deletions include/fastdds/rtps/common/CDRMessage_t.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@
#include <cstdlib>
#include <cstring>

namespace eprosima{
namespace fastrtps{
namespace rtps{

namespace eprosima {
namespace fastrtps {
namespace rtps {

//!Max size of RTPS message in bytes.
#define RTPSMESSAGE_DEFAULT_SIZE 10500 //max size of rtps message in bytes
Expand All @@ -48,43 +47,44 @@ namespace rtps{
* @brief Structure CDRMessage_t, contains a serialized message.
* @ingroup COMMON_MODULE
*/
struct RTPS_DllAPI CDRMessage_t{
//! Default constructor
CDRMessage_t():wraps(false){
pos = 0;
length = 0;
buffer = (octet*) malloc(RTPSMESSAGE_DEFAULT_SIZE);
max_size = RTPSMESSAGE_DEFAULT_SIZE;

#if __BIG_ENDIAN__
msg_endian = BIGEND;
#else
msg_endian = LITTLEEND;
#endif
struct RTPS_DllAPI CDRMessage_t final
{
// TODO(Miguel C): Deprecate when not used in mocks
CDRMessage_t()
: CDRMessage_t(RTPSMESSAGE_DEFAULT_SIZE)
{
}

~CDRMessage_t()
{
if(buffer != nullptr && !wraps)
if (buffer != nullptr && !wraps)
{
free(buffer);
}
}

/**
* Constructor with maximum size
* @param size Maximum size
*/
CDRMessage_t(uint32_t size)
explicit CDRMessage_t(
uint32_t size)
{
wraps = false;
pos = 0;
length = 0;

if(size != 0)
if (size != 0)
{
buffer = (octet*)malloc(size);
}
else
{
buffer = nullptr;
}

max_size = size;
reserved_size = size;

#if __BIG_ENDIAN__
msg_endian = BIGEND;
Expand All @@ -94,36 +94,44 @@ struct RTPS_DllAPI CDRMessage_t{
}

/**
* Constructor to wrap a serialized payload
* @param payload Payload to wrap
*/
CDRMessage_t(const SerializedPayload_t& payload) : wraps(true)
* Constructor to wrap a serialized payload
* @param payload Payload to wrap
*/
explicit CDRMessage_t(
const SerializedPayload_t& payload)
: wraps(true)
{
msg_endian = payload.encapsulation == PL_CDR_BE ? BIGEND : LITTLEEND;
pos = payload.pos;
length = payload.length;
buffer = payload.data;
max_size = payload.max_size;
reserved_size = payload.max_size;
}

CDRMessage_t(const CDRMessage_t& message)
CDRMessage_t(
const CDRMessage_t& message)
{
wraps = false;
pos = 0;
length = message.length;
max_size = message.max_size;
msg_endian = message.msg_endian;

if(max_size != 0)
reserved_size = max_size;
if (max_size != 0)
{
buffer = (octet*)malloc(max_size);
memcpy(buffer, message.buffer, length);
}
else
{
buffer = nullptr;
}
}

CDRMessage_t(CDRMessage_t&& message)
CDRMessage_t(
CDRMessage_t&& message)
{
wraps = message.wraps;
message.wraps = false;
Expand All @@ -133,6 +141,8 @@ struct RTPS_DllAPI CDRMessage_t{
message.length = 0;
max_size = message.max_size;
message.max_size = 0;
reserved_size = message.reserved_size;
message.reserved_size = 0;
msg_endian = message.msg_endian;
#if __BIG_ENDIAN__
message.msg_endian = BIGEND;
Expand All @@ -143,7 +153,8 @@ struct RTPS_DllAPI CDRMessage_t{
message.buffer = nullptr;
}

CDRMessage_t& operator=(CDRMessage_t &&message)
CDRMessage_t& operator =(
CDRMessage_t&& message)
{
wraps = message.wraps;
message.wraps = false;
Expand All @@ -153,6 +164,8 @@ struct RTPS_DllAPI CDRMessage_t{
message.length = 0;
max_size = message.max_size;
message.max_size = 0;
reserved_size = message.reserved_size;
message.reserved_size = 0;
msg_endian = message.msg_endian;
#if __BIG_ENDIAN__
message.msg_endian = BIGEND;
Expand All @@ -169,7 +182,7 @@ struct RTPS_DllAPI CDRMessage_t{
uint32_t size)
{
assert(wraps == false);
if (size > max_size)
if (size > reserved_size)
{
octet* new_buffer = (octet*) realloc(buffer, size);
if (new_buffer == nullptr)
Expand All @@ -179,9 +192,11 @@ struct RTPS_DllAPI CDRMessage_t{
else
{
buffer = new_buffer;
max_size = size;
reserved_size = size;
}
}

max_size = size;
}

//!Pointer to the buffer where the data is stored.
Expand All @@ -190,6 +205,8 @@ struct RTPS_DllAPI CDRMessage_t{
uint32_t pos;
//!Max size of the message.
uint32_t max_size;
//!Size allocated on buffer. May be higher than max_size.
uint32_t reserved_size;
//!Current length of the message.
uint32_t length;
//!Endianness of the message.
Expand All @@ -198,8 +215,9 @@ struct RTPS_DllAPI CDRMessage_t{
bool wraps;
};

}
}
}
#endif
} // namespace rtps
} // namespace fastrtps
} // namespace eprosima

#endif /* DOXYGEN_SHOULD_SKIP_THIS_PUBLIC */
#endif /* _FASTDDS_RTPS_CDRMESSAGE_T_H_ */
11 changes: 2 additions & 9 deletions include/fastdds/rtps/messages/RTPSMessageCreator.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,8 @@ class RTPSMessageCreator
{
public:

RTPSMessageCreator();
virtual ~RTPSMessageCreator();

//!
CDRMessage_t rtpsmc_submsgElem;
//!
CDRMessage_t rtpsmc_submsgHeader;


RTPSMessageCreator() = delete;
~RTPSMessageCreator() = delete;

/**
* Create a Header to the serialized message.
Expand Down
1 change: 0 additions & 1 deletion include/fastdds/rtps/network/ReceiverResource.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ class ReceiverResource : public fastdds::rtps::TransportReceiverInterface

std::mutex mtx;
MessageReceiver* receiver;
CDRMessage_t msg;
};

} // namespace rtps
Expand Down
Loading

0 comments on commit b800b44

Please sign in to comment.