Skip to content

Commit

Permalink
Merge pull request adafruit#21 from brentru/update-for-ethernet
Browse files Browse the repository at this point in the history
Updates for Ethernet, Refactor
  • Loading branch information
brentru authored Mar 17, 2020
2 parents 6b3c2aa + ab1dfc8 commit ecfc57b
Show file tree
Hide file tree
Showing 9 changed files with 329 additions and 183 deletions.
184 changes: 69 additions & 115 deletions adafruit_minimqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,31 @@
}


_the_interface = None # pylint: disable=invalid-name
_the_sock = None # pylint: disable=invalid-name

class MMQTTException(Exception):
"""MiniMQTT Exception class."""

# pylint: disable=unnecessary-pass
# pass


def set_socket(sock, iface=None):
"""Helper to set the global socket and optionally set the global network interface.
:param sock: socket object.
:param iface: internet interface object
"""
global _the_sock # pylint: disable=invalid-name, global-statement
_the_sock = sock
if iface:
global _the_interface # pylint: disable=invalid-name, global-statement
_the_interface = iface
_the_sock.set_interface(iface)

class MQTT:
"""MQTT Client for CircuitPython
:param socket: Socket object for provided network interface
:param str broker: MQTT Broker URL or IP Address.
:param int port: Optional port definition, defaults to 8883.
:param str username: Username for broker authentication.
Expand All @@ -95,33 +110,18 @@ class MQTT:
:param bool is_ssl: Sets a secure or insecure connection with the broker.
:param bool log: Attaches a logger to the MQTT client, defaults to logging level INFO.
:param int keep_alive: KeepAlive interval between the broker and the MiniMQTT client.
"""

# pylint: disable=too-many-arguments,too-many-instance-attributes, not-callable, invalid-name, no-member
def __init__(
self,
socket,
broker,
port=None,
username=None,
password=None,
network_manager=None,
client_id=None,
is_ssl=True,
log=False,
keep_alive=60,
):
# network management
self._socket = socket
network_manager_type = str(type(network_manager))
if "ESPSPI_WiFiManager" in network_manager_type:
self._wifi = network_manager
else:
raise TypeError("This library requires a NetworkManager object.")
def __init__(self, broker, port=None, username=None,
password=None, client_id=None,
is_ssl=True, log=False, keep_alive=60):
self._sock = None
# broker
try: # set broker IP
self.broker = self._wifi.esp.unpretty_ip(broker)
except ValueError: # set broker URL
try: # set broker IP
self.broker = _the_interface.unpretty_ip(broker)
except ValueError: # set broker URL
self.broker = broker
# port/ssl
self.port = MQTT_TCP_PORT
Expand Down Expand Up @@ -181,6 +181,7 @@ def __exit__(self, exception_type, exception_value, traceback):
def deinit(self):
"""De-initializes the MQTT client and disconnects from
the mqtt broker.
"""
self.disconnect()

