Skip to content

Commit

Permalink
PR IntelRealSense#12983 from Eran: enable dds flow controller
Browse files Browse the repository at this point in the history
  • Loading branch information
maloel authored May 30, 2024
2 parents 6f90805 + bf3b3dd commit 73c623c
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 11 deletions.
6 changes: 5 additions & 1 deletion src/dds/rs-dds-device-proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,11 @@ bool dds_device_proxy::check_fw_compatibility( const std::vector< uint8_t > & im
_dds_dev->device_info().topic_root()
+ realdds::topics::DFU_TOPIC_NAME );
auto writer = std::make_shared< realdds::dds_topic_writer >( topic );
writer->run( realdds::dds_topic_writer::qos( eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS ) );
realdds::dds_topic_writer::qos wqos( eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS );
wqos.publish_mode().kind = eprosima::fastdds::dds::ASYNCHRONOUS_PUBLISH_MODE;
wqos.publish_mode().flow_controller_name = "dfu";
writer->override_qos_from_json( wqos, _dds_dev->participant()->settings().nested( "device", "dfu" ) );
writer->run( wqos );
if( ! writer->wait_for_readers( { 3, 0 } ) )
throw std::runtime_error( "timeout waiting for DFU subscriber" );
auto blob = realdds::topics::blob_msg( std::vector< uint8_t >( image ) );
Expand Down
31 changes: 29 additions & 2 deletions src/dds/rsdds-device-factory.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// License: Apache 2.0. See LICENSE file in root directory.
// Copyright(c) 2023 Intel Corporation. All Rights Reserved.
// Copyright(c) 2023-4 Intel Corporation. All Rights Reserved.

#include "rsdds-device-factory.h"
#include "context.h"
Expand All @@ -10,6 +10,7 @@
#include <realdds/dds-device-watcher.h>
#include <realdds/dds-participant.h>
#include <realdds/dds-device.h>
#include <realdds/dds-serialization.h>
#include <realdds/topics/device-info-msg.h>

