Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable dds flow controller #12983

Merged
merged 4 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/dds/rs-dds-device-proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,11 @@ bool dds_device_proxy::check_fw_compatibility( const std::vector< uint8_t > & im
_dds_dev->device_info().topic_root()
+ realdds::topics::DFU_TOPIC_NAME );
auto writer = std::make_shared< realdds::dds_topic_writer >( topic );
writer->run( realdds::dds_topic_writer::qos( eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS ) );
realdds::dds_topic_writer::qos wqos( eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS );
wqos.publish_mode().kind = eprosima::fastdds::dds::ASYNCHRONOUS_PUBLISH_MODE;
wqos.publish_mode().flow_controller_name = "dfu";
writer->override_qos_from_json( wqos, _dds_dev->participant()->settings().nested( "device", "dfu" ) );
writer->run( wqos );
if( ! writer->wait_for_readers( { 3, 0 } ) )
throw std::runtime_error( "timeout waiting for DFU subscriber" );
auto blob = realdds::topics::blob_msg( std::vector< uint8_t >( image ) );
Expand Down
31 changes: 29 additions & 2 deletions src/dds/rsdds-device-factory.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// License: Apache 2.0. See LICENSE file in root directory.
// Copyright(c) 2023 Intel Corporation. All Rights Reserved.
// Copyright(c) 2023-4 Intel Corporation. All Rights Reserved.

#include "rsdds-device-factory.h"
#include "context.h"
Expand All @@ -10,6 +10,7 @@
#include <realdds/dds-device-watcher.h>
#include <realdds/dds-participant.h>
#include <realdds/dds-device.h>
#include <realdds/dds-serialization.h>
#include <realdds/topics/device-info-msg.h>

#include <rsutils/easylogging/easyloggingpp.h>
Expand Down Expand Up @@ -96,7 +97,33 @@ rsdds_device_factory::rsdds_device_factory( std::shared_ptr< context > const & c
_participant = domain.participant.instance();
if( ! _participant->is_valid() )
{
_participant->init( domain_id, participant_name, dds_settings.default_object() );
realdds::dds_participant::qos qos( participant_name ); // default settings

// As a client, we send messages to a server; sometimes big messages. E.g., for DFU these may be up to
// 20MB... flexible messages are up to 4K. The UDP protocol is supposed to break messages (by default, up to
// 64K) into fragmented packed, but the server may not be able to handle these; certain hardware cannot
// handle more than 1500 bytes!
//
// FastDDS does provide a 'udp/max-message-size' setting (as of v2.10.4), but this applies to both send and
// receive. We want to only limit sends! So we use a new property of FastDDS:
qos.properties().properties().emplace_back( "fastdds.max_message_size", "1470" );
// (overridable with "max-out-message-bytes" in our settings)

// Create a flow-controller that will be used for DFU, because of similar possible IP stack limitations at
// the server side: if we send too many packets all at once, some servers get overloaded...
auto dfu_flow_control = std::make_shared< eprosima::fastdds::rtps::FlowControllerDescriptor >();
dfu_flow_control->name = "dfu";
// Max bytes to be sent to network per period; [1, 2147483647]; default=0 -> no limit.
// -> We allow 256 buffers, each the size of the UDP max-message-size
dfu_flow_control->max_bytes_per_period = 256 * 1470; // qos.transport().user_transports.front()->maxMessageSize;
// -> Every 100ms
dfu_flow_control->period_ms = 100; // default=100
// Further override with settings from device/dfu
realdds::override_flow_controller_from_json( *dfu_flow_control, dds_settings.nested( "device", "dfu" ) );
qos.flow_controllers().push_back( dfu_flow_control );

// qos will get further overriden with the settings we pass in
_participant->init( domain_id, qos, dds_settings.default_object() );
}
else if( participant_name_j.exists() && participant_name != _participant->name() )
{
Expand Down
16 changes: 10 additions & 6 deletions third-party/realdds/scripts/topic-send.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,17 @@ def e( *a, **kw ):

dds.debug( args.debug )

max_sample_size = 1470 # assuming ~1500 max packet size at destination IP stack
flow_period_bytes = 256 * max_sample_size # 256=quarter of number of buffers available at destination
flow_period_ms = 100 # how often to send
settings = {
'flow-controllers': {
'blob': {
'max-bytes-per-period': 256 * 1470,
'period-ms': 250
'max-bytes-per-period': flow_period_bytes,
'period-ms': flow_period_ms
}
},
'max-out-message-bytes': 1470
'max-out-message-bytes': max_sample_size
}

participant = dds.participant()
Expand All @@ -68,9 +71,9 @@ def e( *a, **kw ):
sys.exit( 1 )
writer = dds.topic_writer( dds.message.blob.create_topic( participant, topic_path ))
wqos = dds.topic_writer.qos() # reliable
#writer.override_qos_from_json( wqos, { 'publish-mode': { 'flow-control': 'blob' } } )
writer.override_qos_from_json( wqos, { 'publish-mode': { 'flow-control': 'blob' } } )
writer.run( wqos )
if not writer.wait_for_readers( dds.time( 2. ) ):
if not writer.wait_for_readers( dds.time( 3. ) ):
e( 'Timeout waiting for readers' )
sys.exit( 1 )
with open( args.blob, mode='rb' ) as file: # b is important -> binary
Expand All @@ -80,7 +83,8 @@ def e( *a, **kw ):
blob.write_to( writer )
# We must wait for acks, since we use a flow controller and write_to() will return before we've
# actually finished the send
if not writer.wait_for_acks( dds.time( 5. ) ): # seconds
seconds_to_send = blob.size() / flow_period_bytes / (1000. / flow_period_ms)
if not writer.wait_for_acks( dds.time( 5. + seconds_to_send ) ):
e( 'Timeout waiting for ack' )
sys.exit( 1 )
i( f'Acknowledged' )
Expand Down
2 changes: 1 addition & 1 deletion third-party/realdds/scripts/topic-sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def time_arg(x):
raise ValueError( f'--time should be >=0' )
return t
args.add_argument( '--wait', metavar='<seconds>', type=time_arg, default=5., help='seconds to wait for writers (default 5; 0=disable)' )
args.add_argument( '--time', metavar='<seconds>', type=time_arg, default=5., help='runtime before stopping, in seconds (default 0=forever)' )
args.add_argument( '--time', metavar='<seconds>', type=time_arg, help='runtime before stopping, in seconds (default 0=forever)' )
args.add_argument( '--not-ready', action='store_true', help='start output immediately, without waiting for all topics' )
args = args.parse_args()

Expand Down
3 changes: 2 additions & 1 deletion third-party/realdds/src/dds-serialization.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,8 @@ std::ostream & operator<<( std::ostream & os, WriterProxyData const & info )
os << /*field::separator << "durability" << field::group() <<*/ info.m_qos.m_durability;
if( ! ( info.m_qos.m_liveliness == eprosima::fastdds::dds::LivelinessQosPolicy() ) )
os << field::separator << "liveliness" << field::value << info.m_qos.m_liveliness;
if( info.m_qos.m_publishMode.flow_controller_name )
if( info.m_qos.m_publishMode.flow_controller_name
&& info.m_qos.m_publishMode.flow_controller_name != eprosima::fastdds::rtps::FASTDDS_FLOW_CONTROLLER_DEFAULT )
os << field::separator << "flow-controller" << field::value << "'"
<< info.m_qos.m_publishMode.flow_controller_name << "'";
return os;
Expand Down
Loading