Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding optional service function #106

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ bundles
.eggs
dist
**/*.egg-info
.vscode/
20 changes: 17 additions & 3 deletions adafruit_esp32spi/adafruit_esp32spi.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,17 @@ class ESP_SPIcontrol: # pylint: disable=too-many-public-methods, too-many-insta

# pylint: disable=too-many-arguments
def __init__(
self, spi, cs_pin, ready_pin, reset_pin, gpio0_pin=None, *, debug=False
self,
spi,
cs_pin,
ready_pin,
reset_pin,
gpio0_pin=None,
*,
debug=False,
service_function=None
):
self._client_service_function = service_function
self._debug = debug
self.set_psk = False
self.set_crt = False
Expand All @@ -157,6 +166,10 @@ def __init__(
self.reset()

# pylint: enable=too-many-arguments
def service_client_fn(self):
"""If one was provided, call the client's service function while waiting"""
if self._client_service_function:
self._client_service_function()

def reset(self):
"""Hard reset the ESP32 using the reset pin"""
Expand Down Expand Up @@ -184,6 +197,7 @@ def _wait_for_ready(self):
if self._debug >= 3:
print(".", end="")
time.sleep(0.05)
self.service_client_fn()
else:
raise RuntimeError("ESP32 not responding")
if self._debug >= 3:
Expand Down Expand Up @@ -394,8 +408,8 @@ def get_scan_networks(self):

def scan_networks(self):
"""Scan for visible access points, returns a list of access point details.
Returns a list of dictionaries with 'ssid', 'rssi' and 'encryption' entries,
one for each AP found"""
Returns a list of dictionaries with 'ssid', 'rssi' and 'encryption' entries,
one for each AP found"""
self.start_scan_networks()
for _ in range(10): # attempts
time.sleep(2)
Expand Down
95 changes: 95 additions & 0 deletions examples/esp32spi_service_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# SPDX-FileCopyrightText: 2020 Bryan Siepert for Adafruit Industries
# SPDX-License-Identifier: MIT

import time
import board
from digitalio import DigitalInOut
import neopixel
from adafruit_io.adafruit_io import IO_HTTP
from adafruit_esp32spi import adafruit_esp32spi
from adafruit_esp32spi import adafruit_esp32spi_wifimanager

# Get wifi details and more from a secrets.py file
try:
from secrets import secrets
except ImportError:
print("WiFi secrets are kept in secrets.py, please add them there!")
raise


class Timer:
"""A class to track timeouts, like an egg timer"""

def __init__(self, timeout=0.0):
self._timeout = None
self._start_time = None
if timeout:
self.rewind_to(timeout)

@property
def expired(self):
"""Returns the expiration status of the timer
Returns:
bool: True if more than `timeout` seconds has past since it was set
"""
return (time.monotonic() - self._start_time) > self._timeout

def rewind_to(self, new_timeout):
"""Re-wind the timer to a new timeout and start ticking"""
self._timeout = float(new_timeout)
self._start_time = time.monotonic()


class IntervalToggler:
def __init__(self, interval=0.3):
self._interval = interval
self._toggle_state = False
self._timer = Timer(self._interval)

def update_toggle(self):
if self._timer.expired:
self._toggle_state = not self._toggle_state
print("Toggle!", self._toggle_state)
self._timer.rewind_to(self._interval)


toggler = IntervalToggler(interval=0.5)

esp32_cs = DigitalInOut(board.ESP_CS)
esp32_ready = DigitalInOut(board.ESP_BUSY)
esp32_reset = DigitalInOut(board.ESP_RESET)

# Create the ESP32SPI class that communicates with the Airlift Module over SPI
# Pass in a service function to be called while waiting for the ESP32 status to update

esp = adafruit_esp32spi.ESP_SPIcontrol(
board.SPI(),
esp32_cs,
esp32_ready,
esp32_reset,
service_function=toggler.update_toggle,
)

status_light = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=0.2)

wifi = adafruit_esp32spi_wifimanager.ESPSPI_WiFiManager(esp, secrets, status_light)
aio_username = secrets["aio_username"]
aio_key = secrets["aio_key"]

# Create an instance of the Adafruit IO HTTP client
io = IO_HTTP(aio_username, aio_key, wifi)

time_timer = Timer(15.0)
while True:
# You can remove the service function keyword argument to compare
toggler.update_toggle()

if time_timer.expired:
try:
print("Current time:", io.receive_time())
except RuntimeError as e:
print("whoops! An error recurred, trying again")
# rewind to 0 to try again immediately
time_timer.rewind_to(0.0)
continue
time_timer.rewind_to(15.0)