Skip to content

Commit

Permalink
Message receiver improvements (#986)
Browse files Browse the repository at this point in the history
* Refs #7478. Uncrustify.

* Refs #7478. Improvements and cleanup

* Refs #7478. Divide SecurityManager initialization in two phases.

* Refs #7478. Not allocating memory for crypto message when participant is not secure

* Refs #7478. clang-tidy.

* Refs #7478. Protecting against spurious wakeup

* Refs #7478. Creating send buffers after receiver resources.
  • Loading branch information
MiguelCompany authored Feb 18, 2020
1 parent 030384b commit 54e4a62
Show file tree
Hide file tree
Showing 11 changed files with 724 additions and 578 deletions.
2 changes: 2 additions & 0 deletions include/fastrtps/qos/ParameterTypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ class ParameterKey_t : public Parameter_t
};
#define PARAMETER_KEY_LENGTH 16

#define PARAMETER_KEY_HASH_LENGTH 16

/**
*
*/
Expand Down
30 changes: 15 additions & 15 deletions include/fastrtps/rtps/common/Types.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,23 +89,23 @@ const Endianness_t DEFAULT_ENDIAN = BIGEND;
const Endianness_t DEFAULT_ENDIAN = LITTLEEND;
#endif

typedef unsigned char octet;
using octet = unsigned char;
//typedef unsigned int uint;
//typedef unsigned short ushort;
typedef unsigned char SubmessageFlag;
typedef uint32_t BuiltinEndpointSet_t;
typedef uint32_t Count_t;

#define BIT0 0x1
#define BIT1 0x2
#define BIT2 0x4
#define BIT3 0x8
#define BIT4 0x10
#define BIT5 0x20
#define BIT6 0x40
#define BIT7 0x80

#define BIT(i) ((i==0) ? BIT0 : (i==1) ? BIT1 :(i==2)?BIT2:(i==3)?BIT3:(i==4)?BIT4:(i==5)?BIT5:(i==6)?BIT6:(i==7)?BIT7:0x0)
using SubmessageFlag = unsigned char;
using BuiltinEndpointSet_t = uint32_t;
using Count_t = uint32_t;

#define BIT0 0x1u
#define BIT1 0x2u
#define BIT2 0x4u
#define BIT3 0x8u
#define BIT4 0x10u
#define BIT5 0x20u
#define BIT6 0x40u
#define BIT7 0x80u

#define BIT(i) (1U << static_cast<unsigned>(i))

//!@brief Structure ProtocolVersion_t, contains the protocol version.
struct RTPS_DllAPI ProtocolVersion_t
Expand Down
259 changes: 140 additions & 119 deletions include/fastrtps/rtps/messages/MessageReceiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,15 @@
* @file MessageReceiver.h
*/



#ifndef MESSAGERECEIVER_H_
#define MESSAGERECEIVER_H_
#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC

#include <unordered_map>
#include "../common/all_common.h"
#include "../../qos/ParameterList.h"

#include <mutex>
#include <unordered_map>

