diff --git a/utils/api/login.py b/utils/api/login.py index 757a61e..c318b66 100644 --- a/utils/api/login.py +++ b/utils/api/login.py @@ -1,15 +1,14 @@ """ Date: 2023-11-12 14:05:06 LastEditors: Night-stars-1 nujj1042633805@gmail.com -LastEditTime: 2025-01-19 16:35:56 +LastEditTime: 2025-01-24 22:41:00 """ +import json import time from os import getenv from typing import Dict, Optional, Tuple, Union -import orjson - from ..config import Account, ConfigManager from ..data_model import LoginResultHandler from ..logger import log @@ -80,7 +79,7 @@ def login( ) log.debug(response.text) result = response.text.lstrip("&").lstrip("START").lstrip("&") - data = orjson.loads(result) # pylint: disable=no-member + data = json.loads(result) # pylint: disable=no-member api_data = LoginResultHandler(data) if api_data.success: log.success("小米账号登录成功") @@ -188,7 +187,7 @@ def qr_login(self) -> Tuple[str, bytes]: headers=headers, ) result = response.text.replace("&&&START&&&", "") - data = orjson.loads(result) # pylint: disable=no-member + data = json.loads(result) # pylint: disable=no-member log.info(f"浏览器访问: {data['qr']}\n获取扫描下方二维码登录") login_url = data["loginUrl"] check_url = data["lp"] @@ -216,7 +215,7 @@ def check_login(self, url: str) -> Tuple[Optional[int], Optional[dict]]: } response = get(url, headers=headers) result = response.text.replace("&&&START&&&", "") - data = orjson.loads(result) # pylint: disable=no-member + data = json.loads(result) # pylint: disable=no-member pass_token = data["passToken"] user_id = str(data["userId"]) cookies = self.get_cookies_by_passtk(user_id=user_id, pass_token=pass_token) diff --git a/utils/captcha.py b/utils/captcha.py index 4fe3f4f..5f33e3e 100644 --- a/utils/captcha.py +++ b/utils/captcha.py @@ -1,17 +1,22 @@ -''' +""" Date: 2023-11-13 19:55:22 LastEditors: Night-stars-1 nujj1042633805@gmail.com -LastEditTime: 2023-12-18 20:46:51 -''' +LastEditTime: 2025-01-24 22:02:00 +""" + import json +import time + +from jsonpath_ng import parse -from .request import post -from .logger import log from .config import ConfigManager -from .data_model import ApiResultHandler, GeetestResult +from .data_model import GeetestResult +from .logger import log +from .request import request _conf = ConfigManager.data_obj + def find_key(data: dict, key: str): """递归查找字典中的key""" for dkey, dvalue in data.items(): @@ -21,26 +26,90 @@ def find_key(data: dict, key: str): find_key(dvalue, key) return None -def get_validate(gt: str, challenge: str) -> GeetestResult: # pylint: disable=invalid-name + +def get_validate_other( + gt: str, challenge: str +) -> GeetestResult: # pylint: disable=invalid-name """获取人机验证结果""" try: - validate = None + validate = "" + if _conf.preference.get_geetest_url: + params = _conf.preference.get_geetest_params.copy() + params = json.loads( + json.dumps(params).replace("{gt}", gt).replace("{challenge}", challenge) + ) + data = _conf.preference.get_geetest_data.copy() + data = json.loads( + json.dumps(data).replace("{gt}", gt).replace("{challenge}", challenge) + ) + for i in range(10): + log.info(f"第{i}次获取结果") + response = request( + _conf.preference.get_geetest_method, + _conf.preference.get_geetest_url, + params=params, + json=data, + ) + log.debug(response.text) + result = response.json() + geetest_validate_expr = parse( + _conf.preference.get_geetest_validate_path + ) + geetest_validate_match = geetest_validate_expr.find(result) + if len(geetest_validate_match) > 0: + validate = geetest_validate_match[0].value + geetest_challenge_expr = parse( + _conf.preference.get_geetest_challenge_path + ) + geetest_challenge_match = geetest_challenge_expr.find(result) + if len(geetest_challenge_match) > 0: + challenge = geetest_challenge_match[0].value + if validate and challenge: + return GeetestResult(challenge=challenge, validate=validate) + time.sleep(1) + return GeetestResult(challenge="", validate="") + else: + return GeetestResult(challenge="", validate="") + except Exception: # pylint: disable=broad-exception-caught + log.exception("获取人机验证结果异常") + return GeetestResult(challenge="", validate="") + + +def get_validate( + gt: str, challenge: str +) -> GeetestResult: # pylint: disable=invalid-name + """创建人机验证并结果""" + try: + validate = "" if _conf.preference.geetest_url: params = _conf.preference.geetest_params.copy() - params = json.loads(json.dumps(params).replace("{gt}", gt).replace("{challenge}", challenge)) + params = json.loads( + json.dumps(params).replace("{gt}", gt).replace("{challenge}", challenge) + ) data = _conf.preference.geetest_data.copy() - data = json.loads(json.dumps(data).replace("{gt}", gt).replace("{challenge}", challenge)) - response = post( + data = json.loads( + json.dumps(data).replace("{gt}", gt).replace("{challenge}", challenge) + ) + response = request( + _conf.preference.geetest_method, _conf.preference.geetest_url, params=params, json=data, ) log.debug(response.text) - geetest_data = response.json() - geetest = ApiResultHandler(geetest_data) - challenge = find_key(geetest.data, "challenge") - validate = find_key(geetest.data, "validate") - return GeetestResult(challenge=challenge, validate=validate) + result = response.json() + geetest_validate_expr = parse(_conf.preference.geetest_validate_path) + geetest_validate_match = geetest_validate_expr.find(result) + if len(geetest_validate_match) > 0: + validate = geetest_validate_match[0].value + geetest_challenge_expr = parse(_conf.preference.geetest_challenge_path) + geetest_challenge_match = geetest_challenge_expr.find(result) + if len(geetest_challenge_match) > 0: + challenge = geetest_challenge_match[0].value + if validate and challenge: + return GeetestResult(challenge=challenge, validate=validate) + else: + return get_validate_other(gt=gt, challenge=challenge) else: return GeetestResult(challenge="", validate="") except Exception: # pylint: disable=broad-exception-caught diff --git a/utils/config.py b/utils/config.py index baed42f..2299c17 100644 --- a/utils/config.py +++ b/utils/config.py @@ -5,6 +5,7 @@ import platform from hashlib import md5 from pathlib import Path +from typing import Literal, Optional import yaml # pylint: disable=wrong-import-order @@ -21,7 +22,7 @@ CONFIG_PATH = ( DATA_PATH / f"config.{CONFIG_TYPE}" if os.getenv("MIUITASK_CONFIG_PATH") is None - else Path(os.getenv("MIUITASK_CONFIG_PATH")) + else Path(str(os.getenv("MIUITASK_CONFIG_PATH"))) ) """数据文件默认路径""" @@ -138,10 +139,46 @@ def __init__(self, notifier="", params=None): class Preference: """偏好设置""" - def __init__(self, geetest_url="", geetest_params=None, geetest_data=None): + # pylint: disable=too-many-arguments,too-many-positional-arguments + def __init__( + self, + geetest_url="", + geetest_method: Literal["post", "get"] = "post", + geetest_params: Optional[dict] = None, + geetest_data: Optional[dict] = None, + geetest_validate_path="$.data.validate", + geetest_challenge_path="$.data.challenge", + get_geetest_url="", + get_geetest_method: Literal["post", "get"] = "post", + get_geetest_params: Optional[dict] = None, + get_geetest_data: Optional[dict] = None, + get_geetest_validate_path="", + get_geetest_challenge_path="", + ): self.geetest_url = geetest_url + """极验验证URL""" + self.geetest_method = geetest_method + """极验请求方法""" self.geetest_params = geetest_params or {} + """极验自定义params参数""" self.geetest_data = geetest_data or {} + """极验自定义data参数""" + self.geetest_validate_path = geetest_validate_path + """极验验证validate的路径""" + self.geetest_challenge_path = geetest_challenge_path + """极验验证challenge的路径""" + self.get_geetest_url = get_geetest_url + """获取极验验证结果的URL""" + self.get_geetest_method = get_geetest_method + """获取极验验证结果的请求方法""" + self.get_geetest_params = get_geetest_params or {} + """获取极验验证结果的自定义params参数""" + self.get_geetest_data = get_geetest_data or {} + """获取极验验证结果的自定义data参数""" + self.get_geetest_validate_path = get_geetest_validate_path + """获取极验验证validate的路径""" + self.get_geetest_challenge_path = get_geetest_challenge_path + """获取极验验证challenge的路径""" class Config: diff --git a/utils/data_model.py b/utils/data_model.py index b880ac0..086d4fc 100644 --- a/utils/data_model.py +++ b/utils/data_model.py @@ -15,8 +15,8 @@ def __init__(self, content: Dict[str, Any]): :param content: API返回的原始JSON对象 """ self.content = content - self.data = self.content.get("data", {}) - self.message = self.content.get("message", "") + self.data: Dict[str, str] = self.content.get("data", {}) + self.message: str = self.content.get("message", "") self.status = self.content.get("status") # 尝试从其他键获取数据 @@ -36,7 +36,7 @@ def __init__(self, content: Dict[str, Any]): if self.message == "": self.message = self.content.get(key, "") if self.message is None and isinstance(self.data, dict): - self.message = self.data.get(key) + self.message = self.data.get(key, "") @property def success(self): diff --git a/utils/request.py b/utils/request.py index 3f02e5f..cdd1b92 100644 --- a/utils/request.py +++ b/utils/request.py @@ -58,6 +58,28 @@ def post( """ return requests.post(url, headers=headers, params=params, timeout=timeout, **kwargs) +def request( + method: str | bytes, + url: str, + *, + headers: Optional[Dict[str, str]] = None, + params: Optional[Dict[str, Any]] = None, + timeout: Optional[int] = 20, + **kwargs, +): + """ + 说明: + request请求封装 + 参数: + :param method: 请求方法 + :param url: url + :param headers: 请求头 + :param params: params + :param data: data + :param json: json + :param timeout: 超时时间 + """ + return requests.request(method, url, headers=headers, params=params, timeout=timeout, **kwargs) def notify_me(content=""): """ diff --git a/utils/utils.py b/utils/utils.py index 5d594a4..2b55c29 100644 --- a/utils/utils.py +++ b/utils/utils.py @@ -1,4 +1,5 @@ """工具类""" + import base64 import random import string @@ -7,11 +8,11 @@ from typing import Type from urllib.parse import parse_qsl, urlparse -import qrcode from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import padding, serialization from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15 from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from qrcode import QRCode, constants from tenacity import RetryError, Retrying, stop_after_attempt from .captcha import get_validate @@ -19,7 +20,7 @@ from .logger import log from .request import post -PUBLIC_KEY_PEM = '''-----BEGIN PUBLIC KEY----- +PUBLIC_KEY_PEM = """-----BEGIN PUBLIC KEY----- MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEArxfNLkuAQ/BYHzkzVwtu g+0abmYRBVCEScSzGxJIOsfxVzcuqaKO87H2o2wBcacD3bRHhMjTkhSEqxPjQ/FE XuJ1cdbmr3+b3EQR6wf/cYcMx2468/QyVoQ7BADLSPecQhtgGOllkC+cLYN6Md34 @@ -27,68 +28,74 @@ +QfMD0q2EM9wo20aLnos24yDzRjh9HJc6xfr37jRlv1/boG/EABMG9FnTm35xWrV R0nw3cpYF7GZg13QicS/ZwEsSd4HyboAruMxJBPvK3Jdr4ZS23bpN0cavWOJsBqZ VwIDAQAB ------END PUBLIC KEY-----''' +-----END PUBLIC KEY-----""" headers = { - 'Accept': '*/*', - 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6', - 'Cache-Control': 'no-cache', - 'Connection': 'keep-alive', - 'Content-type': 'application/x-www-form-urlencoded', - 'Origin': 'https://web.vip.miui.com', - 'Pragma': 'no-cache', - 'Referer': 'https://web.vip.miui.com/', - 'Sec-Fetch-Dest': 'empty', - 'Sec-Fetch-Mode': 'cors', - 'Sec-Fetch-Site': 'cross-site', - 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0', - 'sec-ch-ua': '"Microsoft Edge";v="119", "Chromium";v="119", "Not?A_Brand";v="24"', - 'sec-ch-ua-mobile': '?0', - 'sec-ch-ua-platform': '"Windows"', + "Accept": "*/*", + "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6", + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "Content-type": "application/x-www-form-urlencoded", + "Origin": "https://web.vip.miui.com", + "Pragma": "no-cache", + "Referer": "https://web.vip.miui.com/", + "Sec-Fetch-Dest": "empty", + "Sec-Fetch-Mode": "cors", + "Sec-Fetch-Site": "cross-site", + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0", + "sec-ch-ua": '"Microsoft Edge";v="119", "Chromium";v="119", "Not?A_Brand";v="24"', + "sec-ch-ua-mobile": "?0", + "sec-ch-ua-platform": '"Windows"', } -def get_random_chars_as_string(length, characters: str = string.ascii_letters + string.digits + string.punctuation): + +def get_random_chars_as_string( + length, characters: str = string.ascii_letters + string.digits + string.punctuation +): """获取随机字符串""" - return ''.join(random.choice(characters) for _ in range(length)) + return "".join(random.choice(characters) for _ in range(length)) + def aes_encrypt(key: str, data: str) -> base64: """AES加密""" - iv = b'0102030405060708' # pylint: disable=invalid-name - cipher = Cipher(algorithms.AES(key.encode('utf-8')), modes.CBC(iv), backend=default_backend()) + iv = b"0102030405060708" # pylint: disable=invalid-name + cipher = Cipher( + algorithms.AES(key.encode("utf-8")), modes.CBC(iv), backend=default_backend() + ) encryptor = cipher.encryptor() padder = padding.PKCS7(algorithms.AES.block_size).padder() - padded_data = padder.update(data.encode('utf-8')) + padder.finalize() + padded_data = padder.update(data.encode("utf-8")) + padder.finalize() ciphertext = encryptor.update(padded_data) + encryptor.finalize() - return base64.b64encode(ciphertext).decode('utf-8') + return base64.b64encode(ciphertext).decode("utf-8") def rsa_encrypt(public_key_pem: str, data: str) -> base64: """RSA加密""" public_key = serialization.load_pem_public_key( - public_key_pem.encode('utf-8'), - backend=default_backend() - ) - encoded_data = base64.b64encode(data.encode('utf-8')) - ciphertext = public_key.encrypt( - encoded_data, - PKCS1v15() + public_key_pem.encode("utf-8"), backend=default_backend() ) + encoded_data = base64.b64encode(data.encode("utf-8")) + ciphertext = public_key.encrypt(encoded_data, PKCS1v15()) - return base64.b64encode(ciphertext).decode('utf-8') + return base64.b64encode(ciphertext).decode("utf-8") IncorrectReturn = (KeyError, TypeError, AttributeError, IndexError) """API返回数据无效会触发的异常组合""" -def is_incorrect_return(exception: Exception, *addition_exceptions: Type[Exception]) -> bool: +def is_incorrect_return( + exception: Exception, *addition_exceptions: Type[Exception] +) -> bool: """ 判断是否是API返回数据无效的异常 :param exception: 异常对象 :param addition_exceptions: 额外的异常类型,用于触发判断 """ exceptions = IncorrectReturn + addition_exceptions - return isinstance(exception, exceptions) or isinstance(exception.__cause__, exceptions) + return isinstance(exception, exceptions) or isinstance( + exception.__cause__, exceptions + ) def get_token_by_captcha(url: str) -> str | bool: @@ -96,23 +103,27 @@ def get_token_by_captcha(url: str) -> str | bool: try: parsed_url = urlparse(url) query_params = dict(parse_qsl(parsed_url.query)) # 解析URL参数 - gt = query_params.get("c") # pylint: disable=invalid-name - challenge = query_params.get("l") + gt = query_params.get("c", "") + challenge = query_params.get("l", "") geetest_data = get_validate(gt, challenge) params = { - 'k': '3dc42a135a8d45118034d1ab68213073', - 'locale': 'zh_CN', - '_t': round(time.time() * 1000), + "k": "3dc42a135a8d45118034d1ab68213073", + "locale": "zh_CN", + "_t": round(time.time() * 1000), } data = { - 'e': query_params.get("e"), # 人机验证的e参数,来自URL - 'challenge': geetest_data.challenge, - 'seccode': f'{geetest_data.validate}|jordan', + "e": query_params.get("e"), # 人机验证的e参数,来自URL + "challenge": geetest_data.challenge, + "seccode": f"{geetest_data.validate}|jordan", } - response = post('https://verify.sec.xiaomi.com/captcha/v2/gt/dk/verify', params=params, headers=headers, - data=data) + response = post( + "https://verify.sec.xiaomi.com/captcha/v2/gt/dk/verify", + params=params, + headers=headers, + data=data, + ) log.debug(response.text) result = response.json() api_data = TokenResultHandler(result) @@ -172,7 +183,7 @@ def get_token(uid: str) -> str | bool: "p31": "", "p32": "", "p33": [], - "p34": "" + "p34": "", }, "action": { "a1": [], @@ -188,34 +199,35 @@ def get_token(uid: str) -> str | bool: "a11": [], "a12": [], "a13": [], - "a14": [] + "a14": [], }, "force": False, "talkBack": False, "uid": uid, - "nonce": { - "t": round(time.time()), - "r": round(time.time()) - }, + "nonce": {"t": round(time.time()), "r": round(time.time())}, "version": "2.0", - "scene": "GROW_UP_CHECKIN" + "scene": "GROW_UP_CHECKIN", } key = get_random_chars_as_string(16) params = { - 'k': '3dc42a135a8d45118034d1ab68213073', - 'locale': 'zh_CN', - '_t': round(time.time() * 1000), + "k": "3dc42a135a8d45118034d1ab68213073", + "locale": "zh_CN", + "_t": round(time.time() * 1000), } data = { - 's': rsa_encrypt(PUBLIC_KEY_PEM, key), - 'd': aes_encrypt(key, str(data)), - 'a': 'GROW_UP_CHECKIN', + "s": rsa_encrypt(PUBLIC_KEY_PEM, key), + "d": aes_encrypt(key, str(data)), + "a": "GROW_UP_CHECKIN", } - response = post('https://verify.sec.xiaomi.com/captcha/v2/data', params=params, headers=headers, - data=data) + response = post( + "https://verify.sec.xiaomi.com/captcha/v2/data", + params=params, + headers=headers, + data=data, + ) log.debug(response.text) result = response.json() api_data = TokenResultHandler(result) @@ -223,7 +235,7 @@ def get_token(uid: str) -> str | bool: return api_data.token elif api_data.need_verify: log.error("遇到人机验证码, 尝试调用解决方案") - url = api_data.data.get("url") + url = api_data.data.get("url", "") if token := get_token_by_captcha(url): return token else: @@ -238,15 +250,18 @@ def get_token(uid: str) -> str | bool: log.exception("获取TOKEN异常") return False + def generate_qrcode(url): """生成二维码""" - qr = qrcode.QRCode(version=1, # pylint: disable=invalid-name - error_correction=qrcode.constants.ERROR_CORRECT_L, - box_size=10, - border=4) + qr = QRCode( + version=1, + error_correction=constants.ERROR_CORRECT_L, + box_size=10, + border=4, + ) qr.add_data(url) qr.make(fit=True) - img = qr.make_image(fill_color='black', back_color='white') + img = qr.make_image(fill_color="black", back_color="white") bio = BytesIO() img.save(bio) # 获取二维码的模块 (module) 列表