diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 03b1517abf4..07eb00f15be 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -19,6 +19,7 @@ add_subdirectory(cpp/content_filter) add_subdirectory(cpp/custom_payload_pool) add_subdirectory(cpp/dds) add_subdirectory(cpp/delivery_mechanisms) +add_subdirectory(cpp/discovery_server) add_subdirectory(cpp/hello_world) add_subdirectory(cpp/rtps) add_subdirectory(cpp/xtypes) diff --git a/examples/cpp/dds/CMakeLists.txt b/examples/cpp/dds/CMakeLists.txt index 8585624b003..32ea44c1f70 100644 --- a/examples/cpp/dds/CMakeLists.txt +++ b/examples/cpp/dds/CMakeLists.txt @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -add_subdirectory(DiscoveryServerExample) add_subdirectory(DynamicHelloWorldExample) add_subdirectory(FlowControlExample) add_subdirectory(HelloWorldExampleDataSharing) diff --git a/examples/cpp/dds/DiscoveryServerExample/DiscoveryServerPublisher.h b/examples/cpp/dds/DiscoveryServerExample/DiscoveryServerPublisher.h deleted file mode 100644 index 8397dd5472b..00000000000 --- a/examples/cpp/dds/DiscoveryServerExample/DiscoveryServerPublisher.h +++ /dev/null @@ -1,126 +0,0 @@ -// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/** - * @file DiscoveryServerPublisher.h - * - */ - -#ifndef _EPROSIMA_FASTDDS_EXAMPLES_CPP_DDS_DISCOVERYSERVEREXAMPLE_DISCOVERYSERVERPUBLISHER_H_ -#define _EPROSIMA_FASTDDS_EXAMPLES_CPP_DDS_DISCOVERYSERVEREXAMPLE_DISCOVERYSERVERPUBLISHER_H_ - -#include - -#include -#include -#include - -#include "types/HelloWorldPubSubTypes.hpp" -#include "common.h" - -/** - * Class used to group into a single working unit a Publisher with a DataWriter, its listener, and a TypeSupport member - * corresponding to the HelloWorld datatype - */ -class HelloWorldPublisher -{ -public: - - HelloWorldPublisher(); - - virtual ~HelloWorldPublisher(); - - //! Initialize the publisher - bool init( - const std::string& topic_name, - const std::string& server_address, - unsigned short server_port, - TransportKind transport); - - //! Publish a sample - void publish(); - - //! Run for number samples, publish every sleep seconds - void run( - uint32_t number, - uint32_t sleep); - - //! Return the current state of execution - static bool is_stopped(); - - //! Trigger the end of execution - static void stop(); - -private: - - HelloWorld hello_; - - eprosima::fastdds::dds::DomainParticipant* participant_; - - eprosima::fastdds::dds::Publisher* publisher_; - - eprosima::fastdds::dds::Topic* topic_; - - eprosima::fastdds::dds::DataWriter* writer_; - - eprosima::fastdds::dds::TypeSupport type_; - - /** - * Class handling discovery events - */ - class PubListener : public eprosima::fastdds::dds::DomainParticipantListener - { - public: - - PubListener() - : matched_(0) - { - } - - ~PubListener() override - { - } - - //! Callback executed when a DataReader is matched or unmatched - void on_publication_matched( - eprosima::fastdds::dds::DataWriter* writer, - const eprosima::fastdds::dds::PublicationMatchedStatus& info) override; - - //! Callback executed when a DomainParticipant is discovered, dropped or removed - void on_participant_discovery( - eprosima::fastdds::dds::DomainParticipant* /*participant*/, - eprosima::fastdds::rtps::ParticipantDiscoveryInfo&& info, - bool& should_be_ignored) override; - - private: - - using eprosima::fastdds::dds::DomainParticipantListener::on_participant_discovery; - - //! Number of DataReaders matched to the associated DataWriter - std::atomic matched_; - } - listener_; - - //! Run thread for number samples, publish every sleep seconds - void runThread( - uint32_t number, - uint32_t sleep); - - //! Member used for control flow purposes - static std::atomic stop_; -}; - - - -#endif /* _EPROSIMA_FASTDDS_EXAMPLES_CPP_DDS_DISCOVERYSERVEREXAMPLE_DISCOVERYSERVERPUBLISHER_H_ */ diff --git a/examples/cpp/dds/DiscoveryServerExample/DiscoveryServerServer.h b/examples/cpp/dds/DiscoveryServerExample/DiscoveryServerServer.h deleted file mode 100644 index 0520eb2dc00..00000000000 --- a/examples/cpp/dds/DiscoveryServerExample/DiscoveryServerServer.h +++ /dev/null @@ -1,105 +0,0 @@ -// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/** - * @file DiscoveryServerServer.h - * - */ - -#ifndef _EPROSIMA_FASTDDS_EXAMPLES_CPP_DDS_DISCOVERYSERVEREXAMPLE_DISCOVERYSERVERSERVER_H_ -#define _EPROSIMA_FASTDDS_EXAMPLES_CPP_DDS_DISCOVERYSERVEREXAMPLE_DISCOVERYSERVERSERVER_H_ - -#include -#include -#include - -#include -#include - -#include "common.h" - -/** - * Class with a partipant configured to function as server in the Discovery Server mechanism - */ -class DiscoveryServer -{ -public: - - DiscoveryServer(); - - virtual ~DiscoveryServer(); - - //! Initialize the server - bool init( - const std::string& server_address, - unsigned short server_port, - TransportKind transport, - bool has_connection_server, - const std::string& connection_server_address, - unsigned short connection_server_port); - - //! Run - void run( - unsigned int timeout); - - //! Return the current state of execution - static bool is_stopped(); - - //! Trigger the end of execution - static void stop(); - -private: - - eprosima::fastdds::dds::DomainParticipant* participant_; - - /** - * Class handling discovery events - */ - class ServerListener : public eprosima::fastdds::dds::DomainParticipantListener - { - public: - - ServerListener() - { - } - - ~ServerListener() override - { - } - - //! Callback executed when a DomainParticipant is discovered, dropped or removed - void on_participant_discovery( - eprosima::fastdds::dds::DomainParticipant* /*participant*/, - eprosima::fastdds::rtps::ParticipantDiscoveryInfo&& info, - bool& should_be_ignored) override; - - private: - - using eprosima::fastdds::dds::DomainParticipantListener::on_participant_discovery; - } - listener_; - - //! Member used for control flow purposes - static std::atomic stop_; - - //! Protects terminate condition variable - static std::mutex terminate_cv_mtx_; - - //! Waits during execution until SIGINT or max_messages_ samples are received - static std::condition_variable terminate_cv_; -}; - - - -#endif /* _EPROSIMA_FASTDDS_EXAMPLES_CPP_DDS_DISCOVERYSERVEREXAMPLE_DISCOVERYSERVERSERVER_H_ */ diff --git a/examples/cpp/dds/DiscoveryServerExample/DiscoveryServerSubscriber.h b/examples/cpp/dds/DiscoveryServerExample/DiscoveryServerSubscriber.h deleted file mode 100644 index cc81aad5f6a..00000000000 --- a/examples/cpp/dds/DiscoveryServerExample/DiscoveryServerSubscriber.h +++ /dev/null @@ -1,140 +0,0 @@ -// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/** - * @file DiscoveryServerSubscriber.h - * - */ - -#ifndef _EPROSIMA_FASTDDS_EXAMPLES_CPP_DDS_DISCOVERYSERVEREXAMPLE_DISCOVERYSERVERSUBSCRIBER_H_ -#define _EPROSIMA_FASTDDS_EXAMPLES_CPP_DDS_DISCOVERYSERVEREXAMPLE_DISCOVERYSERVERSUBSCRIBER_H_ - -#include -#include -#include - -#include -#include -#include - -#include "types/HelloWorldPubSubTypes.hpp" -#include "common.h" - -/** - * Class used to group into a single working unit a Subscriber with a DataReader, its listener, and a TypeSupport member - * corresponding to the HelloWorld datatype - */ -class HelloWorldSubscriber -{ -public: - - HelloWorldSubscriber(); - - virtual ~HelloWorldSubscriber(); - - //! Initialize the subscriber - bool init( - const std::string& topic_name, - uint32_t max_messages, - const std::string& server_address, - unsigned short server_port, - TransportKind transport); - - //! RUN the subscriber until number samples are received - void run( - uint32_t number); - - //! Return the current state of execution - static bool is_stopped(); - - //! Trigger the end of execution - static void stop(); - -private: - - eprosima::fastdds::dds::DomainParticipant* participant_; - - eprosima::fastdds::dds::Subscriber* subscriber_; - - eprosima::fastdds::dds::Topic* topic_; - - eprosima::fastdds::dds::DataReader* reader_; - - eprosima::fastdds::dds::TypeSupport type_; - - /** - * Class handling discovery and dataflow events - */ - class SubListener : public eprosima::fastdds::dds::DomainParticipantListener - { - public: - - SubListener() - : matched_(0) - , samples_(0) - , max_messages_(0) - { - } - - ~SubListener() override - { - } - - //! Set the maximum number of messages to receive before exiting - void set_max_messages( - uint32_t max_messages); - - //! Callback executed when a new sample is received - void on_data_available( - eprosima::fastdds::dds::DataReader* reader) override; - - //! Callback executed when a DataWriter is matched or unmatched - void on_subscription_matched( - eprosima::fastdds::dds::DataReader* reader, - const eprosima::fastdds::dds::SubscriptionMatchedStatus& info) override; - - //! Callback executed when a DomainParticipant is discovered, dropped or removed - void on_participant_discovery( - eprosima::fastdds::dds::DomainParticipant* /*participant*/, - eprosima::fastdds::rtps::ParticipantDiscoveryInfo&& info, - bool& should_be_ignored) override; - - private: - - using eprosima::fastdds::dds::DomainParticipantListener::on_participant_discovery; - - HelloWorld hello_; - - //! Number of DataWriters matched to the associated DataReader - int matched_; - - //! Number of samples received - uint32_t samples_; - - //! Number of messages to be received before triggering termination of execution - uint32_t max_messages_; - } - listener_; - - //! Member used for control flow purposes - static std::atomic stop_; - - //! Protects terminate condition variable - static std::mutex terminate_cv_mtx_; - - //! Waits during execution until SIGINT or max_messages_ samples are received - static std::condition_variable terminate_cv_; -}; - -#endif /* _EPROSIMA_FASTDDS_EXAMPLES_CPP_DDS_DISCOVERYSERVEREXAMPLE_DISCOVERYSERVERSUBSCRIBER_H_ */ diff --git a/examples/cpp/dds/DiscoveryServerExample/DiscoveryServer_main.cpp b/examples/cpp/dds/DiscoveryServerExample/DiscoveryServer_main.cpp deleted file mode 100644 index e71d6b04fe0..00000000000 --- a/examples/cpp/dds/DiscoveryServerExample/DiscoveryServer_main.cpp +++ /dev/null @@ -1,331 +0,0 @@ -// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/** - * @file DiscoveryServer_main.cpp - * - */ - -#include - -#include "arg_configuration.h" -#include "DiscoveryServerPublisher.h" -#include "DiscoveryServerServer.h" -#include "DiscoveryServerSubscriber.h" - -enum class EntityKind -{ - PUBLISHER, - SUBSCRIBER, - SERVER, -}; - -int main( - int argc, - char** argv) -{ - int columns; - -#if defined(_WIN32) - char* buf = nullptr; - size_t sz = 0; - if (_dupenv_s(&buf, &sz, "COLUMNS") == 0 && buf != nullptr) - { - columns = strtol(buf, nullptr, 10); - free(buf); - } - else - { - columns = 80; - } -#else - columns = getenv("COLUMNS") ? atoi(getenv("COLUMNS")) : 80; -#endif // if defined(_WIN32) - - EntityKind type = EntityKind::PUBLISHER; - std::string topic_name = "HelloWorldTopic"; - int count = 0; - long sleep = 100; - - // Transport - TransportKind transport = TransportKind::UDPv4; - - // Discovery Server connection - std::string connection_address = "127.0.0.1"; // default ip address - uint16_t connection_port = 16166; // default physical port - bool id_ds_set = false; - - // Discovery Server listening - std::string listening_address = "127.0.0.1"; // default ip address - uint16_t listening_port = 16166; // default physical port - uint32_t timeout = 0; // default DS id - - if (argc > 1) - { - if (!strcmp(argv[1], "publisher")) - { - type = EntityKind::PUBLISHER; - } - else if (!strcmp(argv[1], "subscriber")) - { - type = EntityKind::SUBSCRIBER; - } - else if (!strcmp(argv[1], "server")) - { - type = EntityKind::SERVER; - } - // check if first argument is help, needed because we skip it when parsing - else if (!(strcmp(argv[1], "-h") && strcmp(argv[1], "--help"))) - { - option::printUsage(fwrite, stdout, usage, columns); - return 0; - } - else - { - std::cerr << "ERROR: first argument can only be " << std::endl; - option::printUsage(fwrite, stdout, usage, columns); - return 1; - } - - argc -= (argc > 0); - argv += (argc > 0); // skip program name argv[0] if present - --argc; ++argv; // skip pub/sub argument - option::Stats stats(usage, argc, argv); - std::vector options(stats.options_max); - std::vector buffer(stats.buffer_max); - option::Parser parse(usage, argc, argv, &options[0], &buffer[0]); - - if (parse.error()) - { - option::printUsage(fwrite, stdout, usage, columns); - return 1; - } - - if (options[HELP]) - { - option::printUsage(fwrite, stdout, usage, columns); - return 0; - } - - for (int i = 0; i < parse.optionsCount(); ++i) - { - option::Option& opt = buffer[i]; - switch (opt.index()) - { - case optionIndex::HELP: - // not possible, because handled further above and exits the program - break; - - case optionIndex::TOPIC: - if (type == EntityKind::SERVER) - { - print_warning("publisher|subscriber", opt.name); - } - else - { - topic_name = std::string(opt.arg); - } - break; - - case optionIndex::SAMPLES: - if (type == EntityKind::SERVER) - { - print_warning("publisher|subscriber", opt.name); - } - else - { - count = strtol(opt.arg, nullptr, 10); - } - break; - - case optionIndex::INTERVAL: - if (type == EntityKind::PUBLISHER) - { - sleep = strtol(opt.arg, nullptr, 10); - } - else - { - print_warning("publisher", opt.name); - } - break; - - case optionIndex::TRANSPORT: - { - std::string transport_str(opt.arg); - if (transport_str == "udpv4") - { - transport = TransportKind::UDPv4; - } - else if (transport_str == "udpv6") - { - transport = TransportKind::UDPv6; - } - else if (transport_str == "tcpv4") - { - transport = TransportKind::TCPv4; - } - else if (transport_str == "tcpv6") - { - transport = TransportKind::TCPv6; - } - else - { - print_warning("udpv4|udpv6|tcpv4|tcpv6", opt.name); - } - - break; - } - - case optionIndex::CONNECTION_PORT: - id_ds_set = true; - connection_port = static_cast(strtol(opt.arg, nullptr, 10)); - break; - - case optionIndex::CONNECTION_ADDRESS: - id_ds_set = true; - connection_address = opt.arg; - break; - - case optionIndex::LISTENING_PORT: - if (type != EntityKind::SERVER) - { - print_warning("server", opt.name); - break; - } - listening_port = static_cast(strtol(opt.arg, nullptr, 10)); - break; - - case optionIndex::LISTENING_ADDRESS: - if (type != EntityKind::SERVER) - { - print_warning("server", opt.name); - break; - } - listening_address = opt.arg; - - break; - - case optionIndex::TIMEOUT: - if (type != EntityKind::SERVER) - { - print_warning("server", opt.name); - break; - } - timeout = strtol(opt.arg, nullptr, 10); - break; - - case optionIndex::UNKNOWN_OPT: - std::cerr << "ERROR: " << opt.name << " is not a valid argument." << std::endl; - option::printUsage(fwrite, stdout, usage, columns); - return 1; - break; - } - } - } - else - { - std::cerr << "ERROR: argument is required." << std::endl; - option::printUsage(fwrite, stdout, usage, columns); - return 1; - } - - // Check that ip matches transport - // if (transport == TransportKind::UDPv4 && !eprosima::fastdds::rtps::IPLocator::isIPv4(listening_address)) - // { - // std::cerr << "ERROR: IPv4 is needed to use UDPv4. Wrong IP address: " << listening_address << std::endl; - // option::printUsage(fwrite, stdout, usage, columns); - // return 1; - // } - // else if (transport == TransportKind::UDPv6 && !eprosima::fastdds::rtps::IPLocator::isIPv6(listening_address)) - // { - // std::cerr << "ERROR: IPv6 is needed to use UDPv6. Wrong IP address: " << listening_address << std::endl; - // option::printUsage(fwrite, stdout, usage, columns); - // return 1; - // } - - // Check that a DS has not same ip and port in listening and connection - if (id_ds_set && - type == EntityKind::SERVER && - listening_address == connection_address && - listening_port == connection_port) - { - std::cerr << "ERROR: Discovery Servers ports must be different, " - << " cannot connect to a server with same listening address " - << listening_address << "(" << listening_port << ")" << std::endl; - option::printUsage(fwrite, stdout, usage, columns); - return 1; - } - - switch (type) - { - case EntityKind::PUBLISHER: - { - HelloWorldPublisher mypub; - if (mypub.init( - topic_name, - connection_address, - connection_port, - transport)) - { - mypub.run(static_cast(count), static_cast(sleep)); - } - else - { - std::cerr << "ERROR: when initializing Publisher." << std::endl; - return 1; - } - break; - } - case EntityKind::SUBSCRIBER: - { - HelloWorldSubscriber mysub; - if (mysub.init( - topic_name, - static_cast(count), - connection_address, - connection_port, - transport)) - { - mysub.run(static_cast(count)); - } - else - { - std::cerr << "ERROR: when initializing Subscriber." << std::endl; - return 1; - } - break; - } - case EntityKind::SERVER: - { - DiscoveryServer myserver; - if (myserver.init( - listening_address, - listening_port, - transport, - id_ds_set, - connection_address, - connection_port)) - { - myserver.run(timeout); - } - else - { - std::cerr << "ERROR: when initializing Server." << std::endl; - return 1; - } - break; - } - } - return 0; -} diff --git a/examples/cpp/dds/DiscoveryServerExample/README.md b/examples/cpp/dds/DiscoveryServerExample/README.md deleted file mode 100644 index ef236617f4e..00000000000 --- a/examples/cpp/dds/DiscoveryServerExample/README.md +++ /dev/null @@ -1,65 +0,0 @@ -# Discovery Server Example - -This example demonstrates how communication between a publisher and subscriber can be established through the Discovery -Server mechanism. - -## Execution instructions - -To launch this test open three different consoles: - -In the first one launch: ./DiscoveryServerExample publisher (or DiscoveryServerExample.exe publisher on windows). -In the second one: ./DiscoveryServerExample subscriber (or DiscoveryServerExample.exe subscriber on windows). -In the third one: ./DiscoveryServerExample server (or DiscoveryServerExample.exe server on windows). - -## Arguments - -First argument is `publisher`, `subscriber` or `server` and then the rest of arguments are read unordered - -```sh -Usage: DiscoveryServerExample - -General options: - -h --help - Produce help message. - -Publisher options: - -t --topic= - Topic name (Default: HelloWorldTopic). - -s --samples= - Number of samples to send (Default: 0 => infinite samples). - -i --interval= - Time between samples in milliseconds (Default: 100). - -c --connection-address= - Server address (Default address: 127.0.0.1). - -p --connection-port= - Server listening port (Default port: 16166). - --transport= - Use Transport Protocol [udpv4|udpv6|tcpv4|tcpv6] (Default: udpv4). - -Subscriber options: - -t --topic= - Topic name (Default: HelloWorldTopic). - -s --samples= - Number of samples to wait for (Default: 0 => infinite - samples). - -c --connection-address= - Server address (Default address: 127.0.0.1). - -p --connection-port= - Server listening port (Default port: 16166). - --transport= - Use Transport Protocol [udpv4|udpv6|tcpv4|tcpv6] (Default: udpv4). - -DiscoveryServer options: - --listening-address= - Server address (Default address: 127.0.0.1). - --listening-port= - Server listening port (Default port: 16166). - --transport= - Use Transport Protocol [udpv4|udpv6|tcpv4|tcpv6] (Default: udpv4). - -c --connection-address= - Server address (Default address: 127.0.0.1). - -p --connection-port= - Server listening port (Default port: 16166). - -z --timeout - Number of seconds before finish the process (Default: 0 = till ^C). -``` diff --git a/examples/cpp/dds/DiscoveryServerExample/arg_configuration.h b/examples/cpp/dds/DiscoveryServerExample/arg_configuration.h deleted file mode 100644 index d7a888df0af..00000000000 --- a/examples/cpp/dds/DiscoveryServerExample/arg_configuration.h +++ /dev/null @@ -1,345 +0,0 @@ -// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/** - * @file arg_configuration.h - * - */ - -#ifndef _EPROSIMA_FASTDDS_EXAMPLES_CPP_DDS_DISCOVERYSERVEREXAMPLE_ARG_CONFIGURATION_H_ -#define _EPROSIMA_FASTDDS_EXAMPLES_CPP_DDS_DISCOVERYSERVEREXAMPLE_ARG_CONFIGURATION_H_ - -#include -#include -#include - -#include -#include - -#include "common.h" - -namespace option = eprosima::option; - -const std::regex IPv4_REGEX(R"(^((?:[0-9]{1,3}\.){3}[0-9]{1,3})?:?(?:(\d+))?$)"); - -struct Arg : public option::Arg -{ - static void print_error( - const char* msg1, - const option::Option& opt, - const char* msg2) - { - fprintf(stderr, "%s", msg1); - fwrite(opt.name, opt.namelen, 1, stderr); - fprintf(stderr, "%s", msg2); - } - - static option::ArgStatus Unknown( - const option::Option& option, - bool msg) - { - if (msg) - { - print_error("Unknown option '", option, "'\n"); - } - return option::ARG_ILLEGAL; - } - - static option::ArgStatus Required( - const option::Option& option, - bool msg) - { - if (option.arg != 0 && option.arg[0] != 0) - { - return option::ARG_OK; - } - - if (msg) - { - print_error("Option '", option, "' requires an argument\n"); - } - return option::ARG_ILLEGAL; - } - - static option::ArgStatus Numeric( - const option::Option& option, - bool msg) - { - char* endptr = 0; - if (option.arg != 0 && strtol(option.arg, &endptr, 10)) - { - } - if (endptr != option.arg && *endptr == 0) - { - return option::ARG_OK; - } - - if (msg) - { - print_error("Option '", option, "' requires a numeric argument\n"); - } - return option::ARG_ILLEGAL; - } - - static option::ArgStatus String( - const option::Option& option, - bool msg) - { - if (option.arg != 0) - { - return option::ARG_OK; - } - if (msg) - { - print_error("Option '", option, "' requires a string argument\n"); - } - return option::ARG_ILLEGAL; - } - - static option::ArgStatus Locator( - const option::Option& option, - bool msg) - { - if (option.arg != 0) - { - // we must check if it is a correct ip address plus port number - std::string ip_str(option.arg); - if ( - eprosima::fastdds::rtps::IPLocator::isIPv4(ip_str) || - eprosima::fastdds::rtps::IPLocator::isIPv6(ip_str)) - { - return option::ARG_OK; - } - } - if (msg) - { - print_error("Option '", option, "' requires an v4 or v6 argument\n"); - } - return option::ARG_ILLEGAL; - } - - static option::ArgStatus Transport( - const option::Option& option, - bool msg) - { - if (option.arg != 0) - { - // we must check if it is a correct ip address plus port number - std::string transport = std::string(option.arg); - if ( - // transport == "shm" || - transport == "udpv4" || - transport == "udpv6" || - transport == "tcpv4" || - transport == "tcpv6" - ) - { - return option::ARG_OK; - } - } - if (msg) - { - print_error("Option '", option, "' requires a string argument\n"); - } - return option::ARG_ILLEGAL; - } - -}; - -enum optionIndex -{ - UNKNOWN_OPT, - HELP, - - TOPIC, - SAMPLES, - INTERVAL, - TRANSPORT, - - CONNECTION_ADDRESS, - CONNECTION_PORT, - - LISTENING_ADDRESS, - LISTENING_PORT, - TIMEOUT, -}; - -const option::Descriptor usage[] = { - {UNKNOWN_OPT, 0, "", "", Arg::None, - "Usage: DiscoveryServerExample \n\nGeneral options:" }, - { - HELP, - 0, - "h", - "help", - Arg::None, - " -h \t--help \tProduce help message." - }, - - /// PUBLISHER OPTIONS - {UNKNOWN_OPT, 0, "", "", Arg::None, "\nPublisher options:"}, - { - TOPIC, - 0, - "t", - "topic", - Arg::String, - " -t \t--topic= \tTopic name (Default: HelloWorldTopic)." - }, - { - SAMPLES, - 0, - "s", - "samples", - Arg::Numeric, - " -s \t--samples= \tNumber of samples to send (Default: 0 => infinite samples)." - }, - { - INTERVAL, - 0, - "i", - "interval", - Arg::Numeric, - " -i \t--interval= \tTime between samples in milliseconds (Default: 100)." - }, - { - CONNECTION_ADDRESS, - 0, - "c", - "connection-address", - Arg::String, - " -c \t--connection-address= \tServer address (Default address: 127.0.0.1)." - }, - { - CONNECTION_PORT, - 0, - "p", - "connection-port", - Arg::Numeric, - " -p \t--connection-port= \tServer listening port (Default port: 16166)." - }, - { - TRANSPORT, - 0, - "", - "transport", - Arg::Transport, - " \t--transport \tUse Transport Protocol [udpv4|udpv6|tcpv4|tcpv6] (UDPv4 by default)." - }, - - /// SUBSCRIBER OPTIONS - {UNKNOWN_OPT, 0, "", "", Arg::None, "\nSubscriber options:"}, - { - TOPIC, - 0, - "t", - "topic", - Arg::String, - " -t \t--topic= \tTopic name (Default: HelloWorldTopic)." - }, - { - SAMPLES, - 0, - "s", - "samples", - Arg::Numeric, - " -s \t--samples= \tNumber of samples to send (Default: 0 => infinite samples)." - }, - { - CONNECTION_ADDRESS, - 0, - "c", - "connection-address", - Arg::String, - " -c \t--connection-address= \tServer address (Default address: 127.0.0.1)." - }, - { - CONNECTION_PORT, - 0, - "p", - "connection-port", - Arg::Numeric, - " -p \t--connection-port= \tServer listening port (Default port: 16166)." - }, - { - TRANSPORT, - 0, - "", - "transport", - Arg::Transport, - " \t--transport \tUse Transport Protocol [udpv4|udpv6|tcpv4|tcpv6] (UDPv4 by default)." - }, - - /// SERVER OPTIONS - {UNKNOWN_OPT, 0, "", "", Arg::None, "\nDiscovery Server options:"}, - { - LISTENING_ADDRESS, - 0, - "", - "listening-address", - Arg::String, - " \t--listening-address= \tServer address (Default address: 127.0.0.1)." - }, - { - LISTENING_PORT, - 0, - "", - "listening-port", - Arg::Numeric, - " \t--listening-port= \tServer listening port (Default port: 16166)." - }, - { - TRANSPORT, - 0, - "", - "transport", - Arg::Transport, - " \t--transport \tUse Transport Protocol [udpv4|udpv6|tcpv4|tcpv6] (UDPv4 by default)." - }, - { - CONNECTION_PORT, - 0, - "p", - "connection-port", - Arg::Numeric, - " -p \t--connection-port= \tServer listening port (Default port: 16166)." - }, - { - CONNECTION_ADDRESS, - 0, - "c", - "connection-address", - Arg::String, - " -c \t--connection-address= \tServer address (Default address: 127.0.0.1)." - }, - { - TIMEOUT, - 0, - "z", - "timeout", - Arg::Numeric, - " -z \t--timeout \tNumber of seconds before finish the process (Default: 0 = till ^C). " - }, - - { 0, 0, 0, 0, 0, 0 } -}; - -void print_warning( - std::string type, - const char* opt) -{ - std::cerr << "WARNING: " << opt << " is a " << type << " option, ignoring argument." << std::endl; -} - -#endif /* _EPROSIMA_FASTDDS_EXAMPLES_CPP_DDS_DISCOVERYSERVEREXAMPLE_ARG_CONFIGURATION_H_ */ diff --git a/examples/cpp/discovery_server/Application.cpp b/examples/cpp/discovery_server/Application.cpp new file mode 100644 index 00000000000..00fb74a6863 --- /dev/null +++ b/examples/cpp/discovery_server/Application.cpp @@ -0,0 +1,61 @@ +// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file Application.cpp + * + */ + +#include "Application.hpp" + +#include "ClientPublisherApp.hpp" +#include "ClientSubscriberApp.hpp" +#include "CLIParser.hpp" +#include "ServerApp.hpp" + +using namespace eprosima::fastdds::dds; + +namespace eprosima { +namespace fastdds { +namespace examples { +namespace discovery_server { + +//! Factory method to create a publisher or subscriber +std::shared_ptr Application::make_app( + const CLIParser::ds_example_config& config) +{ + std::shared_ptr entity; + switch (config.entity) + { + case CLIParser::EntityKind::CLIENT_PUBLISHER: + entity = std::make_shared(config.pub_config); + break; + case CLIParser::EntityKind::CLIENT_SUBSCRIBER: + entity = std::make_shared(config.sub_config); + break; + case CLIParser::EntityKind::SERVER: + entity = std::make_shared(config.srv_config); + break; + case CLIParser::EntityKind::UNDEFINED: + default: + throw std::runtime_error("Entity initialization failed"); + break; + } + return entity; +} + +} // namespace discovery_server +} // namespace examples +} // namespace fastdds +} // namespace eprosima diff --git a/examples/cpp/discovery_server/Application.hpp b/examples/cpp/discovery_server/Application.hpp new file mode 100644 index 00000000000..14df294260e --- /dev/null +++ b/examples/cpp/discovery_server/Application.hpp @@ -0,0 +1,55 @@ +// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file Application.hpp + * + */ + +#ifndef FASTDDS_EXAMPLES_CPP_DISCOVERY_SERVER__APPLICATION_HPP +#define FASTDDS_EXAMPLES_CPP_DISCOVERY_SERVER__APPLICATION_HPP + +#include + +#include "CLIParser.hpp" + +namespace eprosima { +namespace fastdds { +namespace examples { +namespace discovery_server { + +class Application +{ +public: + + //! Virtual destructor + virtual ~Application() = default; + + //! Run application + virtual void run() = 0; + + //! Trigger the end of execution + virtual void stop() = 0; + + //! Factory method to create applications based on configuration + static std::shared_ptr make_app( + const CLIParser::ds_example_config& config); +}; + +} // namespace discovery_server +} // namespace examples +} // namespace fastdds +} // namespace eprosima + +#endif /* FASTDDS_EXAMPLES_CPP_DISCOVERY_SERVER__APPLICATION_HPP */ diff --git a/examples/cpp/discovery_server/CLIParser.hpp b/examples/cpp/discovery_server/CLIParser.hpp new file mode 100644 index 00000000000..89a9ffb5073 --- /dev/null +++ b/examples/cpp/discovery_server/CLIParser.hpp @@ -0,0 +1,683 @@ +// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file CLIParser.hpp + * + */ + +#include +#include +#include + +#include + +#include "Helpers.hpp" + +#ifndef FASTDDS_EXAMPLES_CPP_DISCOVERY_SERVER__CLIPARSER_HPP +#define FASTDDS_EXAMPLES_CPP_DISCOVERY_SERVER__CLIPARSER_HPP + +namespace eprosima { +namespace fastdds { +namespace examples { +namespace discovery_server { + +using dds::Log; + +class CLIParser +{ +public: + + CLIParser() = delete; + + //! Entity kind enumeration + enum class EntityKind : uint8_t + { + CLIENT_PUBLISHER, + CLIENT_SUBSCRIBER, + SERVER, + UNDEFINED + }; + + //! Clients common configuration + struct client_config + { + uint16_t connection_port{16166}; + std::string connection_address{"127.0.0.1"}; + }; + + //! Configuration options for both publisher and subscriber clients + struct pubsub_config : public client_config + { + bool reliable{false}; + bool transient_local{false}; + uint16_t samples{0}; + std::string topic_name{"discovery_server_topic"}; + }; + + //! Publisher client configuration structure + struct client_publisher_config : public pubsub_config + { + TransportKind transport_kind{TransportKind::UDPv4}; + uint16_t interval{100}; + }; + + //! Subscriber client configuration structure + struct client_subscriber_config : public pubsub_config + { + TransportKind transport_kind{TransportKind::UDPv4}; + }; + + //! Server configuration structure + //! A server can, in turn, act as a client + struct server_config : public client_config + { + bool is_also_client{false}; + TransportKind transport_kind{TransportKind::UDPv4}; + uint16_t listening_port{16166}; + uint16_t timeout{0}; + std::string listening_address{"127.0.0.1"}; + }; + + //! Configuration structure for the example + struct ds_example_config + { + CLIParser::EntityKind entity = CLIParser::EntityKind::UNDEFINED; + client_publisher_config pub_config; + client_subscriber_config sub_config; + server_config srv_config; + + friend std::ostream& operator << ( + std::ostream& os, + const ds_example_config& config) + { + os << "Entity: " << parse_entity_kind(config.entity) << std::endl; + os << "Common options:" << std::endl; + os << " Transport: " << static_cast(config.pub_config.transport_kind) << std::endl; + + if (config.entity != CLIParser::EntityKind::SERVER || + (config.entity == CLIParser::EntityKind::SERVER && config.srv_config.is_also_client)) + { + os << "Client options:" << std::endl; + os << " Connection address: " << config.pub_config.connection_address << std::endl; + os << " Connection port: " << config.pub_config.connection_port << std::endl; + } + + if (config.entity == CLIParser::EntityKind::CLIENT_PUBLISHER) + { + os << "Publisher options:" << std::endl; + os << " Topic name: " << config.pub_config.topic_name << std::endl; + os << " Samples: " << config.pub_config.samples << std::endl; + os << " Interval: " << config.pub_config.interval << std::endl; + } + else if (config.entity == CLIParser::EntityKind::CLIENT_SUBSCRIBER) + { + os << "Subscriber options:" << std::endl; + os << " Topic name: " << config.sub_config.topic_name << std::endl; + os << " Samples: " << config.sub_config.samples << std::endl; + } + else if (config.entity == CLIParser::EntityKind::SERVER) + { + os << "Server options:" << std::endl; + os << " Listening address: " << config.srv_config.listening_address << std::endl; + os << " Listening port: " << config.srv_config.listening_port << std::endl; + os << " Timeout: " << config.srv_config.timeout << std::endl; + } + + return os; + } + + }; + + /** + * @brief Print usage help message and exit with the given return code + * + * @param return_code return code to exit with + * + * @warning This method finishes the execution of the program with the input return code + */ + static void print_help( + uint8_t return_code) + { + std::cout << "Usage: discovery_server [options]" << std::endl; + std::cout << "" << std::endl; + std::cout << "Entities:" << std::endl; + std::cout << " publisher Run a client publisher entity." << std::endl; + std::cout << " subscriber Run a client subscriber entity." << std::endl; + std::cout << " server Run a server entity." << std::endl; + std::cout << "" << std::endl; + std::cout << " -h, --help Print this help message." << std::endl; + std::cout << "Client options (common to Publisher, Subscriber and Server acting as Client):" << std::endl; + std::cout << " -c , --connection-address Address of the Server to connect to" << std::endl; + std::cout << " (Default address: 127.0.0.1)." << std::endl; + std::cout << " -p , --connection-port Port of the Server to connect to" << std::endl; + std::cout << " (Default port: 16166)." << std::endl; + std::cout << " (0 by default)." << std::endl; + std::cout << " --transport [udpv4|udpv6|tcpv4|tcpv6|shm] " << std::endl; + std::cout << " (udpv4 by default)." << std::endl; + std::cout << "" << std::endl; + std::cout << "Publisher options:" << std::endl; + std::cout << " -t , --topic Topic name" << std::endl; + std::cout << " (Default: discovery_server_topic)." << std::endl; + std::cout << " -r, --reliable Set Reliability QoS as reliable" << std::endl; + std::cout << " (Default: best effort)" << std::endl; + std::cout << " --transient-local Set Durability QoS as transient local" << std::endl; + std::cout << " (Default: volatile)" << std::endl; + std::cout << " -s , --samples Number of samples to send " << std::endl; + std::cout << " (Default: 0 => infinite samples)." << std::endl; + std::cout << " -i , --interval Time between samples in milliseconds" << std::endl; + std::cout << " (Default: 100)." << std::endl; + std::cout << "" << std::endl; + std::cout << "Subscriber options:" << std::endl; + std::cout << " -t , --topic Topic name" << std::endl; + std::cout << " (Default: discovery_server_topic)." << std::endl; + std::cout << " -s , --samples Number of samples to receive" << std::endl; + std::cout << " (Default: 0 => infinite samples)." << std::endl; + std::cout << " -r, --reliable Set Reliability QoS as reliable" << std::endl; + std::cout << " (Default: best effort)" << std::endl; + std::cout << " --transient-local Set Durability QoS as transient local" << std::endl; + std::cout << " (Default: volatile)" << std::endl; + std::cout << "" << std::endl; + std::cout << "Server options:" << std::endl; + std::cout << " --listening-address Server listening address" << std::endl; + std::cout << " (Default address: 127.0.0.1)" << std::endl; + std::cout << " --listening-port Server listening port" << std::endl; + std::cout << " (Default port: 16166)" << std::endl; + std::cout << " --timeout Number of seconds before finish" << std::endl; + std::cout << " the process (Default: 0 = till ^C)." << std::endl; + std::exit(return_code); + } + + /** + * @brief Parse the command line options and return the configuration_config object + * + * @param argc number of arguments + * @param argv array of arguments + * @return configuration_config object with the parsed options + * + * @warning This method finishes the execution of the program if the input arguments are invalid + */ + static ds_example_config parse_cli_options( + int argc, + char* argv[]) + { + ds_example_config config; + + if (argc < 2) + { + EPROSIMA_LOG_ERROR(CLI_PARSER, "missing entity argument"); + print_help(EXIT_FAILURE); + } + + std::string first_argument = argv[1]; + + if (first_argument == "publisher" ) + { + config.entity = CLIParser::EntityKind::CLIENT_PUBLISHER; + } + else if (first_argument == "subscriber") + { + config.entity = CLIParser::EntityKind::CLIENT_SUBSCRIBER; + } + else if (first_argument == "server") + { + config.entity = CLIParser::EntityKind::SERVER; + } + else if (first_argument == "-h" || first_argument == "--help") + { + print_help(EXIT_SUCCESS); + } + else + { + EPROSIMA_LOG_ERROR(CLI_PARSER, "parsing entity argument " + first_argument); + print_help(EXIT_FAILURE); + } + + bool uses_ipv6 = false; + bool listening_address_was_set = false; + bool connection_address_was_set = false; + + for (int i = 2; i < argc; ++i) + { + std::string arg = argv[i]; + + if (arg == "-h" || arg == "--help") + { + print_help(EXIT_SUCCESS); + } + // Common options + else if (arg == "--transport") + { + if (++i < argc) + { + std::string input = argv[i]; + + if (input == "udpv4") + { + config.pub_config.transport_kind = TransportKind::UDPv4; + config.sub_config.transport_kind = TransportKind::UDPv4; + config.srv_config.transport_kind = TransportKind::UDPv4; + } + else if (input == "udpv6") + { + config.pub_config.transport_kind = TransportKind::UDPv6; + config.sub_config.transport_kind = TransportKind::UDPv6; + config.srv_config.transport_kind = TransportKind::UDPv6; + uses_ipv6 = true; + } + else if (input == "tcpv4") + { + config.pub_config.transport_kind = TransportKind::TCPv4; + config.sub_config.transport_kind = TransportKind::TCPv4; + config.srv_config.transport_kind = TransportKind::TCPv4; + } + else if (input == "tcpv6") + { + config.pub_config.transport_kind = TransportKind::TCPv6; + config.sub_config.transport_kind = TransportKind::TCPv6; + config.srv_config.transport_kind = TransportKind::TCPv6; + uses_ipv6 = true; + } + else if (input == "shm") + { + config.pub_config.transport_kind = TransportKind::SHM; + config.sub_config.transport_kind = TransportKind::SHM; + config.srv_config.transport_kind = TransportKind::SHM; + } + else + { + EPROSIMA_LOG_ERROR(CLI_PARSER, "Unkown transport argument: " + input); + print_help(EXIT_FAILURE); + } + } + else + { + EPROSIMA_LOG_ERROR(CLI_PARSER, "parsing transport argument"); + print_help(EXIT_FAILURE); + } + } + // Client options + else if (arg == "-c" || arg == "--connection-address") + { + if (++i < argc) + { + config.pub_config.connection_address = argv[i]; + config.sub_config.connection_address = argv[i]; + config.srv_config.connection_address = argv[i]; + if (config.entity == CLIParser::EntityKind::SERVER) + { + config.srv_config.is_also_client = true; + } + connection_address_was_set = true; + } + else + { + EPROSIMA_LOG_ERROR(CLI_PARSER, "parsing connection-address argument"); + print_help(EXIT_FAILURE); + } + } + else if (arg == "-p" || arg == "--connection-port") + { + if (++i < argc) + { + try + { + int input = std::stoi(argv[i]); + if (input < std::numeric_limits::min() || + input > std::numeric_limits::max()) + { + throw std::out_of_range("port argument " + std::string( + argv[i]) + " out of range [0, 65535]."); + } + else + { + config.pub_config.connection_port = static_cast(input); + config.sub_config.connection_port = static_cast(input); + config.srv_config.connection_port = static_cast(input); + if (config.entity == CLIParser::EntityKind::SERVER) + { + config.srv_config.is_also_client = true; + } + } + } + catch (const std::invalid_argument& e) + { + EPROSIMA_LOG_ERROR(CLI_PARSER, "invalid port argument " + std::string( + argv[i]) + ": " + std::string(e.what())); + print_help(EXIT_FAILURE); + } + catch (const std::out_of_range& e) + { + EPROSIMA_LOG_ERROR(CLI_PARSER, std::string(e.what())); + print_help(EXIT_FAILURE); + } + } + else + { + EPROSIMA_LOG_ERROR(CLI_PARSER, "parsing port argument"); + print_help(EXIT_FAILURE); + } + } + // PubSub options + else if (arg == "-t" || arg == "--topic") + { + if (config.entity == CLIParser::EntityKind::CLIENT_PUBLISHER || + config.entity == CLIParser::EntityKind::CLIENT_SUBSCRIBER) + { + config.pub_config.topic_name = argv[i]; + config.sub_config.topic_name = argv[i]; + } + else + { + EPROSIMA_LOG_ERROR(CLI_PARSER, + "wrong or missing entity for --topic argument: only available for publisher and subscriber"); + print_help(EXIT_FAILURE); + } + } + else if (arg == "-s" || arg == "--samples") + { + if (i + 1 < argc) + { + try + { + int input = std::stoi(argv[++i]); + if (input < std::numeric_limits::min() || + input > std::numeric_limits::max()) + { + throw std::out_of_range("samples argument out of range"); + } + else + { + if (config.entity == CLIParser::EntityKind::CLIENT_PUBLISHER || + config.entity == CLIParser::EntityKind::CLIENT_SUBSCRIBER) + { + config.pub_config.samples = static_cast(input); + config.sub_config.samples = static_cast(input); + } + else + { + EPROSIMA_LOG_ERROR(CLI_PARSER, "entity error or not specified for --samples argument"); + print_help(EXIT_FAILURE); + } + } + } + catch (const std::invalid_argument& e) + { + EPROSIMA_LOG_ERROR(CLI_PARSER, "invalid sample argument for " + arg + ": " + e.what()); + print_help(EXIT_FAILURE); + } + catch (const std::out_of_range& e) + { + EPROSIMA_LOG_ERROR(CLI_PARSER, "samples argument out of range for " + arg + ": " + e.what()); + print_help(EXIT_FAILURE); + } + } + else + { + EPROSIMA_LOG_ERROR(CLI_PARSER, "missing argument for " + arg); + print_help(EXIT_FAILURE); + } + } + else if (arg == "--reliable") + { + config.pub_config.reliable = true; + config.sub_config.reliable = true; + } + else if (arg == "--transient-local") + { + config.pub_config.transient_local = true; + config.sub_config.transient_local = true; + } + // Publisher options + else if (arg == "-i" || arg == "--interval") + { + if (config.entity == CLIParser::EntityKind::CLIENT_PUBLISHER) + { + if (++i < argc) + { + try + { + int input = std::stoi(argv[i]); + if (input < std::numeric_limits::min() || + input > std::numeric_limits::max()) + { + throw std::out_of_range("interval argument out of range"); + } + else + { + config.pub_config.interval = static_cast(input); + } + } + catch (const std::invalid_argument& e) + { + EPROSIMA_LOG_ERROR(CLI_PARSER, "invalid interval argument " + std::string( + argv[i]) + ": " + std::string(e.what())); + print_help(EXIT_FAILURE); + } + catch (const std::out_of_range& e) + { + EPROSIMA_LOG_ERROR(CLI_PARSER, std::string(e.what())); + print_help(EXIT_FAILURE); + } + } + else + { + EPROSIMA_LOG_ERROR(CLI_PARSER, "parsing interval argument"); + print_help(EXIT_FAILURE); + } + } + else + { + EPROSIMA_LOG_ERROR(CLI_PARSER, "interval argument is only valid for client publisher entity"); + print_help(EXIT_FAILURE); + } + } + // Server options + else if (arg == "--listening-address") + { + if (++i < argc) + { + if (config.entity == CLIParser::EntityKind::SERVER ) + { + config.srv_config.listening_address = argv[i]; + listening_address_was_set = true; + } + else + { + EPROSIMA_LOG_ERROR(CLI_PARSER, "listening address argument is only valid for server entity"); + print_help(EXIT_FAILURE); + } + } + else + { + EPROSIMA_LOG_ERROR(CLI_PARSER, "parsing connection-address argument"); + print_help(EXIT_FAILURE); + } + } + else if (arg == "--listening-port") + { + if (++i < argc) + { + if (config.entity == CLIParser::EntityKind::SERVER) + { + try + { + int input = std::stoi(argv[i]); + if (input < std::numeric_limits::min() || + input > std::numeric_limits::max()) + { + throw std::out_of_range("listening-port argument " + std::string( + argv[i]) + " out of range [0, 65535]."); + } + else + { + config.srv_config.listening_port = static_cast(input); + } + } + catch (const std::invalid_argument& e) + { + EPROSIMA_LOG_ERROR(CLI_PARSER, "invalid listening-port argument " + std::string( + argv[i]) + ": " + std::string(e.what())); + print_help(EXIT_FAILURE); + } + catch (const std::out_of_range& e) + { + EPROSIMA_LOG_ERROR(CLI_PARSER, std::string(e.what())); + print_help(EXIT_FAILURE); + } + } + else + { + EPROSIMA_LOG_ERROR(CLI_PARSER, "--listening-port argument is only valid for server entity"); + print_help(EXIT_FAILURE); + } + } + else + { + EPROSIMA_LOG_ERROR(CLI_PARSER, "parsing port argument"); + print_help(EXIT_FAILURE); + } + } + else if (arg == "--timeout") + { + if (++i < argc) + { + if (config.entity == CLIParser::EntityKind::SERVER) + { + try + { + int input = std::stoi(argv[i]); + if (input < std::numeric_limits::min() || + input > std::numeric_limits::max()) + { + throw std::out_of_range("timeout argument " + std::string( + argv[i]) + " out of range [0, 65535]."); + } + else + { + config.srv_config.timeout = static_cast(input); + } + } + catch (const std::invalid_argument& e) + { + EPROSIMA_LOG_ERROR(CLI_PARSER, "invalid timeout argument " + std::string( + argv[i]) + ": " + std::string(e.what())); + print_help(EXIT_FAILURE); + } + catch (const std::out_of_range& e) + { + EPROSIMA_LOG_ERROR(CLI_PARSER, std::string(e.what())); + print_help(EXIT_FAILURE); + } + } + else + { + EPROSIMA_LOG_ERROR(CLI_PARSER, "--listening-port argument is only valid for server entity"); + print_help(EXIT_FAILURE); + } + } + else + { + EPROSIMA_LOG_ERROR(CLI_PARSER, "parsing port argument"); + print_help(EXIT_FAILURE); + } + } + else + { + EPROSIMA_LOG_ERROR(CLI_PARSER, "unknown option " + arg); + print_help(EXIT_FAILURE); + } + } + + // change default values if IPv6 is used + // and user did not specified ones + if (uses_ipv6) + { + if (config.entity == CLIParser::EntityKind::SERVER && + !listening_address_was_set) + { + config.srv_config.listening_address = "::1"; + } + + if (!connection_address_was_set) + { + config.pub_config.connection_address = "::1"; + config.sub_config.connection_address = "::1"; + if (config.srv_config.is_also_client) + { + config.srv_config.connection_address = "::1"; + } + } + } + + return config; + } + + /** + * @brief Parse the signal number into the signal name + * + * @param signum signal number + * @return std::string signal name + */ + static std::string parse_signal( + const int& signum) + { + switch (signum) + { + case SIGINT: + return "SIGINT"; + case SIGTERM: + return "SIGTERM"; +#ifndef _WIN32 + case SIGQUIT: + return "SIGQUIT"; + case SIGHUP: + return "SIGHUP"; +#endif // _WIN32 + default: + return "UNKNOWN SIGNAL"; + } + } + + /** + * @brief Parse the entity kind into std::string + * + * @param entity entity kind + * @return std::string entity kind + */ + static std::string parse_entity_kind( + const EntityKind& entity) + { + switch (entity) + { + case EntityKind::CLIENT_PUBLISHER: + return "Client Publisher"; + case EntityKind::CLIENT_SUBSCRIBER: + return "Client Subscriber"; + case EntityKind::SERVER: + return "Discovery Server"; + case EntityKind::UNDEFINED: + default: + return "Undefined entity"; + } + } + +}; + +} // namespace discovery_server +} // namespace examples +} // namespace fastdds +} // namespace eprosima + +#endif // FASTDDS_EXAMPLES_CPP_DISCOVERY_SERVER__CLIPARSER_HPP diff --git a/examples/cpp/dds/DiscoveryServerExample/CMakeLists.txt b/examples/cpp/discovery_server/CMakeLists.txt similarity index 61% rename from examples/cpp/dds/DiscoveryServerExample/CMakeLists.txt rename to examples/cpp/discovery_server/CMakeLists.txt index 035ff741189..44d985226ac 100644 --- a/examples/cpp/dds/DiscoveryServerExample/CMakeLists.txt +++ b/examples/cpp/discovery_server/CMakeLists.txt @@ -1,4 +1,4 @@ -# Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +# Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,7 +14,7 @@ cmake_minimum_required(VERSION 3.20) -project(DiscoveryServerExample VERSION 1 LANGUAGES CXX) +project(fastdds_discovery_server_example VERSION 1 LANGUAGES CXX) # Find requirements if(NOT fastcdr_FOUND) @@ -28,22 +28,23 @@ endif() #Check C++11 include(CheckCXXCompilerFlag) if(CMAKE_CXX_COMPILER_ID MATCHES "GNU|Clang") - check_cxx_compiler_flag(-std=c++11 SUPPORTS_CXX11) check_cxx_compiler_flag(-std=c++11 SUPPORTS_CXX11) if(NOT SUPPORTS_CXX11) message(FATAL_ERROR "Compiler doesn't support C++11") endif() endif() -message(STATUS "Configuring DiscoveryServerExample example...") -file(GLOB DISCOVERY_SERVER_EXAMPLE_SOURCES_CXX "types/*.cxx") -file(GLOB DISCOVERY_SERVER_EXAMPLE_SOURCES_CPP "*.cpp") +message(STATUS "Configuring discovery server example...") +file(GLOB DISCOVERY_SERVER_SOURCES_CXX "*.cxx") +file(GLOB DISCOVERY_SERVER_SOURCES_CPP "*.cpp") -add_executable(${PROJECT_NAME} ${DISCOVERY_SERVER_EXAMPLE_SOURCES_CXX} ${DISCOVERY_SERVER_EXAMPLE_SOURCES_CPP}) -target_compile_definitions(${PROJECT_NAME} PRIVATE +add_executable(discovery_server ${DISCOVERY_SERVER_SOURCES_CXX} ${DISCOVERY_SERVER_SOURCES_CPP}) +target_compile_definitions(discovery_server PRIVATE $<$>,$>:__DEBUG> $<$:__INTERNALDEBUG> # Internal debug activated. + $<$:SHM_TRANSPORT_BUILTIN> # Enable SHM as built-in transport ) -target_link_libraries(${PROJECT_NAME} fastdds fastcdr fastdds::optionparser) -install(TARGETS ${PROJECT_NAME} - RUNTIME DESTINATION examples/cpp/dds/${PROJECT_NAME}/${BIN_INSTALL_DIR}) +target_link_libraries(discovery_server fastdds fastcdr) +install(TARGETS discovery_server + RUNTIME DESTINATION ${DATA_INSTALL_DIR}/fastdds/examples/cpp/discovery_server/${BIN_INSTALL_DIR}) + diff --git a/examples/cpp/dds/DiscoveryServerExample/DiscoveryServerPublisher.cpp b/examples/cpp/discovery_server/ClientPublisherApp.cpp similarity index 54% rename from examples/cpp/dds/DiscoveryServerExample/DiscoveryServerPublisher.cpp rename to examples/cpp/discovery_server/ClientPublisherApp.cpp index ae9e061e35b..d705aeb21cd 100644 --- a/examples/cpp/dds/DiscoveryServerExample/DiscoveryServerPublisher.cpp +++ b/examples/cpp/discovery_server/ClientPublisherApp.cpp @@ -1,4 +1,4 @@ -// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -13,14 +13,14 @@ // limitations under the License. /** - * @file DiscoveryServerPublisher.cpp + * @file ClientPublisherApp.cpp * */ -#include "DiscoveryServerPublisher.h" +#include "ClientPublisherApp.hpp" -#include -#include +#include +#include #include #include @@ -32,63 +32,58 @@ #include #include #include -#include using namespace eprosima::fastdds::dds; -using namespace eprosima::fastdds::rtps; -std::atomic HelloWorldPublisher::stop_(false); +namespace eprosima { +namespace fastdds { +namespace examples { +namespace discovery_server { -HelloWorldPublisher::HelloWorldPublisher() +ClientPublisherApp::ClientPublisherApp( + const CLIParser::client_publisher_config& config) : participant_(nullptr) , publisher_(nullptr) , topic_(nullptr) , writer_(nullptr) , type_(new HelloWorldPubSubType()) + , matched_(0) + , samples_(config.samples) + , stop_(false) + , period_ms_(config.interval) { -} - -bool HelloWorldPublisher::is_stopped() -{ - return stop_; -} - -void HelloWorldPublisher::stop() -{ - stop_ = true; -} - -bool HelloWorldPublisher::init( - const std::string& topic_name, - const std::string& server_address, - unsigned short server_port, - TransportKind transport) -{ + // Set up the data type with initial values hello_.index(0); - hello_.message("HelloWorld"); + hello_.message("Hello world"); + + // Configure Participant QoS DomainParticipantQos pqos; + pqos.name("DS-Client_pub"); pqos.transport().use_builtin_transports = false; - std::string ip_server_address(server_address); + uint16_t server_port = config.connection_port; + + std::string ip_server_address(config.connection_address); + // Check if DNS is required - if (!is_ip(server_address)) + if (!is_ip(config.connection_address)) { - ip_server_address = get_ip_from_dns(server_address, transport); + ip_server_address = get_ip_from_dns(config.connection_address, config.transport_kind); } if (ip_server_address.empty()) { - return false; + throw std::runtime_error("Invalid connection address"); } - // Create DS SERVER locator + // Create DS locator eprosima::fastdds::rtps::Locator server_locator; eprosima::fastdds::rtps::IPLocator::setPhysicalPort(server_locator, server_port); std::shared_ptr descriptor; - switch (transport) + switch (config.transport_kind) { case TransportKind::SHM: descriptor = std::make_shared(); @@ -120,8 +115,6 @@ bool HelloWorldPublisher::init( case TransportKind::TCPv4: { auto descriptor_tmp = std::make_shared(); - // descriptor_tmp->interfaceWhiteList.push_back(ip_server_address); - // One listening port must be added either in the pub or the sub descriptor_tmp->add_listener_port(0); descriptor = descriptor_tmp; @@ -134,21 +127,12 @@ bool HelloWorldPublisher::init( case TransportKind::TCPv6: { auto descriptor_tmp = std::make_shared(); - // descriptor_tmp->interfaceWhiteList.push_back(ip_server_address); - // One listening port must be added either in the pub or the sub descriptor_tmp->add_listener_port(0); descriptor = descriptor_tmp; server_locator.kind = LOCATOR_KIND_TCPv6; eprosima::fastdds::rtps::IPLocator::setLogicalPort(server_locator, server_port); - if (eprosima::fastdds::rtps::IPLocator::isIPv6(ip_server_address)) - { - eprosima::fastdds::rtps::IPLocator::setIPv6(server_locator, ip_server_address); - } - else - { - eprosima::fastdds::rtps::IPLocator::setIPv6(server_locator, "::1"); - } + eprosima::fastdds::rtps::IPLocator::setIPv6(server_locator, ip_server_address); break; } @@ -166,12 +150,12 @@ bool HelloWorldPublisher::init( // Add descriptor pqos.transport().user_transports.push_back(descriptor); - // CREATE THE PARTICIPANT - participant_ = DomainParticipantFactory::get_instance()->create_participant(0, pqos, &listener_); + // Create Domainparticipant + participant_ = DomainParticipantFactory::get_instance()->create_participant(0, pqos, nullptr); if (participant_ == nullptr) { - return false; + throw std::runtime_error("Participant initialization failed"); } std::cout << @@ -180,68 +164,71 @@ bool HelloWorldPublisher::init( " connecting to server <" << server_locator << "> " << std::endl; - // REGISTER THE TYPE + // Regsiter type type_.register_type(participant_); - // CREATE THE PUBLISHER + // Create the publisher publisher_ = participant_->create_publisher(PUBLISHER_QOS_DEFAULT, nullptr); if (publisher_ == nullptr) { - return false; + throw std::runtime_error("Publisher initialization failed"); } - // CREATE THE TOPIC - topic_ = participant_->create_topic(topic_name, "HelloWorld", TOPIC_QOS_DEFAULT); + // Create the topic + topic_ = participant_->create_topic(config.topic_name, type_.get_type_name(), TOPIC_QOS_DEFAULT); if (topic_ == nullptr) { - return false; + throw std::runtime_error("Topic initialization failed"); } - // CREATE THE WRITER + // Create de data writer DataWriterQos wqos = DATAWRITER_QOS_DEFAULT; - writer_ = publisher_->create_datawriter(topic_, wqos, &listener_); + + if (!config.reliable) + { + wqos.reliability().kind = BEST_EFFORT_RELIABILITY_QOS; + } + + if (!config.transient_local) + { + wqos.durability().kind = VOLATILE_DURABILITY_QOS; + } + + writer_ = publisher_->create_datawriter(topic_, wqos, this); if (writer_ == nullptr) { - return false; + throw std::runtime_error("DataWriter initialization failed"); } - return true; } -HelloWorldPublisher::~HelloWorldPublisher() +ClientPublisherApp::~ClientPublisherApp() { - if (participant_ != nullptr) + if (nullptr != participant_) { - if (publisher_ != nullptr) - { - if (writer_ != nullptr) - { - publisher_->delete_datawriter(writer_); - } - participant_->delete_publisher(publisher_); - } - if (topic_ != nullptr) - { - participant_->delete_topic(topic_); - } + // Delete DDS entities contained within the DomainParticipant + participant_->delete_contained_entities(); + + // Delete DomainParticipant DomainParticipantFactory::get_instance()->delete_participant(participant_); } } -void HelloWorldPublisher::PubListener::on_publication_matched( - eprosima::fastdds::dds::DataWriter*, - const eprosima::fastdds::dds::PublicationMatchedStatus& info) +void ClientPublisherApp::on_publication_matched( + DataWriter* /*writer*/, + const PublicationMatchedStatus& info) { if (info.current_count_change == 1) { - matched_ = info.current_count; + matched_ = static_cast(info.current_count); std::cout << "Publisher matched." << std::endl; + cv_.notify_one(); } else if (info.current_count_change == -1) { - matched_ = info.current_count; + matched_ = static_cast(info.current_count); std::cout << "Publisher unmatched." << std::endl; } else @@ -251,61 +238,55 @@ void HelloWorldPublisher::PubListener::on_publication_matched( } } -void HelloWorldPublisher::PubListener::on_participant_discovery( - eprosima::fastdds::dds::DomainParticipant* /*participant*/, - eprosima::fastdds::rtps::ParticipantDiscoveryInfo&& info, - bool& should_be_ignored) +void ClientPublisherApp::run() { - static_cast(should_be_ignored); - if (info.status == eprosima::fastdds::rtps::ParticipantDiscoveryInfo::DISCOVERED_PARTICIPANT) - { - std::cout << "Discovered Participant with GUID " << info.info.m_guid << std::endl; - } - else if (info.status == eprosima::fastdds::rtps::ParticipantDiscoveryInfo::DROPPED_PARTICIPANT || - info.status == eprosima::fastdds::rtps::ParticipantDiscoveryInfo::REMOVED_PARTICIPANT) + while (!is_stopped() && ((samples_ == 0) || (hello_.index() < samples_))) { - std::cout << "Dropped Participant with GUID " << info.info.m_guid << std::endl; + if (publish()) + { + std::cout << "Message: '" << hello_.message() << "' with index: '" << hello_.index() + << "' SENT" << std::endl; + } + // Wait for period or stop event + std::unique_lock period_lock(mutex_); + cv_.wait_for(period_lock, std::chrono::milliseconds(period_ms_), [&]() + { + return is_stopped(); + }); } } -void HelloWorldPublisher::runThread( - uint32_t samples, - uint32_t sleep) +bool ClientPublisherApp::publish() { - while (!is_stopped() && (samples == 0 || hello_.index() < samples)) + bool ret = false; + // Wait for the data endpoints discovery + std::unique_lock matched_lock(mutex_); + cv_.wait(matched_lock, [&]() + { + // at least one has been discovered + return ((matched_ > 0) || is_stopped()); + }); + + if (!is_stopped()) { - publish(); - std::cout << "Message: " << hello_.message() << " with index: " << hello_.index() - << " SENT" << std::endl; - std::this_thread::sleep_for(std::chrono::milliseconds(sleep)); + hello_.index(hello_.index() + 1); + ret = writer_->write(&hello_); } + return ret; } -void HelloWorldPublisher::run( - uint32_t samples, - uint32_t sleep) +bool ClientPublisherApp::is_stopped() { - stop_ = false; - std::thread thread(&HelloWorldPublisher::runThread, this, samples, sleep); - if (samples == 0) - { - std::cout << "Publisher running. Please press CTRL+C to stop the Publisher at any time." << std::endl; - } - else - { - std::cout << "Publisher running " << samples << - " samples. Please press CTRL+C to stop the Publisher at any time." << std::endl; - } - signal(SIGINT, [](int signum) - { - std::cout << "SIGINT received, stopping Publisher execution." << std::endl; - static_cast(signum); HelloWorldPublisher::stop(); - }); - thread.join(); + return stop_.load(); } -void HelloWorldPublisher::publish() +void ClientPublisherApp::stop() { - hello_.index(hello_.index() + 1); - writer_->write(&hello_); + stop_.store(true); + cv_.notify_one(); } + +} // namespace discovery_server +} // namespace examples +} // namespace fastdds +} // namespace eprosima diff --git a/examples/cpp/discovery_server/ClientPublisherApp.hpp b/examples/cpp/discovery_server/ClientPublisherApp.hpp new file mode 100644 index 00000000000..bfdd291e2b4 --- /dev/null +++ b/examples/cpp/discovery_server/ClientPublisherApp.hpp @@ -0,0 +1,98 @@ +// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file ClientPublisherApp.hpp + * + */ + +#ifndef FASTDDS_EXAMPLES_CPP_DISCOVERY_SERVER__CLIENTPUBLISHERAPP_HPP +#define FASTDDS_EXAMPLES_CPP_DISCOVERY_SERVER__CLIENTPUBLISHERAPP_HPP + +#include + +#include +#include +#include + +#include "Application.hpp" +#include "CLIParser.hpp" +#include "HelloWorldPubSubTypes.hpp" + +using namespace eprosima::fastdds::dds; + +namespace eprosima { +namespace fastdds { +namespace examples { +namespace discovery_server { + +class ClientPublisherApp : public Application, public DataWriterListener +{ +public: + + ClientPublisherApp( + const CLIParser::client_publisher_config& config); + + ~ClientPublisherApp(); + + //! Publisher matched method + void on_publication_matched( + DataWriter* writer, + const PublicationMatchedStatus& info) override; + + //! Run publisher + void run() override; + + //! Stop publisher + void stop() override; + +private: + + //! Return the current state of execution + bool is_stopped(); + + //! Publish a sample + bool publish(); + + HelloWorld hello_; + + DomainParticipant* participant_; + + Publisher* publisher_; + + Topic* topic_; + + DataWriter* writer_; + + TypeSupport type_; + + int16_t matched_; + + uint16_t samples_; + + std::mutex mutex_; + + std::condition_variable cv_; + + std::atomic stop_; + + const uint16_t period_ms_{100}; // in ms +}; + +} // namespace discovery_server +} // namespace examples +} // namespace fastdds +} // namespace eprosima + +#endif /* FASTDDS_EXAMPLES_CPP_DISCOVERY_SERVER__CLIENTPUBLISHERAPP_HPP */ diff --git a/examples/cpp/dds/DiscoveryServerExample/DiscoveryServerSubscriber.cpp b/examples/cpp/discovery_server/ClientSubscriberApp.cpp similarity index 59% rename from examples/cpp/dds/DiscoveryServerExample/DiscoveryServerSubscriber.cpp rename to examples/cpp/discovery_server/ClientSubscriberApp.cpp index 8cfb7f3d194..8918f708df7 100644 --- a/examples/cpp/dds/DiscoveryServerExample/DiscoveryServerSubscriber.cpp +++ b/examples/cpp/discovery_server/ClientSubscriberApp.cpp @@ -1,4 +1,4 @@ -// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -13,19 +13,20 @@ // limitations under the License. /** - * @file DiscoveryServerSubscriber.cpp + * @file ClientSubscriberApp.cpp * */ -#include "DiscoveryServerSubscriber.h" +#include "ClientSubscriberApp.hpp" #include -#include -#include +#include +#include #include #include #include +#include #include #include #include @@ -33,56 +34,44 @@ #include #include #include -#include + +#include "CLIParser.hpp" +#include "Application.hpp" using namespace eprosima::fastdds::dds; -using namespace eprosima::fastdds::rtps; -std::atomic HelloWorldSubscriber::stop_(false); -std::mutex HelloWorldSubscriber::terminate_cv_mtx_; -std::condition_variable HelloWorldSubscriber::terminate_cv_; +namespace eprosima { +namespace fastdds { +namespace examples { +namespace discovery_server { -HelloWorldSubscriber::HelloWorldSubscriber() +ClientSubscriberApp::ClientSubscriberApp( + const CLIParser::client_subscriber_config& config) : participant_(nullptr) , subscriber_(nullptr) , topic_(nullptr) , reader_(nullptr) , type_(new HelloWorldPubSubType()) -{ -} - -bool HelloWorldSubscriber::is_stopped() -{ - return stop_; -} - -void HelloWorldSubscriber::stop() -{ - stop_ = true; - terminate_cv_.notify_all(); -} - -bool HelloWorldSubscriber::init( - const std::string& topic_name, - uint32_t max_messages, - const std::string& server_address, - unsigned short server_port, - TransportKind transport) + , samples_(config.samples) + , received_samples_(0) + , stop_(false) { DomainParticipantQos pqos; pqos.name("DS-Client_sub"); pqos.transport().use_builtin_transports = false; - std::string ip_server_address(server_address); + uint16_t server_port = config.connection_port; + + std::string ip_server_address(config.connection_address); // Check if DNS is required - if (!is_ip(server_address)) + if (!is_ip(config.connection_address)) { - ip_server_address = get_ip_from_dns(server_address, transport); + ip_server_address = get_ip_from_dns(config.connection_address, config.transport_kind); } if (ip_server_address.empty()) { - return false; + throw std::runtime_error("Invalid connection address"); } // Create DS SERVER locator @@ -91,7 +80,7 @@ bool HelloWorldSubscriber::init( std::shared_ptr descriptor; - switch (transport) + switch (config.transport_kind) { case TransportKind::SHM: descriptor = std::make_shared(); @@ -144,14 +133,7 @@ bool HelloWorldSubscriber::init( server_locator.kind = LOCATOR_KIND_TCPv6; eprosima::fastdds::rtps::IPLocator::setLogicalPort(server_locator, server_port); - if (eprosima::fastdds::rtps::IPLocator::isIPv6(ip_server_address)) - { - eprosima::fastdds::rtps::IPLocator::setIPv6(server_locator, ip_server_address); - } - else - { - eprosima::fastdds::rtps::IPLocator::setIPv6(server_locator, "::1"); - } + eprosima::fastdds::rtps::IPLocator::setIPv6(server_locator, ip_server_address); break; } @@ -169,13 +151,13 @@ bool HelloWorldSubscriber::init( // Add descriptor pqos.transport().user_transports.push_back(descriptor); - // CREATE THE PARTICIPANT - participant_ = DomainParticipantFactory::get_instance()->create_participant(0, pqos, &listener_, + // Create the Domainparticipant + participant_ = DomainParticipantFactory::get_instance()->create_participant(0, pqos, nullptr, StatusMask::all() >> StatusMask::data_on_readers()); if (participant_ == nullptr) { - return false; + throw std::runtime_error("Participant initialization failed"); } std::cout << @@ -184,82 +166,71 @@ bool HelloWorldSubscriber::init( " connecting to server <" << server_locator << "> " << std::endl; - // REGISTER THE TYPE + // Register the type type_.register_type(participant_); - // CREATE THE SUBSCRIBER + // Create the subscriber subscriber_ = participant_->create_subscriber(SUBSCRIBER_QOS_DEFAULT, nullptr); if (subscriber_ == nullptr) { - return false; + throw std::runtime_error("Subscriber initialization failed"); } - // CREATE THE TOPIC + // Create the topic topic_ = participant_->create_topic( - topic_name, - "HelloWorld", + config.topic_name, + type_.get_type_name(), TOPIC_QOS_DEFAULT); if (topic_ == nullptr) { - return false; + throw std::runtime_error("Topic initialization failed"); } - // CREATE THE READER - if (max_messages > 0) + // Create the data reader + DataReaderQos rqos = DATAREADER_QOS_DEFAULT; + + if (config.reliable) { - listener_.set_max_messages(max_messages); + rqos.reliability().kind = RELIABLE_RELIABILITY_QOS; } - DataReaderQos rqos = DATAREADER_QOS_DEFAULT; - reader_ = subscriber_->create_datareader(topic_, rqos, &listener_); - if (reader_ == nullptr) + if (config.transient_local) { - return false; + rqos.durability().kind = TRANSIENT_LOCAL_DURABILITY_QOS; } - return true; -} + reader_ = subscriber_->create_datareader(topic_, rqos, this); -HelloWorldSubscriber::~HelloWorldSubscriber() -{ - if (participant_ != nullptr) + if (reader_ == nullptr) { - if (topic_ != nullptr) - { - participant_->delete_topic(topic_); - } - if (subscriber_ != nullptr) - { - if (reader_ != nullptr) - { - subscriber_->delete_datareader(reader_); - } - participant_->delete_subscriber(subscriber_); - } - DomainParticipantFactory::get_instance()->delete_participant(participant_); + throw std::runtime_error("DataWriter initialization failed"); } } -void HelloWorldSubscriber::SubListener::set_max_messages( - uint32_t max_messages) +ClientSubscriberApp::~ClientSubscriberApp() { - max_messages_ = max_messages; + if (nullptr != participant_) + { + // Delete DDS entities contained within the DomainParticipant + participant_->delete_contained_entities(); + + // Delete DomainParticipant + DomainParticipantFactory::get_instance()->delete_participant(participant_); + } } -void HelloWorldSubscriber::SubListener::on_subscription_matched( - DataReader*, +void ClientSubscriberApp::on_subscription_matched( + DataReader* /*reader*/, const SubscriptionMatchedStatus& info) { if (info.current_count_change == 1) { - matched_ = info.current_count; std::cout << "Subscriber matched." << std::endl; } else if (info.current_count_change == -1) { - matched_ = info.current_count; std::cout << "Subscriber unmatched." << std::endl; } else @@ -269,18 +240,19 @@ void HelloWorldSubscriber::SubListener::on_subscription_matched( } } -void HelloWorldSubscriber::SubListener::on_data_available( +void ClientSubscriberApp::on_data_available( DataReader* reader) { SampleInfo info; - while ((reader->take_next_sample(&hello_, &info) == RETCODE_OK) && !is_stopped()) + while ((!is_stopped()) && (RETCODE_OK == reader->take_next_sample(&hello_, &info))) { - if (info.instance_state == ALIVE_INSTANCE_STATE) + if ((info.instance_state == ALIVE_INSTANCE_STATE) && info.valid_data) { - samples_++; - // Print your structure data here. - std::cout << "Message " << hello_.message() << " " << hello_.index() << " RECEIVED" << std::endl; - if (max_messages_ > 0 && (samples_ >= max_messages_)) + received_samples_++; + // Print Hello world message data + std::cout << "Message: '" << hello_.message() << "' with index: '" << hello_.index() + << "' RECEIVED" << std::endl; + if (samples_ > 0 && (received_samples_ >= samples_)) { stop(); } @@ -288,44 +260,27 @@ void HelloWorldSubscriber::SubListener::on_data_available( } } -void HelloWorldSubscriber::SubListener::on_participant_discovery( - eprosima::fastdds::dds::DomainParticipant* /*participant*/, - eprosima::fastdds::rtps::ParticipantDiscoveryInfo&& info, - bool& should_be_ignored) -{ - static_cast(should_be_ignored); - if (info.status == eprosima::fastdds::rtps::ParticipantDiscoveryInfo::DISCOVERED_PARTICIPANT) - { - std::cout << "Discovered Participant with GUID " << info.info.m_guid << std::endl; - } - else if (info.status == eprosima::fastdds::rtps::ParticipantDiscoveryInfo::DROPPED_PARTICIPANT || - info.status == eprosima::fastdds::rtps::ParticipantDiscoveryInfo::REMOVED_PARTICIPANT) - { - std::cout << "Dropped Participant with GUID " << info.info.m_guid << std::endl; - } -} - -void HelloWorldSubscriber::run( - uint32_t samples) +void ClientSubscriberApp::run() { - stop_ = false; - if (samples > 0) - { - std::cout << "Subscriber running until " << samples << - " samples have been received. Please press CTRL+C to stop the Subscriber at any time." << std::endl; - } - else - { - std::cout << "Subscriber running. Please press CTRL+C to stop the Subscriber." << std::endl; - } - signal(SIGINT, [](int signum) - { - std::cout << "SIGINT received, stopping Subscriber execution." << std::endl; - static_cast(signum); HelloWorldSubscriber::stop(); - }); std::unique_lock lck(terminate_cv_mtx_); - terminate_cv_.wait(lck, [] + terminate_cv_.wait(lck, [&] { return is_stopped(); }); } + +bool ClientSubscriberApp::is_stopped() +{ + return stop_.load(); +} + +void ClientSubscriberApp::stop() +{ + stop_.store(true); + terminate_cv_.notify_all(); +} + +} // namespace discovery_server +} // namespace examples +} // namespace fastdds +} // namespace eprosima diff --git a/examples/cpp/discovery_server/ClientSubscriberApp.hpp b/examples/cpp/discovery_server/ClientSubscriberApp.hpp new file mode 100644 index 00000000000..2d1ddae1fca --- /dev/null +++ b/examples/cpp/discovery_server/ClientSubscriberApp.hpp @@ -0,0 +1,97 @@ +// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file ClientSubscriberApp.hpp + * + */ + +#ifndef FASTDDS_EXAMPLES_CPP_DISCOVERY_SERVER__CLIENTSUBSCRIBERAPP_HPP +#define FASTDDS_EXAMPLES_CPP_DISCOVERY_SERVER__CLIENTSUBSCRIBERAPP_HPP + +#include + +#include +#include +#include + +#include "CLIParser.hpp" +#include "HelloWorldPubSubTypes.hpp" +#include "Application.hpp" + +using namespace eprosima::fastdds::dds; + +namespace eprosima { +namespace fastdds { +namespace examples { +namespace discovery_server { + +class ClientSubscriberApp : public Application, public DataReaderListener +{ +public: + + ClientSubscriberApp( + const CLIParser::client_subscriber_config& config); + + ~ClientSubscriberApp(); + + //! Subscription callback + void on_data_available( + DataReader* reader) override; + + //! Subscriber matched method + void on_subscription_matched( + DataReader* reader, + const SubscriptionMatchedStatus& info) override; + + //! Run subscriber + void run() override; + + //! Trigger the end of execution + void stop() override; + +private: + + //! Return the current state of execution + bool is_stopped(); + + HelloWorld hello_; + + DomainParticipant* participant_; + + Subscriber* subscriber_; + + Topic* topic_; + + DataReader* reader_; + + TypeSupport type_; + + uint16_t samples_; + + uint16_t received_samples_; + + std::atomic stop_; + + mutable std::mutex terminate_cv_mtx_; + + std::condition_variable terminate_cv_; +}; + +} // namespace discovery_server +} // namespace examples +} // namespace fastdds +} // namespace eprosima + +#endif /* FASTDDS_EXAMPLES_CPP_DISCOVERY_SERVER__CLIENTSUBSCRIBERAPP_HPP */ diff --git a/examples/cpp/dds/DiscoveryServerExample/types/HelloWorld.hpp b/examples/cpp/discovery_server/HelloWorld.hpp similarity index 100% rename from examples/cpp/dds/DiscoveryServerExample/types/HelloWorld.hpp rename to examples/cpp/discovery_server/HelloWorld.hpp diff --git a/examples/cpp/dds/DiscoveryServerExample/types/HelloWorld.idl b/examples/cpp/discovery_server/HelloWorld.idl similarity index 69% rename from examples/cpp/dds/DiscoveryServerExample/types/HelloWorld.idl rename to examples/cpp/discovery_server/HelloWorld.idl index 0fd2c355aee..192f8f9d487 100644 --- a/examples/cpp/dds/DiscoveryServerExample/types/HelloWorld.idl +++ b/examples/cpp/discovery_server/HelloWorld.idl @@ -1,3 +1,4 @@ +@extensibility(APPENDABLE) struct HelloWorld { unsigned long index; diff --git a/examples/cpp/dds/DiscoveryServerExample/types/HelloWorldCdrAux.hpp b/examples/cpp/discovery_server/HelloWorldCdrAux.hpp similarity index 100% rename from examples/cpp/dds/DiscoveryServerExample/types/HelloWorldCdrAux.hpp rename to examples/cpp/discovery_server/HelloWorldCdrAux.hpp diff --git a/examples/cpp/dds/DiscoveryServerExample/types/HelloWorldCdrAux.ipp b/examples/cpp/discovery_server/HelloWorldCdrAux.ipp similarity index 100% rename from examples/cpp/dds/DiscoveryServerExample/types/HelloWorldCdrAux.ipp rename to examples/cpp/discovery_server/HelloWorldCdrAux.ipp diff --git a/examples/cpp/dds/DiscoveryServerExample/types/HelloWorldPubSubTypes.cxx b/examples/cpp/discovery_server/HelloWorldPubSubTypes.cxx similarity index 100% rename from examples/cpp/dds/DiscoveryServerExample/types/HelloWorldPubSubTypes.cxx rename to examples/cpp/discovery_server/HelloWorldPubSubTypes.cxx diff --git a/examples/cpp/dds/DiscoveryServerExample/types/HelloWorldPubSubTypes.hpp b/examples/cpp/discovery_server/HelloWorldPubSubTypes.hpp similarity index 100% rename from examples/cpp/dds/DiscoveryServerExample/types/HelloWorldPubSubTypes.hpp rename to examples/cpp/discovery_server/HelloWorldPubSubTypes.hpp diff --git a/examples/cpp/dds/DiscoveryServerExample/types/HelloWorldTypeObjectSupport.cxx b/examples/cpp/discovery_server/HelloWorldTypeObjectSupport.cxx similarity index 96% rename from examples/cpp/dds/DiscoveryServerExample/types/HelloWorldTypeObjectSupport.cxx rename to examples/cpp/discovery_server/HelloWorldTypeObjectSupport.cxx index 58928b6f057..aaf391eeed3 100644 --- a/examples/cpp/dds/DiscoveryServerExample/types/HelloWorldTypeObjectSupport.cxx +++ b/examples/cpp/discovery_server/HelloWorldTypeObjectSupport.cxx @@ -54,6 +54,13 @@ void register_HelloWorld_type_identifier( QualifiedTypeName type_name_HelloWorld = "HelloWorld"; eprosima::fastcdr::optional type_ann_builtin_HelloWorld; eprosima::fastcdr::optional ann_custom_HelloWorld; + AppliedAnnotationSeq tmp_ann_custom_HelloWorld; + eprosima::fastcdr::optional verbatim_HelloWorld; + if (!tmp_ann_custom_HelloWorld.empty()) + { + ann_custom_HelloWorld = tmp_ann_custom_HelloWorld; + } + CompleteTypeDetail detail_HelloWorld = TypeObjectUtils::build_complete_type_detail(type_ann_builtin_HelloWorld, ann_custom_HelloWorld, type_name_HelloWorld.to_string()); CompleteStructHeader header_HelloWorld; header_HelloWorld = TypeObjectUtils::build_complete_struct_header(TypeIdentifier(), detail_HelloWorld); diff --git a/examples/cpp/dds/DiscoveryServerExample/types/HelloWorldTypeObjectSupport.hpp b/examples/cpp/discovery_server/HelloWorldTypeObjectSupport.hpp similarity index 100% rename from examples/cpp/dds/DiscoveryServerExample/types/HelloWorldTypeObjectSupport.hpp rename to examples/cpp/discovery_server/HelloWorldTypeObjectSupport.hpp diff --git a/examples/cpp/dds/DiscoveryServerExample/common.h b/examples/cpp/discovery_server/Helpers.hpp similarity index 88% rename from examples/cpp/dds/DiscoveryServerExample/common.h rename to examples/cpp/discovery_server/Helpers.hpp index 2012555902d..c4603c69a9a 100644 --- a/examples/cpp/dds/DiscoveryServerExample/common.h +++ b/examples/cpp/discovery_server/Helpers.hpp @@ -13,17 +13,18 @@ // limitations under the License. /** - * @file common.h + * @file Helpers.hpp * */ -#ifndef _EPROSIMA_FASTDDS_EXAMPLES_CPP_DDS_DISCOVERYSERVEREXAMPLE_COMMON_H_ -#define _EPROSIMA_FASTDDS_EXAMPLES_CPP_DDS_DISCOVERYSERVEREXAMPLE_COMMON_H_ +#ifndef FASTDDS_EXAMPLES_CPP_DISCOVERY_SERVER__HELPERS_HPP +#define FASTDDS_EXAMPLES_CPP_DISCOVERY_SERVER__HELPERS_HPP #include #include -enum class TransportKind +//! Transport kind enumeration +enum class TransportKind : uint8_t { UDPv4, UDPv6, @@ -77,4 +78,4 @@ inline std::string get_ip_from_dns( return domain_name; } -#endif /* _EPROSIMA_FASTDDS_EXAMPLES_CPP_DDS_DISCOVERYSERVEREXAMPLE_COMMON_H_ */ +#endif /* FASTDDS_EXAMPLES_CPP_DISCOVERY_SERVER__HELPERS_HPP */ diff --git a/examples/cpp/discovery_server/README.md b/examples/cpp/discovery_server/README.md new file mode 100644 index 00000000000..15071fc7cde --- /dev/null +++ b/examples/cpp/discovery_server/README.md @@ -0,0 +1,115 @@ +# Discovery Server example + +The *eProsima Fast DDS discovery server* example is an application intended to demonstrate the use of *Discovery Server*. + +This example is part of the suite of examples designed by eProsima that aims to illustrate the features and possible configurations of DDS deployments through *eProsima Fast DDS*. + +In this case, the *discovery server* example describes the use of *Discovery Server* as a discovery hub among different publishers and subscribers. + +* [Description of the example](#description-of-the-example) +* [Run the example](#run-the-example) +* [Wait-set subscriber](#wait-set-subscriber) +* [XML profile playground](#xml-profile-playground) + +## Description of the example + +Within the example, user can create three different entities: a publisher, a subscriber, or a discovery server. +The publisher and subscriber entities are discovery server clients designed to send and receive messages, respectively, while the server is intended to act as the centralized discovery entity among them. + +A server can also act as a client of another server, allowing the creation of a network of servers that can be used to distribute the discovery information among them. +For further information about discovery server, please refer to the [Discovery Server documentation](https://fast-dds.docs.eprosima.com/en/latest/fastdds/discovery/discovery_server.html). + +## Run the example + +To launch this example, three different terminals are required. +One of them will run the publisher example application, another one will run the subscriber application and the third one will run the discovery server application. + +### Discovery Server publisher client + +* Ubuntu ( / MacOS ) + + ```shell + user@machine:example_path$ ./discovery_server publisher + Publisher running. Please press Ctrl+C to stop the Publisher at any time. + ``` + +* Windows + + ```powershell + example_path> discovery_server.exe publisher + Publisher running. Please press Ctrl+C to stop the Publisher at any time. + ``` + +### Discovery Server subscriber client + +* Ubuntu ( / MacOS ) + + ```shell + user@machine:example_path$ ./discovery_server subscriber + Subscriber running. Please press Ctrl+C to stop the Subscriber at any time. + ``` + +* Windows + + ```powershell + example_path> discovery_server.exe subscriber + Subscriber running. Please press Ctrl+C to stop the Subscriber at any time. + ``` + +### Discovery Server server + +* Ubuntu ( / MacOS ) + + ```shell + user@machine:example_path$ ./discovery_server server + Subscriber running. Please press Ctrl+C to stop the Server at any time. + ``` + +* Windows + + ```powershell + example_path> discovery_server.exe server + Subscriber running. Please press Ctrl+C to stop the Server at any time. + ``` + +All the example available flags can be queried running the executable with the ``-h`` or ``--help`` flag. + +### Expected output + +Regardless of which application is run first, the publisher will not start sending data until the server is up and the subscriber is discovered. +The expected output both for publisher and subscriber is a first displayed message acknowledging the match, followed by the amount of samples sent or received until Ctrl+C is pressed. + +### Discovery Server publisher client + +```shell +Publisher running. Please press Ctrl+C to stop the Publisher at any time. +Publisher matched. +Message: 'Hello world' with index: '1' SENT +Message: 'Hello world' with index: '2' SENT +Message: 'Hello world' with index: '3' SENT +... +``` + +### Discovery Server subscriber client + +```shell +Subscriber running. Please press Ctrl+C to stop the Subscriber at any time. +Subscriber matched. +Message: 'Hello world' with index: '1' RECEIVED +Message: 'Hello world' with index: '2' RECEIVED +Message: 'Hello world' with index: '3' RECEIVED +... +``` + +When Ctrl+C is pressed to stop one of the applications, the other one will show the unmatched status, displaying an informative message, and it will stop sending / receiving messages. +The following is a possible output of the publisher application when stopping the subscriber app. + +```shell +... +Message: 'Hello world' with index: '8' SENT +Message: 'Hello world' with index: '9' SENT +Message: 'Hello world' with index: '10' SENT +Message: 'Hello world' with index: '11' SENT +Publisher unmatched. +``` + diff --git a/examples/cpp/dds/DiscoveryServerExample/DiscoveryServerServer.cpp b/examples/cpp/discovery_server/ServerApp.cpp similarity index 61% rename from examples/cpp/dds/DiscoveryServerExample/DiscoveryServerServer.cpp rename to examples/cpp/discovery_server/ServerApp.cpp index e659bd8ad39..3df45a1ba5c 100644 --- a/examples/cpp/dds/DiscoveryServerExample/DiscoveryServerServer.cpp +++ b/examples/cpp/discovery_server/ServerApp.cpp @@ -1,4 +1,4 @@ -// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -13,18 +13,20 @@ // limitations under the License. /** - * @file DiscoveryServerServer.cpp + * @file ServerApp.cpp * */ -#include "DiscoveryServerServer.h" +#include "ServerApp.hpp" #include -#include -#include -#include +#include #include +#include +#include +#include +#include #include #include #include @@ -32,77 +34,58 @@ #include using namespace eprosima::fastdds::dds; -using namespace eprosima::fastdds::rtps; -std::atomic DiscoveryServer::stop_(false); -std::mutex DiscoveryServer::terminate_cv_mtx_; -std::condition_variable DiscoveryServer::terminate_cv_; +namespace eprosima { +namespace fastdds { +namespace examples { +namespace discovery_server { -DiscoveryServer::DiscoveryServer() +ServerApp::ServerApp( + const CLIParser::server_config& config) : participant_(nullptr) -{ -} - -bool DiscoveryServer::is_stopped() -{ - return stop_; -} - -void DiscoveryServer::stop() -{ - stop_ = true; - terminate_cv_.notify_all(); -} - -bool DiscoveryServer::init( - const std::string& server_address, - unsigned short server_port, - TransportKind transport, - bool has_connection_server, - const std::string& connection_server_address, - unsigned short connection_server_port) + , matched_(0) + , timeout_(config.timeout) + , start_time_(std::chrono::steady_clock::now()) + , stop_(false) { DomainParticipantQos pqos; pqos.name("DS-Server"); pqos.transport().use_builtin_transports = false; - std::string ip_listening_address(server_address); - std::string ip_connection_address(connection_server_address); + std::string ip_listening_address(config.listening_address); + std::string ip_connection_address(config.connection_address); // Check if DNS is required - if (!is_ip(server_address)) + if (!is_ip(config.listening_address)) { - ip_listening_address = get_ip_from_dns(server_address, transport); + ip_listening_address = get_ip_from_dns(config.listening_address, config.transport_kind); } if (ip_listening_address.empty()) { - return false; + throw std::runtime_error("Invalid listening address"); } // Do the same for connection - if (has_connection_server && !is_ip(connection_server_address)) + if (config.is_also_client && !is_ip(config.connection_address)) { - ip_connection_address = get_ip_from_dns(connection_server_address, transport); + ip_connection_address = get_ip_from_dns(config.connection_address, config.transport_kind); } - if (has_connection_server && ip_connection_address.empty()) + if (config.is_also_client && ip_connection_address.empty()) { - return false; + throw std::runtime_error("Invalid connection address"); } - /////////////////////////////// // Configure Listening address - /////////////////////////////// - // Create DS SERVER locator eprosima::fastdds::rtps::Locator listening_locator; eprosima::fastdds::rtps::Locator connection_locator; - eprosima::fastdds::rtps::IPLocator::setPhysicalPort(listening_locator, server_port); - eprosima::fastdds::rtps::IPLocator::setPhysicalPort(connection_locator, connection_server_port); + eprosima::fastdds::rtps::IPLocator::setPhysicalPort(listening_locator, config.listening_port); + eprosima::fastdds::rtps::IPLocator::setPhysicalPort(connection_locator, config.connection_port); std::shared_ptr descriptor; - switch (transport) + switch (config.transport_kind) { case TransportKind::SHM: descriptor = std::make_shared(); @@ -113,7 +96,6 @@ bool DiscoveryServer::init( case TransportKind::UDPv4: { auto descriptor_tmp = std::make_shared(); - // descriptor_tmp->interfaceWhiteList.push_back(ip_listening_address); descriptor = descriptor_tmp; listening_locator.kind = LOCATOR_KIND_UDPv4; @@ -126,7 +108,6 @@ bool DiscoveryServer::init( case TransportKind::UDPv6: { auto descriptor_tmp = std::make_shared(); - // descriptor_tmp->interfaceWhiteList.push_back(ip_listening_address); descriptor = descriptor_tmp; listening_locator.kind = LOCATOR_KIND_UDPv6; @@ -139,39 +120,30 @@ bool DiscoveryServer::init( case TransportKind::TCPv4: { auto descriptor_tmp = std::make_shared(); - // descriptor_tmp->interfaceWhiteList.push_back(ip_listening_address); - descriptor_tmp->add_listener_port(server_port); + descriptor_tmp->add_listener_port(config.listening_port); descriptor = descriptor_tmp; listening_locator.kind = LOCATOR_KIND_TCPv4; - eprosima::fastdds::rtps::IPLocator::setLogicalPort(listening_locator, server_port); + eprosima::fastdds::rtps::IPLocator::setLogicalPort(listening_locator, config.listening_port); eprosima::fastdds::rtps::IPLocator::setIPv4(listening_locator, ip_listening_address); connection_locator.kind = LOCATOR_KIND_TCPv4; eprosima::fastdds::rtps::IPLocator::setIPv4(connection_locator, ip_connection_address); - eprosima::fastdds::rtps::IPLocator::setLogicalPort(connection_locator, connection_server_port); + eprosima::fastdds::rtps::IPLocator::setLogicalPort(connection_locator, config.connection_port); break; } case TransportKind::TCPv6: { auto descriptor_tmp = std::make_shared(); - // descriptor_tmp->interfaceWhiteList.push_back(ip_listening_address); - descriptor_tmp->add_listener_port(server_port); + descriptor_tmp->add_listener_port(config.listening_port); descriptor = descriptor_tmp; listening_locator.kind = LOCATOR_KIND_TCPv6; - eprosima::fastdds::rtps::IPLocator::setLogicalPort(listening_locator, server_port); - if (eprosima::fastdds::rtps::IPLocator::isIPv6(ip_listening_address)) - { - eprosima::fastdds::rtps::IPLocator::setIPv6(listening_locator, ip_listening_address); - } - else - { - eprosima::fastdds::rtps::IPLocator::setIPv6(listening_locator, "::1"); - } + eprosima::fastdds::rtps::IPLocator::setLogicalPort(listening_locator, config.listening_port); + eprosima::fastdds::rtps::IPLocator::setIPv6(listening_locator, ip_listening_address); connection_locator.kind = LOCATOR_KIND_TCPv6; eprosima::fastdds::rtps::IPLocator::setIPv6(connection_locator, ip_connection_address); - eprosima::fastdds::rtps::IPLocator::setLogicalPort(connection_locator, connection_server_port); + eprosima::fastdds::rtps::IPLocator::setLogicalPort(connection_locator, config.connection_port); break; } @@ -189,37 +161,29 @@ bool DiscoveryServer::init( // Set SERVER's listening locator for PDP pqos.wire_protocol().builtin.metatrafficUnicastLocatorList.push_back(listening_locator); - /////////////////////////////// // Configure Connection address - /////////////////////////////// - - if (has_connection_server) + if (config.is_also_client) { // Add remote SERVER to CLIENT's list of SERVERs pqos.wire_protocol().builtin.discovery_config.m_DiscoveryServers.push_back(connection_locator); } - /////////////////////////////// // Create Participant - /////////////////////////////// - - // CREATE THE PARTICIPANT - participant_ = DomainParticipantFactory::get_instance()->create_participant(0, pqos, &listener_); + participant_ = DomainParticipantFactory::get_instance()->create_participant(0, pqos, this); if (participant_ == nullptr) { - return false; + throw std::runtime_error("Participant initialization failed"); } - - if (has_connection_server) + if (config.is_also_client) { std::cout << "Server Participant " << pqos.name() << " created with GUID " << participant_->guid() << " listening in address <" << listening_locator << "> " << - " connecting with Discovery Server <" << connection_locator << "> " << + " connected to address <" << connection_locator << "> " << std::endl; } else @@ -230,63 +194,84 @@ bool DiscoveryServer::init( " listening in address <" << listening_locator << "> " << std::endl; } - - return true; } -DiscoveryServer::~DiscoveryServer() +ServerApp::~ServerApp() { - if (participant_ != nullptr) + if (nullptr != participant_) { + // Delete DDS entities contained within the DomainParticipant + participant_->delete_contained_entities(); + + // Delete DomainParticipant DomainParticipantFactory::get_instance()->delete_participant(participant_); } } -void DiscoveryServer::ServerListener::on_participant_discovery( - eprosima::fastdds::dds::DomainParticipant* /*participant*/, - eprosima::fastdds::rtps::ParticipantDiscoveryInfo&& info, +void ServerApp::on_participant_discovery( + DomainParticipant*, + fastdds::rtps::ParticipantDiscoveryInfo&& info, bool& should_be_ignored) { static_cast(should_be_ignored); if (info.status == eprosima::fastdds::rtps::ParticipantDiscoveryInfo::DISCOVERED_PARTICIPANT) { std::cout << "Discovered Participant with GUID " << info.info.m_guid << std::endl; + ++matched_; } else if (info.status == eprosima::fastdds::rtps::ParticipantDiscoveryInfo::DROPPED_PARTICIPANT || info.status == eprosima::fastdds::rtps::ParticipantDiscoveryInfo::REMOVED_PARTICIPANT) { std::cout << "Dropped Participant with GUID " << info.info.m_guid << std::endl; + --matched_; } } -void DiscoveryServer::run( - unsigned int timeout) +void ServerApp::run() { - stop_ = false; - std::cout << "Server running. Please press CTRL+C to stop the Server." << std::endl; - signal(SIGINT, [](int signum) - { - std::cout << "SIGINT received, stopping Server execution." << std::endl; - static_cast(signum); DiscoveryServer::stop(); - }); - - if (timeout > 0) + while (!is_stopped()) { - // Create a thread that will stop this process after timeout - std::thread t( - [=] - () + // Wait for period or stop event + std::unique_lock period_lock(mutex_); + + if (timeout_ != 0) + { + bool timeout = false; + cv_.wait_for(period_lock, std::chrono::seconds(timeout_), [&]() + { + timeout = + ((std::chrono::steady_clock::now() - start_time_) >= + std::chrono::milliseconds(timeout_ * 1000)); + return is_stopped() || timeout; + }); + + if (timeout) { - std::this_thread::sleep_for(std::chrono::seconds(timeout)); - std::cout << "Stopping Server execution due to timeout." << std::endl; - DiscoveryServer::stop(); - }); - t.detach(); + stop(); + } + } + else + { + cv_.wait(period_lock, [&]() + { + return is_stopped(); + }); + } } +} - std::unique_lock lck(terminate_cv_mtx_); - terminate_cv_.wait(lck, [] - { - return is_stopped(); - }); +bool ServerApp::is_stopped() +{ + return stop_.load(); } + +void ServerApp::stop() +{ + stop_.store(true); + cv_.notify_one(); +} + +} // namespace discovery_server +} // namespace examples +} // namespace fastdds +} // namespace eprosima diff --git a/examples/cpp/discovery_server/ServerApp.hpp b/examples/cpp/discovery_server/ServerApp.hpp new file mode 100644 index 00000000000..f9eca825559 --- /dev/null +++ b/examples/cpp/discovery_server/ServerApp.hpp @@ -0,0 +1,86 @@ +// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file ServerApp.hpp + * + */ + +#ifndef FASTDDS_EXAMPLES_CPP_DISCOVERY_SERVER__SERVERAPP_HPP +#define FASTDDS_EXAMPLES_CPP_DISCOVERY_SERVER__SERVERAPP_HPP + +#include + +#include +#include +#include + +#include "Application.hpp" +#include "CLIParser.hpp" +#include "HelloWorldPubSubTypes.hpp" + +using namespace eprosima::fastdds::dds; + +namespace eprosima { +namespace fastdds { +namespace examples { +namespace discovery_server { + +class ServerApp : public Application, public DomainParticipantListener +{ +public: + + ServerApp( + const CLIParser::server_config& config); + + ~ServerApp(); + + //! Publisher matched method + void on_participant_discovery( + DomainParticipant* participant, + fastdds::rtps::ParticipantDiscoveryInfo&& info, + bool& should_be_ignored) override; + + //! Run publisher + void run() override; + + //! Stop publisher + void stop() override; + +private: + + //! Return the current state of execution + bool is_stopped(); + + DomainParticipant* participant_; + + int16_t matched_; + + std::mutex mutex_; + + uint16_t timeout_; + + std::chrono::steady_clock::time_point start_time_; + + std::condition_variable cv_; + + std::atomic stop_; +}; + +} // namespace discovery_server +} // namespace examples +} // namespace fastdds +} // namespace eprosima + +#endif /* FASTDDS_EXAMPLES_CPP_DISCOVERY_SERVER__SERVERAPP_HPP */ diff --git a/examples/cpp/discovery_server/main.cpp b/examples/cpp/discovery_server/main.cpp new file mode 100644 index 00000000000..d6cd7bfb050 --- /dev/null +++ b/examples/cpp/discovery_server/main.cpp @@ -0,0 +1,107 @@ +// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file main.cpp + * + */ + +#include +#include +#include + +#include +#include + +#include "Application.hpp" +#include "CLIParser.hpp" + +using eprosima::fastdds::dds::Log; + +using namespace eprosima::fastdds::examples::discovery_server; + +std::function stop_app_handler; +void signal_handler( + int signum) +{ + stop_app_handler(signum); +} + +int main( + int argc, + char** argv) +{ + auto ret = EXIT_SUCCESS; + CLIParser::ds_example_config config = CLIParser::parse_cli_options(argc, argv); + uint16_t samples = 0; + switch (config.entity) + { + case CLIParser::EntityKind::CLIENT_PUBLISHER: + samples = config.pub_config.samples; + break; + case CLIParser::EntityKind::CLIENT_SUBSCRIBER: + samples = config.sub_config.samples; + break; + default: + break; + } + + std::string app_name = CLIParser::parse_entity_kind(config.entity); + std::shared_ptr app; + + try + { + app = Application::make_app(config); + } + catch (const std::runtime_error& e) + { + EPROSIMA_LOG_ERROR(app_name, e.what()); + ret = EXIT_FAILURE; + } + + if (EXIT_FAILURE != ret) + { + std::thread thread(&Application::run, app); + + if (samples == 0) + { + std::cout << app_name << " running. Please press Ctrl+C to stop the " + << app_name << " at any time." << std::endl; + } + else + { + std::cout << app_name << " running for " << samples << " samples. Please press Ctrl+C to stop the " + << app_name << " at any time." << std::endl; + } + + stop_app_handler = [&](int signum) + { + std::cout << "\n" << CLIParser::parse_signal(signum) << " received, stopping " << app_name + << " execution." << std::endl; + app->stop(); + }; + + signal(SIGINT, signal_handler); + signal(SIGTERM, signal_handler); + #ifndef _WIN32 + signal(SIGQUIT, signal_handler); + signal(SIGHUP, signal_handler); + #endif // _WIN32 + + thread.join(); + } + + Log::Reset(); + return ret; +} diff --git a/test/examples/discovery_server.compose.yml b/test/examples/discovery_server.compose.yml new file mode 100644 index 00000000000..127e034b336 --- /dev/null +++ b/test/examples/discovery_server.compose.yml @@ -0,0 +1,80 @@ +# Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +version: "3" + +services: + subscriber: + image: @DOCKER_IMAGE_NAME@ + volumes: + - @PROJECT_BINARY_DIR@:@PROJECT_BINARY_DIR@ + - @fastcdr_LIB_DIR@:@fastcdr_LIB_DIR@ + @TINYXML2_LIB_DIR_COMPOSE_VOLUME@ + environment: + # TODO(eduponz): LD_LIBRARY_PATH is not the correct variable for Windows + LD_LIBRARY_PATH: @PROJECT_BINARY_DIR@/src/cpp:@fastcdr_LIB_DIR@@TINYXML2_LIB_DIR_COMPOSE_LD_LIBRARY_PATH@ + EXAMPLE_DIR: @PROJECT_BINARY_DIR@/examples/cpp/discovery_server@FILE_EXTENSION@ + SUBSCRIBER_ADDITIONAL_ARGUMENTS: ${SUB_ARGS} + command: @SHELL_EXECUTABLE@ -c "$${EXAMPLE_DIR}/discovery_server@FILE_EXTENSION@ subscriber --reliable --transient-local $${SUBSCRIBER_ADDITIONAL_ARGUMENTS}" + network_mode: host + ipc: host + depends_on: + - server_1 + + publisher: + image: @DOCKER_IMAGE_NAME@ + volumes: + - @PROJECT_BINARY_DIR@:@PROJECT_BINARY_DIR@ + - @fastcdr_LIB_DIR@:@fastcdr_LIB_DIR@ + @TINYXML2_LIB_DIR_COMPOSE_VOLUME@ + environment: + # TODO(eduponz): LD_LIBRARY_PATH is not the correct variable for Windows + LD_LIBRARY_PATH: @PROJECT_BINARY_DIR@/src/cpp:@fastcdr_LIB_DIR@@TINYXML2_LIB_DIR_COMPOSE_LD_LIBRARY_PATH@ + EXAMPLE_DIR: @PROJECT_BINARY_DIR@/examples/cpp/discovery_server@FILE_EXTENSION@ + PUBLISHER_ADDITIONAL_ARGUMENTS: ${PUB_ARGS} + command: @SHELL_EXECUTABLE@ -c "$${EXAMPLE_DIR}/discovery_server@FILE_EXTENSION@ publisher --reliable --transient-local $${PUBLISHER_ADDITIONAL_ARGUMENTS}" + network_mode: host + ipc: host + depends_on: + - server_1 + + server_1: + image: @DOCKER_IMAGE_NAME@ + volumes: + - @PROJECT_BINARY_DIR@:@PROJECT_BINARY_DIR@ + - @fastcdr_LIB_DIR@:@fastcdr_LIB_DIR@ + @TINYXML2_LIB_DIR_COMPOSE_VOLUME@ + environment: + # TODO(eduponz): LD_LIBRARY_PATH is not the correct variable for Windows + LD_LIBRARY_PATH: @PROJECT_BINARY_DIR@/src/cpp:@fastcdr_LIB_DIR@@TINYXML2_LIB_DIR_COMPOSE_LD_LIBRARY_PATH@ + EXAMPLE_DIR: @PROJECT_BINARY_DIR@/examples/cpp/discovery_server@FILE_EXTENSION@ + SERVER1_ADDITIONAL_ARGUMENTS: ${SERVER1_ARGS} + network_mode: host + ipc: host + command: @SHELL_EXECUTABLE@ -c "$${EXAMPLE_DIR}/discovery_server@FILE_EXTENSION@ server $${SERVER1_ADDITIONAL_ARGUMENTS}" + + server_2: + image: @DOCKER_IMAGE_NAME@ + volumes: + - @PROJECT_BINARY_DIR@:@PROJECT_BINARY_DIR@ + - @fastcdr_LIB_DIR@:@fastcdr_LIB_DIR@ + @TINYXML2_LIB_DIR_COMPOSE_VOLUME@ + environment: + # TODO(eduponz): LD_LIBRARY_PATH is not the correct variable for Windows + LD_LIBRARY_PATH: @PROJECT_BINARY_DIR@/src/cpp:@fastcdr_LIB_DIR@@TINYXML2_LIB_DIR_COMPOSE_LD_LIBRARY_PATH@ + EXAMPLE_DIR: @PROJECT_BINARY_DIR@/examples/cpp/discovery_server@FILE_EXTENSION@ + SERVER2_ADDITIONAL_ARGUMENTS: ${SERVER2_ARGS} + network_mode: host + ipc: host + command: @SHELL_EXECUTABLE@ -c "$${EXAMPLE_DIR}/discovery_server@FILE_EXTENSION@ server $${SERVER2_ADDITIONAL_ARGUMENTS}" diff --git a/test/examples/test_discovery_server.py b/test/examples/test_discovery_server.py new file mode 100644 index 00000000000..2a1167f4d1c --- /dev/null +++ b/test/examples/test_discovery_server.py @@ -0,0 +1,71 @@ +# Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import subprocess +import pytest + +""" + Expected output test cases for Discovery Server example. + Each element is a ternary (pub_args, sub_args, server_args) +""" +discovery_server_test_cases = [ + ('--samples 10', '--samples 10', '--timeout 5', '--listening-port 11400 --timeout 1'), + ('--connection-port 11500 --samples 10', '--connection-port 11500 --samples 10', '--listening-port 11500 --timeout 5', '--listening-port 11400 --timeout 1'), + ('--transport tcpv4 --samples 10', '--transport tcpv4 --samples 10', '--transport tcpv4 --timeout 5', '--listening-port 11400 --timeout 1'), + ('--samples 10', '--samples 10', '--timeout 5', '--listening-port 11400 --timeout 1'), + ('--samples 10', '--connection-port 11400 --samples 10', '--timeout 5 --connection-port 11400', '--listening-port 11400 --timeout 5'), + ('--transport udpv6 --samples 10', '--transport udpv6 --connection-port 18000 --samples 10', '--transport udpv6 --timeout 5 --connection-port 18000', '--transport udpv6 --listening-port 18000 --timeout 5'), + ('--transport tcpv4 --samples 10', '--transport tcpv4 --connection-port 18000 --samples 10', '--transport tcpv4 --timeout 5 --connection-port 18000', '--transport tcpv4 --listening-port 18000 --timeout 5'), + ('--transport tcpv6 --samples 10', '--transport tcpv6 --connection-port 18000 --samples 10', '--transport tcpv6 --timeout 5 --connection-port 18000', '--transport tcpv6 --listening-port 18000 --timeout 5') +] + +@pytest.mark.parametrize("pub_args, sub_args, server1_args, server2_args", discovery_server_test_cases) +def test_discovery_server(pub_args, sub_args, server1_args, server2_args): + """.""" + ret = False + out = '' + command_args = 'PUB_ARGS="' + pub_args + '" SUB_ARGS="' + sub_args + '" SERVER1_ARGS="' + server1_args + '" SERVER2_ARGS="' + server2_args + '" ' + try: + out = subprocess.check_output( + command_args + '@DOCKER_EXECUTABLE@ compose -f discovery_server.compose.yml up', + stderr=subprocess.STDOUT, + shell=True, + timeout=30 + ).decode().split('\n') + + sent = 0 + received = 0 + for line in out: + if 'SENT' in line: + sent += 1 + continue + + if 'RECEIVED' in line: + received += 1 + continue + + if sent != 0 and received != 0 and sent == received: + ret = True + else: + print('ERROR: sent: ' + str(sent) + ', but received: ' + str(received)) + raise subprocess.CalledProcessError(1, '') + + except subprocess.CalledProcessError: + for l in out: + print(l) + except subprocess.TimeoutExpired: + print('TIMEOUT') + print(out) + + assert(ret) diff --git a/versions.md b/versions.md index 53415be40a0..c2300f5be08 100644 --- a/versions.md +++ b/versions.md @@ -50,6 +50,7 @@ Forthcoming * X-Types example with dynamic type discovery and Hello world example compatibility. * Custom Content filter example * Delivery mechanisms example with SHM, UDP, TCP, data-sharing and intra-process mechanisms. + * Discovery server example. * Removed `TypeConsistencyQos` from DataReader, and included `TypeConsistencyEnforcementQosPolicy` and `DataRepresentationQosPolicy` * Added new `flow_controller_descriptor_list` XML configuration, remove `ThroughtputController`. * Migrate `#define`s within `BuiltinEndpoints.hpp` to namespaced `constexpr` variables.