Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[19347] Fix encapsulation format in WLP #3784

Merged
merged 10 commits into from
Aug 7, 2023
48 changes: 32 additions & 16 deletions include/fastdds/rtps/builtin/liveliness/WLPListener.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
#include <fastrtps/qos/QosPolicies.h>

namespace eprosima {
namespace fastrtps{
namespace fastrtps {
namespace rtps {

class WLP;
Expand All @@ -39,14 +39,16 @@ struct CacheChange_t;
* Class WLPListener that receives the liveliness messages asserting the liveliness of remote endpoints.
* @ingroup LIVELINESS_MODULE
*/
class WLPListener: public ReaderListener {
class WLPListener : public ReaderListener
{
public:

/**
* @brief Constructor
* @param pwlp Pointer to the writer liveliness protocol
*/
WLPListener(WLP* pwlp);
WLPListener(
WLP* pwlp);

/**
* @brief Destructor
Expand All @@ -60,27 +62,41 @@ class WLPListener: public ReaderListener {
*/
void onNewCacheChangeAdded(
RTPSReader* reader,
const CacheChange_t* const change) override;
const CacheChange_t* const change) override;

private:

/**
* Separate the Key between the GuidPrefix_t and the liveliness Kind
* @param key InstanceHandle_t to separate.
* @param guidP GuidPrefix_t pointer to store the info.
* @param liveliness Liveliness Kind Pointer.
* @return True if correctly separated.
*/
* Separate the Key between the GuidPrefix_t and the liveliness Kind
* @param key InstanceHandle_t to separate.
* @param guidP GuidPrefix_t pointer to store the info.
* @param liveliness Liveliness Kind Pointer.
* @return True if correctly separated.
*/
bool separateKey(
InstanceHandle_t& key,
GuidPrefix_t* guidP,
LivelinessQosPolicyKind* liveliness);

/**
* Compute the key from a CacheChange_t
* @param change
*/
bool computeKey(CacheChange_t* change);
* Compute the key from a CacheChange_t
* @param change
*/
bool computeKey(
CacheChange_t* change);

/**
* @brief Check that the ParticipantMessageData kind is a valid one for WLP and extract the liveliness kind.
*
* @param[in] serialized_kind A pointer to the first octet of the kind array. The function assumes 4 elements
* in the array.
* @param[out] liveliness_kind A reference to the LivelinessQosPolicyKind.
*
* @return True if the kind corresponds with one for WLP, false otherwise.
*/
bool get_wlp_kind(
octet* serialized_kind,
MiguelCompany marked this conversation as resolved.
Show resolved Hide resolved
LivelinessQosPolicyKind& liveliness_kind);

//! A pointer to the writer liveliness protocol
WLP* mp_WLP;
Expand All @@ -89,6 +105,6 @@ class WLPListener: public ReaderListener {

} /* namespace rtps */
} /* namespace eprosima */
}
#endif
} // namespace eprosima
#endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
#endif /* _FASTDDS_RTPS_WLPLISTENER_H_ */
2 changes: 1 addition & 1 deletion include/fastdds/rtps/common/SerializedPayload.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ namespace rtps {
#define PL_CDR_LE 0x0003

#if FASTDDS_IS_BIG_ENDIAN_TARGET
#define DEFAULT_ENCAPSULATION CDR_LE
#define DEFAULT_ENCAPSULATION CDR_BE
#define PL_DEFAULT_ENCAPSULATION PL_CDR_BE
#else
#define DEFAULT_ENCAPSULATION CDR_LE
Expand Down
11 changes: 11 additions & 0 deletions include/fastdds/rtps/messages/CDRMessage.h
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,17 @@ inline bool addParticipantGenericMessage(

///@}

/**
* @brief Skip bytes in serialized buffer
*
* @param msg The CDR message
* @param length The number of bytes to skip
* @return true if skipped, false otherwise
*/
inline bool skip(
CDRMessage_t* msg,
uint32_t length);

} /* namespace CDRMessage */

} /* namespace rtps */
Expand Down
14 changes: 14 additions & 0 deletions include/fastdds/rtps/messages/CDRMessage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1301,6 +1301,20 @@ inline bool CDRMessage::readParticipantGenericMessage(
return true;
}

inline bool CDRMessage::skip(
CDRMessage_t* msg,
uint32_t length)
{
// Validate input
bool ret = (msg != nullptr) && (msg->pos + length <= msg->length);
if (ret)
{
// Advance index the number of specified bytes
msg->pos += length;
}
return ret;
}

} // namespace rtps
} // namespace fastrtps
} // namespace eprosima
Expand Down
4 changes: 2 additions & 2 deletions src/cpp/rtps/builtin/liveliness/WLP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -900,9 +900,9 @@ bool WLP::send_liveliness_message(

if (change != nullptr)
{
change->serializedPayload.encapsulation = (uint16_t)PL_DEFAULT_ENCAPSULATION;
change->serializedPayload.encapsulation = (uint16_t)DEFAULT_ENCAPSULATION;
change->serializedPayload.data[0] = 0;
change->serializedPayload.data[1] = PL_DEFAULT_ENCAPSULATION;
change->serializedPayload.data[1] = DEFAULT_ENCAPSULATION;
change->serializedPayload.data[2] = 0;
change->serializedPayload.data[3] = 0;

Expand Down
88 changes: 72 additions & 16 deletions src/cpp/rtps/builtin/liveliness/WLPListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ void WLPListener::onNewCacheChangeAdded(
std::lock_guard<std::recursive_mutex> guard2(*mp_WLP->mp_builtinProtocols->mp_PDP->getMutex());

GuidPrefix_t guidP;
LivelinessQosPolicyKind livelinessKind;
LivelinessQosPolicyKind livelinessKind = AUTOMATIC_LIVELINESS_QOS;
CacheChange_t* change = (CacheChange_t*)changeIN;
if (!computeKey(change))
{
Expand All @@ -74,23 +74,52 @@ void WLPListener::onNewCacheChangeAdded(
break;
}
}
if (change->serializedPayload.length > 0)

// Serialized payload should have at least 4 bytes of representation header, 12 of GuidPrefix,
// 4 of kind, and 4 of length.
const uint32_t representation_header_size = 4;
const uint32_t guid_prefix_size = 12;
MiguelCompany marked this conversation as resolved.
Show resolved Hide resolved
const uint32_t participant_msg_data_kind_size = 4;
const uint32_t participant_msg_data_length_size = 4;
const uint32_t min_serialized_length = representation_header_size
MiguelCompany marked this conversation as resolved.
Show resolved Hide resolved
+ guid_prefix_size
+ participant_msg_data_kind_size
+ participant_msg_data_length_size;

if (change->serializedPayload.length >= min_serialized_length)
{
if (PL_CDR_BE == change->serializedPayload.data[1])
const uint32_t participant_msg_data_pos = 16;
MiguelCompany marked this conversation as resolved.
Show resolved Hide resolved
const uint32_t encapsulation_pos = 16;
MiguelCompany marked this conversation as resolved.
Show resolved Hide resolved
uint32_t data_length;

// Create CDR message from buffer to deserialize contents for further validation
CDRMessage_t cdr_message(change->serializedPayload);

bool message_ok = (
// Skip representation header
CDRMessage::skip(&cdr_message, representation_header_size)
// Extract GuidPrefix
&& CDRMessage::readData(&cdr_message, guidP.value, guid_prefix_size)
// Skip kind, it will be validated later
&& CDRMessage::skip(&cdr_message, participant_msg_data_length_size)
MiguelCompany marked this conversation as resolved.
Show resolved Hide resolved
// Extract and validate liveliness kind
&& get_wlp_kind(&change->serializedPayload.data[participant_msg_data_pos], livelinessKind)
// Extract data length
&& CDRMessage::readUInt32(&cdr_message, &data_length)
// Check that serialized length is correctly set
&& (change->serializedPayload.length >= min_serialized_length + data_length));

if (message_ok)
{
change->serializedPayload.encapsulation = (uint16_t)PL_CDR_BE;
// Extract encapsulation from the second byte of the representation header.
change->serializedPayload.encapsulation = (uint16_t)change->serializedPayload.data[encapsulation_pos];
MiguelCompany marked this conversation as resolved.
Show resolved Hide resolved
}
else
{
change->serializedPayload.encapsulation = (uint16_t)PL_CDR_LE;
}

for (size_t i = 0; i < 12; ++i)
{
guidP.value[i] = change->serializedPayload.data[i + 4];
logInfo(RTPS_LIVELINESS, "Ignoring incorrect WLP ParticipantDataMessage");
history->remove_change(change);
return;
}
livelinessKind = (LivelinessQosPolicyKind)(change->serializedPayload.data[19] - 0x01);

}
else
{
Expand All @@ -99,6 +128,8 @@ void WLPListener::onNewCacheChangeAdded(
&guidP,
&livelinessKind))
{
logInfo(RTPS_LIVELINESS, "Ignoring not WLP ParticipantDataMessage");
history->remove_change(change);
return;
}
}
Expand Down Expand Up @@ -130,12 +161,13 @@ bool WLPListener::separateKey(
GuidPrefix_t* guidP,
LivelinessQosPolicyKind* liveliness)
{
for (uint8_t i = 0; i < 12; ++i)
bool ret = get_wlp_kind(&key.value[12], *liveliness);
if (ret)
{
guidP->value[i] = key.value[i];
// Extract GuidPrefix
memcpy(guidP->value, key.value, 12);
}
*liveliness = (LivelinessQosPolicyKind)key.value[15];
return true;
return ret;
}

bool WLPListener::computeKey(
Expand All @@ -154,6 +186,30 @@ bool WLPListener::computeKey(
return true;
}

bool WLPListener::get_wlp_kind(
octet* serialized_kind,
LivelinessQosPolicyKind& liveliness_kind)
{
/*
* From RTPS 2.5 9.6.3.1, the ParticipantMessageData kinds for WLP are:
* - PARTICIPANT_MESSAGE_DATA_KIND_AUTOMATIC_LIVELINESS_UPDATE {0x00, 0x00, 0x00, 0x01}
* - PARTICIPANT_MESSAGE_DATA_KIND_MANUAL_LIVELINESS_UPDATE {0x00, 0x00, 0x00, 0x02}
*/
bool is_wlp = (
serialized_kind[0] == 0
&& serialized_kind[1] == 0
&& serialized_kind[2] == 0
&& (serialized_kind[3] == 0x01 || serialized_kind[3] == 0x02));

if (is_wlp)
{
// Adjust and cast to LivelinessQosPolicyKind enum, where AUTOMATIC_LIVELINESS_QOS == 0
liveliness_kind = (LivelinessQosPolicyKind)(serialized_kind[3] - 0x01);
}

return is_wlp;
}

} /* namespace rtps */
} /* namespace eprosima */
} // namespace eprosima