Skip to content

Commit

Permalink
Merge pull request #4 from brentru/master
Browse files Browse the repository at this point in the history
Improve network handling and implement time-based KeepAlive
  • Loading branch information
brentru authored Aug 15, 2019
2 parents f3c0cdf + 4d4d6a5 commit 801b3a3
Showing 1 changed file with 91 additions and 44 deletions.
135 changes: 91 additions & 44 deletions adafruit_minimqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,12 @@ class MQTT:
:param str client_id: Optional client identifier, defaults to a unique, generated string.
:param bool is_ssl: Sets a secure or insecure connection with the broker.
:param bool log: Attaches a logger to the MQTT client, defaults to logging level INFO.
:param int keep_alive: KeepAlive interval between the broker and the MiniMQTT client.
"""
# pylint: disable=too-many-arguments,too-many-instance-attributes, not-callable, invalid-name, no-member
def __init__(self, socket, broker, port=None, username=None,
password=None, network_manager=None, client_id=None, is_ssl=True, log=False):
password=None, network_manager=None, client_id=None,
is_ssl=True, log=False, keep_alive=60):
# network management
self._socket = socket
network_manager_type = str(type(network_manager))
Expand Down Expand Up @@ -137,9 +139,11 @@ def __init__(self, socket, broker, port=None, username=None,
self._is_connected = False
self._msg_size_lim = MQTT_MSG_SZ_LIM
self.packet_id = 0
self._keep_alive = 60
self._keep_alive = keep_alive
self._pid = 0
self._user_data = None
self._timestamp = 0
# List of subscribed topics, used for tracking
self._subscribed_topics = []
# Server callbacks
self.on_message = None
Expand Down Expand Up @@ -180,34 +184,6 @@ def last_will(self, topic=None, message=None, qos=0, retain=False):
self._lw_msg = message
self._lw_retain = retain

def reconnect(self, retries=30, resub_topics=True):
"""Attempts to reconnect to the MQTT broker.
:param int retries: Amount of retries before resetting the network interface.
:param bool resub_topics: Resubscribe to previously subscribed topics.
"""
retries = 0
while not self._is_connected:
if self._logger is not None:
self._logger.debug('Attempting to reconnect to broker')
try:
self.connect()
if self._logger is not None:
self._logger.debug('Reconnected to broker')
if resub_topics:
if self._logger is not None:
self._logger.debug('Attempting to resubscribe to prv. subscribed topics.')
while self._subscribed_topics:
feed = self._subscribed_topics.pop()
self.subscribe(feed)
except OSError as e:
if self._logger is not None:
self._logger.debug('Lost connection, reconnecting and resubscribing...', e)
retries += 1
if retries >= 30:
retries = 0
time.sleep(1)
continue

# pylint: disable=too-many-branches, too-many-statements
def connect(self, clean_session=True):
"""Initiates connection with the MQTT Broker.
Expand Down Expand Up @@ -564,29 +540,100 @@ def unsubscribe(self, topic):
self._subscribed_topics.remove(t)
return

@property
def is_wifi_connected(self):
"""Returns if the ESP module is connected to
an access point, resets module if False"""
if self._wifi:
return self._wifi.esp.is_connected
raise MMQTTException("MiniMQTT Client does not use a WiFi NetworkManager.")

# pylint: disable=line-too-long, protected-access
@property
def is_sock_connected(self):
"""Returns if the socket is connected."""
return self.is_wifi_connected and self._sock and self._wifi.esp.socket_connected(self._sock._socknum)

def reconnect_socket(self):
"""Re-establishes the socket's connection with the MQTT broker.
"""
try:
if self._logger is not None:
self._logger.debug("Attempting to reconnect with MQTT Broker...")
self.reconnect()
except RuntimeError as err:
if self._logger is not None:
self._logger.debug('Failed to reconnect with MQTT Broker, retrying...', err)
time.sleep(1)
self.reconnect_socket()

def reconnect_wifi(self):
"""Reconnects to WiFi Access Point and socket, if disconnected.
"""
while not self.is_wifi_connected:
try:
if self._logger is not None:
self._logger.debug('Connecting to WiFi AP...')
self._wifi.connect()
except (RuntimeError, ValueError):
if self._logger is not None:
self._logger.debug('Failed to reset WiFi module, retrying...')
time.sleep(1)
# we just reconnected, is the socket still connected?
if not self.is_sock_connected:
self.reconnect_socket()

def reconnect(self, resub_topics=True):
"""Attempts to reconnect to the MQTT broker.
:param bool resub_topics: Resubscribe to previously subscribed topics.
"""
if self._logger is not None:
self._logger.debug('Attempting to reconnect with MQTT broker')
self.connect()
if self._logger is not None:
self._logger.debug('Reconnected with broker')
if resub_topics:
if self._logger is not None:
self._logger.debug('Attempting to resubscribe to previously subscribed topics.')
while self._subscribed_topics:
feed = self._subscribed_topics.pop()
self.subscribe(feed)

def loop_forever(self):
"""Starts a blocking message loop. Use this
method if you want to run a program forever.
Code below a call to this method will NOT execute.
Network reconnection is handled within this call.
Your code will not execute anything below this call.
"""
run = True
while run:
if self._is_connected:
self._wait_for_msg(0.0)
else:
if self._logger is not None:
self._logger.debug('Lost connection, reconnecting and resubscribing...')
self.reconnect(resub_topics=True)
if self._logger is not None:
self._logger.debug('Connection restored, continuing to loop forever...')
while True:
# Check WiFi and socket status
if self.is_sock_connected:
try:
self.loop()
except (RuntimeError, ValueError):
if self._wifi:
# Reconnect the WiFi module and the socket
self.reconnect_wifi()
continue

def loop(self):
"""Non-blocking message loop. Use this method to
check incoming subscription messages. Does not handle
network reconnection like loop_forever - reconnection must
be handled within your code.
check incoming subscription messages.
This method does NOT handle networking or
network hardware management, use loop_forever
or handle in code instead.
"""
if self._timestamp == 0:
self._timestamp = time.monotonic()
current_time = time.monotonic()
if current_time - self._timestamp >= self._keep_alive:
# Handle KeepAlive by expecting a PINGREQ/PINGRESP from the server
if self._logger is not None:
self._logger.debug('KeepAlive period elapsed - requesting a PINGRESP from the server...')
self.ping()
self._timestamp = 0
self._sock.settimeout(0.1)
return self._wait_for_msg()

Expand Down

0 comments on commit 801b3a3

Please sign in to comment.