From a7071ec49d0c136fef8d3157409b9d33a8116999 Mon Sep 17 00:00:00 2001 From: Aditya Pande Date: Wed, 24 Aug 2022 14:21:37 -0700 Subject: [PATCH] Acoustic comms plugin (#1608) * Added acoustic comms plugin Signed-off-by: Aditya --- examples/worlds/acoustic_comms.sdf | 254 ++++++++++++++++++ .../worlds/acoustic_comms_moving_targets.sdf | 250 +++++++++++++++++ src/systems/CMakeLists.txt | 1 + src/systems/acoustic_comms/AcousticComms.cc | 211 +++++++++++++++ src/systems/acoustic_comms/AcousticComms.hh | 86 ++++++ src/systems/acoustic_comms/CMakeLists.txt | 6 + test/integration/CMakeLists.txt | 1 + test/integration/acoustic_comms.cc | 157 +++++++++++ 8 files changed, 966 insertions(+) create mode 100644 examples/worlds/acoustic_comms.sdf create mode 100644 examples/worlds/acoustic_comms_moving_targets.sdf create mode 100644 src/systems/acoustic_comms/AcousticComms.cc create mode 100644 src/systems/acoustic_comms/AcousticComms.hh create mode 100644 src/systems/acoustic_comms/CMakeLists.txt create mode 100644 test/integration/acoustic_comms.cc diff --git a/examples/worlds/acoustic_comms.sdf b/examples/worlds/acoustic_comms.sdf new file mode 100644 index 0000000000..6b20c698fb --- /dev/null +++ b/examples/worlds/acoustic_comms.sdf @@ -0,0 +1,254 @@ + + + + + + + 0.001 + 1.0 + + + + + + + + + 6 + 1400 + + + + true + 0 0 10 0 0 0 + 1 1 1 1 + 0.5 0.5 0.5 1 + + 1000 + 0.9 + 0.01 + 0.001 + + -0.5 0.1 -0.9 + + + + true + + + + + 0 0 1 + 100 100 + + + + + + + 0 0 1 + 100 100 + + + + 0.8 0.8 0.8 1 + 0.8 0.8 0.8 1 + 0.8 0.8 0.8 1 + + + + + + + 2 0 0.5 0 0 0 + + + + 0.16666 + 0 + 0 + 0.16666 + 0 + 0.16666 + + 1.0 + + + + + 1 1 1 + + + + + + + + 1 1 1 + + + + 1 0 0 1 + 1 0 0 1 + 1 0 0 1 + + + + + +
addr1
+ addr1/rx +
+
+ + + -2 0 0.5 0 0 0 + + + + 0.16666 + 0 + 0 + 0.16666 + 0 + 0.16666 + + 1.0 + + + + + 1 1 1 + + + + + + + + 1 1 1 + + + + 0 0 1 1 + 0 0 1 1 + 0 0 1 1 + + + + + +
addr2
+ addr2/rx + + /broker/bind + /broker/unbind + +
+ +
+ + + 5 0 0.5 0 0 0 + + + + 0.16666 + 0 + 0 + 0.16666 + 0 + 0.16666 + + 1.0 + + + + + 1 1 1 + + + + + + + + 1 1 1 + + + + 1 0 0 1 + 1 0 0 1 + 1 0 0 1 + + + + + +
addr3
+ addr3/rx +
+
+ + + -5 0 0.5 0 0 0 + + + + 0.16666 + 0 + 0 + 0.16666 + 0 + 0.16666 + + 1.0 + + + + + 1 1 1 + + + + + + + + 1 1 1 + + + + 1 0 0 1 + 1 0 0 1 + 1 0 0 1 + + + + + +
addr4
+ addr4/rx +
+
+ +
+
diff --git a/examples/worlds/acoustic_comms_moving_targets.sdf b/examples/worlds/acoustic_comms_moving_targets.sdf new file mode 100644 index 0000000000..1a474f1cbf --- /dev/null +++ b/examples/worlds/acoustic_comms_moving_targets.sdf @@ -0,0 +1,250 @@ + + + + + 0 1 -10 + + + 0.001 + 1.0 + + + + + + + + + 1400 + + + + true + 0 0 10 0 0 0 + 1 1 1 1 + 0.5 0.5 0.5 1 + + 1000 + 0.9 + 0.01 + 0.001 + + -0.5 0.1 -0.9 + + + + true + + + + + 0 0 1 + 100 100 + + + + + + + 0 0 1 + 100 100 + + + + 0.8 0.8 0.8 1 + 0.8 0.8 0.8 1 + 0.8 0.8 0.8 1 + + + + + + + 2 0 1 0 0 0 + + + + 0.16666 + 0 + 0 + 0.16666 + 0 + 0.16666 + + 1.0 + + + + + + + + + + + + + + + 1 0 0 1 + 1 0 0 1 + 1 0 0 1 + + + + + +
addr1
+ addr1/rx +
+
+ + + -2 0 0.5 0 0 0 + + + + 0.16666 + 0 + 0 + 0.16666 + 0 + 0.16666 + + 1.0 + + + + + 1 1 1 + + + + + + + + 1 1 1 + + + + 0 0 1 1 + 0 0 1 1 + 0 0 1 1 + + + + + +
addr2
+ addr2/rx + + /broker/bind + /broker/unbind + +
+ +
+ + + 5 0 0.5 0 0 0 + + + + 0.16666 + 0 + 0 + 0.16666 + 0 + 0.16666 + + 1.0 + + + + + 1 1 1 + + + + + + + + 1 1 1 + + + + 1 0 0 1 + 1 0 0 1 + 1 0 0 1 + + + + + +
addr3
+ addr3/rx +
+
+ + + -5 0 1 0 0 0 + + + + 0.16666 + 0 + 0 + 0.16666 + 0 + 0.16666 + + 1.0 + + + + + + + + + + + + + + + 1 0 0 1 + 1 0 0 1 + 1 0 0 1 + + + + + +
addr4
+ addr4/rx +
+
+ +
+
diff --git a/src/systems/CMakeLists.txt b/src/systems/CMakeLists.txt index 57e8050a75..6c0f5b402b 100644 --- a/src/systems/CMakeLists.txt +++ b/src/systems/CMakeLists.txt @@ -96,6 +96,7 @@ function(gz_add_system system_name) endfunction() add_subdirectory(ackermann_steering) +add_subdirectory(acoustic_comms) add_subdirectory(air_pressure) add_subdirectory(altimeter) add_subdirectory(apply_joint_force) diff --git a/src/systems/acoustic_comms/AcousticComms.cc b/src/systems/acoustic_comms/AcousticComms.cc new file mode 100644 index 0000000000..bab0c84781 --- /dev/null +++ b/src/systems/acoustic_comms/AcousticComms.cc @@ -0,0 +1,211 @@ +/* + * Copyright (C) 2022 Open Source Robotics Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/* + * Development of this module has been funded by the Monterey Bay Aquarium + * Research Institute (MBARI) and the David and Lucile Packard Foundation + */ +#include + +#include +#include +#include +#include "gz/sim/comms/Broker.hh" +#include "gz/sim/comms/MsgManager.hh" +#include "gz/sim/Util.hh" +#include "AcousticComms.hh" + +using namespace gz; +using namespace sim; +using namespace systems; + +/// \brief Private Acoustic comms data class. +class AcousticComms::Implementation +{ + /// \brief Default max range for acoustic comms in metres. + public: double maxRange = 1000.0; + + /// \brief Default speed of sound in air in metres/sec. + public: double speedOfSound = 343.0; + + /// \brief Position of the transmitter at the time the message was + /// sent, or first processed. + public: std::unordered_map + , math::Vector3d> + poseSrcAtMsgTimestamp; +}; + +////////////////////////////////////////////////// +AcousticComms::AcousticComms() + : dataPtr(gz::utils::MakeUniqueImpl()) +{ +} + +////////////////////////////////////////////////// +void AcousticComms::Load( + const Entity &_entity, + std::shared_ptr _sdf, + EntityComponentManager &_ecm, + EventManager &_eventMgr) +{ + if (_sdf->HasElement("max_range")) + { + this->dataPtr->maxRange = _sdf->Get("max_range"); + } + if (_sdf->HasElement("speed_of_sound")) + { + this->dataPtr->speedOfSound = _sdf->Get("speed_of_sound"); + } + + gzmsg << "AcousticComms configured with max range : " << + this->dataPtr->maxRange << " m and speed of sound : " << + this->dataPtr->speedOfSound << " m/s." << std::endl; +} + +////////////////////////////////////////////////// +void AcousticComms::Step( + const UpdateInfo &_info, + const comms::Registry &_currentRegistry, + comms::Registry &_newRegistry, + EntityComponentManager &_ecm) +{ + // Initialize entity if needed. + for (auto & [address, content] : _currentRegistry) + { + if (content.entity == kNullEntity) + { + auto entities = entitiesFromScopedName(content.modelName, _ecm); + if (entities.empty()) + continue; + + auto entityId = *(entities.begin()); + if (entityId == kNullEntity) + continue; + + _newRegistry[address].entity = entityId; + } + } + + for (auto & [address, content] : _currentRegistry) + { + // Reference to the outbound queue for this address. + auto &outbound = content.outboundMsgs; + + // Is the source address bound? + auto itSrc = _currentRegistry.find(address); + bool srcAddressBound = itSrc != _currentRegistry.end(); + + // Is the source address attached to a model? + bool srcAddressAttachedToModel = + srcAddressBound && itSrc->second.entity != kNullEntity; + + comms::DataQueue newOutbound; + + if (srcAddressAttachedToModel) + { + // All these messages need to be processed. + for (auto &msg : outbound) + { + // Is the destination address bound? + auto itDst = _currentRegistry.find(msg->dst_address()); + bool dstAddressBound = itDst != _currentRegistry.end(); + + // Is the destination address attached to a model? + bool dstAddressAttachedToModel = + dstAddressBound && itDst->second.entity != kNullEntity; + + if (!dstAddressAttachedToModel) + continue; + + // The plugin checks the distance travelled by the signal + // so far. If it is more than the maxRange, it is dropped + // and would never reach the destination. + // If it has already reached the destination but not as far + // as maxRange, it is processed. + // If it has reached neither the destination nor the maxRange, + // it is considered in transit. + + if (this->dataPtr->poseSrcAtMsgTimestamp.count(msg) == 0) + { + // This message is being processed for the first time. + // Record the current position of the sender and use it + // for distance calculations. + this->dataPtr->poseSrcAtMsgTimestamp[msg] = + worldPose(itSrc->second.entity, _ecm).Pos(); + } + + const auto& poseSrc = + this->dataPtr->poseSrcAtMsgTimestamp[msg]; + + // Calculate distance between the bodies. + const auto poseDst = worldPose(itDst->second.entity, _ecm).Pos(); + const auto distanceToTransmitter = (poseSrc - poseDst).Length(); + + // Calculate distance covered by the message. + const std::chrono::steady_clock::time_point currTime(_info.simTime); + const auto timeOfTransmission = msg->mutable_header()->stamp(); + + const auto currTimestamp = + std::chrono::nanoseconds(currTime.time_since_epoch()); + const auto packetTimestamp = + std::chrono::seconds(timeOfTransmission.sec()) + + std::chrono::nanoseconds(timeOfTransmission.nsec()); + + const std::chrono::duration deltaT = + currTimestamp - packetTimestamp; + const double distanceCoveredByMessage = deltaT.count() * + this->dataPtr->speedOfSound; + + // Check the msgs that haven't exceeded the maxRange. + if (distanceCoveredByMessage <= this->dataPtr->maxRange) + { + if (distanceCoveredByMessage >= distanceToTransmitter) + { + // This message needs to be processed. + _newRegistry[msg->dst_address()].inboundMsgs.push_back(msg); + this->dataPtr->poseSrcAtMsgTimestamp.erase(msg); + } + else + { + // This message is still in transit, should be kept in the + // outbound buffer of source and not moved to inbound of + // the destination. + newOutbound.push_back(msg); + } + } + else + { + // This message exceeded the maxRange. + // Stop keeping track of the position of its source. + this->dataPtr->poseSrcAtMsgTimestamp.erase(msg); + } + } + } + + // Clear the outbound queue, except for messages that were + // in transit. + _newRegistry[address].outboundMsgs = newOutbound; + } +} + +GZ_ADD_PLUGIN(AcousticComms, + System, + comms::ICommsModel::ISystemConfigure, + comms::ICommsModel::ISystemPreUpdate) + +GZ_ADD_PLUGIN_ALIAS(AcousticComms, + "gz::sim::systems::AcousticComms") diff --git a/src/systems/acoustic_comms/AcousticComms.hh b/src/systems/acoustic_comms/AcousticComms.hh new file mode 100644 index 0000000000..a4e6adedd3 --- /dev/null +++ b/src/systems/acoustic_comms/AcousticComms.hh @@ -0,0 +1,86 @@ +/* + * Copyright (C) 2022 Open Source Robotics Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/* + * Development of this module has been funded by the Monterey Bay Aquarium + * Research Institute (MBARI) and the David and Lucile Packard Foundation + */ +#ifndef GZ_SIM_SYSTEMS_ACOUSTICCOMMS_HH_ +#define GZ_SIM_SYSTEMS_ACOUSTICCOMMS_HH_ + +#include + +#include +#include "gz/sim/comms/ICommsModel.hh" +#include +#include +#include +#include + +namespace gz +{ +namespace sim +{ +// Inline bracket to help doxygen filtering. +inline namespace GZ_SIM_VERSION_NAMESPACE { +namespace systems +{ + /// \brief A comms model that simulates communication using acoustic + /// devices. The model uses simple distance based acoustics model. + /// + /// This system can be configured with the following SDF parameters: + /// + /// * Optional parameters: + /// * : Hard limit on range (meters). No communication will + /// happen beyond this range. Default is 1000. + /// * : Speed of sound in the medium (meters/sec). + /// Default is 343.0 + /// + /// Here's an example: + /// + /// 6 + /// 1400 + /// + + class AcousticComms: + public gz::sim::comms::ICommsModel + { + public: explicit AcousticComms(); + + // Documentation inherited. + public: void Load(const gz::sim::Entity &_entity, + std::shared_ptr _sdf, + gz::sim::EntityComponentManager &_ecm, + gz::sim::EventManager &_eventMgr) override; + + // Documentation inherited. + public: void Step(const gz::sim::UpdateInfo &_info, + const gz::sim::comms::Registry &_currentRegistry, + gz::sim::comms::Registry &_newRegistry, + gz::sim::EntityComponentManager &_ecm) override; + + // Impl pointer + GZ_UTILS_UNIQUE_IMPL_PTR(dataPtr) + }; +} +} +} +} + +#endif diff --git a/src/systems/acoustic_comms/CMakeLists.txt b/src/systems/acoustic_comms/CMakeLists.txt new file mode 100644 index 0000000000..537f4e2551 --- /dev/null +++ b/src/systems/acoustic_comms/CMakeLists.txt @@ -0,0 +1,6 @@ +gz_add_system(acoustic-comms + SOURCES + AcousticComms.cc + PUBLIC_LINK_LIBS + gz-common${GZ_COMMON_VER}::gz-common${GZ_COMMON_VER} +) diff --git a/test/integration/CMakeLists.txt b/test/integration/CMakeLists.txt index 2d5babb5ae..b2c99f0fee 100644 --- a/test/integration/CMakeLists.txt +++ b/test/integration/CMakeLists.txt @@ -2,6 +2,7 @@ set(TEST_TYPE "INTEGRATION") set(tests ackermann_steering_system.cc + acoustic_comms.cc air_pressure_system.cc altimeter_system.cc apply_joint_force_system.cc diff --git a/test/integration/acoustic_comms.cc b/test/integration/acoustic_comms.cc new file mode 100644 index 0000000000..f7ea59db3b --- /dev/null +++ b/test/integration/acoustic_comms.cc @@ -0,0 +1,157 @@ +/* + * Copyright (C) 2022 Open Source Robotics Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * +*/ + +#include +#include + +#include +#include +#include + +#include +#include +#include +#include "gz/sim/Server.hh" +#include "test_config.hh" // NOLINT(build/include) +#include "../helpers/EnvTestFixture.hh" + +using namespace gz; +using namespace sim; + +///////////////////////////////////////////////// +class AcousticCommsTestDefinition +{ +public: + std::string worldFile; + std::string srcAddr; + std::string dstAddr; + int numMsgs; + + AcousticCommsTestDefinition( + std::string worldFile_, std::string srcAddr_, + std::string dstAddr_, int numMsgs_) : + worldFile(worldFile_), srcAddr(srcAddr_), + dstAddr(dstAddr_), numMsgs(numMsgs_) {} +}; + +///////////////////////////////////////////////// +class AcousticCommsTestFixture : + public ::testing::TestWithParam +{ +}; + +TEST_P(AcousticCommsTestFixture, + GZ_UTILS_TEST_DISABLED_ON_WIN32(AcousticCommsTestTemplate)) +{ + auto param = GetParam(); + + // Start server + ServerConfig serverConfig; + const auto sdfFile = + gz::common::joinPaths(std::string(PROJECT_SOURCE_PATH), + "examples", "worlds", param.worldFile); + serverConfig.SetSdfFile(sdfFile); + + Server server(serverConfig); + EXPECT_FALSE(server.Running()); + EXPECT_FALSE(*server.Running(0)); + + // Run server + size_t iters = 1000; + server.Run(true, iters, false); + + int msgCounter = 0; + std::mutex mutex; + auto cb = [&](const msgs::Dataframe &_msg) -> void + { + // Verify msg content + std::lock_guard lock(mutex); + std::string expected = "hello world " + std::to_string(msgCounter); + + gz::msgs::StringMsg receivedMsg; + receivedMsg.ParseFromString(_msg.data()); + EXPECT_EQ(expected, receivedMsg.data()); + msgCounter++; + }; + + // Create subscriber. + gz::transport::Node node; + std::string addr = param.dstAddr; + std::string subscriptionTopic = param.dstAddr + "/rx"; + + // Subscribe to a topic by registering a callback. + auto cbFunc = std::function(cb); + EXPECT_TRUE(node.Subscribe(subscriptionTopic, cbFunc)) + << "Error subscribing to topic [" << subscriptionTopic << "]"; + + // Create publisher. + std::string publicationTopic = "/broker/msgs"; + auto pub = node.Advertise(publicationTopic); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + // Prepare the message. + gz::msgs::Dataframe msg; + msg.set_src_address(param.srcAddr); + msg.set_dst_address(addr); + + // Publish some messages. + gz::msgs::StringMsg payload; + int pubCount = param.numMsgs; + for (int i = 0; i < pubCount; ++i) + { + // Prepare the payload. + payload.set_data("hello world " + std::to_string(i)); + std::string serializedData; + EXPECT_TRUE(payload.SerializeToString(&serializedData)) + << payload.DebugString(); + msg.set_data(serializedData); + EXPECT_TRUE(pub.Publish(msg)); + server.Run(true, 100, false); + } + + // Verify subscriber received all msgs. + bool done = false; + for (int sleep = 0; !done && sleep < 3; ++sleep) + { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + { + std::lock_guard lock(mutex); + done = (msgCounter == pubCount) && (pubCount != 0); + } + } + if (param.numMsgs != 0) + { + EXPECT_TRUE(done); + } + EXPECT_EQ(pubCount, msgCounter); +} + +INSTANTIATE_TEST_SUITE_P( + AcousticCommsTests, + AcousticCommsTestFixture, + ::testing::Values( + AcousticCommsTestDefinition( + "acoustic_comms.sdf", "addr2", "addr1", 3), + AcousticCommsTestDefinition( + "acoustic_comms.sdf", "addr4", "addr3", 0), + // The source is moving and the destination is stationary. + AcousticCommsTestDefinition( + "acoustic_comms_moving_targets.sdf", "addr2", "addr1", 3), + // The source is stationary and the destnation is moving. + AcousticCommsTestDefinition( + "acoustic_comms_moving_targets.sdf", "addr4", "addr3", 3) + ) +);