Expand All @@ -190,6 +191,7 @@ def last_will(self, topic=None, message=None, qos=0, retain=False):
:param str message: Last will disconnection message.
:param int qos: Quality of Service level.
:param bool retain: Specifies if the message is to be retained when it is published.
"""
if self._is_connected:
raise MMQTTException(
Expand All @@ -204,37 +206,45 @@ def last_will(self, topic=None, message=None, qos=0, retain=False):
self._lw_msg = message
self._lw_retain = retain

# pylint: disable=too-many-branches, too-many-statements
# pylint: disable=too-many-branches, too-many-statements, too-many-locals
def connect(self, clean_session=True):
"""Initiates connection with the MQTT Broker.
:param bool clean_session: Establishes a persistent session.
"""
self._set_interface()
if self.logger is not None:
self.logger.debug("Creating new socket")
self._sock = self._socket.socket()
self._sock.settimeout(10)
try:
proto, dummy, self.broker, path = self.broker.split("/", 3)
# replace spaces in path
path = path.replace(" ", "%20")
except ValueError:
proto, dummy, self.broker = self.broker.split("/", 2)
path = ""
if proto == "http:":
self.port = MQTT_TCP_PORT
elif proto == "https:":
self.port = MQTT_TLS_PORT
else:
raise ValueError("Unsupported protocol: " + proto)

if ":" in self.broker:
self.broker, port = self.broker.split(":", 1)
port = int(port)

addr = _the_sock.getaddrinfo(self.broker, self.port, 0, _the_sock.SOCK_STREAM)[0]
self._sock = _the_sock.socket(addr[0], addr[1], addr[2])
self._sock.settimeout(15)
if self.port == 8883:
try:
if self.logger is not None:
self.logger.debug(
"Attempting to establish secure MQTT connection..."
)
self._sock.connect((self.broker, self.port), TLS_MODE)
except RuntimeError:
raise MMQTTException("Invalid broker address defined.")
self.logger.debug('Attempting to establish secure MQTT connection...')
self._sock.connect((self.broker, self.port), _the_interface.TLS_MODE)
except RuntimeError as e:
raise MMQTTException("Invalid broker address defined.", e)
else:
if isinstance(self.broker, str):
addr = self._socket.getaddrinfo(self.broker, self.port)[0][-1]
else:
addr = (self.broker, self.port)
try:
if self.logger is not None:
self.logger.debug(
"Attempting to establish insecure MQTT connection..."
)
# self._sock.connect((self.broker, self.port), TCP_MODE)
self._sock.connect(addr, TCP_MODE)
self.logger.debug('Attempting to establish insecure MQTT connection...')
self._sock.connect(addr[-1], TCP_MODE)
except RuntimeError as e:
raise MMQTTException("Invalid broker address defined.", e)

Expand Down Expand Up @@ -376,9 +386,9 @@ def publish(self, topic, msg, retain=False, qos=0):
raise MMQTTException("Publish topic can not contain wildcards.")
# check msg/qos kwargs
if msg is None:
raise MMQTTException("Message can not be None.")
raise MMQTTException('Message can not be None.')
if isinstance(msg, (int, float)):
msg = str(msg).encode("ascii")
msg = str(msg).encode('ascii')
elif isinstance(msg, str):
msg = str(msg).encode("utf-8")
else:
Expand Down Expand Up @@ -574,55 +584,6 @@ def unsubscribe(self, topic):
self._subscribed_topics.remove(t)
return

@property
def is_wifi_connected(self):
"""Returns if the ESP module is connected to
an access point, resets module if False"""
if self._wifi:
return self._wifi.esp.is_connected
raise MMQTTException("MiniMQTT Client does not use a WiFi NetworkManager.")

# pylint: disable=line-too-long, protected-access
@property
def is_sock_connected(self):
"""Returns if the socket is connected."""
return (
self.is_wifi_connected
and self._sock
and self._wifi.esp.socket_connected(self._sock._socknum)
)

def reconnect_socket(self):
"""Re-establishes the socket's connection with the MQTT broker.
"""
try:
if self.logger is not None:
self.logger.debug("Attempting to reconnect with MQTT Broker...")
self.reconnect()
except RuntimeError as err:
if self.logger is not None:
self.logger.debug(
"Failed to reconnect with MQTT Broker, retrying...", err
)
time.sleep(1)
self.reconnect_socket()

def reconnect_wifi(self):
"""Reconnects to WiFi Access Point and socket, if disconnected.
"""
while not self.is_wifi_connected:
try:
if self.logger is not None:
self.logger.debug("Connecting to WiFi AP...")
self._wifi.connect()
except (RuntimeError, ValueError):
if self.logger is not None:
self.logger.debug("Failed to reset WiFi module, retrying...")
time.sleep(1)
# we just reconnected, is the socket still connected?
if not self.is_sock_connected:
self.reconnect_socket()

def reconnect(self, resub_topics=True):
"""Attempts to reconnect to the MQTT broker.
:param bool resub_topics: Resubscribe to previously subscribed topics.
Expand All @@ -645,37 +606,30 @@ def loop_forever(self):
"""Starts a blocking message loop. Use this
method if you want to run a program forever.
Code below a call to this method will NOT execute.
Network reconnection is handled within this call.
NOTE: This method is depreciated and will be removed in the
next major release. Please see examples/minimqtt_pub_sub_blocking.py
for an example of creating a blocking loop which can handle wireless
network events.
"""
while True:
# Check WiFi and socket status
if self.is_sock_connected:
try:
self.loop()
except (RuntimeError, ValueError):
if self._wifi:
# Reconnect the WiFi module and the socket
self.reconnect_wifi()
continue
if self._sock.connected:
self.loop()

