Spaces:
BG5
/
Running

File size: 10,566 Bytes
f9cbb48
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
from typing import Optional, Dict, Any, Tuple
import json
import time
import httpx
from dataclasses import dataclass
from DrissionPage import Chromium
from DrissionPage._configs.chromium_options import ChromiumOptions
from DrissionPage._pages.mix_tab import MixTab
import os

@dataclass
class CaptchaConfig:
    """验证码解决配置"""
    speech2text_api: str = os.environ.get("SPEECH2TEXT_API", "")
    auth_key: str = os.environ.get("AUTH_KEY", "")
    # auth_key: str = "your_auth_key_here"  # 替换为你的API密钥
    max_retries: int = 3
    retry_delay: float = 1.0
    page_load_timeout: float = 20.0
    element_timeout: float = 5.0


class CaptchaSolver:
    def __init__(self, email: str, firstname, lastname, is_teacher: bool = False, config: Optional[CaptchaConfig] = None, proxy = None):
        self.email = email
        self.firstname = firstname
        self.lastname = lastname
        self.is_teacher = is_teacher
        self.config = config or CaptchaConfig()
        self.tab: Optional[MixTab] = None
        self.proxy = proxy

    def __enter__(self):
        self.tab = self._init_browser()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        # return False  # 允许异常传播
        if self.tab:
            self.tab.clear_cache()
            self.tab.close()
            self.tab.browser.quit()

    def _init_browser(self) -> MixTab:
        """初始化浏览器并返回标签页"""
        co = ChromiumOptions().set_paths(browser_path=r".\Chrome\chrome.exe")
        co.incognito(on_off=True)  # 无痕隐身模式打开的话。不会记住你的网站账号密码
        # co.set_local_port("1236")
        co.auto_port(on_off=True)

        # co.headless(False)
        co.headless(True)
        co.set_user_agent(
            user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTM, like Gecko) chrome/124.0.0.8 safari/537.36')
        # 阻止“自动保存密码”的提示气泡
        co.set_pref('credentials_enable_service', False)
        # 阻止“要恢复页面吗?Chrome未正确关闭”的提示气泡
        co.set_argument('--hide-crash-restore-bubble')
        co.set_proxy(proxy=self.proxy)
        browser = Chromium(co)
        browser.clear_cache()
        tab = browser.new_tab()
        tab.set.timeouts(page_load=self.config.page_load_timeout)
        return tab
    def _colese_browser(self):
        """关闭浏览器并清理资源"""
        if self.tab:
            self.tab.clear_cache()
            self.tab.close()
            self.tab.browser.quit()
    def _wait_for_element(self, selector: str, timeout: Optional[float] = None) -> Any:
        """等待元素出现"""
        timeout = timeout or self.config.element_timeout
        return self.tab.ele(selector, timeout=timeout)

    def get_audio_data(self) -> Tuple[Optional[str], bool]:
        """

        获取音频验证码数据

        返回: (audio_data, needs_captcha)

        """
        try:
            self.tab.get('https://www.jetbrains.com/shop/eform/students')

            # 检查是否需要验证码
            if "Human Verification" not in self.tab.title:
                print("页面不需要人机验证")
                return None, False

            try:
                # time.sleep(2)  # 等待页面加载
                self.tab.wait.eles_loaded('#amzn-captcha-verify-button')
                verify_btn = self._wait_for_element('#amzn-captcha-verify-button')
                verify_btn.click(by_js=True)
            except Exception:
                # 如果找不到验证按钮,可能已经通过验证
                if self._wait_for_element('@name=email', timeout=2):
                    print("已经不需要人机验证")
                    return None, False
                raise

            # 设置监听并点击音频按钮
            self.tab.listen.start('problem?kind=audio')
            audio_btn = self._wait_for_element('#amzn-btn-audio-internal')
            audio_btn.click(by_js=True)

            # 等待并获取音频数据
            for _ in range(self.config.max_retries):
                res = self.tab.listen.wait()
                try:
                    audio_data = res.response.body
                    return audio_data['assets']['audio'], True
                except (json.JSONDecodeError, KeyError):
                    print("数据包解析失败,继续等待...")
                    time.sleep(self.config.retry_delay)

            raise TimeoutError("获取音频数据超时")

        except Exception as e:
            print(f"获取音频数据失败: {str(e)}")
            return None, False

    def call_whisper_api(self, audio_base64: str) -> Optional[Dict[str, Any]]:
        """调用Whisper API进行语音识别"""
        print("开始识别人机验证码")
        
        headers = {
            "Content-Type": "application/json",
            "Accept": "application/json",
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.8 Safari/537.36"
        }
        if self.config.auth_key:
            headers["Authorization"] = f"Bearer {self.config.auth_key}"
        payload = {
            "audio": audio_base64,
            "task": "translate"
        }

        try:
            with httpx.Client(timeout=20) as client:
                response = client.post(
                    self.config.speech2text_api,
                    headers=headers,
                    json=payload
                )
                response.raise_for_status()
                return response.json()
        except httpx.HTTPError as e:
            print(f"API请求失败: {str(e)}")
            return None
        except json.JSONDecodeError:
            print("API响应解析失败")
            return None

    @staticmethod
    def parse_result(result: Dict[str, Any]) -> str:
        """解析语音识别结果"""
        # if not result or 'segments' not in result or not result['segments']:
        #     raise ValueError("无效的API响应: 缺少segments字段")

        # last_segment = result['segments'][-1].get('text', '').strip()
        if 'text' not in result:
            raise ValueError("无效的API响应: 缺少text字段")
        last_segment = result.get('text', '').strip()
        if not last_segment:
            raise ValueError("无法从结果中提取文本")

        # 清理结果: 取最后一个词,移除空格和标点
        last_word = last_segment.split()[-1]
        return last_word.replace(' ', '').replace('.', '').replace(',', '')

    def fill_form(self) -> bool:
        """填写并提交表单"""
        try:
            # 等待表单元素出现
            self.tab.wait.eles_loaded('@name=email')
            email_input = self._wait_for_element('@name=email')
            email_input.input(self.email, clear=True)

            role_value = "TEACHER" if self.is_teacher else "STUDENT"
            self._wait_for_element(f'@@name=studentType@@value={role_value}').click(by_js=True)
            self._wait_for_element('@name=name.firstName').input(self.firstname, clear=True)
            self._wait_for_element('@name=name.lastName').input(self.lastname, clear=True)
            self._wait_for_element('@name=privacyPolicy').click(by_js=True)

            submit_btn = self._wait_for_element('xpath://button[@type="submit"]')
            submit_btn.click(by_js=True)

            return True
        except Exception as e:
            print(f"表单填写失败: {str(e)}")
            return False
    def check_success(self) -> bool:
        """检查表单提交是否成功"""
        try:
            if "primaryConfirmation?email=" in self.tab.url:
                print(f"{self.email} 提交成功")
                return True
            else:
                print(f"{self.email} 提交失败")
                return False
        except Exception as e:
            print(f"检查提交状态失败:{self.email} {str(e)}")
            return False

    def solve_audio_captcha(self) -> bool | None:
        """主流程: 解决音频验证码并提交表单"""
        try:
            while True:
                # 1. 获取音频数据
                print(f"{self.email} 正在获取音频验证码")
                audio_data, needs_captcha = self.get_audio_data()

                if needs_captcha and audio_data:
                    # 2. 调用API识别
                    result = self.call_whisper_api(audio_data)
                    if not result:
                        raise RuntimeError("语音识别API调用失败")
                    print(result)
                    # 3. 解析结果
                    last_word = self.parse_result(result)
                    print(f"识别结果: {last_word}")

                    # 4. 提交验证码
                    input_field = self._wait_for_element('tag:input')
                    input_field.input(last_word)
                    time.sleep(1)  # 短暂等待确保输入完成

                    verify_btn = self._wait_for_element('#amzn-btn-verify-internal')
                    verify_btn.click(by_js=True)
                    time.sleep(2)  # 等待验证结果
                    if "Human Verification" in self.tab.title:
                        print("验证码验证失败,重试")
                        continue
                # 5. 填写表单
                if not self.fill_form():
                    raise RuntimeError("表单填写失败")
                # 6. 检查提交状态
                time.sleep(1)  # 等待页面加载
                if not self.check_success():
                    raise RuntimeError("表单提交失败")
                print("验证流程完成")
                return True

        except Exception as e:
            print(f"处理过程中发生错误: {str(e)}")
            return False


if __name__ == "__main__":
    # 示例用法
    with CaptchaSolver(email="[email protected]",
                       firstname="John",
                       lastname="Doe",
                       is_teacher=True,
                       ) as solver:
        success = solver.solve_audio_captcha()
        if not success:
            exit(1)