BG5 commited on
Commit
309ccba
·
verified ·
1 Parent(s): 85dd955

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +27 -80
app.py CHANGED
@@ -1,87 +1,34 @@
1
- import httpx
2
- import asyncio
3
- import sys
4
- import types
5
  import importlib.util
6
- from typing import Optional
7
 
8
- class DynamicCodeExecutor:
9
- def __init__(self, code_server_url: str):
10
- """
11
- 动态代码执行器
 
 
12
 
13
- Args:
14
- code_server_url: B服务器URL,如 "http://192.168.1.100:9000"
15
- """
16
- self.code_server_url = code_server_url.rstrip('/')
17
- self.loaded_module = None
18
 
19
- async def download_and_execute_code(self) -> bool:
20
- """从B服务器下载代码并在内存中执行"""
21
- try:
22
- # 1. 下载源码
23
- async with httpx.AsyncClient() as client:
24
- response = await client.get(f"https://bg5-pycode.static.hf.space/api.txt")
25
- result = response.text
26
-
27
- if not result["success"]:
28
- print(f"下载代码失败: {result['error']}")
29
- return False
30
-
31
- code_content = result["code"]
32
- print("源码下载成功")
33
-
34
- # 2. 在内存中创建模块并执行
35
- module_name = "dynamic_api"
36
- spec = importlib.util.spec_from_loader(module_name, loader=None)
37
- module = importlib.util.module_from_spec(spec)
38
-
39
- # 3. 执行代码
40
- exec(code_content, module.__dict__)
41
-
42
- # 4. 将模块添加到sys.modules中,使其可被导入
43
- sys.modules[module_name] = module
44
- self.loaded_module = module
45
-
46
- print("代码执行成功,FastAPI应用已加载到内存")
47
- return True
48
-
49
- except Exception as e:
50
- print(f"执行代码失败: {e}")
51
- return False
52
-
53
- def get_fastapi_app(self):
54
- """获取动态加载的FastAPI应用实例"""
55
- if self.loaded_module and hasattr(self.loaded_module, 'app'):
56
- return self.loaded_module.app
57
- return None
58
-
59
- async def start_server(self, host: str = "0.0.0.0", port: int = 8000):
60
- """启动动态加载的FastAPI服务器"""
61
- app = self.get_fastapi_app()
62
- if not app:
63
- print("FastAPI应用未加载")
64
- return
65
 
66
- import uvicorn
67
- config = uvicorn.Config(app, host=host, port=port)
68
- server = uvicorn.Server(config)
69
- await server.serve()
70
-
71
- # 使用示例
72
- async def main():
73
- # 初始化动态代码执行器
74
- executor = DynamicCodeExecutor("https://bg5-pycode.static.hf.space/api.txt")
75
-
76
- # 下载并执行代码
77
- success = await executor.download_and_execute_code()
78
-
79
- if success:
80
- # 启动服务器
81
- print("启动FastAPI服务器...")
82
- await executor.start_server(host="0.0.0.0", port=7860)
83
- else:
84
- print("代码加载失败")
85
 
86
  if __name__ == "__main__":
87
- asyncio.run(main())
 
 
1
+ import requests
 
 
 
2
  import importlib.util
3
+ import sys
4
 
5
+ def load_and_run_from_txt(txt_url: str):
6
+ """同步方式下载txt并执行"""
7
+ try:
8
+ # 下载txt文件
9
+ response = requests.get(txt_url)
10
+ response.raise_for_status()
11
 
12
+ code_content = response.text
13
+ print(f"下载成功,代码长度: {len(code_content)}")
 
 
 
14
 
15
+ # 创建并执行模块
16
+ spec = importlib.util.spec_from_loader("dynamic_api", loader=None)
17
+ module = importlib.util.module_from_spec(spec)
18
+
19
+ exec(code_content, module.__dict__)
20
+ sys.modules["dynamic_api"] = module
21
+
22
+ # 启动FastAPI
23
+ if hasattr(module, 'app'):
24
+ import uvicorn
25
+ uvicorn.run(module.app, host="0.0.0.0", port=8000)
26
+ else:
27
+ print("未找到FastAPI应用")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
28
 
29
+ except Exception as e:
30
+ print(f"执行失败: {e}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
31
 
32
  if __name__ == "__main__":
33
+ # 直接执行
34
+ load_and_run_from_txt("https://bg5-pycode.static.hf.space/api.txt")