File size: 6,020 Bytes
1e4a2ab |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
import os
import re
import sys
import shutil
import codecs
import zipfile
import requests
import xml.etree.ElementTree
sys.path.append(os.getcwd())
from main.app.variables import logger, translations, configs
from main.app.core.ui import gr_info, gr_warning, gr_error, process_output
def read_docx_text(path):
with zipfile.ZipFile(path) as docx:
with docx.open("word/document.xml") as document_xml:
xml_content = document_xml.read()
WORD_NAMESPACE = '{http://schemas.openxmlformats.org/wordprocessingml/2006/main}'
paragraphs = []
for paragraph in xml.etree.ElementTree.XML(xml_content).iter(WORD_NAMESPACE + 'p'):
texts = [node.text for node in paragraph.iter(WORD_NAMESPACE + 't') if node.text]
if texts: paragraphs.append(''.join(texts))
return '\n'.join(paragraphs)
def process_input(file_path):
if file_path.endswith(".srt"): file_contents = ""
elif file_path.endswith(".docx"): file_contents = read_docx_text(file_path)
else:
try:
with open(file_path, "r", encoding="utf-8") as file:
file_contents = file.read()
except Exception as e:
gr_warning(translations["read_error"])
logger.debug(e)
file_contents = ""
gr_info(translations["upload_success"].format(name=translations["text"]))
return file_contents
def move_files_from_directory(src_dir, dest_weights, dest_logs, model_name):
for root, _, files in os.walk(src_dir):
for file in files:
file_path = os.path.join(root, file)
if file.endswith(".index"):
model_log_dir = os.path.join(dest_logs, model_name)
os.makedirs(model_log_dir, exist_ok=True)
filepath = process_output(os.path.join(model_log_dir, file.replace(' ', '_').replace('(', '').replace(')', '').replace('[', '').replace(']', '').replace(",", "").replace('"', "").replace("'", "").replace("|", "").replace("{", "").replace("}", "").strip()))
shutil.move(file_path, filepath)
elif file.endswith(".pth") and not file.startswith("D_") and not file.startswith("G_"):
pth_path = process_output(os.path.join(dest_weights, model_name + ".pth"))
shutil.move(file_path, pth_path)
elif file.endswith(".onnx") and not file.startswith("D_") and not file.startswith("G_"):
pth_path = process_output(os.path.join(dest_weights, model_name + ".onnx"))
shutil.move(file_path, pth_path)
def extract_name_model(filename):
match = re.search(r"_([A-Za-z0-9]+)(?=_v\d*)", filename.replace('-', '').replace('(', '').replace(')', '').replace('[', '').replace(']', '').replace(",", "").replace('"', "").replace("'", "").replace("|", "").replace("{", "").replace("}", "").strip())
return match.group(1) if match else None
def save_drop_model(dropbox):
weight_folder = configs["weights_path"]
logs_folder = configs["logs_path"]
save_model_temp = "save_model_temp"
if not os.path.exists(weight_folder): os.makedirs(weight_folder, exist_ok=True)
if not os.path.exists(logs_folder): os.makedirs(logs_folder, exist_ok=True)
if not os.path.exists(save_model_temp): os.makedirs(save_model_temp, exist_ok=True)
shutil.move(dropbox, save_model_temp)
try:
file_name = os.path.basename(dropbox)
if file_name.endswith(".zip"):
shutil.unpack_archive(os.path.join(save_model_temp, file_name), save_model_temp)
move_files_from_directory(save_model_temp, weight_folder, logs_folder, file_name.replace(".zip", ""))
elif file_name.endswith((".pth", ".onnx")):
output_file = process_output(os.path.join(weight_folder, file_name))
shutil.move(os.path.join(save_model_temp, file_name), output_file)
elif file_name.endswith(".index"):
modelname = extract_name_model(file_name)
if modelname is None: modelname = os.path.splitext(os.path.basename(file_name))[0]
model_logs = os.path.join(logs_folder, modelname)
if not os.path.exists(model_logs): os.makedirs(model_logs, exist_ok=True)
shutil.move(os.path.join(save_model_temp, file_name), model_logs)
else:
gr_warning(translations["unable_analyze_model"])
return None
gr_info(translations["upload_success"].format(name=translations["model"]))
return None
except Exception as e:
gr_error(message=translations["error_occurred"].format(e=e))
return None
finally:
shutil.rmtree(save_model_temp, ignore_errors=True)
def zip_file(name, pth, index):
pth_path = os.path.join(configs["weights_path"], pth)
if not pth or not os.path.exists(pth_path) or not pth.endswith((".pth", ".onnx")): return gr_warning(translations["provide_file"].format(filename=translations["model"]))
zip_file_path = os.path.join(configs["logs_path"], name, name + ".zip")
gr_info(translations["start"].format(start=translations["zip"]))
with zipfile.ZipFile(zip_file_path, 'w') as zipf:
zipf.write(pth_path, os.path.basename(pth_path))
if index: zipf.write(index, os.path.basename(index))
gr_info(translations["success"])
return {"visible": True, "value": zip_file_path, "__type__": "update"}
def fetch_pretrained_data():
try:
response = requests.get(codecs.decode("uggcf://uhttvatsnpr.pb/NauC/Ivrganzrfr-EIP-Cebwrpg/erfbyir/znva/wfba/phfgbz_cergenvarq.wfba", "rot13"))
response.raise_for_status()
return response.json()
except:
return {}
def update_sample_rate_dropdown(model):
data = fetch_pretrained_data()
if model != translations["success"]: return {"choices": list(data[model].keys()), "value": list(data[model].keys())[0], "__type__": "update"} |