Skip to content

Commit

Permalink
Fix sensors system parallel updates (#2201)
Browse files Browse the repository at this point in the history

---------

Signed-off-by: Ian Chen <ichen@openrobotics.org>
  • Loading branch information
iche033 authored Oct 16, 2023
1 parent ab0669c commit 5d1c505
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 16 deletions.
67 changes: 51 additions & 16 deletions src/systems/sensors/Sensors.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,23 @@ class gz::sim::systems::SensorsPrivate
/// \brief Mutex to protect rendering data
public: std::mutex renderMutex;

/// \brief Mutex to protect renderUtil changes
public: std::mutex renderUtilMutex;

/// \brief Condition variable to signal rendering thread
///
/// This variable is used to block/unblock operations in the rendering
/// thread. For a more detailed explanation on the flow refer to the
/// documentation on RenderThread.
public: std::condition_variable renderCv;

/// \brief Condition variable to signal update time applied
///
/// This variable is used to block/unblock operations in PostUpdate thread
/// to make sure renderUtil's ECM updates are applied to the scene first
/// before they are overriden by PostUpdate
public: std::condition_variable updateTimeCv;

/// \brief Connection to events::Stop event, used to stop thread
public: common::ConnectionPtr stopConn;

Expand All @@ -141,6 +151,12 @@ class gz::sim::systems::SensorsPrivate
/// \brief Update time for the next rendering iteration
public: std::chrono::steady_clock::duration updateTime;

/// \brief Update time applied in the rendering thread
public: std::chrono::steady_clock::duration updateTimeApplied;

/// \brief Update time to be appplied in the rendering thread
public: std::chrono::steady_clock::duration updateTimeToApply;

/// \brief Next sensors update time
public: std::chrono::steady_clock::duration nextUpdateTime;

Expand Down Expand Up @@ -297,13 +313,13 @@ void SensorsPrivate::RunOnce()
if (!this->scene)
return;

std::chrono::steady_clock::duration updateTimeApplied;
GZ_PROFILE("SensorsPrivate::RunOnce");
{
GZ_PROFILE("Update");
std::unique_lock<std::mutex> timeLock(this->renderMutex);
std::unique_lock<std::mutex> timeLock(this->renderUtilMutex);
this->renderUtil.Update();
updateTimeApplied = this->updateTime;
this->updateTimeApplied = this->updateTime;
this->updateTimeCv.notify_one();
}

bool activeSensorsEmpty = true;
Expand Down Expand Up @@ -334,7 +350,7 @@ void SensorsPrivate::RunOnce()
{
GZ_PROFILE("PreRender");
this->eventManager->Emit<events::PreRender>();
this->scene->SetTime(updateTimeApplied);
this->scene->SetTime(this->updateTimeApplied);
// Update the scene graph manually to improve performance
// We only need to do this once per frame It is important to call
// sensors::RenderingSensor::SetManualSceneUpdate and set it to true
Expand All @@ -361,11 +377,11 @@ void SensorsPrivate::RunOnce()
// safety check to see if reset occurred while we're rendering
// avoid publishing outdated data if reset occurred
std::unique_lock<std::mutex> timeLock(this->renderMutex);
if (updateTimeApplied <= this->updateTime)
if (this->updateTimeApplied <= this->updateTime)
{
// publish data
GZ_PROFILE("RunOnce");
this->sensorManager.RunOnce(updateTimeApplied);
this->sensorManager.RunOnce(this->updateTimeApplied);
this->eventManager->Emit<events::Render>();
}

Expand Down Expand Up @@ -605,8 +621,10 @@ void Sensors::Reset(const UpdateInfo &_info, EntityComponentManager &)
s->SetNextDataUpdateTime(_info.simTime);
}
this->dataPtr->nextUpdateTime = _info.simTime;
std::unique_lock<std::mutex> lock(this->dataPtr->renderMutex);
std::unique_lock<std::mutex> lock2(this->dataPtr->renderUtilMutex);
this->dataPtr->updateTime = _info.simTime;
this->dataPtr->updateTimeToApply = _info.simTime;
this->dataPtr->updateTimeApplied = _info.simTime;
}
}

