Skip to content

Commit

Permalink
feat: 更改geetest验证方式 (#312)
Browse files Browse the repository at this point in the history
  • Loading branch information
Night-stars-1 authored Jan 24, 2025
1 parent a4d09ba commit cbef91d
Show file tree
Hide file tree
Showing 6 changed files with 234 additions and 92 deletions.
11 changes: 5 additions & 6 deletions utils/api/login.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
"""
Date: 2023-11-12 14:05:06
LastEditors: Night-stars-1 nujj1042633805@gmail.com
LastEditTime: 2025-01-19 16:35:56
LastEditTime: 2025-01-24 22:41:00
"""

import json
import time
from os import getenv
from typing import Dict, Optional, Tuple, Union

import orjson

from ..config import Account, ConfigManager
from ..data_model import LoginResultHandler
from ..logger import log
Expand Down Expand Up @@ -80,7 +79,7 @@ def login(
)
log.debug(response.text)
result = response.text.lstrip("&").lstrip("START").lstrip("&")
data = orjson.loads(result) # pylint: disable=no-member
data = json.loads(result) # pylint: disable=no-member
api_data = LoginResultHandler(data)
if api_data.success:
log.success("小米账号登录成功")
Expand Down Expand Up @@ -188,7 +187,7 @@ def qr_login(self) -> Tuple[str, bytes]:
headers=headers,
)
result = response.text.replace("&&&START&&&", "")
data = orjson.loads(result) # pylint: disable=no-member
data = json.loads(result) # pylint: disable=no-member
log.info(f"浏览器访问: {data['qr']}\n获取扫描下方二维码登录")
login_url = data["loginUrl"]
check_url = data["lp"]
Expand Down Expand Up @@ -216,7 +215,7 @@ def check_login(self, url: str) -> Tuple[Optional[int], Optional[dict]]:
}
response = get(url, headers=headers)
result = response.text.replace("&&&START&&&", "")
data = orjson.loads(result) # pylint: disable=no-member
data = json.loads(result) # pylint: disable=no-member
pass_token = data["passToken"]
user_id = str(data["userId"])
cookies = self.get_cookies_by_passtk(user_id=user_id, pass_token=pass_token)
Expand Down
101 changes: 85 additions & 16 deletions utils/captcha.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
'''
"""
Date: 2023-11-13 19:55:22
LastEditors: Night-stars-1 nujj1042633805@gmail.com
LastEditTime: 2023-12-18 20:46:51
'''
LastEditTime: 2025-01-24 22:02:00
"""

import json
import time

from jsonpath_ng import parse

from .request import post
from .logger import log
from .config import ConfigManager
from .data_model import ApiResultHandler, GeetestResult
from .data_model import GeetestResult
from .logger import log
from .request import request

_conf = ConfigManager.data_obj


def find_key(data: dict, key: str):
"""递归查找字典中的key"""
for dkey, dvalue in data.items():
Expand All @@ -21,26 +26,90 @@ def find_key(data: dict, key: str):
find_key(dvalue, key)
return None

def get_validate(gt: str, challenge: str) -> GeetestResult: # pylint: disable=invalid-name

def get_validate_other(
gt: str, challenge: str
) -> GeetestResult: # pylint: disable=invalid-name
"""获取人机验证结果"""
try:
validate = None
validate = ""
if _conf.preference.get_geetest_url:
params = _conf.preference.get_geetest_params.copy()
params = json.loads(
json.dumps(params).replace("{gt}", gt).replace("{challenge}", challenge)
)
data = _conf.preference.get_geetest_data.copy()
data = json.loads(
json.dumps(data).replace("{gt}", gt).replace("{challenge}", challenge)
)
for i in range(10):
log.info(f"第{i}次获取结果")
response = request(
_conf.preference.get_geetest_method,
_conf.preference.get_geetest_url,
params=params,
json=data,
)
log.debug(response.text)
result = response.json()
geetest_validate_expr = parse(
_conf.preference.get_geetest_validate_path
)
geetest_validate_match = geetest_validate_expr.find(result)
if len(geetest_validate_match) > 0:
validate = geetest_validate_match[0].value
geetest_challenge_expr = parse(
_conf.preference.get_geetest_challenge_path
)
geetest_challenge_match = geetest_challenge_expr.find(result)
if len(geetest_challenge_match) > 0:
challenge = geetest_challenge_match[0].value
if validate and challenge:
return GeetestResult(challenge=challenge, validate=validate)
time.sleep(1)
return GeetestResult(challenge="", validate="")
else:
return GeetestResult(challenge="", validate="")
except Exception: # pylint: disable=broad-exception-caught
log.exception("获取人机验证结果异常")
return GeetestResult(challenge="", validate="")


def get_validate(
gt: str, challenge: str
) -> GeetestResult: # pylint: disable=invalid-name
"""创建人机验证并结果"""
try:
validate = ""
if _conf.preference.geetest_url:
params = _conf.preference.geetest_params.copy()
params = json.loads(json.dumps(params).replace("{gt}", gt).replace("{challenge}", challenge))
params = json.loads(
json.dumps(params).replace("{gt}", gt).replace("{challenge}", challenge)
)
data = _conf.preference.geetest_data.copy()
data = json.loads(json.dumps(data).replace("{gt}", gt).replace("{challenge}", challenge))
response = post(
data = json.loads(
json.dumps(data).replace("{gt}", gt).replace("{challenge}", challenge)
)
response = request(
_conf.preference.geetest_method,
_conf.preference.geetest_url,
params=params,
json=data,
)
log.debug(response.text)
geetest_data = response.json()
geetest = ApiResultHandler(geetest_data)
challenge = find_key(geetest.data, "challenge")
validate = find_key(geetest.data, "validate")
return GeetestResult(challenge=challenge, validate=validate)
result = response.json()
geetest_validate_expr = parse(_conf.preference.geetest_validate_path)
geetest_validate_match = geetest_validate_expr.find(result)
if len(geetest_validate_match) > 0:
validate = geetest_validate_match[0].value
geetest_challenge_expr = parse(_conf.preference.geetest_challenge_path)
geetest_challenge_match = geetest_challenge_expr.find(result)
if len(geetest_challenge_match) > 0:
challenge = geetest_challenge_match[0].value
if validate and challenge:
return GeetestResult(challenge=challenge, validate=validate)
else:
return get_validate_other(gt=gt, challenge=challenge)
else:
return GeetestResult(challenge="", validate="")
except Exception: # pylint: disable=broad-exception-caught
Expand Down
41 changes: 39 additions & 2 deletions utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import platform
from hashlib import md5
from pathlib import Path
from typing import Literal, Optional

import yaml # pylint: disable=wrong-import-order

Expand All @@ -21,7 +22,7 @@
CONFIG_PATH = (
DATA_PATH / f"config.{CONFIG_TYPE}"
if os.getenv("MIUITASK_CONFIG_PATH") is None
else Path(os.getenv("MIUITASK_CONFIG_PATH"))
else Path(str(os.getenv("MIUITASK_CONFIG_PATH")))
)
"""数据文件默认路径"""

Expand Down Expand Up @@ -138,10 +139,46 @@ def __init__(self, notifier="", params=None):
class Preference:
"""偏好设置"""

def __init__(self, geetest_url="", geetest_params=None, geetest_data=None):
# pylint: disable=too-many-arguments,too-many-positional-arguments
def __init__(
self,
geetest_url="",
geetest_method: Literal["post", "get"] = "post",
geetest_params: Optional[dict] = None,
geetest_data: Optional[dict] = None,
geetest_validate_path="$.data.validate",
geetest_challenge_path="$.data.challenge",
get_geetest_url="",
get_geetest_method: Literal["post", "get"] = "post",
get_geetest_params: Optional[dict] = None,
get_geetest_data: Optional[dict] = None,
get_geetest_validate_path="",
get_geetest_challenge_path="",
):
self.geetest_url = geetest_url
"""极验验证URL"""
self.geetest_method = geetest_method
"""极验请求方法"""
self.geetest_params = geetest_params or {}
"""极验自定义params参数"""
self.geetest_data = geetest_data or {}
"""极验自定义data参数"""
self.geetest_validate_path = geetest_validate_path
"""极验验证validate的路径"""
self.geetest_challenge_path = geetest_challenge_path
"""极验验证challenge的路径"""
self.get_geetest_url = get_geetest_url
"""获取极验验证结果的URL"""
self.get_geetest_method = get_geetest_method
"""获取极验验证结果的请求方法"""
self.get_geetest_params = get_geetest_params or {}
"""获取极验验证结果的自定义params参数"""
self.get_geetest_data = get_geetest_data or {}
"""获取极验验证结果的自定义data参数"""
self.get_geetest_validate_path = get_geetest_validate_path
"""获取极验验证validate的路径"""
self.get_geetest_challenge_path = get_geetest_challenge_path
"""获取极验验证challenge的路径"""


class Config:
Expand Down
6 changes: 3 additions & 3 deletions utils/data_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ def __init__(self, content: Dict[str, Any]):
:param content: API返回的原始JSON对象
"""
self.content = content
self.data = self.content.get("data", {})
self.message = self.content.get("message", "")
self.data: Dict[str, str] = self.content.get("data", {})
self.message: str = self.content.get("message", "")
self.status = self.content.get("status")

# 尝试从其他键获取数据
Expand All @@ -36,7 +36,7 @@ def __init__(self, content: Dict[str, Any]):
if self.message == "":
self.message = self.content.get(key, "")
if self.message is None and isinstance(self.data, dict):
self.message = self.data.get(key)
self.message = self.data.get(key, "")

@property
def success(self):
Expand Down
22 changes: 22 additions & 0 deletions utils/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,28 @@ def post(
"""
return requests.post(url, headers=headers, params=params, timeout=timeout, **kwargs)

def request(
method: str | bytes,
url: str,
*,
headers: Optional[Dict[str, str]] = None,
params: Optional[Dict[str, Any]] = None,
timeout: Optional[int] = 20,
**kwargs,
):
"""
说明:
request请求封装
参数:
:param method: 请求方法
:param url: url
:param headers: 请求头
:param params: params
:param data: data
:param json: json
:param timeout: 超时时间
"""
return requests.request(method, url, headers=headers, params=params, timeout=timeout, **kwargs)

def notify_me(content=""):
"""
Expand Down
Loading

0 comments on commit cbef91d

Please sign in to comment.