|
from typing import Optional, Dict, Any, Tuple
|
|
import json
|
|
import time
|
|
import httpx
|
|
from dataclasses import dataclass
|
|
from DrissionPage import Chromium
|
|
from DrissionPage._configs.chromium_options import ChromiumOptions
|
|
from DrissionPage._pages.mix_tab import MixTab
|
|
import os
|
|
|
|
@dataclass
|
|
class CaptchaConfig:
|
|
"""验证码解决配置"""
|
|
speech2text_api: str = os.environ.get("SPEECH2TEXT_API", "")
|
|
auth_key: str = os.environ.get("AUTH_KEY", "")
|
|
|
|
max_retries: int = 3
|
|
retry_delay: float = 1.0
|
|
page_load_timeout: float = 20.0
|
|
element_timeout: float = 5.0
|
|
|
|
|
|
class CaptchaSolver:
|
|
def __init__(self, email: str, firstname, lastname, is_teacher: bool = False, config: Optional[CaptchaConfig] = None, proxy = None):
|
|
self.email = email
|
|
self.firstname = firstname
|
|
self.lastname = lastname
|
|
self.is_teacher = is_teacher
|
|
self.config = config or CaptchaConfig()
|
|
self.tab: Optional[MixTab] = None
|
|
self.proxy = proxy
|
|
|
|
def __enter__(self):
|
|
self.tab = self._init_browser()
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
|
|
if self.tab:
|
|
self.tab.clear_cache()
|
|
self.tab.close()
|
|
self.tab.browser.quit()
|
|
|
|
def _init_browser(self) -> MixTab:
|
|
"""初始化浏览器并返回标签页"""
|
|
co = ChromiumOptions().set_paths(browser_path=r".\Chrome\chrome.exe")
|
|
co.incognito(on_off=True)
|
|
|
|
co.auto_port(on_off=True)
|
|
|
|
|
|
co.headless(True)
|
|
co.set_user_agent(
|
|
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTM, like Gecko) chrome/124.0.0.8 safari/537.36')
|
|
|
|
co.set_pref('credentials_enable_service', False)
|
|
|
|
co.set_argument('--hide-crash-restore-bubble')
|
|
co.set_proxy(proxy=self.proxy)
|
|
browser = Chromium(co)
|
|
browser.clear_cache()
|
|
tab = browser.new_tab()
|
|
tab.set.timeouts(page_load=self.config.page_load_timeout)
|
|
return tab
|
|
def _colese_browser(self):
|
|
"""关闭浏览器并清理资源"""
|
|
if self.tab:
|
|
self.tab.clear_cache()
|
|
self.tab.close()
|
|
self.tab.browser.quit()
|
|
def _wait_for_element(self, selector: str, timeout: Optional[float] = None) -> Any:
|
|
"""等待元素出现"""
|
|
timeout = timeout or self.config.element_timeout
|
|
return self.tab.ele(selector, timeout=timeout)
|
|
|
|
def get_audio_data(self) -> Tuple[Optional[str], bool]:
|
|
"""
|
|
获取音频验证码数据
|
|
返回: (audio_data, needs_captcha)
|
|
"""
|
|
try:
|
|
self.tab.get('https://www.jetbrains.com/shop/eform/students')
|
|
|
|
|
|
if "Human Verification" not in self.tab.title:
|
|
print("页面不需要人机验证")
|
|
return None, False
|
|
|
|
try:
|
|
|
|
self.tab.wait.eles_loaded('#amzn-captcha-verify-button')
|
|
verify_btn = self._wait_for_element('#amzn-captcha-verify-button')
|
|
verify_btn.click(by_js=True)
|
|
except Exception:
|
|
|
|
if self._wait_for_element('@name=email', timeout=2):
|
|
print("已经不需要人机验证")
|
|
return None, False
|
|
raise
|
|
|
|
|
|
self.tab.listen.start('problem?kind=audio')
|
|
audio_btn = self._wait_for_element('#amzn-btn-audio-internal')
|
|
audio_btn.click(by_js=True)
|
|
|
|
|
|
for _ in range(self.config.max_retries):
|
|
res = self.tab.listen.wait()
|
|
try:
|
|
audio_data = res.response.body
|
|
return audio_data['assets']['audio'], True
|
|
except (json.JSONDecodeError, KeyError):
|
|
print("数据包解析失败,继续等待...")
|
|
time.sleep(self.config.retry_delay)
|
|
|
|
raise TimeoutError("获取音频数据超时")
|
|
|
|
except Exception as e:
|
|
print(f"获取音频数据失败: {str(e)}")
|
|
return None, False
|
|
|
|
def call_whisper_api(self, audio_base64: str) -> Optional[Dict[str, Any]]:
|
|
"""调用Whisper API进行语音识别"""
|
|
print("开始识别人机验证码")
|
|
|
|
headers = {
|
|
"Content-Type": "application/json",
|
|
"Accept": "application/json",
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.8 Safari/537.36"
|
|
}
|
|
if self.config.auth_key:
|
|
headers["Authorization"] = f"Bearer {self.config.auth_key}"
|
|
payload = {
|
|
"audio": audio_base64,
|
|
"task": "translate"
|
|
}
|
|
|
|
try:
|
|
with httpx.Client(timeout=20) as client:
|
|
response = client.post(
|
|
self.config.speech2text_api,
|
|
headers=headers,
|
|
json=payload
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except httpx.HTTPError as e:
|
|
print(f"API请求失败: {str(e)}")
|
|
return None
|
|
except json.JSONDecodeError:
|
|
print("API响应解析失败")
|
|
return None
|
|
|
|
@staticmethod
|
|
def parse_result(result: Dict[str, Any]) -> str:
|
|
"""解析语音识别结果"""
|
|
|
|
|
|
|
|
|
|
if 'text' not in result:
|
|
raise ValueError("无效的API响应: 缺少text字段")
|
|
last_segment = result.get('text', '').strip()
|
|
if not last_segment:
|
|
raise ValueError("无法从结果中提取文本")
|
|
|
|
|
|
last_word = last_segment.split()[-1]
|
|
return last_word.replace(' ', '').replace('.', '').replace(',', '')
|
|
|
|
def fill_form(self) -> bool:
|
|
"""填写并提交表单"""
|
|
try:
|
|
|
|
self.tab.wait.eles_loaded('@name=email')
|
|
email_input = self._wait_for_element('@name=email')
|
|
email_input.input(self.email, clear=True)
|
|
|
|
role_value = "TEACHER" if self.is_teacher else "STUDENT"
|
|
self._wait_for_element(f'@@name=studentType@@value={role_value}').click(by_js=True)
|
|
self._wait_for_element('@name=name.firstName').input(self.firstname, clear=True)
|
|
self._wait_for_element('@name=name.lastName').input(self.lastname, clear=True)
|
|
self._wait_for_element('@name=privacyPolicy').click(by_js=True)
|
|
|
|
submit_btn = self._wait_for_element('xpath://button[@type="submit"]')
|
|
submit_btn.click(by_js=True)
|
|
|
|
return True
|
|
except Exception as e:
|
|
print(f"表单填写失败: {str(e)}")
|
|
return False
|
|
def check_success(self) -> bool:
|
|
"""检查表单提交是否成功"""
|
|
try:
|
|
if "primaryConfirmation?email=" in self.tab.url:
|
|
print(f"{self.email} 提交成功")
|
|
return True
|
|
else:
|
|
print(f"{self.email} 提交失败")
|
|
return False
|
|
except Exception as e:
|
|
print(f"检查提交状态失败:{self.email} {str(e)}")
|
|
return False
|
|
|
|
def solve_audio_captcha(self) -> bool | None:
|
|
"""主流程: 解决音频验证码并提交表单"""
|
|
try:
|
|
while True:
|
|
|
|
print(f"{self.email} 正在获取音频验证码")
|
|
audio_data, needs_captcha = self.get_audio_data()
|
|
|
|
if needs_captcha and audio_data:
|
|
|
|
result = self.call_whisper_api(audio_data)
|
|
if not result:
|
|
raise RuntimeError("语音识别API调用失败")
|
|
print(result)
|
|
|
|
last_word = self.parse_result(result)
|
|
print(f"识别结果: {last_word}")
|
|
|
|
|
|
input_field = self._wait_for_element('tag:input')
|
|
input_field.input(last_word)
|
|
time.sleep(1)
|
|
|
|
verify_btn = self._wait_for_element('#amzn-btn-verify-internal')
|
|
verify_btn.click(by_js=True)
|
|
time.sleep(2)
|
|
if "Human Verification" in self.tab.title:
|
|
print("验证码验证失败,重试")
|
|
continue
|
|
|
|
if not self.fill_form():
|
|
raise RuntimeError("表单填写失败")
|
|
|
|
time.sleep(1)
|
|
if not self.check_success():
|
|
raise RuntimeError("表单提交失败")
|
|
print("验证流程完成")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"处理过程中发生错误: {str(e)}")
|
|
return False
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
with CaptchaSolver(email="[email protected]",
|
|
firstname="John",
|
|
lastname="Doe",
|
|
is_teacher=True,
|
|
) as solver:
|
|
success = solver.solve_audio_captcha()
|
|
if not success:
|
|
exit(1) |