-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathairthings.py
121 lines (104 loc) · 4.85 KB
/
airthings.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import datetime
import logging
import json
import requests
from requests import HTTPError
# API documentation: https://developer.airthings.com/consumer-api-docs/
class Device:
def __init__(self, conf: dict) -> None:
self.id: str = conf["id"]
self.type: str = conf["deviceType"]
self.name: str = conf["segment"]["name"]
self.location: str = conf["location"]["name"]
def __str__(self) -> str:
return f"{self.name} ({self.id}) in {self.location}"
class AirthingsConnector:
def __init__(self, conf: dict) -> None:
self._clientId: str = conf["clientID"]
self._clientSecret: str = conf["clientSecret"]
self._enabled_devices: set[str] = set(conf["devices"])
self._devices: list[Device] = None
self._fields_rename: dict[str, str] = conf["fields_rename"]
self._measurement: str = conf["measurement"]
self._auth_url: str = conf["auth_url"]
self._api_url: str = conf["api_url"]
self._auth_token: str = None
self._elevation: float = float(conf["elevation"])
def __get_auth_token(self) -> str:
try:
logging.debug(f"Getting authentication token")
response = requests.post(
self._auth_url,
data={
"grant_type": "client_credentials",
"scope": "read:device:current_values",
},
allow_redirects=False,
auth=(self._clientId, self._clientSecret),
)
response.raise_for_status()
return response.json()["access_token"]
except HTTPError as e:
logging.exception("Unable to get authentication token")
raise
def __fetch_data(self, path: str, retry: bool = True) -> json:
if not self._auth_token:
self._auth_token = self.__get_auth_token()
url = f"{self._api_url}{path}"
logging.debug(f"Calling {url}")
try:
response = requests.get(url=url, headers={"Authorization": f"Bearer {self._auth_token}"})
if response.status_code == 401 and retry:
logging.debug("Auth error. Token possibly expired.")
self._auth_token = None
return self.__fetch_data(str, False)
response.raise_for_status()
ret = response.json()
logging.debug(f"Response: {ret}")
return ret
except HTTPError as e:
self._auth_token = None
raise
def __get_enabled_devices(self) -> list[Device]:
if not self._devices:
logging.info("Querying list of devices")
data = self.__fetch_data("devices")
data = [Device(d) for d in data["devices"]]
if self._enabled_devices:
data = [d for d in data if d.id in self._enabled_devices]
self._devices = data
logging.info(f"Found device(s): {', '.join(map(str, self._devices))}.")
return self._devices
def __adjust_pressure(self, pressure_mbar: float, temperature_celsius: float, altitude_meter: float) -> float:
if altitude_meter == 0:
return pressure_mbar
# formula taken from https://keisan.casio.com/exec/system/1224575267
temperature_gradient: float = 0.0065
temperature_kelvin: float = 273.15 + temperature_celsius
pressure_sea_level = pressure_mbar * pow(1 - temperature_gradient * altitude_meter / (temperature_kelvin + temperature_gradient * altitude_meter), -5.257)
pressure_sea_level = round(pressure_sea_level, 1)
logging.debug(f"Pressure: recorded={pressure_mbar};sea-level={pressure_sea_level}")
return pressure_sea_level
def fetch_data(self) -> list:
records = []
for device in self.__get_enabled_devices():
data: dict = self.__fetch_data(f"devices/{device.id}/latest-samples")["data"]
time = datetime.datetime.fromtimestamp(data["time"], tz=datetime.timezone.utc) # airthings returns an epoch time
logging.debug(f"Processing measurements from {time}")
record = {
"measurement": self._measurement,
"tags": {"host": device.id, "name": device.name, "location": device.location, "type": device.type},
"time": time
}
logging.debug(f"Data before transform: {data}")
if "temp" in data and "pressure" in data:
data["pressure"] = self.__adjust_pressure(float(data["pressure"]), float(data["temp"]), self._elevation)
for key, newKey in self._fields_rename.items():
if key in data:
value = data.pop(key)
if newKey:
data[newKey] = value
logging.debug(f"Data after transform: {data}")
record["fields"] = data
records.append(record)
return records