namespace eprosima {
namespace fastrtps {
Expand All @@ -44,126 +42,149 @@ struct SubmessageHeader_t;
*/
class MessageReceiver
{
public:
/**
* @param participant
* @param rec_buffer_size
*/
MessageReceiver(RTPSParticipantImpl* participant, uint32_t rec_buffer_size);

virtual ~MessageReceiver();
//!Reset the MessageReceiver to process a new message.
void reset();
/** Init MessageReceiver. Does what the constructor used to do.
This is now on an independent function since MessageReceiver now stands inside
a struct.
@param rec_buffer_size
**/
void init(uint32_t rec_buffer_size);

/**
* Process a new CDR message.
* @param[in] loc Locator indicating the sending address.
* @param[in] msg Pointer to the message
*/
void processCDRMsg(const Locator_t& loc, CDRMessage_t*msg);

//!Pointer to the Listen Resource that contains this MessageReceiver.

//!Received message
public:

/**
* @param participant
* @param rec_buffer_size
*/
MessageReceiver(
RTPSParticipantImpl* participant,
uint32_t rec_buffer_size);

virtual ~MessageReceiver();

/**
* Process a new CDR message.
* @param[in] loc Locator indicating the sending address.
* @param[in] msg Pointer to the message
*/
void processCDRMsg(
const Locator_t& loc,
CDRMessage_t* msg);

// Functions to associate/remove associatedendpoints
void associateEndpoint(
Endpoint* to_add);
void removeEndpoint(
Endpoint* to_remove);

private:

std::mutex mtx_;
std::vector<RTPSWriter*> associated_writers_;
std::unordered_map<EntityId_t, std::vector<RTPSReader*> > associated_readers_;

RTPSParticipantImpl* participant_;

//!Protocol version of the message
ProtocolVersion_t source_version_;
//!VendorID that created the message
VendorId_t source_vendor_id_;
//!GuidPrefix of the entity that created the message
GuidPrefix_t source_guid_prefix_;
//!GuidPrefix of the entity that receives the message. GuidPrefix of the RTPSParticipant.
GuidPrefix_t dest_guid_prefix_;
//!Has the message timestamp?
bool have_timestamp_;
//!Timestamp associated with the message
Time_t timestamp_;

#if HAVE_SECURITY
CDRMessage_t m_crypto_msg;
CDRMessage_t crypto_msg_;
#endif
// Functions to associate/remove associatedendpoints
void associateEndpoint(Endpoint *to_add);
void removeEndpoint(Endpoint *to_remove);

private:
std::vector<RTPSWriter *> AssociatedWriters;
std::unordered_map<EntityId_t, std::vector<RTPSReader*>> AssociatedReaders;
std::mutex mtx;
//!Protocol version of the message
ProtocolVersion_t sourceVersion;
//!VendorID that created the message
VendorId_t sourceVendorId;
//!GuidPrefix of the entity that created the message
GuidPrefix_t sourceGuidPrefix;
//!GuidPrefix of the entity that receives the message. GuidPrefix of the RTPSParticipant.
GuidPrefix_t destGuidPrefix;
//!Has the message timestamp?
bool haveTimestamp;
//!Timestamp associated with the message
Time_t timestamp;
//!Version of the protocol used by the receiving end.
ProtocolVersion_t destVersion;

uint16_t mMaxPayload_;


/**@name Processing methods.
* These methods are designed to read a part of the message
* and perform the corresponding actions:
* -Modify the message receiver state if necessary.
* -Add information to the history.
* -Return an error if the message is malformed.
* @param[in] msg Pointer to the message
* @param[out] params Different parameters depending on the message
* @return True if correct, false otherwise
*/

///@{
/**
* Check the RTPSHeader of a received message.
* @param msg Pointer to the message.
* @return True if correct.
*/
bool checkRTPSHeader(CDRMessage_t*msg);
/**
* Read the submessage header of a message.
* @param msg Pointer to the CDRMessage_t to read.
* @param smh Pointer to the submessageheader structure.
* @return True if correctly read.
*/
bool readSubmessageHeader(CDRMessage_t*msg, SubmessageHeader_t* smh);

/**
* Find if there is a reader (in AssociatedReaders) that will accept a msg directed
* to the given entity ID.
*/
bool willAReaderAcceptMsgDirectedTo(const EntityId_t & readerID);

/**
* Find all readers (in AssociatedReaders), with the given entity ID, and call the
* callback provided.
*/
template<typename Functor>
void findAllReaders(
const EntityId_t & readerID,
const Functor & callback);

/**
*
* @param msg
* @param smh
* @return
*/
bool proc_Submsg_Data(CDRMessage_t*msg, SubmessageHeader_t* smh);
bool proc_Submsg_DataFrag(CDRMessage_t*msg, SubmessageHeader_t* smh);
bool proc_Submsg_Acknack(CDRMessage_t*msg, SubmessageHeader_t* smh);
bool proc_Submsg_Heartbeat(CDRMessage_t*msg, SubmessageHeader_t* smh);
bool proc_Submsg_Gap(CDRMessage_t*msg, SubmessageHeader_t* smh);
bool proc_Submsg_InfoTS(CDRMessage_t*msg, SubmessageHeader_t* smh);
bool proc_Submsg_InfoDST(CDRMessage_t*msg,SubmessageHeader_t* smh);
bool proc_Submsg_InfoSRC(CDRMessage_t*msg,SubmessageHeader_t* smh);
bool proc_Submsg_NackFrag(CDRMessage_t*msg, SubmessageHeader_t* smh);
bool proc_Submsg_HeartbeatFrag(CDRMessage_t*msg, SubmessageHeader_t* smh);
bool proc_Submsg_SecureMessage(CDRMessage_t*msg, SubmessageHeader_t* smh);
bool proc_Submsg_SecureSubMessage(CDRMessage_t*msg, SubmessageHeader_t* smh);

RTPSParticipantImpl* participant_;

//! Reset the MessageReceiver to process a new message.
void reset();

/**
* Find if there is a reader (in associated_readers_) that will accept a msg directed
* to the given entity ID.
*/
bool willAReaderAcceptMsgDirectedTo(
const EntityId_t& readerID);

/**
* Find all readers (in associated_readers_), with the given entity ID, and call the
* callback provided.
*/
template<typename Functor>
void findAllReaders(
const EntityId_t& readerID,
const Functor& callback);

/**
* Check the RTPSHeader of a received message.
* @param msg Pointer to the message.
* @return True if correct.
*/
bool checkRTPSHeader(
CDRMessage_t* msg);
/**
* Read the submessage header of a message.
* @param msg Pointer to the CDRMessage_t to read.
* @param smh Pointer to the submessageheader structure.
* @return True if correctly read.
*/
bool readSubmessageHeader(
CDRMessage_t* msg,
SubmessageHeader_t* smh);

/**
* @name Processing methods.
* These methods are designed to read a part of the message
* and perform the corresponding actions:
* -Modify the message receiver state if necessary.
* -Add information to the history.
* -Return an error if the message is malformed.
* @param[in,out] msg Pointer to the message
* @param[in] smh Pointer to the submessage header
* @return True if correct, false otherwise
*/
///@{

/**
*
* @param msg
* @param smh
* @return
*/
bool proc_Submsg_Data(
CDRMessage_t* msg,
SubmessageHeader_t* smh);
bool proc_Submsg_DataFrag(
CDRMessage_t* msg,
SubmessageHeader_t* smh);
bool proc_Submsg_Acknack(
CDRMessage_t* msg,
SubmessageHeader_t* smh);
bool proc_Submsg_Heartbeat(
CDRMessage_t* msg,
SubmessageHeader_t* smh);
bool proc_Submsg_Gap(
CDRMessage_t* msg,
SubmessageHeader_t* smh);
bool proc_Submsg_InfoTS(
CDRMessage_t* msg,
SubmessageHeader_t* smh);
bool proc_Submsg_InfoDST(
CDRMessage_t* msg,
SubmessageHeader_t* smh);
bool proc_Submsg_InfoSRC(
CDRMessage_t* msg,
SubmessageHeader_t* smh);
bool proc_Submsg_NackFrag(
CDRMessage_t* msg,
SubmessageHeader_t* smh);
bool proc_Submsg_HeartbeatFrag(
CDRMessage_t* msg,
SubmessageHeader_t* smh);
///@}
};
}

} /* namespace rtps */
} /* namespace fastrtps */
} /* namespace eprosima */

#endif
#endif /* MESSAGERECEIVER_H_ */
Loading

0 comments on commit 54e4a62

Please sign in to comment.