forked from bmork/defogger
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdch-wifi.py
297 lines (243 loc) · 10.8 KB
/
dch-wifi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
# This exists to set the Wi-Fi (client) settings for a DLink DCH-S150 motion detector
# This became particularly relevant after DLink abandoned this line (look, I don't blame them)
# in December 2022, meaning that you could no longer reset one of the motion detectors. The
# detectors aren't the greatest, but they're decent, and you can run them w/o any other hubs
# or cloud (using code such as that from Postlund, linked below), and, well, I have a bunch
# sitting around, and they occasionally need to be reset!
# This contains code derived directly from: https://github.com/postlund/dlink_hnap/blob/master/custom_components/dlink_hnap/dlink.py
# Also particular thanks to: https://wiki.elvis.science/index.php?title=Examination_of_mydlink%E2%84%A2_home_devices
# To figure some of this out, I did as in the above link: connect to the motion detector
# as an AP, and then start digging through the javascript (particularly to sort out getting
# the password AES-128 encryption correct).
import argparse
import xml
import hmac
import logging
import asyncio
import aiohttp
import xml.etree.ElementTree as ET
from io import BytesIO
from datetime import datetime
import xmltodict
from Crypto.Cipher import AES
import binascii
_LOGGER = logging.getLogger(__name__)
ACTION_BASE_URL = "http://purenetworks.com/HNAP1/"
def str2hexstr(origin):
return ''.join(['{:x}'.format(ord(i)) for i in origin])
def _hmac(key, message):
encodedKey = key.encode("utf-8")
encodedMsg = message.encode("utf-8")
hmacVal = hmac.new(encodedKey, encodedMsg, digestmod="MD5" )
toHex = hmacVal.hexdigest()
toUpper = toHex.upper()
return toUpper
class AuthenticationError(Exception):
"""Thrown when login fails."""
pass
class HNAPClient:
"""Client for the HNAP protocol."""
def __init__(self, soap, username, password, loop=None):
"""Initialize a new HNAPClient instance."""
self.username = username
self.password = password
self.logged_in = False
self.loop = loop or asyncio.get_event_loop()
self.actions = None
self._client = soap
self._private_key = None
self._cookie = None
self._auth_token = None
self._timestamp = None
async def login(self):
"""Authenticate with device and obtain cookie."""
_LOGGER.info("Logging into device")
self.logged_in = False
resp = await self.call(
"Login",
Action="request",
Username=self.username,
LoginPassword="",
Captcha="",
)
challenge = resp["Challenge"]
public_key = resp["PublicKey"]
self._cookie = resp["Cookie"]
_LOGGER.debug(
"Challenge: %s, Public key: %s, Cookie: %s",
challenge,
public_key,
self._cookie,
)
self._private_key = _hmac(public_key + str(self.password), challenge)
_LOGGER.debug("Private key: %s", self._private_key)
try:
password = _hmac(self._private_key, challenge)
resp = await self.call(
"Login",
Action="login",
Username=self.username,
LoginPassword=password,
Captcha="",
)
if resp["LoginResult"].lower() != "success":
raise AuthenticationError("Incorrect username or password")
if not self.actions:
self.actions = await self.device_actions()
except xml.parsers.expat.ExpatError:
raise AuthenticationError("Bad response from device")
self.logged_in = True
def codeWifiPassword ( self, wifiPassword : str ) -> str:
hexPassword = str2hexstr(wifiPassword)
privateKey = self._private_key
if len(privateKey) > 32:
privateKey = privateKey[0:32]
privateKeyBytes = binascii.unhexlify(privateKey).ljust(32, b'\0')
passwordBytes = binascii.unhexlify(hexPassword).ljust(64, b'\0')
cipher = AES.new( privateKeyBytes, AES.MODE_ECB)
encoded = cipher.encrypt(passwordBytes)
return bytes.hex(encoded) # Convert bytes to a hex string
async def device_actions(self):
actions = await self.call("GetDeviceSettings")
return list(
map(lambda x: x[x.rfind("/") + 1 :], actions["SOAPActions"]["string"])
)
async def soap_actions(self, module_id):
return await self.call("GetModuleSOAPActions", ModuleID=module_id)
async def call(self, method, *args, **kwargs):
"""Call an HNAP method (async)."""
# Do login if no login has been done before
result = None
if not self._private_key and method != "Login":
await self.login()
self._update_nauth_token(method)
try:
result = await self.soap().call(method, **kwargs)
if "ERROR" in result:
self._bad_response(None)
except Exception as e:
self._bad_response(e)
return result
def _bad_response(self, e):
_LOGGER.error("Got an error, resetting private key")
self._private_key = None
raise Exception(f"got error response from device: {e}")
def _update_nauth_token(self, action):
"""Update HNAP auth token for an action."""
if not self._private_key:
return
self._timestamp = int(datetime.now().timestamp())
self._auth_token = _hmac(
self._private_key,
'{0}"{1}{2}"'.format(self._timestamp, ACTION_BASE_URL, action),
)
_LOGGER.debug(
"Generated new token for %s: %s (time: %d)",
action,
self._auth_token,
self._timestamp,
)
def soap(self):
"""Get SOAP client with updated headers."""
if self._cookie:
self._client.headers["Cookie"] = "uid={0}".format(self._cookie)
if self._auth_token:
self._client.headers["HNAP_AUTH"] = "{0} {1}".format(
self._auth_token, self._timestamp
)
return self._client
class NanoSOAPClient:
BASE_NS = {
"xmlns:soap": "http://schemas.xmlsoap.org/soap/envelope/",
"xmlns:xsd": "http://www.w3.org/2001/XMLSchema",
"xmlns:xsi": "http://www.w3.org/2001/XMLSchema-instance",
}
ACTION_NS = {"xmlns": "http://purenetworks.com/HNAP1/"}
def __init__(self, address, action, loop=None, session=None):
self.address = "http://{0}/HNAP1".format(address)
self.action = action
self.loop = loop or asyncio.get_event_loop()
self.session = session or aiohttp.ClientSession(loop=loop)
self.headers = {}
def _generate_request_xml(self, method, **kwargs):
body = ET.Element("soap:Body")
action = ET.Element(method, self.ACTION_NS)
body.append(action)
for param, value in kwargs.items():
element = ET.Element( param )
if isinstance(value, str) and len(value) > 0 and value[0] == '<':
# Assume it's raw XML
sub = ET.fromstring(value)
element.append(sub)
else:
element.text = str(value)
action.append(element)
envelope = ET.Element("soap:Envelope", self.BASE_NS)
envelope.append(body)
f = BytesIO()
tree = ET.ElementTree(envelope)
tree.write(f, encoding="utf-8", xml_declaration=True)
return f.getvalue().decode("utf-8")
async def call(self, method, **kwargs):
xml = self._generate_request_xml(method, **kwargs)
headers = self.headers.copy()
headers["SOAPAction"] = '"{0}{1}"'.format(self.action, method)
resp = await self.session.post(
self.address, data=xml, headers=headers, timeout=10
)
text = await resp.text()
parsed = xmltodict.parse(text)
if "soap:Envelope" not in parsed:
_LOGGER.error("parsed: " + str(parsed))
raise Exception("probably a bad response")
return parsed["soap:Envelope"]["soap:Body"][method + "Response"]
async def doOurStuff ( ip : str,
pin : str, # get this from the label on the back
macAddress : str, # get this from the label on the back
accessPointSSID : str,
wifiPassword : str = None ) : # use None to indicate no security
# Connect to the motion detector (as an AP) and login
session = aiohttp.ClientSession()
soap = NanoSOAPClient(ip, ACTION_BASE_URL, loop=loop, session=session)
client = HNAPClient(soap, "Admin", pin, loop=loop)
await client.login()
# If you're curious . . .
# print(f"Supported actions:")
# print("\n".join(client.actions))
# resp = await client.call( "GetInternetSettings" )
# print( resp )
# resp = await client.call("GetWLanRadios")
# print( resp )
# resp = await client.call("GetAPClientSettings", RadioID="RADIO_2.4GHz")
# print( resp )
# Format here was largely gotten at through guessing, and by looking at some of the XML
# used by DLink HNAP-based access points. For sure it works for open access; it /may/
# work for WPA2-PSK (never got that to work, but that's also related to my networking
# environment).
if not wifiPassword:
supportedSecurity = "<SecurityInfo><SecurityType>NONE</SecurityType><Encryptions><string>NONE</string></Encryptions></SecurityInfo>"
wifiPassword = "x"
else:
supportedSecurity = "<SecurityInfo><SecurityType>WPA2-PSK</SecurityType><Encryptions><string>AES</string></Encryptions></SecurityInfo>"
encodedKey = client.codeWifiPassword( wifiPassword )
resp = await client.call("SetAPClientSettings",
RadioID="RADIO_2.4GHz",
Enabled="true",
SSID=accessPointSSID,
ChannelWidth=1,
MacAddress=macAddress,
Key=encodedKey,
SupportedSecurity=supportedSecurity )
print( resp )
await session.close()
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Wifi connector")
parser.add_argument("--ip", help="IP of our DCH-S150 device after connecting to wifi", nargs='?', default="192.168.0.60")
parser.add_argument("--pin", help="PIN for the DCH-S150 device - get from label on back" )
parser.add_argument("--mac", help="MAC address for the DCH-S150 device - get from label on back" )
parser.add_argument("--ssid", help="SSID of the AP you're connecting to")
parser.add_argument("--password", help="Password to connect to the AP - don't provide if no security", default=None )
args = parser.parse_args()
logging.basicConfig(level=logging.DEBUG)
loop = asyncio.get_event_loop()
loop.run_until_complete(doOurStuff(args.ip, args.pin, args.mac, args.ssid, args.password ))