From df82ee43787e96739dc2f434d3dd6f5e5a9e2b64 Mon Sep 17 00:00:00 2001 From: fpagliughi <fpagliughi@mindspring.com> Date: Sat, 4 Jan 2025 19:29:12 -0500 Subject: [PATCH] #410 Added 'shutdown_event' and reworked consumer to prevent propagating exceptions on shutdown. --- CHANGELOG.md | 1 + examples/multithr_pub_sub.cpp | 43 +++++++-- include/mqtt/async_client.h | 170 +++++++++++++++++++--------------- include/mqtt/event.h | 25 ++++- src/async_client.cpp | 29 +++++- 5 files changed, 180 insertions(+), 88 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0e2cdcce..925eacb8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [#503](https://github.com/eclipse-paho/paho.mqtt.cpp/issues/503) Fixed issue that generated docs were empty. - [#518](https://github.com/eclipse-paho/paho.mqtt.cpp/pull/518) Add function for checking async consumer event queue size - [#519](https://github.com/eclipse-paho/paho.mqtt.cpp/pull/519) Fix potential deadlock in set_callback +- [#524](https://github.com/eclipse-paho/paho.mqtt.cpp/issues/524) Fixed copy and move operations for 'subscribe_options'. Added unit tests. ## [Version 1.4.1](https://github.com/eclipse/paho.mqtt.cpp/compare/v1.4.0..v1.4.1) - (2024-07-09) diff --git a/examples/multithr_pub_sub.cpp b/examples/multithr_pub_sub.cpp index c182e227..663cf4fa 100644 --- a/examples/multithr_pub_sub.cpp +++ b/examples/multithr_pub_sub.cpp @@ -21,18 +21,20 @@ // processing, perhaps based on the topics. It could be common, however, to // want to have multiple threads for publishing. // -// The sample demonstrates: -// - Creating a client and accessing it from a shared_ptr<> +// This example demonstrates: +// - Creating a client and sharing it across threads using a shared_ptr<> // - Using one thread to receive incoming messages from the broker and // another thread to publish messages to it. // - Connecting to an MQTT server/broker. -// - Subscribing to a topic -// - Using the asynchronous consumer -// - Publishing messages. +// - Automatic reconnect +// - Publishing messages +// - Subscribing to multiple topics +// - Using the asynchronous message consumer +// - Signaling consumer from another thread // /******************************************************************************* - * Copyright (c) 2020-2023 Frank Pagliughi <fpagliughi@mindspring.com> + * Copyright (c) 2020-2025 Frank Pagliughi <fpagliughi@mindspring.com> * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v2.0 @@ -61,8 +63,8 @@ using namespace std; using namespace std::chrono; -const std::string DFLT_SERVER_ADDRESS("mqtt://localhost:1883"); -const std::string CLIENT_ID("multithr_pub_sub_cpp"); +const std::string DFLT_SERVER_ADDRESS{"mqtt://localhost:1883"}; +const std::string CLIENT_ID{"multithr_pub_sub_cpp"}; ///////////////////////////////////////////////////////////////////////////// @@ -172,6 +174,10 @@ int main(int argc, char* argv[]) auto rsp = cli->connect(connOpts)->get_connect_response(); cout << "OK\n" << endl; + cout << "Now start an application such as 'async_publish_time'\n" + << "that publishes to a 'data/' topic...\n" + << endl; + // Subscribe if this is a new session with the server if (!rsp.is_session_present()) cli->subscribe(TOPICS, QOS); @@ -180,13 +186,32 @@ int main(int argc, char* argv[]) std::thread publisher(publisher_func, cli, counter); + // Start another thread to shut us down after a minute + + std::thread{[cli] { + this_thread::sleep_for(30s); + cout << "Signaling the consumer to stop." << endl; + cli->stop_consuming(); + }}.detach(); + // Consume messages in this thread + // Remember that with the message consumer, we can't detect a + // reconnect We would need to register a connect callback or use the + // event consumer. + while (true) { auto msg = cli->consume_message(); - if (!msg) + if (!msg) { + // Exit if the consumer was shut down + if (cli->consumer_closed()) + break; + + // Otherwise let auto-reconnect deal with it. + cout << "Disconnect detected. Attempting an auto-reconnect." << endl; continue; + } if (msg->get_topic() == "command" && msg->to_string() == "exit") { cout << "Exit command received" << endl; diff --git a/include/mqtt/async_client.h b/include/mqtt/async_client.h index be20638d..999f3cc2 100644 --- a/include/mqtt/async_client.h +++ b/include/mqtt/async_client.h @@ -826,8 +826,102 @@ class async_client : public virtual iasync_client return (que_) ? que_->size() : 0; } /** - * Read the next message from the queue. + * Read the next client event from the queue. * This blocks until a new message arrives. + * If the consumer queue is closed, this returns a shutdown event. + * @return The client event. + */ + event consume_event() override; + /** + * Try to read the next client event without blocking. + * @param evt Pointer to the value to receive the event + * @return @em true if an event was read, @em false if no + * event was available. + */ + bool try_consume_event(event* evt) override; + /** + * Waits a limited time for a client event to appear. + * @param evt Pointer to the value to receive the event. + * @param relTime The maximum amount of time to wait for an event. + * @return @em true if an event was read, @em false if a timeout + * occurred. + */ + template <typename Rep, class Period> + bool try_consume_event_for( + event* evt, const std::chrono::duration<Rep, Period>& relTime + ) { + if (!que_) + throw mqtt::exception(-1, "Consumer not started"); + + try { + return que_->try_get_for(evt, relTime); + } + catch (queue_closed&) { + *evt = event{shutdown_event{}}; + return true; + } + } + /** + * Waits a limited time for a client event to arrive. + * @param relTime The maximum amount of time to wait for an event. + * @return The event that was received. It will contain empty message on + * timeout. + */ + template <typename Rep, class Period> + event try_consume_event_for(const std::chrono::duration<Rep, Period>& relTime) { + event evt; + try { + que_->try_get_for(&evt, relTime); + } + catch (queue_closed&) { + evt = event{shutdown_event{}}; + } + return evt; + } + /** + * Waits until a specific time for a client event to appear. + * @param evt Pointer to the value to receive the event. + * @param absTime The time point to wait until, before timing out. + * @return @em true if an event was recceived, @em false if a timeout + * occurred. + */ + template <class Clock, class Duration> + bool try_consume_event_until( + event* evt, const std::chrono::time_point<Clock, Duration>& absTime + ) { + if (!que_) + throw mqtt::exception(-1, "Consumer not started"); + + try { + return que_->try_get_until(evt, absTime); + } + catch (queue_closed&) { + *evt = event{shutdown_event{}}; + return true; + } + } + /** + * Waits until a specific time for a client event to appear. + * @param absTime The time point to wait until, before timing out. + * @return The event that was received. It will contain empty message on + * timeout. + */ + template <class Clock, class Duration> + event try_consume_event_until(const std::chrono::time_point<Clock, Duration>& absTime + ) { + event evt; + try { + que_->try_get_until(&evt, absTime); + } + catch (queue_closed&) { + evt = event{shutdown_event{}}; + } + return evt; + } + /** + * Read the next message from the queue. + * This blocks until a new message arrives or until a disconnect or + * shutdown occurs. * @return The message and topic. */ const_message_ptr consume_message() override; @@ -855,7 +949,7 @@ class async_client : public virtual iasync_client event evt; while (true) { - if (!que_->try_get_for(&evt, relTime)) + if (!try_consume_event_for(&evt, relTime)) return false; if (const auto* pval = evt.get_message_if()) { @@ -901,7 +995,7 @@ class async_client : public virtual iasync_client event evt; while (true) { - if (!que_->try_get_until(&evt, absTime)) + if (!try_consume_event_until(&evt, absTime)) return false; if (const auto* pval = evt.get_message_if()) { @@ -930,76 +1024,6 @@ class async_client : public virtual iasync_client this->try_consume_message_until(&msg, absTime); return msg; } - /** - * Read the next message from the queue. - * This blocks until a new message arrives. - * @return The message and topic. - */ - event consume_event() override { return que_->get(); } - /** - * Try to read the next message from the queue without blocking. - * @param evt Pointer to the value to receive the event - * @return @em true if an event was read, @em false if no - * event was available. - */ - bool try_consume_event(event* evt) override { return que_->try_get(evt); } - /** - * Waits a limited time for a message to arrive. - * @param evt Pointer to the value to receive the event. - * @param relTime The maximum amount of time to wait for an event. - * @return @em true if an event was read, @em false if a timeout - * occurred. - */ - template <typename Rep, class Period> - bool try_consume_event_for( - event* evt, const std::chrono::duration<Rep, Period>& relTime - ) { - if (!que_) - throw mqtt::exception(-1, "Consumer not started"); - - return que_->try_get_for(evt, relTime); - } - /** - * Waits a limited time for an event to arrive. - * @param relTime The maximum amount of time to wait for an event. - * @return The event that was received. It will contain empty message on - * timeout. - */ - template <typename Rep, class Period> - event try_consume_event_for(const std::chrono::duration<Rep, Period>& relTime) { - event evt; - que_->try_get_for(&evt, relTime); - return evt; - } - /** - * Waits until a specific time for an event to appear. - * @param evt Pointer to the value to receive the event. - * @param absTime The time point to wait until, before timing out. - * @return @em true if an event was recceived, @em false if a timeout - * occurred. - */ - template <class Clock, class Duration> - bool try_consume_event_until( - event* evt, const std::chrono::time_point<Clock, Duration>& absTime - ) { - if (!que_) - throw mqtt::exception(-1, "Consumer not started"); - - return que_->try_get_until(evt, absTime); - } - /** - * Waits until a specific time for an event to appear. - * @param absTime The time point to wait until, before timing out. - * @return The event that was received. It will contain empty message on - * timeout. - */ - template <class Clock, class Duration> - event try_consume_event_until(const std::chrono::time_point<Clock, Duration>& absTime - ) { - event evt; - que_->try_get_until(&evt, absTime); - return evt; - } }; /** Smart/shared pointer to an asynchronous MQTT client object */ diff --git a/include/mqtt/event.h b/include/mqtt/event.h index 8b45dd47..bcb003c4 100644 --- a/include/mqtt/event.h +++ b/include/mqtt/event.h @@ -53,6 +53,9 @@ struct disconnected_event ReasonCode reasonCode; }; +/** Event for when the consumer queue is shutdown from another thread */ +struct shutdown_event { }; + /* Event for when a message arrives is just a message pointer */ @@ -83,7 +86,7 @@ class event public: /** The variant type for any possible event. */ using event_type = std::variant< - const_message_ptr, connected_event, connection_lost_event, disconnected_event>; + const_message_ptr, connected_event, connection_lost_event, disconnected_event, shutdown_event>; private: event_type evt_{}; @@ -124,6 +127,11 @@ class event * @param evt A disconnected event. */ event(disconnected_event evt) : evt_{std::move(evt)} {} + /** + * Constructs a 'shutdown' event. + * @param evt A shutdown event. + */ + event(shutdown_event evt) : evt_{std::move(evt)} {} /** * Copy constructor. * @param evt The event to copy. @@ -196,13 +204,22 @@ class event return std::holds_alternative<disconnected_event>(evt_); } /** - * Determines if this is any type of client disconnect. + * Determines if this event is an internal shutdown request. + * @return @em true if this event is a shutdown request, @em false + * otherwise. + */ + bool is_shutdown() const { + return std::holds_alternative<disconnected_event>(evt_); + } + /** + * Determines if this is any type of client disconnect or shutdown. * @return @em true if this event is any type of client disconnect such - * as a 'connection lost' or 'disconnected' event. + * as a 'connection lost', 'disconnected', or shutdown event. */ bool is_any_disconnect() const { return std::holds_alternative<connection_lost_event>(evt_) - || std::holds_alternative<disconnected_event>(evt_); + || std::holds_alternative<disconnected_event>(evt_) + || std::holds_alternative<shutdown_event>(evt_); } /** * Gets the message from the event, iff this is a message event. diff --git a/src/async_client.cpp b/src/async_client.cpp index 79ecbe10..e4f4b4b4 100644 --- a/src/async_client.cpp +++ b/src/async_client.cpp @@ -879,6 +879,31 @@ void async_client::stop_consuming() } } +event async_client::consume_event() +{ + event evt; + try { + evt = que_->get(); + } + catch (queue_closed&) { + evt = event{shutdown_event{}}; + } + return evt; +} + +bool async_client::try_consume_event(event* evt) +{ + bool res = false; + try { + res = que_->try_get(evt); + } + catch (queue_closed&) { + *evt = event{shutdown_event{}}; + res = true; + } + return res; +} + const_message_ptr async_client::consume_message() { if (!que_) @@ -887,7 +912,7 @@ const_message_ptr async_client::consume_message() // For backward compatibility we ignore the 'connected' events, // whereas disconnected/lost return an empty pointer. while (true) { - auto evt = que_->get(); + auto evt = consume_event(); if (const auto* pval = evt.get_message_if()) return *pval; @@ -905,7 +930,7 @@ bool async_client::try_consume_message(const_message_ptr* msg) event evt; while (true) { - if (!que_->try_get(&evt)) + if (!try_consume_event(&evt)) return false; if (const auto* pval = evt.get_message_if()) {