Skip to content

Commit

Permalink
Update Steam API Example with Connection Manager
Browse files Browse the repository at this point in the history
  • Loading branch information
DJDevon3 committed Mar 18, 2024
1 parent 7106d2f commit eda9969
Showing 1 changed file with 87 additions and 76 deletions.
163 changes: 87 additions & 76 deletions examples/wifi/expanded/requests_wifi_api_steam.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,48 @@
# SPDX-FileCopyrightText: 2022 DJDevon3 (Neradoc & Deshipu helped) for Adafruit Industries
# SPDX-FileCopyrightText: 2024 DJDevon3
# SPDX-License-Identifier: MIT
# Coded for Circuit Python 8.0
"""DJDevon3 Adafruit Feather ESP32-S2 api_steam Example"""
import gc
import json
# Coded for Circuit Python 8.2.x
"""Steam API Get Owned Games Example"""
# pylint: disable=import-error

import os
import ssl
import time

import socketpool
import adafruit_connection_manager
import wifi

import adafruit_requests

# Steam API Docs: https://steamcommunity.com/dev
# Steam API Key: https://steamcommunity.com/dev/apikey
# Steam Usernumber: Visit https://steamcommunity.com
# click on your profile icon, your usernumber will be in the browser url.
# Numerical Steam ID: Visit https://store.steampowered.com/account/
# Your account name will be in big bold letters.
# Your numerical STEAM ID will be below in a very small font.

# Get WiFi details, ensure these are setup in settings.toml
ssid = os.getenv("CIRCUITPY_WIFI_SSID")
password = os.getenv("CIRCUITPY_WIFI_PASSWORD")
# Requires Steam Developer API key
steam_usernumber = os.getenv("steam_id")
steam_apikey = os.getenv("steam_api_key")

# Initialize WiFi Pool (There can be only 1 pool & top of script)
pool = socketpool.SocketPool(wifi.radio)
steam_usernumber = os.getenv("STEAM_ID")
steam_apikey = os.getenv("STEAM_API_KEY")

# Time between API refreshes
# API Polling Rate
# 900 = 15 mins, 1800 = 30 mins, 3600 = 1 hour
sleep_time = 900
SLEEP_TIME = 3600

# Set debug to True for full JSON response.
# WARNING: Steam's full response will overload most microcontrollers
# SET TO TRUE IF YOU FEEL BRAVE =)
DEBUG = False

# Initalize Wifi, Socket Pool, Request Session
pool = adafruit_connection_manager.get_radio_socketpool(wifi.radio)
ssl_context = adafruit_connection_manager.get_radio_ssl_context(wifi.radio)
requests = adafruit_requests.Session(pool, ssl_context)

