Skip to content

Commit

Permalink
PR IntelRealSense#12342 from Eran: syncer debug & AH unit-test
Browse files Browse the repository at this point in the history
  • Loading branch information
maloel authored Nov 1, 2023
2 parents 39f97f5 + bf388a0 commit 8f97c09
Show file tree
Hide file tree
Showing 13 changed files with 273 additions and 60 deletions.
1 change: 1 addition & 0 deletions src/core/enum-helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ std::ostream & operator<<( std::ostream & out, rs2_option option );
bool try_parse( const std::string & option_name, rs2_option & result );

RS2_ENUM_HELPERS( rs2_stream, STREAM )
LRS_EXTENSION_API char const * get_abbr_string( rs2_stream );
RS2_ENUM_HELPERS( rs2_format, FORMAT )
RS2_ENUM_HELPERS( rs2_distortion, DISTORTION )
RS2_ENUM_HELPERS( rs2_camera_info, CAMERA_INFO )
Expand Down
6 changes: 3 additions & 3 deletions src/frame.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ std::ostream & operator<<( std::ostream & s, const frame_interface & f )
}
else
{
s << "[" << f.get_stream()->get_stream_type();
s << "/" << f.get_stream()->get_unique_id();
s << "[" << get_abbr_string( f.get_stream()->get_stream_type() );
s << f.get_stream()->get_unique_id();
s << " " << f.get_header();
s << "]";
}
Expand All @@ -45,7 +45,7 @@ std::ostream & operator<<( std::ostream & s, const frame_interface & f )
std::ostream & operator<<( std::ostream & os, frame_header const & header )
{
os << "#" << header.frame_number;
os << " @" << ( rsutils::string::from() << std::fixed << std::setprecision( 2 ) << (double)header.timestamp ).str();
os << " @" << rsutils::string::from( header.timestamp );
if( header.timestamp_domain != RS2_TIMESTAMP_DOMAIN_HARDWARE_CLOCK )
os << "/" << rs2_timestamp_domain_to_string( header.timestamp_domain );
return os;
Expand Down
105 changes: 62 additions & 43 deletions src/sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ namespace librealsense
_streams_type = {stream_type};

std::ostringstream ss;
ss << rs2_stream_to_string( stream_type );
ss << '/';
ss << get_abbr_string( stream_type );
ss << stream;
_name = ss.str();
}
Expand Down Expand Up @@ -137,6 +136,17 @@ namespace librealsense
_name = create_composite_name(matchers, name);
}

composite_matcher::matcher_queue::matcher_queue()
: q( QUEUE_MAX_SIZE,
[]( frame_holder const & fh )
{
// If queues are overrun, we'll get here
LOG_DEBUG( "DROPPED frame " << fh );
} )
{
}


void composite_matcher::dispatch(frame_holder f, const syncronization_environment& env)
{
clean_inactive_streams(f);
Expand Down Expand Up @@ -172,13 +182,13 @@ namespace librealsense
if( ! matcher->get_active() )
{
matcher->set_active( true );
_frames_queue[matcher.get()].start();
_frames_queue[matcher.get()].q.start();
}
return matcher;
}
}
LOG_DEBUG( "no matcher found for " << rs2_stream_to_string( stream_type ) << '/'
<< stream_id << "; creating matcher from device..." );
LOG_DEBUG( "no matcher found for " << get_abbr_string( stream_type ) << '.' << stream_id
<< "; creating matcher from device..." );

auto sensor = frame.frame->get_sensor().get(); //TODO: Potential deadlock if get_sensor() gets a hold of the last reference of that sensor
auto dev_exist = false;
Expand Down Expand Up @@ -266,7 +276,7 @@ namespace librealsense

// Stop all our queues to wake up anyone waiting on them
for( auto & fq : _frames_queue )
fq.second.stop();
fq.second.q.stop();

