Skip to content

Commit

Permalink
Flow Controllers (#1996)
Browse files Browse the repository at this point in the history
* Refs #11504. Added FlowController interface

Refs #11502. Add FlowController factory

Refs #11502. Add FlowControllerFactory

Refs #11503. Initialization of factory

Refs #11503. Initial integration of the factory

Refs #11505. Initial integration with StatelessWriter

Refs #11505. More integration with StatelessWriter

Refs #11509. Update CacheChange_t

Refs #11505. First implementation async thread.

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>

Refs #11505. First integration with StatefulWriter

Refs #11505. Remove old async thread from StatelessWriter

Refs #11528. Integration with StatefulWriter

Refs #11611. Fix compilation error with security

Refs #11527. Start removing be+trasient

Refs #11655. Priorize new ones

Refs #11529. Global RTPSMessageGroup

Refs 11529. Fix deadlock.

Refs #11527. Remove besteffort + transient_local

Remove code not used

Refs #11676. Fix bug with intraprocess delivery

Refs #11676. Prepare for more schedulers

Refs #11676. Refactor calculation of fragments

Refs #11676. Fix error

Refs #11676. Add limitation of sent bytes in RTPSMessageGroup

Refs #11676. Remove TODO

Refs #11676. Remove TODOs

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Ricardo González <ricardo@richiware.dev>

* Refs #11676. Fix compilation errors

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Ricardo González <ricardo@richiware.dev>

* Refs #11676. Fix compilation errors

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Ricardo González <ricardo@richiware.dev>

* Refs #11676. Fixes for discovery server.

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Ricardo González <ricardo@richiware.dev>

* Refs #11676. Little change
Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Ricardo González <ricardo@richiware.dev>

* Refs #11676. Fix error not sending heartbeat when matching a reader

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Ricardo González <ricardo@richiware.dev>

* Refs #11676. Fix error on Windows

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Ricardo González <ricardo@richiware.dev>

* Refs #11676. Send initial heartbeat also when history empty

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Ricardo González <ricardo@richiware.dev>

* Refs #11792. Forget uncomment after testing

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Ricardo González <ricardo@richiware.dev>

* Refs #11676. Fix unprotected access

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Ricardo González <ricardo@richiware.dev>

* Refs #11676. Maintain old code
Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Ricardo González <ricardo@richiware.dev>

* Refs #11676. Fix error in intraprocess

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Ricardo González <ricardo@richiware.dev>

* Refs #11676. Fix error in writer destruction
Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Ricardo González <ricardo@richiware.dev>

* Refs #11676. Fix error with intraprocess.
Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Ricardo González <ricardo@richiware.dev>

* Refs #11676. Fix bug removing change from FlowControllerImpl

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Ricardo González <ricardo@richiware.dev>

* Fix bug using datasharing and intraprocess.

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Ricardo González <ricardo@richiware.dev>

* Refs #11679. Implemented Round Robin
Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Ricardo González <ricardo@richiware.dev>

* Refs #11680. Add HIGH_PRIORITY scheduler

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Ricardo González <ricardo@richiware.dev>

* Refs #11681. Implemented Priority with reservation scheduler.

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Ricardo González <ricardo@richiware.dev>

* Refs #11831. Apply comment suggestions

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Ricardo González <ricardo@richiware.dev>

* Refs #11903. Added more blackbox tests

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Ricardo González <ricardo@richiware.dev>

* Refs #11903. Changes in location of code

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Ricardo González <ricardo@richiware.dev>

* Refs #11903. Move code movements

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Ricardo González <ricardo@richiware.dev>

* Refs #11903. Fix error in test after adding assertion

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Ricardo González <ricardo@richiware.dev>

* Refs #11903. Add skeleton for unit tests

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Ricardo González <ricardo@richiware.dev>

* Refs #11903. Add test for FlowControllerFactory

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Ricardo González <ricardo@richiware.dev>

* Refs #11903. Fix detected old error in test

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Ricardo González <ricardo@richiware.dev>

* Refs #11903. Add unit tests for publish modes

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Ricardo González <ricardo@richiware.dev>

* Refs #11903. Change TEST to TYPED_TEST

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Ricardo González <ricardo@richiware.dev>

* Refs #11903. Added schedulers unit tests

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Ricardo González <ricardo@richiware.dev>

* Refs #11903. Fix compilation error.

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Ricardo González <ricardo@richiware.dev>

* Refs #11961. Fix rebase

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Ricardo González <ricardo@richiware.dev>

* Apply suggestions from code review

Co-authored-by: Miguel Company <miguelcompany@eprosima.com>
Signed-off-by: Ricardo González <ricardo@richiware.dev>

* Refs #11831. Move header to private place

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Ricardo González <ricardo@richiware.dev>

* Refs #11831. Apply suggestions

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Ricardo González <ricardo@richiware.dev>

* Apply suggestions from code review

Co-authored-by: Miguel Company <miguelcompany@eprosima.com>
Signed-off-by: Ricardo González <ricardo@richiware.dev>

* Refs #11831. Apply suggestions

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Ricardo González <ricardo@richiware.dev>

* Refs #11831. Apply suggestions.

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Ricardo González <ricardo@richiware.dev>

* Refs #11831. Removed sending a gap with individually heartbeat.

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Ricardo González <ricardo@richiware.dev>

* Refs #11831. Fix compilation error on Windows

Signed-off-by: Ricardo González <ricardo@richiware.dev>

* Refs #11831. Remove unnecessary condition in cmake

Signed-off-by: Ricardo González <ricardo@richiware.dev>

* Refs #11831. Fix uncrustify

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>

* Apply suggestions from code review

Co-authored-by: Miguel Company <miguelcompany@eprosima.com>
Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>

* Refs #11831. Fix warning in gtest

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>

* Refs #11831. Apply suggestion

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>

* Refs #11831. Apply suggestions

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>

* Refs #11831. Fix error found by reviewer.

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>

* Apply suggestions from code review

Co-authored-by: Miguel Company <miguelcompany@eprosima.com>
Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>

* Refs #11831. Fix compilation error after apply suggestion

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>

* Refs #11831. Apply suggestions on StatelessWriter.

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>

* Refs #11831. Improve unit tests to avoid segfault

Signed-off-by: Ricardo González <ricardo@richiware.dev>

* Refs #11831. Improve FlowControllerRoundRobinSchedule

Signed-off-by: Ricardo González <ricardo@richiware.dev>

* Refs #11831. Avoid not discovery when testing intraprocess performance

Signed-off-by: Ricardo González <ricardo@richiware.dev>

* Refs #11831. Apply suggestions

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>

* Fix build with statistics enabled.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #11831. Factorize common code on FlowQueue.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #11831. Uncrustify

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>

* Refs #11831. Decrease too much large waitings

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>

* Refs #11831. Fix exception on mac setting a socket option

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>

* Refs #11831. Fix inheritance on FlowControllerSyncPublishMode.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #11831. Improvements on FlowControllerRoundRobinSchedule.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #11831. Fallback setting of priority.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #11831. Improvements on FlowControllerHighPrioritySchedule.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #11831. Improvements on FlowControllerPriorityWithReservationSchedule.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #11831. Apply suggestion

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>

* Refs #11831. Fix random fail of a test

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>

* Apply suggestions from code review

Co-authored-by: Miguel Company <miguelcompany@eprosima.com>
Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>

* Refs #11831. Apply suggestions

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>

* Update src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp

Co-authored-by: Miguel Company <miguelcompany@eprosima.com>
Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>

* Refs #11831. Remove assert

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>

Co-authored-by: Miguel Company <miguelcompany@eprosima.com>
  • Loading branch information
richiware and MiguelCompany authored Aug 5, 2021
1 parent c89604e commit 6b55651
Show file tree
Hide file tree
Showing 155 changed files with 11,260 additions and 6,900 deletions.
7 changes: 5 additions & 2 deletions cmake/common/gtest.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,15 @@ macro(add_gtest)
endif()

foreach(GTEST_SOURCE_FILE ${GTEST_SOURCES})
file(STRINGS ${GTEST_SOURCE_FILE} GTEST_TEST_NAMES REGEX ^TEST)
# Normal tests
file(STRINGS ${GTEST_SOURCE_FILE} GTEST_TEST_NAMES REGEX "^([T][Y][P][E][D][_])?TEST")
foreach(GTEST_TEST_NAME ${GTEST_TEST_NAMES})
string(REGEX REPLACE ["\) \(,"] ";" GTEST_TEST_NAME ${GTEST_TEST_NAME})
list(GET GTEST_TEST_NAME 1 GTEST_GROUP_NAME)
list(GET GTEST_TEST_NAME 3 GTEST_TEST_NAME)
add_test(NAME ${GTEST_GROUP_NAME}.${GTEST_TEST_NAME}
COMMAND ${command} --gtest_filter=${GTEST_GROUP_NAME}.${GTEST_TEST_NAME}:*/${GTEST_GROUP_NAME}.${GTEST_TEST_NAME}/*)
COMMAND ${command}
--gtest_filter=${GTEST_GROUP_NAME}.${GTEST_TEST_NAME}:*/${GTEST_GROUP_NAME}.${GTEST_TEST_NAME}/*:${GTEST_GROUP_NAME}/*.${GTEST_TEST_NAME})

# Add environment
set(GTEST_ENVIRONMENT "")
Expand All @@ -84,6 +86,7 @@ macro(add_gtest)
set_property(TEST ${GTEST_GROUP_NAME}.${GTEST_TEST_NAME} PROPERTY LABELS "${GTEST_LABELS}")

endforeach()

endforeach()
else()
add_test(NAME ${test} COMMAND ${command})
Expand Down
9 changes: 5 additions & 4 deletions examples/C++/RTPSTest_as_socket/TestWriterSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,17 @@ void TestWriterSocket::run(
{
for (int i = 0; i < nmsgs; ++i )
{
CacheChange_t* ch = mp_writer->new_change([]() -> uint32_t {
return 255;
}, ALIVE);
CacheChange_t* ch = mp_writer->new_change([]() -> uint32_t
{
return 255;
}, ALIVE);
#if defined(_WIN32)
ch->serializedPayload.length =
sprintf_s((char*)ch->serializedPayload.data, 255, "My example string %d", i) + 1;
#else
ch->serializedPayload.length =
sprintf((char*)ch->serializedPayload.data, "My example string %d", i) + 1;
#endif
#endif // if defined(_WIN32)
printf("Sending: %s\n", (char*)ch->serializedPayload.data);
mp_history->add_change(ch);
}
Expand Down
11 changes: 9 additions & 2 deletions include/fastdds/rtps/attributes/RTPSParticipantAttributes.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <fastrtps/utils/fixed_size_string.hpp>
#include <fastdds/rtps/attributes/RTPSParticipantAllocationAttributes.hpp>
#include <fastdds/rtps/attributes/ServerAttributes.h>
#include <fastdds/rtps/flowcontrol/FlowControllerDescriptor.hpp>

#include <memory>
#include <sstream>
Expand Down Expand Up @@ -438,6 +439,8 @@ class BuiltinAttributes
*/
class RTPSParticipantAttributes
{
using FlowControllerDescriptorList = std::vector<std::shared_ptr<fastdds::rtps::FlowControllerDescriptor>>;

public:

RTPSParticipantAttributes()
Expand Down Expand Up @@ -467,8 +470,9 @@ class RTPSParticipantAttributes
(this->participantID == b.participantID) &&
(this->throughputController == b.throughputController) &&
(this->useBuiltinTransports == b.useBuiltinTransports) &&
(this->properties == b.properties &&
(this->prefix == b.prefix));
(this->properties == b.properties) &&
(this->prefix == b.prefix) &&
(this->flow_controllers == b.flow_controllers);
}

/**
Expand Down Expand Up @@ -542,6 +546,9 @@ class RTPSParticipantAttributes
return name.c_str();
}

//! Flow controllers.
FlowControllerDescriptorList flow_controllers;

private:

//!Name of the participant.
Expand Down
4 changes: 4 additions & 0 deletions include/fastdds/rtps/attributes/WriterAttributes.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <fastdds/rtps/common/Guid.h>
#include <fastdds/rtps/flowcontrol/ThroughputControllerDescriptor.h>
#include <fastdds/rtps/attributes/EndpointAttributes.h>
#include <fastdds/rtps/flowcontrol/FlowControllerConsts.hpp>
#include <fastrtps/utils/collections/ResourceLimitedContainerConfig.hpp>
#include <fastrtps/qos/QosPolicies.h>

Expand Down Expand Up @@ -138,6 +139,9 @@ class WriterAttributes

//! Keep duration to keep a sample before considering it has been acked
Duration_t keep_duration;

//! Flow controller name. Default: fastdds::rtps::FASTDDS_FLOW_CONTROLLER_DEFAULT.
const char* flow_controller_name = fastdds::rtps::FASTDDS_FLOW_CONTROLLER_DEFAULT;
};

} /* namespace rtps */
Expand Down
34 changes: 28 additions & 6 deletions include/fastdds/rtps/common/CacheChange.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,30 @@ namespace eprosima {
namespace fastrtps {
namespace rtps {

/*!
* Specific information for a writer.
*/
struct CacheChangeWriterInfo_t
{
//!Number of DATA / DATA_FRAG submessages sent to the transport (only used in Writers)
size_t num_sent_submessages = 0;
//! Used to link with previous node in a list. Used by FlowControllerImpl.
//! Cannot be cached because there are several comparisons without locking.
CacheChange_t* volatile previous = nullptr;
//! Used to link with next node in a list. Used by FlowControllerImpl.
//! Cannot be cached because there are several comparisons without locking.
CacheChange_t* volatile next = nullptr;
};

/*!
* Specific information for a reader.
*/
struct CacheChangeReaderInfo_t
{
//!Reception TimeStamp (only used in Readers)
Time_t receptionTimestamp;
};

/**
* Structure CacheChange_t, contains information on a specific CacheChange.
* @ingroup COMMON_MODULE
Expand All @@ -57,10 +81,8 @@ struct RTPS_DllAPI CacheChange_t
Time_t sourceTimestamp{};
union
{
//!Reception TimeStamp (only used in Readers)
Time_t receptionTimestamp;
//!Number of DATA / DATA_FRAG submessages sent to the transport (only used in Writers)
size_t num_sent_submessages = 0;
CacheChangeReaderInfo_t reader_info;
CacheChangeWriterInfo_t writer_info;
};

WriteParams write_params{};
Expand Down Expand Up @@ -105,7 +127,7 @@ struct RTPS_DllAPI CacheChange_t
instanceHandle = ch_ptr->instanceHandle;
sequenceNumber = ch_ptr->sequenceNumber;
sourceTimestamp = ch_ptr->sourceTimestamp;
receptionTimestamp = ch_ptr->receptionTimestamp;
reader_info.receptionTimestamp = ch_ptr->reader_info.receptionTimestamp;
write_params = ch_ptr->write_params;
isRead = ch_ptr->isRead;
fragment_size_ = ch_ptr->fragment_size_;
Expand All @@ -128,7 +150,7 @@ struct RTPS_DllAPI CacheChange_t
instanceHandle = ch_ptr->instanceHandle;
sequenceNumber = ch_ptr->sequenceNumber;
sourceTimestamp = ch_ptr->sourceTimestamp;
receptionTimestamp = ch_ptr->receptionTimestamp;
reader_info.receptionTimestamp = ch_ptr->reader_info.receptionTimestamp;
write_params = ch_ptr->write_params;
isRead = ch_ptr->isRead;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ enum class FlowControllerSchedulerPolicy : int32_t
{
//! FIFO scheduler policy: first written sample by user, first sample scheduled to be sent to network.
FIFO,
//! Round Robind scheduler policy: schedules one sample of each DataWriter in circular order.
ROUND_ROBLIN,
//! Round Robin scheduler policy: schedules one sample of each DataWriter in circular order.
ROUND_ROBIN,
//! High priority scheduler policy: samples with highest priority are scheduled first to be sent to network.
HIGH_PRIORITY,
//! Priority with reservation scheduler policy: guarantee each DataWriter's minimum reservation of throughput.
Expand Down
8 changes: 8 additions & 0 deletions include/fastdds/rtps/history/WriterHistory.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class WriterHistory : public rtps::History
{
friend class RTPSWriter;
friend class PersistentWriter;
friend class IPersistenceService;

WriterHistory(
WriterHistory&&) = delete;
Expand Down Expand Up @@ -133,6 +134,13 @@ class WriterHistory : public rtps::History
SequenceNumber_t m_lastCacheChangeSeqNum;
//!Pointer to the associated RTPSWriter;
RTPSWriter* mp_writer;

uint32_t high_mark_for_frag_ = 0;

private:

void set_fragments(
CacheChange_t* change);
};

} // namespace rtps
Expand Down
Loading

0 comments on commit 6b55651

Please sign in to comment.