# Deconstruct URL (pylint hates long lines)
# http://api.steampowered.com/IPlayerService/GetOwnedGames/v0001/
# ?key=XXXXXXXXXXXXXXXXXXXXX&include_played_free_games=1&steamid=XXXXXXXXXXXXXXXX&format=json
Steam_OwnedGames_URL = (
STEAM_SOURCE = (
"http://api.steampowered.com/IPlayerService/GetOwnedGames/v0001/?"
+ "key="
+ steam_apikey
Expand All @@ -45,75 +52,79 @@
+ "&format=json"
)

if sleep_time < 60:
sleep_time_conversion = "seconds"
sleep_int = sleep_time
elif 60 <= sleep_time < 3600:
sleep_int = sleep_time / 60
sleep_time_conversion = "minutes"
elif 3600 <= sleep_time < 86400:
sleep_int = sleep_time / 60 / 60
sleep_time_conversion = "hours"
else:
sleep_int = sleep_time / 60 / 60 / 24
sleep_time_conversion = "days"

# Connect to Wi-Fi
print("\n===============================")
print("Connecting to WiFi...")
requests = adafruit_requests.Session(pool, ssl.create_default_context())
while not wifi.radio.ipv4_address:
try:
wifi.radio.connect(ssid, password)
except ConnectionError as e:
print("Connection Error:", e)
print("Retrying in 10 seconds")
time.sleep(10)
gc.collect()
print("Connected!\n")

def time_calc(input_time):
"""Converts seconds to minutes/hours/days"""
if input_time < 60:
return f"{input_time:.0f} seconds"
if input_time < 3600:
return f"{input_time / 60:.0f} minutes"
if input_time < 86400:
return f"{input_time / 60 / 60:.0f} hours"
return f"{input_time / 60 / 60 / 24:.1f} days"


def _format_datetime(datetime):
"""F-String formatted struct time conversion"""
return (
f"{datetime.tm_mon:02}/"
+ f"{datetime.tm_mday:02}/"
+ f"{datetime.tm_year:02} "
+ f"{datetime.tm_hour:02}:"
+ f"{datetime.tm_min:02}:"
+ f"{datetime.tm_sec:02}"
)


while True:
# Connect to Wi-Fi
print("\nConnecting to WiFi...")
while not wifi.radio.ipv4_address:
try:
wifi.radio.connect(ssid, password)
except ConnectionError as e:
print("❌ Connection Error:", e)
print("Retrying in 10 seconds")
print("✅ Wifi!")

try:
print("\nAttempting to GET STEAM Stats!") # --------------------------------
# Print Request to Serial
debug_request = False # Set true to see full request
if debug_request:
print("Full API GET URL: ", Steam_OwnedGames_URL)
print("===============================")
print(" | Attempting to GET Steam API JSON!")
try:
steam_response = requests.get(url=Steam_OwnedGames_URL).json()
steam_response = requests.get(url=STEAM_SOURCE)
steam_json = steam_response.json()
except ConnectionError as e:
print("Connection Error:", e)
print("Retrying in 10 seconds")

# Print Response to Serial
debug_response = False # Set true to see full response
if debug_response:
dump_object = json.dumps(steam_response)
print("JSON Dump: ", dump_object)

# Print Keys to Serial
steam_debug_keys = True # Set True to print Serial data
if steam_debug_keys:
game_count = steam_response["response"]["game_count"]
print("Total Games: ", game_count)
total_minutes = 0

for game in steam_response["response"]["games"]:
total_minutes += game["playtime_forever"]
total_hours = total_minutes / 60
total_days = total_minutes / 60 / 24
print(f"Total Hours: {total_hours}")
print(f"Total Days: {total_days}")

print("Monotonic: ", time.monotonic())
print(" | ✅ Steam JSON!")

if DEBUG:
print("Full API GET URL: ", STEAM_SOURCE)
print(steam_json)

game_count = steam_json["response"]["game_count"]
print(f" | | Total Games: {game_count}")
TOTAL_MINUTES = 0

for game in steam_json["response"]["games"]:
TOTAL_MINUTES += game["playtime_forever"]
total_hours = TOTAL_MINUTES / 60
total_days = TOTAL_MINUTES / 60 / 24
total_years = TOTAL_MINUTES / 60 / 24 / 365
print(f" | | Total Hours: {total_hours}")
print(f" | | Total Days: {total_days}")
print(f" | | Total Years: {total_years:.2f}")

steam_response.close()
print("✂️ Disconnected from Steam API")

print("\nFinished!")
print("Next Update in %s %s" % (int(sleep_int), sleep_time_conversion))
print(f"Board Uptime: {time_calc(time.monotonic())}")
print(f"Next Update: {time_calc(SLEEP_TIME)}")
print("===============================")
gc.collect()

except (ValueError, RuntimeError) as e:
print("Failed to get data, retrying\n", e)
print(f"Failed to get data, retrying\n {e}")
time.sleep(60)
continue
time.sleep(sleep_time)
break
time.sleep(SLEEP_TIME)

0 comments on commit eda9969

Please sign in to comment.