#include <rsutils/easylogging/easyloggingpp.h>
Expand Down Expand Up @@ -96,7 +97,33 @@ rsdds_device_factory::rsdds_device_factory( std::shared_ptr< context > const & c
_participant = domain.participant.instance();
if( ! _participant->is_valid() )
{
_participant->init( domain_id, participant_name, dds_settings.default_object() );
realdds::dds_participant::qos qos( participant_name ); // default settings

// As a client, we send messages to a server; sometimes big messages. E.g., for DFU these may be up to
// 20MB... flexible messages are up to 4K. The UDP protocol is supposed to break messages (by default, up to
// 64K) into fragmented packed, but the server may not be able to handle these; certain hardware cannot
// handle more than 1500 bytes!
//
// FastDDS does provide a 'udp/max-message-size' setting (as of v2.10.4), but this applies to both send and
// receive. We want to only limit sends! So we use a new property of FastDDS:
qos.properties().properties().emplace_back( "fastdds.max_message_size", "1470" );
// (overridable with "max-out-message-bytes" in our settings)

// Create a flow-controller that will be used for DFU, because of similar possible IP stack limitations at
// the server side: if we send too many packets all at once, some servers get overloaded...
auto dfu_flow_control = std::make_shared< eprosima::fastdds::rtps::FlowControllerDescriptor >();
dfu_flow_control->name = "dfu";
// Max bytes to be sent to network per period; [1, 2147483647]; default=0 -> no limit.
// -> We allow 256 buffers, each the size of the UDP max-message-size
dfu_flow_control->max_bytes_per_period = 256 * 1470; // qos.transport().user_transports.front()->maxMessageSize;
// -> Every 100ms
dfu_flow_control->period_ms = 100; // default=100
// Further override with settings from device/dfu
realdds::override_flow_controller_from_json( *dfu_flow_control, dds_settings.nested( "device", "dfu" ) );
qos.flow_controllers().push_back( dfu_flow_control );

// qos will get further overriden with the settings we pass in
_participant->init( domain_id, qos, dds_settings.default_object() );
}
else if( participant_name_j.exists() && participant_name != _participant->name() )
{
Expand Down
16 changes: 10 additions & 6 deletions third-party/realdds/scripts/topic-send.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,17 @@ def e( *a, **kw ):

dds.debug( args.debug )

max_sample_size = 1470 # assuming ~1500 max packet size at destination IP stack
flow_period_bytes = 256 * max_sample_size # 256=quarter of number of buffers available at destination
flow_period_ms = 100 # how often to send
settings = {
'flow-controllers': {
'blob': {
'max-bytes-per-period': 256 * 1470,
'period-ms': 250
'max-bytes-per-period': flow_period_bytes,
'period-ms': flow_period_ms
}
},
'max-out-message-bytes': 1470
'max-out-message-bytes': max_sample_size
}

participant = dds.participant()
Expand All @@ -68,9 +71,9 @@ def e( *a, **kw ):
sys.exit( 1 )
writer = dds.topic_writer( dds.message.blob.create_topic( participant, topic_path ))
wqos = dds.topic_writer.qos() # reliable
#writer.override_qos_from_json( wqos, { 'publish-mode': { 'flow-control': 'blob' } } )
writer.override_qos_from_json( wqos, { 'publish-mode': { 'flow-control': 'blob' } } )
writer.run( wqos )
if not writer.wait_for_readers( dds.time( 2. ) ):
if not writer.wait_for_readers( dds.time( 3. ) ):
e( 'Timeout waiting for readers' )
sys.exit( 1 )
with open( args.blob, mode='rb' ) as file: # b is important -> binary
Expand All @@ -80,7 +83,8 @@ def e( *a, **kw ):
blob.write_to( writer )
# We must wait for acks, since we use a flow controller and write_to() will return before we've
# actually finished the send
if not writer.wait_for_acks( dds.time( 5. ) ): # seconds
seconds_to_send = blob.size() / flow_period_bytes / (1000. / flow_period_ms)
if not writer.wait_for_acks( dds.time( 5. + seconds_to_send ) ):
e( 'Timeout waiting for ack' )
sys.exit( 1 )
i( f'Acknowledged' )
Expand Down
2 changes: 1 addition & 1 deletion third-party/realdds/scripts/topic-sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def time_arg(x):
raise ValueError( f'--time should be >=0' )
return t
args.add_argument( '--wait', metavar='<seconds>', type=time_arg, default=5., help='seconds to wait for writers (default 5; 0=disable)' )
args.add_argument( '--time', metavar='<seconds>', type=time_arg, default=5., help='runtime before stopping, in seconds (default 0=forever)' )
args.add_argument( '--time', metavar='<seconds>', type=time_arg, help='runtime before stopping, in seconds (default 0=forever)' )
args.add_argument( '--not-ready', action='store_true', help='start output immediately, without waiting for all topics' )
args = args.parse_args()

Expand Down
3 changes: 2 additions & 1 deletion third-party/realdds/src/dds-serialization.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,8 @@ std::ostream & operator<<( std::ostream & os, WriterProxyData const & info )
os << /*field::separator << "durability" << field::group() <<*/ info.m_qos.m_durability;
if( ! ( info.m_qos.m_liveliness == eprosima::fastdds::dds::LivelinessQosPolicy() ) )
os << field::separator << "liveliness" << field::value << info.m_qos.m_liveliness;
if( info.m_qos.m_publishMode.flow_controller_name )
if( info.m_qos.m_publishMode.flow_controller_name
&& info.m_qos.m_publishMode.flow_controller_name != eprosima::fastdds::rtps::FASTDDS_FLOW_CONTROLLER_DEFAULT )
os << field::separator << "flow-controller" << field::value << "'"
<< info.m_qos.m_publishMode.flow_controller_name << "'";
return os;
Expand Down

0 comments on commit 73c623c

Please sign in to comment.