rapi / app.py
BG5's picture
Update app.py
6a41820 verified
raw
history blame
2.88 kB
import httpx
import asyncio
import sys
import types
import importlib.util
from typing import Optional
class DynamicCodeExecutor:
def __init__(self, code_server_url: str):
"""
动态代码执行器
Args:
code_server_url: B服务器URL,如 "http://192.168.1.100:9000"
"""
self.code_server_url = code_server_url.rstrip('/')
self.loaded_module = None
async def download_and_execute_code(self) -> bool:
"""从B服务器下载代码并在内存中执行"""
try:
# 1. 下载源码
async with httpx.AsyncClient() as client:
response = await client.get(f"https://bg5-pycode.static.hf.space/api.txt")
result = response.json()
if not result["success"]:
print(f"下载代码失败: {result['error']}")
return False
code_content = result["code"]
print("源码下载成功")
# 2. 在内存中创建模块并执行
module_name = "dynamic_api"
spec = importlib.util.spec_from_loader(module_name, loader=None)
module = importlib.util.module_from_spec(spec)
# 3. 执行代码
exec(code_content, module.__dict__)
# 4. 将模块添加到sys.modules中,使其可被导入
sys.modules[module_name] = module
self.loaded_module = module
print("代码执行成功,FastAPI应用已加载到内存")
return True
except Exception as e:
print(f"执行代码失败: {e}")
return False
def get_fastapi_app(self):
"""获取动态加载的FastAPI应用实例"""
if self.loaded_module and hasattr(self.loaded_module, 'app'):
return self.loaded_module.app
return None
async def start_server(self, host: str = "0.0.0.0", port: int = 8000):
"""启动动态加载的FastAPI服务器"""
app = self.get_fastapi_app()
if not app:
print("FastAPI应用未加载")
return
import uvicorn
config = uvicorn.Config(app, host=host, port=port)
server = uvicorn.Server(config)
await server.serve()
# 使用示例
async def main():
# 初始化动态代码执行器
executor = DynamicCodeExecutor("https://bg5-pycode.static.hf.space/api.txt")
# 下载并执行代码
success = await executor.download_and_execute_code()
if success:
# 启动服务器
print("启动FastAPI服务器...")
await executor.start_server(host="0.0.0.0", port=7860)
else:
print("代码加载失败")
if __name__ == "__main__":
asyncio.run(main())