def loop(self):
"""Non-blocking message loop. Use this method to
check incoming subscription messages.
This method does NOT handle networking or
network hardware management, use loop_forever
or handle in code instead.
"""
if self._timestamp == 0:
self._timestamp = time.monotonic()
current_time = time.monotonic()
if current_time - self._timestamp >= self.keep_alive:
# Handle KeepAlive by expecting a PINGREQ/PINGRESP from the server
if self.logger is not None:
self.logger.debug(
"KeepAlive period elapsed - requesting a PINGRESP from the server..."
)
self.logger.debug('KeepAlive period elapsed - \
requesting a PINGRESP from the server...')
self.ping()
self._timestamp = 0
self._sock.settimeout(0.1)
Expand Down Expand Up @@ -745,10 +699,10 @@ def _check_topic(topic):
raise MMQTTException("Topic may not be NoneType")
# [MQTT-4.7.3-1]
if not topic:
raise MMQTTException("Topic may not be empty.")
raise MMQTTException('Topic may not be empty.')
# [MQTT-4.7.3-3]
if len(topic.encode("utf-8")) > MQTT_TOPIC_LENGTH_LIMIT:
raise MMQTTException("Topic length is too large.")
if len(topic.encode('utf-8')) > MQTT_TOPIC_LENGTH_LIMIT:
raise MMQTTException('Topic length is too large.')

@staticmethod
def _check_qos(qos_level):
Expand Down
86 changes: 86 additions & 0 deletions examples/minimqtt_adafruitio_eth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Adafruit MiniMQTT Pub/Sub Example
# Written by Tony DiCola for Adafruit Industries
# Modified by Brent Rubell for Adafruit Industries
import time
import board
import busio
from digitalio import DigitalInOut

from adafruit_wiznet5k.adafruit_wiznet5k import WIZNET5K
import adafruit_wiznet5k.adafruit_wiznet5k_socket as socket

import adafruit_minimqtt as MQTT

# Get Adafruit IO details and more from a secrets.py file
try:
from secrets import secrets
except ImportError:
print("Adafruit IO secrets are kept in secrets.py, please add them there!")
raise

cs = DigitalInOut(board.D10)
spi_bus = busio.SPI(board.SCK, MOSI=board.MOSI, MISO=board.MISO)

# Initialize ethernet interface with DHCP
eth = WIZNET5K(spi_bus, cs)

### Feeds ###

# Setup a feed named 'photocell' for publishing to a feed
photocell_feed = secrets['aio_username'] + '/feeds/photocell'

# Setup a feed named 'onoff' for subscribing to changes
onoff_feed = secrets['aio_username'] + '/feeds/onoff'

### Code ###

# Define callback methods which are called when events occur
# pylint: disable=unused-argument, redefined-outer-name
def connected(client, userdata, flags, rc):
# This function will be called when the client is connected
# successfully to the broker.
print('Connected to Adafruit IO! Listening for topic changes on %s' % onoff_feed)
# Subscribe to all changes on the onoff_feed.
client.subscribe(onoff_feed)


def disconnected(client, userdata, rc):
# This method is called when the client is disconnected
print('Disconnected from Adafruit IO!')


def message(client, topic, message):
# This method is called when a topic the client is subscribed to
# has a new message.
print('New message on topic {0}: {1}'.format(topic, message))


# Initialize MQTT interface with the ethernet interface
MQTT.set_socket(socket, eth)

# Set up a MiniMQTT Client
# NOTE: We'll need to connect insecurely for ethernet configurations.
mqtt_client = MQTT.MQTT(broker = 'http://io.adafruit.com',
username = secrets['aio_username'],
password = secrets['aio_key'])

# Setup the callback methods above
mqtt_client.on_connect = connected
mqtt_client.on_disconnect = disconnected
mqtt_client.on_message = message

# Connect the client to the MQTT broker.
print('Connecting to Adafruit IO...')
mqtt_client.connect()

photocell_val = 0
while True:
# Poll the message queue
mqtt_client.loop()

# Send a new message
print('Sending photocell value: %d...' % photocell_val)
mqtt_client.publish(photocell_feed, photocell_val)
print('Sent!')
photocell_val += 1
time.sleep(5)
Loading

0 comments on commit ecfc57b

Please sign in to comment.