Expand Down Expand Up @@ -668,7 +686,19 @@ void Sensors::PostUpdate(const UpdateInfo &_info,
if (this->dataPtr->running && this->dataPtr->initialized)
{
{
std::unique_lock<std::mutex> lock(this->dataPtr->renderMutex);
GZ_PROFILE("UpdateFromECM");
// Make sure we do not override the state in renderUtil if there are
// still ECM changes that still need to be propagated to the scene,
// i.e. wait until renderUtil.Update(), has taken place in the
// rendering thread
std::unique_lock<std::mutex> lock(this->dataPtr->renderUtilMutex);
this->dataPtr->updateTimeCv.wait(lock, [this]()
{
return !this->dataPtr->updateAvailable ||
(this->dataPtr->updateTimeToApply ==
this->dataPtr->updateTimeApplied);
});

this->dataPtr->renderUtil.UpdateFromECM(_info, _ecm);
this->dataPtr->updateTime = _info.simTime;
}
Expand Down Expand Up @@ -719,16 +749,21 @@ void Sensors::PostUpdate(const UpdateInfo &_info,
std::unique_lock<std::mutex> lockSensors(this->dataPtr->sensorsMutex);
this->dataPtr->activeSensors =
std::move(this->dataPtr->sensorsToUpdate);
}

this->dataPtr->nextUpdateTime = this->dataPtr->NextUpdateTime(
this->dataPtr->sensorsToUpdate, _info.simTime);
this->dataPtr->nextUpdateTime = this->dataPtr->NextUpdateTime(
this->dataPtr->sensorsToUpdate, _info.simTime);

// Force scene tree update if there are sensors to be created or
// subscribes to the render events. This does not necessary force
// sensors to update. Only active sensors will be updated
this->dataPtr->forceUpdate =
(this->dataPtr->renderUtil.PendingSensors() > 0) ||
hasRenderConnections;
// Force scene tree update if there are sensors to be created or
// subscribes to the render events. This does not necessary force
// sensors to update. Only active sensors will be updated
this->dataPtr->forceUpdate =
(this->dataPtr->renderUtil.PendingSensors() > 0) ||
hasRenderConnections;

{
std::unique_lock<std::mutex> timeLock(this->dataPtr->renderUtilMutex);
this->dataPtr->updateTimeToApply = this->dataPtr->updateTime;
}
{
std::unique_lock<std::mutex> cvLock(this->dataPtr->renderMutex);
Expand Down
6 changes: 6 additions & 0 deletions test/integration/reset_sensors.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ struct MsgReceiver
transport::Node node;

std::atomic<bool> msgReceived = {false};
std::atomic<unsigned int> msgCount = 0;

void Start(const std::string &_topic) {
this->msgReceived = false;
Expand All @@ -94,6 +95,7 @@ struct MsgReceiver
std::lock_guard<std::mutex> lk(this->msgMutex);
this->lastMsg = _msg;
this->msgReceived = true;
this->msgCount++;
}

T Last() {
Expand Down Expand Up @@ -252,6 +254,10 @@ TEST_F(ResetFixture, GZ_UTILS_TEST_DISABLED_ON_MAC(HandleReset))
EXPECT_FLOAT_EQ(centerPix.G(), centerPix.B());
}

// wait until expected no. of messages are received.
// sim runs for 2000 iterations with camera at 10 Hz + 1 msg at t=0
while (imageReceiver.msgCount < 21)
std::this_thread::sleep_for(std::chrono::milliseconds(100));

// Send command to reset to initial state
worldReset();
Expand Down

0 comments on commit 5d1c505

Please sign in to comment.