import gradio as gr import os import sys from pathlib import Path from modules import script_callbacks, extra_networks, prompt_parser from fastapi import FastAPI, Body, Request, Response from fastapi.responses import FileResponse from scripts.physton_prompt.storage import Storage from scripts.physton_prompt.get_extensions import get_extensions from scripts.physton_prompt.get_token_counter import get_token_counter from scripts.physton_prompt.get_i18n import get_i18n from scripts.physton_prompt.get_translate_apis import get_translate_apis, privacy_translate_api_config, unprotected_translate_api_config from scripts.physton_prompt.translate import translate from scripts.physton_prompt.history import History from scripts.physton_prompt.csv import get_csvs, get_csv from scripts.physton_prompt.styles import get_style_full_path, get_extension_css_list from scripts.physton_prompt.get_extra_networks import get_extra_networks from scripts.physton_prompt.packages import get_packages_state, install_package from scripts.physton_prompt.gen_openai import gen_openai from scripts.physton_prompt.get_lang import get_lang from scripts.physton_prompt.get_version import get_git_commit_version, get_git_remote_versions, get_latest_version from scripts.physton_prompt.mbart50 import initialize as mbart50_initialize, translate as mbart50_translate from scripts.physton_prompt.get_group_tags import get_group_tags try: from modules.shared import cmd_opts if cmd_opts.data_dir: extension_dir = os.path.dirname(os.path.abspath(__file__)) + '/../' extension_dir = os.path.normpath(extension_dir) + os.path.sep data_dir = os.path.normpath(cmd_opts.data_dir) + os.path.sep webui_dir = os.path.normpath(Path().absolute()) + os.path.sep if not extension_dir.startswith(webui_dir): find = False if cmd_opts.gradio_allowed_path: for path in cmd_opts.gradio_allowed_path: path = os.path.normpath(path) + os.path.sep if path == extension_dir: find = path break elif extension_dir.startswith(path): find = path break else: pass if not find: message = f''' \033[1;31m[sd-webui-prompt-all-in-one] As you have set the --data-dir parameter and have not added the extension path to the --gradio-allowed-path parameter, the extension may not function properly. Please add the following startup parameter: 由于你设置了 --data-dir 参数,并且没有将本扩展路径加入到 --gradio-allowed-path 参数中,所以本扩展可能无法正常运行。请添加启动参数: \033[1;32m--gradio-allowed-path="{extension_dir}" \033[0m ''' print(message) except Exception as e: pass def on_app_started(_: gr.Blocks, app: FastAPI): st = Storage() hi = History() @app.get("/physton_prompt/get_version") async def _get_version(): return { 'version': get_git_commit_version(), 'latest_version': get_latest_version(), } @app.get("/physton_prompt/get_remote_versions") async def _get_remote_versions(page: int = 1, per_page: int = 100): return { 'versions': get_git_remote_versions(page, per_page), } @app.get("/physton_prompt/get_config") async def _get_config(): return { 'i18n': get_i18n(True), 'translate_apis': get_translate_apis(True), 'packages_state': get_packages_state(), 'python': sys.executable, } @app.post("/physton_prompt/install_package") async def _install_package(request: Request): data = await request.json() if 'name' not in data: return {"result": get_lang('is_required', {'0': 'name'})} if 'package' not in data: return {"result": get_lang('is_required', {'0': 'package'})} return {"result": install_package(data['name'], data['package'])} @app.get("/physton_prompt/get_extensions") async def _get_extensions(): return {"extends": get_extensions()} @app.post("/physton_prompt/token_counter") async def _token_counter(request: Request): data = await request.json() if 'text' not in data: return {"result": get_lang('is_required', {'0': 'text'})} if 'steps' not in data: return {"result": get_lang('is_required', {'0': 'steps'})} return get_token_counter(data['text'], data['steps']) @app.get("/physton_prompt/get_data") async def _get_data(key: str): data = st.get(key) data = privacy_translate_api_config(key, data) return {"data": data} @app.get("/physton_prompt/get_datas") async def _get_datas(keys: str): keys = keys.split(',') datas = {} for key in keys: datas[key] = st.get(key) datas[key] = privacy_translate_api_config(key, datas[key]) return {"datas": datas} @app.post("/physton_prompt/set_data") async def _set_data(request: Request): data = await request.json() if 'key' not in data: return {"success": False, "message": get_lang('is_required', {'0': 'key'})} if 'data' not in data: return {"success": False, "message": get_lang('is_required', {'0': 'data'})} data['data'] = unprotected_translate_api_config(data['key'], data['data']) st.set(data['key'], data['data']) return {"success": True} @app.post("/physton_prompt/set_datas") async def _set_datas(request: Request): data = await request.json() if not isinstance(data, dict): return {"success": False, "message": get_lang('is_not_dict', {'0': 'data'})} for key in data: data[key] = unprotected_translate_api_config(key, data[key]) st.set(key, data[key]) return {"success": True} @app.get("/physton_prompt/get_data_list_item") async def _get_data_list_item(key: str, index: int): return {"item": st.list_get(key, index)} @app.post("/physton_prompt/push_data_list") async def _push_data_list(request: Request): data = await request.json() if 'key' not in data: return {"success": False, "message": get_lang('is_required', {'0': 'key'})} if 'item' not in data: return {"success": False, "message": get_lang('is_required', {'0': 'item'})} st.list_push(data['key'], data['item']) return {"success": True} @app.post("/physton_prompt/pop_data_list") async def _pop_data_list(request: Request): data = await request.json() if 'key' not in data: return {"success": False, "message": get_lang('is_required', {'0': 'key'})} return {"success": True, 'item': st.list_pop(data['key'])} @app.post("/physton_prompt/shift_data_list") async def _shift_data_list(request: Request): data = await request.json() if 'key' not in data: return {"success": False, "message": get_lang('is_required', {'0': 'key'})} return {"success": True, 'item': st.list_shift(data['key'])} @app.post("/physton_prompt/remove_data_list") async def _remove_data_list(request: Request): data = await request.json() if 'key' not in data: return {"success": False, "message": get_lang('is_required', {'0': 'key'})} if 'index' not in data: return {"success": False, "message": get_lang('is_required', {'0': 'index'})} st.list_remove(data['key'], data['index']) return {"success": True} @app.post("/physton_prompt/clear_data_list") async def _clear_data_list(request: Request): data = await request.json() if 'key' not in data: return {"success": False, "message": get_lang('is_required', {'0': 'key'})} st.list_clear(data['key']) return {"success": True} @app.get("/physton_prompt/get_histories") async def _get_histories(type: str): return {"histories": hi.get_histories(type)} @app.get("/physton_prompt/get_favorites") async def _get_favorites(type: str): return {"favorites": hi.get_favorites(type)} @app.post("/physton_prompt/push_history") async def _push_history(request: Request): data = await request.json() if 'type' not in data: return {"success": False, "message": get_lang('is_required', {'0': 'type'})} if 'tags' not in data: return {"success": False, "message": get_lang('is_required', {'0': 'tags'})} if 'prompt' not in data: return {"success": False, "message": get_lang('is_required', {'0': 'prompt'})} hi.push_history(data['type'], data['tags'], data['prompt'], data.get('name', '')) return {"success": True} @app.post("/physton_prompt/push_favorite") async def _push_favorite(request: Request): data = await request.json() if 'type' not in data: return {"success": False, "message": get_lang('is_required', {'0': 'type'})} if 'tags' not in data: return {"success": False, "message": get_lang('is_required', {'0': 'tags'})} if 'prompt' not in data: return {"success": False, "message": get_lang('is_required', {'0': 'prompt'})} hi.push_favorite(data['type'], data['tags'], data['prompt'], data.get('name', '')) return {"success": True} @app.get("/physton_prompt/get_latest_history") async def _get_latest_history(type: str): return {"history": hi.get_latest_history(type)} @app.post("/physton_prompt/set_history") async def _set_history(request: Request): data = await request.json() if 'type' not in data: return {"success": False, "message": get_lang('is_required', {'0': 'type'})} if 'id' not in data: return {"success": False, "message": get_lang('is_required', {'0': 'id'})} if 'tags' not in data: return {"success": False, "message": get_lang('is_required', {'0': 'tags'})} if 'prompt' not in data: return {"success": False, "message": get_lang('is_required', {'0': 'prompt'})} if 'name' not in data: return {"success": False, "message": get_lang('is_required', {'0': 'name'})} return {"success": hi.set_history(data['type'], data['id'], data['tags'], data['prompt'], data['name'])} @app.post("/physton_prompt/set_history_name") async def _set_history_name(request: Request): data = await request.json() if 'type' not in data: return {"success": False, "message": get_lang('is_required', {'0': 'type'})} if 'id' not in data: return {"success": False, "message": get_lang('is_required', {'0': 'id'})} if 'name' not in data: return {"success": False, "message": get_lang('is_required', {'0': 'name'})} return {"success": hi.set_history_name(data['type'], data['id'], data['name'])} @app.post("/physton_prompt/set_favorite_name") async def _set_favorite_name(request: Request): data = await request.json() if 'type' not in data: return {"success": False, "message": get_lang('is_required', {'0': 'type'})} if 'id' not in data: return {"success": False, "message": get_lang('is_required', {'0': 'id'})} if 'name' not in data: return {"success": False, "message": get_lang('is_required', {'0': 'name'})} return {"success": hi.set_favorite_name(data['type'], data['id'], data['name'])} @app.post("/physton_prompt/dofavorite") async def _dofavorite(request: Request): data = await request.json() if 'type' not in data: return {"success": False, "message": get_lang('is_required', {'0': 'type'})} if 'id' not in data: return {"success": False, "message": get_lang('is_required', {'0': 'id'})} return {"success": hi.dofavorite(data['type'], data['id'])} @app.post("/physton_prompt/unfavorite") async def _unfavorite(request: Request): data = await request.json() if 'type' not in data: return {"success": False, "message": get_lang('is_required', {'0': 'type'})} if 'id' not in data: return {"success": False, "message": get_lang('is_required', {'0': 'id'})} return {"success": hi.unfavorite(data['type'], data['id'])} @app.post("/physton_prompt/delete_history") async def _delete_history(request: Request): data = await request.json() if 'type' not in data: return {"success": False, "message": get_lang('is_required', {'0': 'type'})} if 'id' not in data: return {"success": False, "message": get_lang('is_required', {'0': 'id'})} return {"success": hi.remove_history(data['type'], data['id'])} @app.post("/physton_prompt/delete_histories") async def _delete_histories(request: Request): data = await request.json() if 'type' not in data: return {"success": False, "message": get_lang('is_required', {'0': 'type'})} return {"success": hi.remove_histories(data['type'])} @app.post("/physton_prompt/translate") async def _translate(request: Request): data = await request.json() if 'text' not in data: return {"success": False, "message": get_lang('is_required', {'0': 'text'})} if 'from_lang' not in data: return {"success": False, "message": get_lang('is_required', {'0': 'from_lang'})} if 'to_lang' not in data: return {"success": False, "message": get_lang('is_required', {'0': 'to_lang'})} if 'api' not in data: return {"success": False, "message": get_lang('is_required', {'0': 'api'})} if 'api_config' not in data: return {"success": False, "message": get_lang('is_required', {'0': 'api_config'})} return translate(data['text'], data['from_lang'], data['to_lang'], data['api'], data['api_config']) @app.post("/physton_prompt/translates") async def _translates(request: Request): data = await request.json() if 'texts' not in data: return {"success": False, "message": get_lang('is_required', {'0': 'texts'})} if 'from_lang' not in data: return {"success": False, "message": get_lang('is_required', {'0': 'from_lang'})} if 'to_lang' not in data: return {"success": False, "message": get_lang('is_required', {'0': 'to_lang'})} if 'api' not in data: return {"success": False, "message": get_lang('is_required', {'0': 'api'})} if 'api_config' not in data: return {"success": False, "message": get_lang('is_required', {'0': 'api_config'})} return translate(data['texts'], data['from_lang'], data['to_lang'], data['api'], data['api_config']) @app.get("/physton_prompt/get_csvs") async def _get_csvs(): return {"csvs": get_csvs()} @app.get("/physton_prompt/get_csv") async def _get_csv(key: str): file = get_csv(key) if not file: return Response(status_code=404) return FileResponse(file, media_type='text/csv', filename=os.path.basename(file)) @app.get("/physton_prompt/styles") async def _styles(file: str): file_path = get_style_full_path(file) if not os.path.exists(file_path): return Response(status_code=404) return FileResponse(file_path, filename=os.path.basename(file_path)) @app.get("/physton_prompt/get_extension_css_list") async def _get_extension_css_list(): return {"css_list": get_extension_css_list()} @app.get("/physton_prompt/get_extra_networks") async def _get_extra_networks(): return {"extra_networks": get_extra_networks()} @app.post("/physton_prompt/gen_openai") async def _gen_openai(request: Request): data = await request.json() if 'messages' not in data: return {"success": False, "message": get_lang('is_required', {'0': 'messages'})} if 'api_config' not in data: return {"success": False, "message": get_lang('is_required', {'0': 'api_config'})} try: return {"success": True, 'result': gen_openai(data['messages'], data['api_config'])} except Exception as e: return {"success": False, 'message': str(e)} @app.post("/physton_prompt/mbart50_initialize") async def _mbart50_initialize(request: Request): try: mbart50_initialize(True) return {"success": True} except Exception as e: return {"success": False, 'message': str(e)} @app.get("/physton_prompt/get_group_tags") async def _get_group_tags(lang: str): return {"tags": get_group_tags(lang)} try: translate_api = st.get('translateApi') if translate_api == 'mbart50': mbart50_initialize() except Exception: pass try: script_callbacks.on_app_started(on_app_started) print('sd-webui-prompt-all-in-one background API service started successfully.') except Exception as e: print(f'sd-webui-prompt-all-in-one background API service failed to start: {e}')