import re import logging from flask import Flask, request, Response import requests from urllib.parse import urlparse # --- 设置日志,使其能输出到Hugging Face的控制台 --- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') app = Flask(__name__) # --- 白名单过滤规则 (保持不变) --- ALLOWED_PATTERNS = [ re.compile(r'^https://github\.com/[^/]+/[^/]+/(?:releases|archive)/.*$', re.IGNORECASE), re.compile(r'^https://github\.com/[^/]+/[^/]+/(?:blob|raw)/.*$', re.IGNORECASE), re.compile(r'^https://github\.com/[^/]+/[^/]+/(?:info|git-).*/.*$', re.IGNORECASE), re.compile(r'^https://raw\.(?:githubusercontent|github)\.com/[^/]+/[^/]+/.*/.*$', re.IGNORECASE), re.compile(r'^https://gist\.(?:githubusercontent|github)\.com/[^/]+/[^/]+/.*/.*$', re.IGNORECASE), re.compile(r'^https://github\.com/[^/]+/[^/]+/tags.*$', re.IGNORECASE), re.compile(r'^https://avatars\.githubusercontent\.com/.*$', re.IGNORECASE), re.compile(r'^https://github\.githubassets\.com/.*$', re.IGNORECASE), re.compile(r'^https://github\.com/[^/]+/?$', re.IGNORECASE), re.compile(r'^https://github\.com/[^/]+/[^/]+/?$', re.IGNORECASE), ] def is_url_allowed(url): logging.info(f"Checking URL against whitelist: {url}") for i, pattern in enumerate(ALLOWED_PATTERNS): if pattern.match(url): logging.info(f"URL Matched! Pattern index: {i}") return True logging.warning(f"URL Denied! No pattern matched: {url}") return False # --- 核心代理逻辑 --- @app.route('/', defaults={'path': ''}, methods=['GET', 'POST', 'PUT', 'DELETE']) @app.route('/', methods=['GET', 'POST', 'PUT', 'DELETE']) def proxy(path): logging.info("="*50) logging.info(f"--- New request received ---") # 打印最原始的请求信息 logging.info(f"Request Method: {request.method}") logging.info(f"Request Path (from Flask): / = /{path}") logging.info(f"Request Full Path (raw): {request.full_path}") logging.info(f"Request Headers:\n{request.headers}") # --- 1. 从请求路径中构建目标URL --- target_path = request.full_path if target_path.startswith('/'): target_path = target_path[1:] if not target_path: logging.info("Root path request. Returning info page.") return ("

GitHub reverse proxy is active. Use proxy_url/target_url format.

"), 200 if not target_path.startswith(('http://', 'https://')): target_url = 'https://' + target_path else: target_url = target_path logging.info(f"Constructed target URL: {target_url}") # --- 2. 执行安全过滤检查 --- if not is_url_allowed(target_url): error_message = "

403 Forbidden

Request blocked by proxy security policy.

" return error_message, 403 # --- 3. 转发请求 --- try: target_host = urlparse(target_url).hostname if not target_host: raise ValueError("Could not parse hostname") except Exception as e: logging.error(f"Failed to parse hostname from URL '{target_url}': {e}") return f"Invalid target URL in path: {e}", 400 headers = {key: value for (key, value) in request.headers if key.lower() != 'host'} headers['Host'] = target_host logging.info(f"Forwarding request to {target_url} with headers:\n{headers}") try: resp = requests.request( method=request.method, url=target_url, headers=headers, data=request.get_data(), cookies=request.cookies, allow_redirects=False, stream=True, timeout=30 ) logging.info(f"Received response with status code: {resp.status_code}") excluded_headers = ['content-encoding', 'content-length', 'transfer-encoding', 'connection'] response_headers = [(name, value) for (name, value) in resp.raw.headers.items() if name.lower() not in excluded_headers] logging.info("Streaming response back to client.") logging.info("="*50 + "\n") return Response(resp.iter_content(chunk_size=8192), status=resp.status_code, headers=response_headers) except requests.exceptions.RequestException as e: logging.error(f"Error while proxying request to {target_url}: {e}") logging.info("="*50 + "\n") return f"An error occurred while proxying: {e}", 502 if __name__ == '__main__': app.run(host='0.0.0.0', port=7860)