// Trickle the stop down to any children
for( auto m : _matchers )
Expand All @@ -289,7 +299,7 @@ namespace librealsense
os << '[';
for( auto m : matchers )
{
auto const & q = _frames_queue[m];
auto const & q = _frames_queue[m].q;
q.peek( [&os]( frame_holder const & fh ) {
os << fh;
} );
Expand All @@ -313,7 +323,7 @@ namespace librealsense
// latest timestamp/frame-number/etc. that we can compare to.
auto const last_arrived = f->get_header();

if( ! _frames_queue[matcher.get()].enqueue( std::move( f ) ) )
if( ! _frames_queue[matcher.get()].q.enqueue( std::move( f ) ) )
// If we get stopped, nothing to do!
return;

Expand Down Expand Up @@ -344,7 +354,7 @@ namespace librealsense
for( auto s = _frames_queue.begin(); s != _frames_queue.end(); s++ )
{
librealsense::matcher * const m = s->first;
if( ! s->second.peek( [&]( frame_holder & fh ) {
if( ! s->second.q.peek( [&]( frame_holder & fh ) {
LOG_IF_ENABLE( "... have " << *fh.frame, env );
frames_arrived.push_back( &fh );
frames_arrived_matchers.push_back( m );
Expand Down Expand Up @@ -395,12 +405,13 @@ namespace librealsense
// something missing, we can't release anything yet...
for( auto i : missing_streams )
{
LOG_IF_ENABLE( "... missing " << i->get_name() << ", next expected "
<< _next_expected[i],
LOG_IF_ENABLE( "... missing " << i->get_name() << ", next expected @"
<< rsutils::string::from( _next_expected[i].value ) << " (from "
<< rsutils::string::from( _next_expected[i].fps ) << " fps)",
env );
if( skip_missing_stream( *curr_sync, i, last_arrived, env ) )
{
LOG_IF_ENABLE( "... ignoring it", env );
LOG_IF_ENABLE( "... cannot be synced; not waiting for it", env );
continue;
}

Expand All @@ -425,7 +436,7 @@ namespace librealsense
frame_holder frame;
int const timeout_ms = 5000;
librealsense::matcher * m = frames_arrived_matchers[index];
_frames_queue[m].dequeue( &frame, timeout_ms );
_frames_queue[m].q.dequeue( &frame, timeout_ms );
match.push_back( std::move( frame ) );
}
}
Expand Down Expand Up @@ -493,7 +504,7 @@ namespace librealsense

for(auto id: inactive_matchers)
{
_frames_queue[_matchers[id].get()].clear();
_frames_queue[_matchers[id].get()].q.clear();
}
}

Expand All @@ -506,9 +517,10 @@ namespace librealsense
if(!missing->get_active())
return true;

auto next_expected = _next_expected[missing];
auto const & next_expected = _next_expected[missing];

if(synced_frame->get_frame_number() - next_expected > 4 || synced_frame->get_frame_number() < next_expected)
if( synced_frame->get_frame_number() - next_expected.value > 4
|| synced_frame->get_frame_number() < next_expected.value )
{
return true;
}
Expand All @@ -518,7 +530,7 @@ namespace librealsense
void frame_number_composite_matcher::update_next_expected(
std::shared_ptr< matcher > const & matcher, const frame_holder & f )
{
_next_expected[matcher.get()] = f.frame->get_frame_number()+1.;
_next_expected[matcher.get()].value = f.frame->get_frame_number()+1.;
}

std::pair<double, double> extract_timestamps(frame_holder & a, frame_holder & b)
Expand Down Expand Up @@ -573,12 +585,12 @@ namespace librealsense
fps = fps_md / 1000.;
if( fps )
{
//LOG_DEBUG( "fps " << fps << " from metadata" );
//LOG_DEBUG( "fps " << rsutils::string::from( fps ) << " from metadata " << *f );
}
else
{
fps = f->get_stream()->get_framerate();
//LOG_DEBUG( "fps " << fps << " from stream framerate" );
//LOG_DEBUG( "fps " << rsutils::string::from( fps ) << " from stream framerate " << *f );
}
return fps;
}
Expand All @@ -592,9 +604,13 @@ namespace librealsense

auto ts = f.frame->get_frame_timestamp();
auto ne = ts + gap;
//LOG_DEBUG( "... next_expected = {timestamp}" << ts << " + {gap}(1000/{fps}" << fps << ") = " << ne );
_next_expected[matcher.get()] = ne;
_next_expected_domain[matcher.get()] = f.frame->get_frame_timestamp_domain();
//LOG_DEBUG( "... next_expected = {timestamp}" << rsutils::string::from( ts ) << " + {gap}(1000/{fps}"
// << rsutils::string::from( fps )
// << ") = " << rsutils::string::from( ne ) );
auto & next_expected = _next_expected[matcher.get()];
next_expected.value = ne;
next_expected.fps = fps;
next_expected.domain = f.frame->get_frame_timestamp_domain();
}

void timestamp_composite_matcher::clean_inactive_streams(frame_holder& f)
Expand All @@ -615,20 +631,16 @@ namespace librealsense

//LOG_IF_ENABLE( "... matcher " << synced[0]->get_name(), env );

auto next_expected = _next_expected[missing];
auto const & next_expected = _next_expected[missing];
// LOG_IF_ENABLE( "... next " << std::fixed << next_expected, env );

auto it = _next_expected_domain.find( missing );
if( it != _next_expected_domain.end() )
if( next_expected.domain != last_arrived.timestamp_domain )
{
if( it->second != last_arrived.timestamp_domain )
{
// LOG_IF_ENABLE( "... not the same domain: frameset not ready!", env );
// D457 dev - return false removed
// because IR has no md, so its ts domain is "system time"
// other streams have md, and their ts domain is "hardware clock"
//return false;
}
// LOG_IF_ENABLE( "... not the same domain: frameset not ready!", env );
// D457 dev - return false removed
// because IR has no md, so its ts domain is "system time"
// other streams have md, and their ts domain is "hardware clock"
//return false;
}

// We want to calculate a cutout for inactive stream detection: if we wait too long past
Expand Down Expand Up @@ -674,7 +686,7 @@ namespace librealsense
auto const fps = get_fps( waiting_to_be_released );

rs2_time_t now = last_arrived.timestamp;
if( now > next_expected )
if( now > next_expected.value )
{
// Wait for the missing stream frame to arrive -- up to a cutout: anything more and we
// let the frameset be ready without it...
Expand All @@ -683,38 +695,45 @@ namespace librealsense
// between the streams we're willing to live with. Each gap is a frame so we are limited
// by the number of frames we're willing to keep (which is our queue limit)
auto threshold = 7 * gap; // really 7+1 because NE is already 1 away
if( now - next_expected < threshold )
if( now - next_expected.value < threshold )
{
//LOG_IF_ENABLE( "... still below cutout of {NE+7*gap}" << ( next_expected + threshold ), env );
//LOG_IF_ENABLE( "... still below cutout of {NE+7*gap}"
// << rsutils::string::from( next_expected + threshold ),
// env );
return false;
}
LOG_IF_ENABLE( "... exceeded cutout of {NE+7*gap}" << ( next_expected + threshold ) << "; deactivating matcher!", env );
LOG_IF_ENABLE( "... exceeded cutout of {NE+7*gap}"
<< rsutils::string::from( next_expected.value + threshold ) << "; deactivating matcher!",
env );

auto const q_it = _frames_queue.find( missing );
if( q_it != _frames_queue.end() )
{
if( q_it->second.empty() )
if( q_it->second.q.empty() )
_frames_queue.erase( q_it );
}
missing->set_active( false );
return true;
}

return ! are_equivalent( waiting_to_be_released->get_frame_timestamp(),
next_expected,
fps ); // should be min fps to match behavior elsewhere?
next_expected.value,
fps );
}

bool timestamp_composite_matcher::are_equivalent( double a, double b, double fps )
{
auto gap = 1000. / fps;
if( abs( a - b ) < (gap / 2) )
{
//LOG_DEBUG( "... " << a << " == " << b << " {diff}" << abs( a - b ) << " < " << (gap / 2) << "{gap/2}" );
//LOG_DEBUG( "... " << rsutils::string::from( a ) << " == " << rsutils::string::from( b ) << " {diff}"
// << abs( a - b ) << " < " << rsutils::string::from( gap / 2 ) << "{gap/2}" );
return true;
}

//LOG_DEBUG( "... " << a << " != " << b << " {diff}" << abs( a - b ) << " >= " << ( gap / 2 ) << "{gap/2}" );
//LOG_DEBUG( "... " << rsutils::string::from( a ) << " != " << rsutils::string::from( b ) << " {diff}"
// << rsutils::string::from( abs( a - b ) ) << " >= " << rsutils::string::from( gap / 2 )
// << "{gap/2}" );
return false;
}

Expand All @@ -731,7 +750,7 @@ namespace librealsense
if (!composite)
{
std::vector<frame_holder> match;
std::stringstream frame_string_for_logging;
std::ostringstream frame_string_for_logging;
frame_string_for_logging << f; // Saving frame holder string before moving frame

match.push_back(std::move(f));
Expand Down
19 changes: 16 additions & 3 deletions src/sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "types.h"
#include "archive.h"
#include "core/frame-holder.h"

#include <stdint.h>
#include <vector>
Expand Down Expand Up @@ -110,10 +111,22 @@ namespace librealsense
const frame_holder & f )
= 0;

std::map<matcher*, single_consumer_frame_queue<frame_holder>> _frames_queue;
struct matcher_queue
{
single_consumer_frame_queue< frame_holder > q;

matcher_queue();
};

std::map< matcher *, matcher_queue > _frames_queue;
std::map<stream_id, std::shared_ptr<matcher>> _matchers;
std::map<matcher*, double> _next_expected;
std::map<matcher*, rs2_timestamp_domain> _next_expected_domain;
struct next_expected_t
{
double value; // timestamp/frame-number/etc.
double fps;
rs2_timestamp_domain domain;
};
std::map< matcher *, next_expected_t > _next_expected;

std::mutex _mutex;
};
Expand Down
22 changes: 22 additions & 0 deletions src/to-string.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,28 @@ const char * get_string( rs2_stream value )
#undef CASE
}

char const * get_abbr_string( rs2_stream value)
{
switch( value )
{
case RS2_STREAM_ANY: return "Any";
case RS2_STREAM_DEPTH: return "D";
case RS2_STREAM_COLOR: return "C";
case RS2_STREAM_INFRARED: return "IR";
case RS2_STREAM_FISHEYE: return "FE";
case RS2_STREAM_GYRO: return "G";
case RS2_STREAM_ACCEL: return "A";
case RS2_STREAM_GPIO: return "GPIO";
case RS2_STREAM_POSE: return "P";
case RS2_STREAM_CONFIDENCE: return "Conf";
case RS2_STREAM_MOTION: return "M";
default:
assert( !is_valid( value ) );
return "?";
}
}


const char * get_string( rs2_sr300_visual_preset value )
{
#define CASE( X ) STRCASE( SR300_VISUAL_PRESET, X )
Expand Down
11 changes: 11 additions & 0 deletions third-party/rsutils/include/rsutils/string/from.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ struct from
_ss << val;
}

// Specialty conversion: like std::to_string; fixed, high-precision (6)
// Trims ending 0s and reverts to non-fixed notation if '0.' is the result...
explicit from( double val, int precision = 6 );

template< class T >
from & operator<<( const T & val )
{
Expand All @@ -45,5 +49,12 @@ struct from
};


inline std::ostream & operator<<( std::ostream & os, from const & f )
{
// TODO c++20: use .rdbuf()->view()
return os << f.str();
}


} // namespace string
} // namespace rsutils
2 changes: 2 additions & 0 deletions third-party/rsutils/py/pyrsutils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ PYBIND11_MODULE(NAME, m) {
m.def( "executable_path", &rsutils::os::executable_path );
m.def( "executable_name", &rsutils::os::executable_name, py::arg( "with_extension" ) = false );

m.def( "string_from_double", []( double d ) { return rsutils::string::from( d ).str(); } );

using rsutils::version;
py::class_< version >( m, "version" )
.def( py::init<>() )
Expand Down
Loading

0 comments on commit 8f97c09

Please sign in to comment.