Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message receiver improvements <1.9.x> [7482] #986

Merged
merged 7 commits into from
Feb 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/fastrtps/qos/ParameterTypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ class ParameterKey_t : public Parameter_t
};
#define PARAMETER_KEY_LENGTH 16

#define PARAMETER_KEY_HASH_LENGTH 16

/**
*
*/
Expand Down
30 changes: 15 additions & 15 deletions include/fastrtps/rtps/common/Types.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,23 +89,23 @@ const Endianness_t DEFAULT_ENDIAN = BIGEND;
const Endianness_t DEFAULT_ENDIAN = LITTLEEND;
#endif

typedef unsigned char octet;
using octet = unsigned char;
//typedef unsigned int uint;
//typedef unsigned short ushort;
typedef unsigned char SubmessageFlag;
typedef uint32_t BuiltinEndpointSet_t;
typedef uint32_t Count_t;

#define BIT0 0x1
#define BIT1 0x2
#define BIT2 0x4
#define BIT3 0x8
#define BIT4 0x10
#define BIT5 0x20
#define BIT6 0x40
#define BIT7 0x80

#define BIT(i) ((i==0) ? BIT0 : (i==1) ? BIT1 :(i==2)?BIT2:(i==3)?BIT3:(i==4)?BIT4:(i==5)?BIT5:(i==6)?BIT6:(i==7)?BIT7:0x0)
using SubmessageFlag = unsigned char;
using BuiltinEndpointSet_t = uint32_t;
using Count_t = uint32_t;

#define BIT0 0x1u
#define BIT1 0x2u
#define BIT2 0x4u
#define BIT3 0x8u
#define BIT4 0x10u
#define BIT5 0x20u
#define BIT6 0x40u
#define BIT7 0x80u

#define BIT(i) (1U << static_cast<unsigned>(i))

//!@brief Structure ProtocolVersion_t, contains the protocol version.
struct RTPS_DllAPI ProtocolVersion_t
Expand Down
259 changes: 140 additions & 119 deletions include/fastrtps/rtps/messages/MessageReceiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,15 @@
* @file MessageReceiver.h
*/



#ifndef MESSAGERECEIVER_H_
#define MESSAGERECEIVER_H_
#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC

#include <unordered_map>
#include "../common/all_common.h"
#include "../../qos/ParameterList.h"

#include <mutex>
#include <unordered_map>

