Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve network handling and implement time-based KeepAlive #4

Merged
merged 2 commits into from
Aug 15, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 91 additions & 44 deletions adafruit_minimqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,12 @@ class MQTT:
:param str client_id: Optional client identifier, defaults to a unique, generated string.
:param bool is_ssl: Sets a secure or insecure connection with the broker.
:param bool log: Attaches a logger to the MQTT client, defaults to logging level INFO.
:param int keep_alive: KeepAlive interval between the broker and the MiniMQTT client.
"""
# pylint: disable=too-many-arguments,too-many-instance-attributes, not-callable, invalid-name, no-member
def __init__(self, socket, broker, port=None, username=None,
password=None, network_manager=None, client_id=None, is_ssl=True, log=False):
password=None, network_manager=None, client_id=None,
is_ssl=True, log=False, keep_alive=60):
# network management
self._socket = socket
network_manager_type = str(type(network_manager))
Expand Down Expand Up @@ -137,9 +139,11 @@ def __init__(self, socket, broker, port=None, username=None,
self._is_connected = False
self._msg_size_lim = MQTT_MSG_SZ_LIM
self.packet_id = 0
self._keep_alive = 60
self._keep_alive = keep_alive
self._pid = 0
self._user_data = None
self._timestamp = 0
# List of subscribed topics, used for tracking
self._subscribed_topics = []
# Server callbacks
self.on_message = None
Expand Down Expand Up @@ -180,34 +184,6 @@ def last_will(self, topic=None, message=None, qos=0, retain=False):
self._lw_msg = message
self._lw_retain = retain

def reconnect(self, retries=30, resub_topics=True):
"""Attempts to reconnect to the MQTT broker.
:param int retries: Amount of retries before resetting the network interface.
:param bool resub_topics: Resubscribe to previously subscribed topics.
"""
retries = 0
while not self._is_connected:
if self._logger is not None:
self._logger.debug('Attempting to reconnect to broker')
try:
self.connect()
if self._logger is not None:
self._logger.debug('Reconnected to broker')
if resub_topics:
if self._logger is not None:
self._logger.debug('Attempting to resubscribe to prv. subscribed topics.')
while self._subscribed_topics:
feed = self._subscribed_topics.pop()
self.subscribe(feed)
except OSError as e:
if self._logger is not None:
self._logger.debug('Lost connection, reconnecting and resubscribing...', e)
retries += 1
if retries >= 30:
retries = 0
time.sleep(1)
continue

# pylint: disable=too-many-branches, too-many-statements
def connect(self, clean_session=True):
"""Initiates connection with the MQTT Broker.
Expand Down Expand Up @@ -564,29 +540,100 @@ def unsubscribe(self, topic):
self._subscribed_topics.remove(t)
return

@property
def is_wifi_connected(self):
"""Returns if the ESP module is connected to
an access point, resets module if False"""
if self._wifi:
return self._wifi.esp.is_connected
raise MMQTTException("MiniMQTT Client does not use a WiFi NetworkManager.")

# pylint: disable=line-too-long, protected-access
@property
def is_sock_connected(self):
"""Returns if the socket is connected."""
return self.is_wifi_connected and self._sock and self._wifi.esp.socket_connected(self._sock._socknum)

def reconnect_socket(self):
"""Re-establishes the socket's connection with the MQTT broker.
"""
try:
if self._logger is not None:
self._logger.debug("Attempting to reconnect with MQTT Broker...")
self.reconnect()
except RuntimeError as err:
if self._logger is not None:
self._logger.debug('Failed to reconnect with MQTT Broker, retrying...', err)
time.sleep(1)
self.reconnect_socket()

def reconnect_wifi(self):
"""Reconnects to WiFi Access Point and socket, if disconnected.
"""
while not self.is_wifi_connected:
try:
if self._logger is not None:
self._logger.debug('Connecting to WiFi AP...')
self._wifi.connect()
except (RuntimeError, ValueError):
if self._logger is not None:
self._logger.debug('Failed to reset WiFi module, retrying...')
time.sleep(1)
# we just reconnected, is the socket still connected?
if not self.is_sock_connected:
self.reconnect_socket()

def reconnect(self, resub_topics=True):
"""Attempts to reconnect to the MQTT broker.
:param bool resub_topics: Resubscribe to previously subscribed topics.
"""
if self._logger is not None:
self._logger.debug('Attempting to reconnect with MQTT broker')
self.connect()
if self._logger is not None:
self._logger.debug('Reconnected with broker')
if resub_topics:
if self._logger is not None:
self._logger.debug('Attempting to resubscribe to previously subscribed topics.')
while self._subscribed_topics:
feed = self._subscribed_topics.pop()
self.subscribe(feed)

def loop_forever(self):
"""Starts a blocking message loop. Use this
method if you want to run a program forever.
Code below a call to this method will NOT execute.
Network reconnection is handled within this call.
Your code will not execute anything below this call.

"""
run = True
while run:
if self._is_connected:
self._wait_for_msg(0.0)
else:
if self._logger is not None:
self._logger.debug('Lost connection, reconnecting and resubscribing...')
self.reconnect(resub_topics=True)
if self._logger is not None:
self._logger.debug('Connection restored, continuing to loop forever...')
while True:
# Check WiFi and socket status
if self.is_sock_connected:
try:
self.loop()
except (RuntimeError, ValueError):
if self._wifi:
# Reconnect the WiFi module and the socket
self.reconnect_wifi()
continue

def loop(self):
"""Non-blocking message loop. Use this method to
check incoming subscription messages. Does not handle
network reconnection like loop_forever - reconnection must
be handled within your code.
check incoming subscription messages.

This method does NOT handle networking or
network hardware management, use loop_forever
or handle in code instead.
"""
if self._timestamp == 0:
self._timestamp = time.monotonic()
current_time = time.monotonic()
if current_time - self._timestamp >= self._keep_alive:
# Handle KeepAlive by expecting a PINGREQ/PINGRESP from the server
if self._logger is not None:
self._logger.debug('KeepAlive period elapsed - requesting a PINGRESP from the server...')
self.ping()
self._timestamp = 0
self._sock.settimeout(0.1)
return self._wait_for_msg()

Expand Down