diff --git a/src/dds/rs-dds-device-proxy.cpp b/src/dds/rs-dds-device-proxy.cpp index 945e1764a5..43ef5c2b29 100644 --- a/src/dds/rs-dds-device-proxy.cpp +++ b/src/dds/rs-dds-device-proxy.cpp @@ -615,7 +615,11 @@ bool dds_device_proxy::check_fw_compatibility( const std::vector< uint8_t > & im _dds_dev->device_info().topic_root() + realdds::topics::DFU_TOPIC_NAME ); auto writer = std::make_shared< realdds::dds_topic_writer >( topic ); - writer->run( realdds::dds_topic_writer::qos( eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS ) ); + realdds::dds_topic_writer::qos wqos( eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS ); + wqos.publish_mode().kind = eprosima::fastdds::dds::ASYNCHRONOUS_PUBLISH_MODE; + wqos.publish_mode().flow_controller_name = "dfu"; + writer->override_qos_from_json( wqos, _dds_dev->participant()->settings().nested( "device", "dfu" ) ); + writer->run( wqos ); if( ! writer->wait_for_readers( { 3, 0 } ) ) throw std::runtime_error( "timeout waiting for DFU subscriber" ); auto blob = realdds::topics::blob_msg( std::vector< uint8_t >( image ) ); diff --git a/src/dds/rsdds-device-factory.cpp b/src/dds/rsdds-device-factory.cpp index 6c52989ebc..6aafffba7d 100644 --- a/src/dds/rsdds-device-factory.cpp +++ b/src/dds/rsdds-device-factory.cpp @@ -1,5 +1,5 @@ // License: Apache 2.0. See LICENSE file in root directory. -// Copyright(c) 2023 Intel Corporation. All Rights Reserved. +// Copyright(c) 2023-4 Intel Corporation. All Rights Reserved. #include "rsdds-device-factory.h" #include "context.h" @@ -10,6 +10,7 @@ #include #include #include +#include #include #include @@ -96,7 +97,33 @@ rsdds_device_factory::rsdds_device_factory( std::shared_ptr< context > const & c _participant = domain.participant.instance(); if( ! _participant->is_valid() ) { - _participant->init( domain_id, participant_name, dds_settings.default_object() ); + realdds::dds_participant::qos qos( participant_name ); // default settings + + // As a client, we send messages to a server; sometimes big messages. E.g., for DFU these may be up to + // 20MB... flexible messages are up to 4K. The UDP protocol is supposed to break messages (by default, up to + // 64K) into fragmented packed, but the server may not be able to handle these; certain hardware cannot + // handle more than 1500 bytes! + // + // FastDDS does provide a 'udp/max-message-size' setting (as of v2.10.4), but this applies to both send and + // receive. We want to only limit sends! So we use a new property of FastDDS: + qos.properties().properties().emplace_back( "fastdds.max_message_size", "1470" ); + // (overridable with "max-out-message-bytes" in our settings) + + // Create a flow-controller that will be used for DFU, because of similar possible IP stack limitations at + // the server side: if we send too many packets all at once, some servers get overloaded... + auto dfu_flow_control = std::make_shared< eprosima::fastdds::rtps::FlowControllerDescriptor >(); + dfu_flow_control->name = "dfu"; + // Max bytes to be sent to network per period; [1, 2147483647]; default=0 -> no limit. + // -> We allow 256 buffers, each the size of the UDP max-message-size + dfu_flow_control->max_bytes_per_period = 256 * 1470; // qos.transport().user_transports.front()->maxMessageSize; + // -> Every 100ms + dfu_flow_control->period_ms = 100; // default=100 + // Further override with settings from device/dfu + realdds::override_flow_controller_from_json( *dfu_flow_control, dds_settings.nested( "device", "dfu" ) ); + qos.flow_controllers().push_back( dfu_flow_control ); + + // qos will get further overriden with the settings we pass in + _participant->init( domain_id, qos, dds_settings.default_object() ); } else if( participant_name_j.exists() && participant_name != _participant->name() ) { diff --git a/third-party/realdds/scripts/topic-send.py b/third-party/realdds/scripts/topic-send.py index cbd988de71..035297e261 100644 --- a/third-party/realdds/scripts/topic-send.py +++ b/third-party/realdds/scripts/topic-send.py @@ -42,14 +42,17 @@ def e( *a, **kw ): dds.debug( args.debug ) +max_sample_size = 1470 # assuming ~1500 max packet size at destination IP stack +flow_period_bytes = 256 * max_sample_size # 256=quarter of number of buffers available at destination +flow_period_ms = 100 # how often to send settings = { 'flow-controllers': { 'blob': { - 'max-bytes-per-period': 256 * 1470, - 'period-ms': 250 + 'max-bytes-per-period': flow_period_bytes, + 'period-ms': flow_period_ms } }, - 'max-out-message-bytes': 1470 + 'max-out-message-bytes': max_sample_size } participant = dds.participant() @@ -68,9 +71,9 @@ def e( *a, **kw ): sys.exit( 1 ) writer = dds.topic_writer( dds.message.blob.create_topic( participant, topic_path )) wqos = dds.topic_writer.qos() # reliable - #writer.override_qos_from_json( wqos, { 'publish-mode': { 'flow-control': 'blob' } } ) + writer.override_qos_from_json( wqos, { 'publish-mode': { 'flow-control': 'blob' } } ) writer.run( wqos ) - if not writer.wait_for_readers( dds.time( 2. ) ): + if not writer.wait_for_readers( dds.time( 3. ) ): e( 'Timeout waiting for readers' ) sys.exit( 1 ) with open( args.blob, mode='rb' ) as file: # b is important -> binary @@ -80,7 +83,8 @@ def e( *a, **kw ): blob.write_to( writer ) # We must wait for acks, since we use a flow controller and write_to() will return before we've # actually finished the send - if not writer.wait_for_acks( dds.time( 5. ) ): # seconds + seconds_to_send = blob.size() / flow_period_bytes / (1000. / flow_period_ms) + if not writer.wait_for_acks( dds.time( 5. + seconds_to_send ) ): e( 'Timeout waiting for ack' ) sys.exit( 1 ) i( f'Acknowledged' ) diff --git a/third-party/realdds/scripts/topic-sink.py b/third-party/realdds/scripts/topic-sink.py index f66de33644..cd85f79eec 100644 --- a/third-party/realdds/scripts/topic-sink.py +++ b/third-party/realdds/scripts/topic-sink.py @@ -22,7 +22,7 @@ def time_arg(x): raise ValueError( f'--time should be >=0' ) return t args.add_argument( '--wait', metavar='', type=time_arg, default=5., help='seconds to wait for writers (default 5; 0=disable)' ) -args.add_argument( '--time', metavar='', type=time_arg, default=5., help='runtime before stopping, in seconds (default 0=forever)' ) +args.add_argument( '--time', metavar='', type=time_arg, help='runtime before stopping, in seconds (default 0=forever)' ) args.add_argument( '--not-ready', action='store_true', help='start output immediately, without waiting for all topics' ) args = args.parse_args() diff --git a/third-party/realdds/src/dds-serialization.cpp b/third-party/realdds/src/dds-serialization.cpp index e3e333d04a..c88e6f2d45 100644 --- a/third-party/realdds/src/dds-serialization.cpp +++ b/third-party/realdds/src/dds-serialization.cpp @@ -398,7 +398,8 @@ std::ostream & operator<<( std::ostream & os, WriterProxyData const & info ) os << /*field::separator << "durability" << field::group() <<*/ info.m_qos.m_durability; if( ! ( info.m_qos.m_liveliness == eprosima::fastdds::dds::LivelinessQosPolicy() ) ) os << field::separator << "liveliness" << field::value << info.m_qos.m_liveliness; - if( info.m_qos.m_publishMode.flow_controller_name ) + if( info.m_qos.m_publishMode.flow_controller_name + && info.m_qos.m_publishMode.flow_controller_name != eprosima::fastdds::rtps::FASTDDS_FLOW_CONTROLLER_DEFAULT ) os << field::separator << "flow-controller" << field::value << "'" << info.m_qos.m_publishMode.flow_controller_name << "'"; return os;