namespace eprosima {
namespace fastrtps {
Expand All @@ -44,126 +42,149 @@ struct SubmessageHeader_t;
*/
class MessageReceiver
{
public:
/**
* @param participant
* @param rec_buffer_size
*/
MessageReceiver(RTPSParticipantImpl* participant, uint32_t rec_buffer_size);

virtual ~MessageReceiver();
//!Reset the MessageReceiver to process a new message.
void reset();
/** Init MessageReceiver. Does what the constructor used to do.
This is now on an independent function since MessageReceiver now stands inside
a struct.
@param rec_buffer_size
**/
void init(uint32_t rec_buffer_size);

/**
* Process a new CDR message.
* @param[in] loc Locator indicating the sending address.
* @param[in] msg Pointer to the message
*/
void processCDRMsg(const Locator_t& loc, CDRMessage_t*msg);

//!Pointer to the Listen Resource that contains this MessageReceiver.

//!Received message
public:

/**
* @param participant
* @param rec_buffer_size
*/
MessageReceiver(
RTPSParticipantImpl* participant,
uint32_t rec_buffer_size);

virtual ~MessageReceiver();

/**
* Process a new CDR message.
* @param[in] loc Locator indicating the sending address.
* @param[in] msg Pointer to the message
*/
void processCDRMsg(
const Locator_t& loc,
CDRMessage_t* msg);

// Functions to associate/remove associatedendpoints
void associateEndpoint(
Endpoint* to_add);
void removeEndpoint(
Endpoint* to_remove);

private:

std::mutex mtx_;
std::vector<RTPSWriter*> associated_writers_;
std::unordered_map<EntityId_t, std::vector<RTPSReader*> > associated_readers_;

RTPSParticipantImpl* participant_;

//!Protocol version of the message
ProtocolVersion_t source_version_;
//!VendorID that created the message
VendorId_t source_vendor_id_;
//!GuidPrefix of the entity that created the message
GuidPrefix_t source_guid_prefix_;
//!GuidPrefix of the entity that receives the message. GuidPrefix of the RTPSParticipant.
GuidPrefix_t dest_guid_prefix_;
//!Has the message timestamp?
bool have_timestamp_;
//!Timestamp associated with the message
Time_t timestamp_;

#if HAVE_SECURITY
CDRMessage_t m_crypto_msg;
CDRMessage_t crypto_msg_;
#endif
// Functions to associate/remove associatedendpoints
void associateEndpoint(Endpoint *to_add);
void removeEndpoint(Endpoint *to_remove);

private:
std::vector<RTPSWriter *> AssociatedWriters;
std::unordered_map<EntityId_t, std::vector<RTPSReader*>> AssociatedReaders;
std::mutex mtx;
//!Protocol version of the message
ProtocolVersion_t sourceVersion;
//!VendorID that created the message
VendorId_t sourceVendorId;
//!GuidPrefix of the entity that created the message
GuidPrefix_t sourceGuidPrefix;
//!GuidPrefix of the entity that receives the message. GuidPrefix of the RTPSParticipant.
GuidPrefix_t destGuidPrefix;
//!Has the message timestamp?
bool haveTimestamp;
//!Timestamp associated with the message
Time_t timestamp;
//!Version of the protocol used by the receiving end.
ProtocolVersion_t destVersion;

uint16_t mMaxPayload_;


/**@name Processing methods.
* These methods are designed to read a part of the message
* and perform the corresponding actions:
* -Modify the message receiver state if necessary.
* -Add information to the history.
* -Return an error if the message is malformed.
* @param[in] msg Pointer to the message
* @param[out] params Different parameters depending on the message
* @return True if correct, false otherwise
*/

///@{
/**
* Check the RTPSHeader of a received message.
* @param msg Pointer to the message.
* @return True if correct.
*/
bool checkRTPSHeader(CDRMessage_t*msg);
/**
* Read the submessage header of a message.
* @param msg Pointer to the CDRMessage_t to read.
* @param smh Pointer to the submessageheader structure.
* @return True if correctly read.
*/
bool readSubmessageHeader(CDRMessage_t*msg, SubmessageHeader_t* smh);

/**
* Find if there is a reader (in AssociatedReaders) that will accept a msg directed
* to the given entity ID.
*/
bool willAReaderAcceptMsgDirectedTo(const EntityId_t & readerID);

/**
* Find all readers (in AssociatedReaders), with the given entity ID, and call the
* callback provided.
*/
template<typename Functor>
void findAllReaders(
const EntityId_t & readerID,
const Functor & callback);

/**
*
* @param msg
* @param smh
* @return
*/
bool proc_Submsg_Data(CDRMessage_t*msg, SubmessageHeader_t* smh);
bool proc_Submsg_DataFrag(CDRMessage_t*msg, SubmessageHeader_t* smh);
bool proc_Submsg_Acknack(CDRMessage_t*msg, SubmessageHeader_t* smh);
bool proc_Submsg_Heartbeat(CDRMessage_t*msg, SubmessageHeader_t* smh);
bool proc_Submsg_Gap(CDRMessage_t*msg, SubmessageHeader_t* smh);
bool proc_Submsg_InfoTS(CDRMessage_t*msg, SubmessageHeader_t* smh);
bool proc_Submsg_InfoDST(CDRMessage_t*msg,SubmessageHeader_t* smh);
bool proc_Submsg_InfoSRC(CDRMessage_t*msg,SubmessageHeader_t* smh);
bool proc_Submsg_NackFrag(CDRMessage_t*msg, SubmessageHeader_t* smh);
bool proc_Submsg_HeartbeatFrag(CDRMessage_t*msg, SubmessageHeader_t* smh);
bool proc_Submsg_SecureMessage(CDRMessage_t*msg, SubmessageHeader_t* smh);
bool proc_Submsg_SecureSubMessage(CDRMessage_t*msg, SubmessageHeader_t* smh);

RTPSParticipantImpl* participant_;

//! Reset the MessageReceiver to process a new message.
void reset();

/**
* Find if there is a reader (in associated_readers_) that will accept a msg directed
* to the given entity ID.
*/
bool willAReaderAcceptMsgDirectedTo(
const EntityId_t& readerID);

/**
* Find all readers (in associated_readers_), with the given entity ID, and call the
* callback provided.
*/
template<typename Functor>
void findAllReaders(
const EntityId_t& readerID,
const Functor& callback);

/**
* Check the RTPSHeader of a received message.
* @param msg Pointer to the message.
* @return True if correct.
*/
bool checkRTPSHeader(
CDRMessage_t* msg);
/**
* Read the submessage header of a message.
* @param msg Pointer to the CDRMessage_t to read.
* @param smh Pointer to the submessageheader structure.
* @return True if correctly read.
*/
bool readSubmessageHeader(
CDRMessage_t* msg,
SubmessageHeader_t* smh);

/**
* @name Processing methods.
* These methods are designed to read a part of the message
* and perform the corresponding actions:
* -Modify the message receiver state if necessary.
* -Add information to the history.
* -Return an error if the message is malformed.
* @param[in,out] msg Pointer to the message
* @param[in] smh Pointer to the submessage header
* @return True if correct, false otherwise
*/
///@{

/**
*
* @param msg
* @param smh
* @return
*/
bool proc_Submsg_Data(
CDRMessage_t* msg,
SubmessageHeader_t* smh);
bool proc_Submsg_DataFrag(
CDRMessage_t* msg,
SubmessageHeader_t* smh);
bool proc_Submsg_Acknack(
CDRMessage_t* msg,
SubmessageHeader_t* smh);
bool proc_Submsg_Heartbeat(
CDRMessage_t* msg,
SubmessageHeader_t* smh);
bool proc_Submsg_Gap(
CDRMessage_t* msg,
SubmessageHeader_t* smh);
bool proc_Submsg_InfoTS(
CDRMessage_t* msg,
SubmessageHeader_t* smh);
bool proc_Submsg_InfoDST(
CDRMessage_t* msg,
SubmessageHeader_t* smh);
bool proc_Submsg_InfoSRC(
CDRMessage_t* msg,
SubmessageHeader_t* smh);
bool proc_Submsg_NackFrag(
CDRMessage_t* msg,
SubmessageHeader_t* smh);
bool proc_Submsg_HeartbeatFrag(
CDRMessage_t* msg,
SubmessageHeader_t* smh);
///@}
};
}

} /* namespace rtps */
} /* namespace fastrtps */
} /* namespace eprosima */

#endif
#endif /* MESSAGERECEIVER_H_ */
Loading