-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsettings.py
142 lines (127 loc) · 5.34 KB
/
settings.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# The MIT License (MIT)
# Copyright (c) 2023 Edgaras Janušauskas and Inovatorius MB (www.fildz.com)
################################################################################
# FILDZ CYBEROS SETTINGS
#
# Cyberos settings and subscribed cyberwares management.
import errno
import os
import ujson as json
import ubinascii
import uasyncio as asyncio
from uasyncio import Event
import fildz_cyberos as cyberos
# import os
# os.remove('fildz/cyberos.json')
# os.remove('fildz/cyberwares.json')
# os.rmdir('fildz')
# os.listdir()
# os.remove('lib/FILDZ_CYBEROS/__init__.py')
# os.remove('lib/FILDZ_Button/__init__.py')
class Settings:
_SETTINGS_FILE = '/cyberos.json'
_PAIRED_FILE = '/cyberwares.json'
_CONFIG_DIR = 'fildz'
def __init__(self):
self._on_save_settings = Event()
asyncio.create_task(self._event_save_settings())
self._on_save_cyberwares = Event()
asyncio.create_task(self._event_save_cyberwares())
self._load_settings()
self._load_cyberwares()
################################################################################
# Helpers
#
def _dir_exists(self, dirname):
try:
return (os.stat(dirname)[0] & 0x4000) != 0
except OSError:
return False
def _file_exists(self, filename):
try:
return (os.stat(filename)[0] & 0x4000) == 0
except OSError:
return False
################################################################################
# Events
#
@property
def on_save_settings(self):
return self._on_save_settings
@property
def on_save_cyberwares(self):
return self._on_save_cyberwares
################################################################################
# Tasks
#
def _load_settings(self):
while True:
if self._dir_exists(self._CONFIG_DIR):
if self._file_exists(self._CONFIG_DIR + self._SETTINGS_FILE):
try:
with open(self._CONFIG_DIR + self._SETTINGS_FILE, "r") as config_file:
config_str = config_file.read()
cyberos.preferences = json.loads(config_str)
break
except ValueError:
break
else:
with open(self._CONFIG_DIR + self._SETTINGS_FILE, 'w'):
break
else:
os.mkdir(self._CONFIG_DIR)
def _load_cyberwares(self):
while True:
if self._dir_exists(self._CONFIG_DIR):
if self._file_exists(self._CONFIG_DIR + self._PAIRED_FILE):
try:
with open(self._CONFIG_DIR + self._PAIRED_FILE, "r") as cyberwares_file:
cyberwares_str = cyberwares_file.read()
cyberos.cyberwares['subscribed'] = json.loads(cyberwares_str)
for cyberware in cyberos.cyberwares['subscribed']:
# Convert '00:00:00:00:00:00' ('mac_str') to b'\x00\x00\x00\x00\x00\x00' ('mac').
mac_str = cyberos.cyberwares['subscribed'][cyberware]['mac_str']
mac_str = mac_str.replace(':', '')
mac = ubinascii.unhexlify(mac_str)
cyberos.cyberwares['subscribed'][cyberware].update({'mac': mac, 'events': {}})
break
except ValueError:
# print('CYBEROS > Error loading cyberwares')
cyberos.cyberwares['subscribed'] = {}
break
else:
cyberos.cyberwares['subscribed'] = {}
with open(self._CONFIG_DIR + self._PAIRED_FILE, 'w'):
break
else:
os.mkdir(self._CONFIG_DIR)
async def _event_save_cyberwares(self):
while True:
await self._on_save_cyberwares.wait()
_paired = {}
for cyberware in cyberos.cyberwares['subscribed']:
if 'mac_str' in cyberos.cyberwares['subscribed'][cyberware]:
_paired.update({cyberware: {'mac_str': cyberos.cyberwares['subscribed'][cyberware]['mac_str']}})
while True:
try:
with open(self._CONFIG_DIR + self._PAIRED_FILE, 'w') as config_file:
json.dump(_paired, config_file)
self._on_save_cyberwares.clear()
break
except OSError as exc:
if exc.errno == errno.ENOENT:
# Config dir does not exists.
os.mkdir(self._CONFIG_DIR)
async def _event_save_settings(self):
while True:
await self._on_save_settings.wait()
while True:
try:
with open(self._CONFIG_DIR + self._SETTINGS_FILE, 'w') as config_file:
json.dump(cyberos.preferences, config_file)
self._on_save_settings.clear()
break
except OSError as exc:
if exc.errno == errno.ENOENT:
# Config dir does not exists.
os.mkdir(self._CONFIG_DIR)