|
import gradio as gr |
|
import os |
|
import sys |
|
from pathlib import Path |
|
from modules import script_callbacks, extra_networks, prompt_parser |
|
from fastapi import FastAPI, Body, Request, Response |
|
from fastapi.responses import FileResponse |
|
from scripts.physton_prompt.storage import Storage |
|
from scripts.physton_prompt.get_extensions import get_extensions |
|
from scripts.physton_prompt.get_token_counter import get_token_counter |
|
from scripts.physton_prompt.get_i18n import get_i18n |
|
from scripts.physton_prompt.get_translate_apis import get_translate_apis, privacy_translate_api_config, unprotected_translate_api_config |
|
from scripts.physton_prompt.translate import translate |
|
from scripts.physton_prompt.history import History |
|
from scripts.physton_prompt.csv import get_csvs, get_csv |
|
from scripts.physton_prompt.styles import get_style_full_path, get_extension_css_list |
|
from scripts.physton_prompt.get_extra_networks import get_extra_networks |
|
from scripts.physton_prompt.packages import get_packages_state, install_package |
|
from scripts.physton_prompt.gen_openai import gen_openai |
|
from scripts.physton_prompt.get_lang import get_lang |
|
from scripts.physton_prompt.get_version import get_git_commit_version, get_git_remote_versions, get_latest_version |
|
from scripts.physton_prompt.mbart50 import initialize as mbart50_initialize, translate as mbart50_translate |
|
from scripts.physton_prompt.get_group_tags import get_group_tags |
|
|
|
try: |
|
from modules.shared import cmd_opts |
|
|
|
if cmd_opts.data_dir: |
|
extension_dir = os.path.dirname(os.path.abspath(__file__)) + '/../' |
|
extension_dir = os.path.normpath(extension_dir) + os.path.sep |
|
data_dir = os.path.normpath(cmd_opts.data_dir) + os.path.sep |
|
webui_dir = os.path.normpath(Path().absolute()) + os.path.sep |
|
if not extension_dir.startswith(webui_dir): |
|
find = False |
|
if cmd_opts.gradio_allowed_path: |
|
for path in cmd_opts.gradio_allowed_path: |
|
path = os.path.normpath(path) + os.path.sep |
|
if path == extension_dir: |
|
find = path |
|
break |
|
elif extension_dir.startswith(path): |
|
find = path |
|
break |
|
else: |
|
pass |
|
if not find: |
|
message = f''' |
|
\033[1;31m[sd-webui-prompt-all-in-one] |
|
As you have set the --data-dir parameter and have not added the extension path to the --gradio-allowed-path parameter, the extension may not function properly. Please add the following startup parameter: |
|
由于你设置了 --data-dir 参数,并且没有将本扩展路径加入到 --gradio-allowed-path 参数中,所以本扩展可能无法正常运行。请添加启动参数: |
|
\033[1;32m--gradio-allowed-path="{extension_dir}" |
|
\033[0m |
|
''' |
|
print(message) |
|
except Exception as e: |
|
pass |
|
|
|
|
|
def on_app_started(_: gr.Blocks, app: FastAPI): |
|
st = Storage() |
|
hi = History() |
|
|
|
@app.get("/physton_prompt/get_version") |
|
async def _get_version(): |
|
return { |
|
'version': get_git_commit_version(), |
|
'latest_version': get_latest_version(), |
|
} |
|
|
|
@app.get("/physton_prompt/get_remote_versions") |
|
async def _get_remote_versions(page: int = 1, per_page: int = 100): |
|
return { |
|
'versions': get_git_remote_versions(page, per_page), |
|
} |
|
|
|
@app.get("/physton_prompt/get_config") |
|
async def _get_config(): |
|
return { |
|
'i18n': get_i18n(True), |
|
'translate_apis': get_translate_apis(True), |
|
'packages_state': get_packages_state(), |
|
'python': sys.executable, |
|
} |
|
|
|
@app.post("/physton_prompt/install_package") |
|
async def _install_package(request: Request): |
|
data = await request.json() |
|
if 'name' not in data: |
|
return {"result": get_lang('is_required', {'0': 'name'})} |
|
if 'package' not in data: |
|
return {"result": get_lang('is_required', {'0': 'package'})} |
|
return {"result": install_package(data['name'], data['package'])} |
|
|
|
@app.get("/physton_prompt/get_extensions") |
|
async def _get_extensions(): |
|
return {"extends": get_extensions()} |
|
|
|
@app.post("/physton_prompt/token_counter") |
|
async def _token_counter(request: Request): |
|
data = await request.json() |
|
if 'text' not in data: |
|
return {"result": get_lang('is_required', {'0': 'text'})} |
|
if 'steps' not in data: |
|
return {"result": get_lang('is_required', {'0': 'steps'})} |
|
return get_token_counter(data['text'], data['steps']) |
|
|
|
@app.get("/physton_prompt/get_data") |
|
async def _get_data(key: str): |
|
data = st.get(key) |
|
data = privacy_translate_api_config(key, data) |
|
return {"data": data} |
|
|
|
@app.get("/physton_prompt/get_datas") |
|
async def _get_datas(keys: str): |
|
keys = keys.split(',') |
|
datas = {} |
|
for key in keys: |
|
datas[key] = st.get(key) |
|
datas[key] = privacy_translate_api_config(key, datas[key]) |
|
return {"datas": datas} |
|
|
|
@app.post("/physton_prompt/set_data") |
|
async def _set_data(request: Request): |
|
data = await request.json() |
|
if 'key' not in data: |
|
return {"success": False, "message": get_lang('is_required', {'0': 'key'})} |
|
if 'data' not in data: |
|
return {"success": False, "message": get_lang('is_required', {'0': 'data'})} |
|
data['data'] = unprotected_translate_api_config(data['key'], data['data']) |
|
st.set(data['key'], data['data']) |
|
return {"success": True} |
|
|
|
@app.post("/physton_prompt/set_datas") |
|
async def _set_datas(request: Request): |
|
data = await request.json() |
|
if not isinstance(data, dict): |
|
return {"success": False, "message": get_lang('is_not_dict', {'0': 'data'})} |
|
for key in data: |
|
data[key] = unprotected_translate_api_config(key, data[key]) |
|
st.set(key, data[key]) |
|
return {"success": True} |
|
|
|
@app.get("/physton_prompt/get_data_list_item") |
|
async def _get_data_list_item(key: str, index: int): |
|
return {"item": st.list_get(key, index)} |
|
|
|
@app.post("/physton_prompt/push_data_list") |
|
async def _push_data_list(request: Request): |
|
data = await request.json() |
|
if 'key' not in data: |
|
return {"success": False, "message": get_lang('is_required', {'0': 'key'})} |
|
if 'item' not in data: |
|
return {"success": False, "message": get_lang('is_required', {'0': 'item'})} |
|
st.list_push(data['key'], data['item']) |
|
return {"success": True} |
|
|
|
@app.post("/physton_prompt/pop_data_list") |
|
async def _pop_data_list(request: Request): |
|
data = await request.json() |
|
if 'key' not in data: |
|
return {"success": False, "message": get_lang('is_required', {'0': 'key'})} |
|
return {"success": True, 'item': st.list_pop(data['key'])} |
|
|
|
@app.post("/physton_prompt/shift_data_list") |
|
async def _shift_data_list(request: Request): |
|
data = await request.json() |
|
if 'key' not in data: |
|
return {"success": False, "message": get_lang('is_required', {'0': 'key'})} |
|
return {"success": True, 'item': st.list_shift(data['key'])} |
|
|
|
@app.post("/physton_prompt/remove_data_list") |
|
async def _remove_data_list(request: Request): |
|
data = await request.json() |
|
if 'key' not in data: |
|
return {"success": False, "message": get_lang('is_required', {'0': 'key'})} |
|
if 'index' not in data: |
|
return {"success": False, "message": get_lang('is_required', {'0': 'index'})} |
|
st.list_remove(data['key'], data['index']) |
|
return {"success": True} |
|
|
|
@app.post("/physton_prompt/clear_data_list") |
|
async def _clear_data_list(request: Request): |
|
data = await request.json() |
|
if 'key' not in data: |
|
return {"success": False, "message": get_lang('is_required', {'0': 'key'})} |
|
st.list_clear(data['key']) |
|
return {"success": True} |
|
|
|
@app.get("/physton_prompt/get_histories") |
|
async def _get_histories(type: str): |
|
return {"histories": hi.get_histories(type)} |
|
|
|
@app.get("/physton_prompt/get_favorites") |
|
async def _get_favorites(type: str): |
|
return {"favorites": hi.get_favorites(type)} |
|
|
|
@app.post("/physton_prompt/push_history") |
|
async def _push_history(request: Request): |
|
data = await request.json() |
|
if 'type' not in data: |
|
return {"success": False, "message": get_lang('is_required', {'0': 'type'})} |
|
if 'tags' not in data: |
|
return {"success": False, "message": get_lang('is_required', {'0': 'tags'})} |
|
if 'prompt' not in data: |
|
return {"success": False, "message": get_lang('is_required', {'0': 'prompt'})} |
|
hi.push_history(data['type'], data['tags'], data['prompt'], data.get('name', '')) |
|
return {"success": True} |
|
|
|
@app.post("/physton_prompt/push_favorite") |
|
async def _push_favorite(request: Request): |
|
data = await request.json() |
|
if 'type' not in data: |
|
return {"success": False, "message": get_lang('is_required', {'0': 'type'})} |
|
if 'tags' not in data: |
|
return {"success": False, "message": get_lang('is_required', {'0': 'tags'})} |
|
if 'prompt' not in data: |
|
return {"success": False, "message": get_lang('is_required', {'0': 'prompt'})} |
|
hi.push_favorite(data['type'], data['tags'], data['prompt'], data.get('name', '')) |
|
return {"success": True} |
|
|
|
@app.get("/physton_prompt/get_latest_history") |
|
async def _get_latest_history(type: str): |
|
return {"history": hi.get_latest_history(type)} |
|
|
|
@app.post("/physton_prompt/set_history") |
|
async def _set_history(request: Request): |
|
data = await request.json() |
|
if 'type' not in data: |
|
return {"success": False, "message": get_lang('is_required', {'0': 'type'})} |
|
if 'id' not in data: |
|
return {"success": False, "message": get_lang('is_required', {'0': 'id'})} |
|
if 'tags' not in data: |
|
return {"success": False, "message": get_lang('is_required', {'0': 'tags'})} |
|
if 'prompt' not in data: |
|
return {"success": False, "message": get_lang('is_required', {'0': 'prompt'})} |
|
if 'name' not in data: |
|
return {"success": False, "message": get_lang('is_required', {'0': 'name'})} |
|
return {"success": hi.set_history(data['type'], data['id'], data['tags'], data['prompt'], data['name'])} |
|
|
|
@app.post("/physton_prompt/set_history_name") |
|
async def _set_history_name(request: Request): |
|
data = await request.json() |
|
if 'type' not in data: |
|
return {"success": False, "message": get_lang('is_required', {'0': 'type'})} |
|
if 'id' not in data: |
|
return {"success": False, "message": get_lang('is_required', {'0': 'id'})} |
|
if 'name' not in data: |
|
return {"success": False, "message": get_lang('is_required', {'0': 'name'})} |
|
return {"success": hi.set_history_name(data['type'], data['id'], data['name'])} |
|
|
|
@app.post("/physton_prompt/set_favorite_name") |
|
async def _set_favorite_name(request: Request): |
|
data = await request.json() |
|
if 'type' not in data: |
|
return {"success": False, "message": get_lang('is_required', {'0': 'type'})} |
|
if 'id' not in data: |
|
return {"success": False, "message": get_lang('is_required', {'0': 'id'})} |
|
if 'name' not in data: |
|
return {"success": False, "message": get_lang('is_required', {'0': 'name'})} |
|
return {"success": hi.set_favorite_name(data['type'], data['id'], data['name'])} |
|
|
|
@app.post("/physton_prompt/dofavorite") |
|
async def _dofavorite(request: Request): |
|
data = await request.json() |
|
if 'type' not in data: |
|
return {"success": False, "message": get_lang('is_required', {'0': 'type'})} |
|
if 'id' not in data: |
|
return {"success": False, "message": get_lang('is_required', {'0': 'id'})} |
|
return {"success": hi.dofavorite(data['type'], data['id'])} |
|
|
|
@app.post("/physton_prompt/unfavorite") |
|
async def _unfavorite(request: Request): |
|
data = await request.json() |
|
if 'type' not in data: |
|
return {"success": False, "message": get_lang('is_required', {'0': 'type'})} |
|
if 'id' not in data: |
|
return {"success": False, "message": get_lang('is_required', {'0': 'id'})} |
|
return {"success": hi.unfavorite(data['type'], data['id'])} |
|
|
|
@app.post("/physton_prompt/delete_history") |
|
async def _delete_history(request: Request): |
|
data = await request.json() |
|
if 'type' not in data: |
|
return {"success": False, "message": get_lang('is_required', {'0': 'type'})} |
|
if 'id' not in data: |
|
return {"success": False, "message": get_lang('is_required', {'0': 'id'})} |
|
return {"success": hi.remove_history(data['type'], data['id'])} |
|
|
|
@app.post("/physton_prompt/delete_histories") |
|
async def _delete_histories(request: Request): |
|
data = await request.json() |
|
if 'type' not in data: |
|
return {"success": False, "message": get_lang('is_required', {'0': 'type'})} |
|
return {"success": hi.remove_histories(data['type'])} |
|
|
|
@app.post("/physton_prompt/translate") |
|
async def _translate(request: Request): |
|
data = await request.json() |
|
if 'text' not in data: |
|
return {"success": False, "message": get_lang('is_required', {'0': 'text'})} |
|
if 'from_lang' not in data: |
|
return {"success": False, "message": get_lang('is_required', {'0': 'from_lang'})} |
|
if 'to_lang' not in data: |
|
return {"success": False, "message": get_lang('is_required', {'0': 'to_lang'})} |
|
if 'api' not in data: |
|
return {"success": False, "message": get_lang('is_required', {'0': 'api'})} |
|
if 'api_config' not in data: |
|
return {"success": False, "message": get_lang('is_required', {'0': 'api_config'})} |
|
return translate(data['text'], data['from_lang'], data['to_lang'], data['api'], data['api_config']) |
|
|
|
@app.post("/physton_prompt/translates") |
|
async def _translates(request: Request): |
|
data = await request.json() |
|
if 'texts' not in data: |
|
return {"success": False, "message": get_lang('is_required', {'0': 'texts'})} |
|
if 'from_lang' not in data: |
|
return {"success": False, "message": get_lang('is_required', {'0': 'from_lang'})} |
|
if 'to_lang' not in data: |
|
return {"success": False, "message": get_lang('is_required', {'0': 'to_lang'})} |
|
if 'api' not in data: |
|
return {"success": False, "message": get_lang('is_required', {'0': 'api'})} |
|
if 'api_config' not in data: |
|
return {"success": False, "message": get_lang('is_required', {'0': 'api_config'})} |
|
return translate(data['texts'], data['from_lang'], data['to_lang'], data['api'], data['api_config']) |
|
|
|
@app.get("/physton_prompt/get_csvs") |
|
async def _get_csvs(): |
|
return {"csvs": get_csvs()} |
|
|
|
@app.get("/physton_prompt/get_csv") |
|
async def _get_csv(key: str): |
|
file = get_csv(key) |
|
if not file: |
|
return Response(status_code=404) |
|
return FileResponse(file, media_type='text/csv', filename=os.path.basename(file)) |
|
|
|
@app.get("/physton_prompt/styles") |
|
async def _styles(file: str): |
|
file_path = get_style_full_path(file) |
|
if not os.path.exists(file_path): |
|
return Response(status_code=404) |
|
return FileResponse(file_path, filename=os.path.basename(file_path)) |
|
|
|
@app.get("/physton_prompt/get_extension_css_list") |
|
async def _get_extension_css_list(): |
|
return {"css_list": get_extension_css_list()} |
|
|
|
@app.get("/physton_prompt/get_extra_networks") |
|
async def _get_extra_networks(): |
|
return {"extra_networks": get_extra_networks()} |
|
|
|
@app.post("/physton_prompt/gen_openai") |
|
async def _gen_openai(request: Request): |
|
data = await request.json() |
|
if 'messages' not in data: |
|
return {"success": False, "message": get_lang('is_required', {'0': 'messages'})} |
|
if 'api_config' not in data: |
|
return {"success": False, "message": get_lang('is_required', {'0': 'api_config'})} |
|
try: |
|
return {"success": True, 'result': gen_openai(data['messages'], data['api_config'])} |
|
except Exception as e: |
|
return {"success": False, 'message': str(e)} |
|
|
|
@app.post("/physton_prompt/mbart50_initialize") |
|
async def _mbart50_initialize(request: Request): |
|
try: |
|
mbart50_initialize(True) |
|
return {"success": True} |
|
except Exception as e: |
|
return {"success": False, 'message': str(e)} |
|
|
|
@app.get("/physton_prompt/get_group_tags") |
|
async def _get_group_tags(lang: str): |
|
return {"tags": get_group_tags(lang)} |
|
|
|
try: |
|
translate_api = st.get('translateApi') |
|
if translate_api == 'mbart50': |
|
mbart50_initialize() |
|
except Exception: |
|
pass |
|
|
|
|
|
try: |
|
script_callbacks.on_app_started(on_app_started) |
|
print('sd-webui-prompt-all-in-one background API service started successfully.') |
|
except Exception as e: |
|
print(f'sd-webui-prompt-all-in-one background API service failed to start: {e}') |
|
|