From b05c17c80497fd9b8d1fa3a84af27d46631d6fad Mon Sep 17 00:00:00 2001 From: brentru Date: Wed, 14 Aug 2019 18:22:24 -0400 Subject: [PATCH 1/2] add keepalive 60s default --- adafruit_minimqtt.py | 136 +++++++++++++++++++++++++++++-------------- 1 file changed, 92 insertions(+), 44 deletions(-) diff --git a/adafruit_minimqtt.py b/adafruit_minimqtt.py index ef7f3f6..b6a04e2 100644 --- a/adafruit_minimqtt.py +++ b/adafruit_minimqtt.py @@ -90,10 +90,13 @@ class MQTT: :param str client_id: Optional client identifier, defaults to a unique, generated string. :param bool is_ssl: Sets a secure or insecure connection with the broker. :param bool log: Attaches a logger to the MQTT client, defaults to logging level INFO. + :param int keep_alive: KeepAlive interval between the broker and the + MiniMQTT client, in seconds. """ # pylint: disable=too-many-arguments,too-many-instance-attributes, not-callable, invalid-name, no-member def __init__(self, socket, broker, port=None, username=None, - password=None, network_manager=None, client_id=None, is_ssl=True, log=False): + password=None, network_manager=None, client_id=None, + is_ssl=True, log=False, keep_alive=60): # network management self._socket = socket network_manager_type = str(type(network_manager)) @@ -137,9 +140,11 @@ def __init__(self, socket, broker, port=None, username=None, self._is_connected = False self._msg_size_lim = MQTT_MSG_SZ_LIM self.packet_id = 0 - self._keep_alive = 60 + self._keep_alive = keep_alive self._pid = 0 self._user_data = None + self._timestamp = 0 + # List of subscribed topics, used for tracking self._subscribed_topics = [] # Server callbacks self.on_message = None @@ -180,34 +185,6 @@ def last_will(self, topic=None, message=None, qos=0, retain=False): self._lw_msg = message self._lw_retain = retain - def reconnect(self, retries=30, resub_topics=True): - """Attempts to reconnect to the MQTT broker. - :param int retries: Amount of retries before resetting the network interface. - :param bool resub_topics: Resubscribe to previously subscribed topics. - """ - retries = 0 - while not self._is_connected: - if self._logger is not None: - self._logger.debug('Attempting to reconnect to broker') - try: - self.connect() - if self._logger is not None: - self._logger.debug('Reconnected to broker') - if resub_topics: - if self._logger is not None: - self._logger.debug('Attempting to resubscribe to prv. subscribed topics.') - while self._subscribed_topics: - feed = self._subscribed_topics.pop() - self.subscribe(feed) - except OSError as e: - if self._logger is not None: - self._logger.debug('Lost connection, reconnecting and resubscribing...', e) - retries += 1 - if retries >= 30: - retries = 0 - time.sleep(1) - continue - # pylint: disable=too-many-branches, too-many-statements def connect(self, clean_session=True): """Initiates connection with the MQTT Broker. @@ -564,29 +541,100 @@ def unsubscribe(self, topic): self._subscribed_topics.remove(t) return + @property + def is_wifi_connected(self): + """Returns if the ESP module is connected to + an access point, resets module if False""" + if self._wifi: + return self._wifi.esp.is_connected + raise MMQTTException("MiniMQTT Client does not use a WiFi NetworkManager.") + + # pylint: disable=line-too-long, protected-access + @property + def is_sock_connected(self): + """Returns if the socket is connected.""" + return self.is_wifi_connected and self._sock and self._wifi.esp.socket_connected(self._sock._socknum) + + def reconnect_socket(self): + """Re-establishes the socket's connection with the MQTT broker. + """ + try: + if self._logger is not None: + self._logger.debug("Attempting to reconnect with MQTT Broker...") + self.reconnect() + except RuntimeError as err: + if self._logger is not None: + self._logger.debug('Failed to reconnect with MQTT Broker, retrying...', err) + time.sleep(1) + self.reconnect_socket() + + def reconnect_wifi(self): + """Reconnects to WiFi Access Point and socket, if disconnected. + """ + while not self.is_wifi_connected: + try: + if self._logger is not None: + self._logger.debug('Connecting to WiFi AP...') + self._wifi.connect() + except (RuntimeError, ValueError): + if self._logger is not None: + self._logger.debug('Failed to reset WiFi module, retrying...') + time.sleep(1) + # we just reconnected, is the socket still connected? + if not self.is_sock_connected: + self.reconnect_socket() + + def reconnect(self, resub_topics=True): + """Attempts to reconnect to the MQTT broker. + :param bool resub_topics: Resubscribe to previously subscribed topics. + """ + if self._logger is not None: + self._logger.debug('Attempting to reconnect with MQTT broker') + self.connect() + if self._logger is not None: + self._logger.debug('Reconnected with broker') + if resub_topics: + if self._logger is not None: + self._logger.debug('Attempting to resubscribe to previously subscribed topics.') + while self._subscribed_topics: + feed = self._subscribed_topics.pop() + self.subscribe(feed) + def loop_forever(self): """Starts a blocking message loop. Use this method if you want to run a program forever. + Code below a call to this method will NOT execute. Network reconnection is handled within this call. - Your code will not execute anything below this call. + """ - run = True - while run: - if self._is_connected: - self._wait_for_msg(0.0) - else: - if self._logger is not None: - self._logger.debug('Lost connection, reconnecting and resubscribing...') - self.reconnect(resub_topics=True) - if self._logger is not None: - self._logger.debug('Connection restored, continuing to loop forever...') + while True: + # Check WiFi and socket status + if self.is_sock_connected: + try: + self.loop() + except (RuntimeError, ValueError): + if self._wifi: + # Reconnect the WiFi module and the socket + self.reconnect_wifi() + continue def loop(self): """Non-blocking message loop. Use this method to - check incoming subscription messages. Does not handle - network reconnection like loop_forever - reconnection must - be handled within your code. + check incoming subscription messages. + + This method does NOT handle networking or + network hardware management, use loop_forever + or handle in code instead. """ + if self._timestamp == 0: + self._timestamp = time.monotonic() + current_time = time.monotonic() + if current_time - self._timestamp >= self._keep_alive: + # Handle KeepAlive by expecting a PINGREQ/PINGRESP from the server + if self._logger is not None: + self._logger.debug('KeepAlive period elapsed - requesting a PINGRESP from the server...') + self.ping() + self._timestamp = 0 self._sock.settimeout(0.1) return self._wait_for_msg() From 4d4d6a59e20c38a4b7a9c03695596473a33fe8ef Mon Sep 17 00:00:00 2001 From: brentru Date: Thu, 15 Aug 2019 09:58:36 -0400 Subject: [PATCH 2/2] fix docstring --- adafruit_minimqtt.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/adafruit_minimqtt.py b/adafruit_minimqtt.py index b6a04e2..bf245b1 100644 --- a/adafruit_minimqtt.py +++ b/adafruit_minimqtt.py @@ -90,8 +90,7 @@ class MQTT: :param str client_id: Optional client identifier, defaults to a unique, generated string. :param bool is_ssl: Sets a secure or insecure connection with the broker. :param bool log: Attaches a logger to the MQTT client, defaults to logging level INFO. - :param int keep_alive: KeepAlive interval between the broker and the - MiniMQTT client, in seconds. + :param int keep_alive: KeepAlive interval between the broker and the MiniMQTT client. """ # pylint: disable=too-many-arguments,too-many-instance-attributes, not-callable, invalid-name, no-member def __init__(self, socket, broker, port=None, username=None,