|
import os
|
|
import re
|
|
import sys
|
|
import shutil
|
|
import codecs
|
|
import zipfile
|
|
import requests
|
|
import xml.etree.ElementTree
|
|
|
|
sys.path.append(os.getcwd())
|
|
|
|
from main.app.variables import logger, translations, configs
|
|
from main.app.core.ui import gr_info, gr_warning, gr_error, process_output
|
|
|
|
def read_docx_text(path):
|
|
with zipfile.ZipFile(path) as docx:
|
|
with docx.open("word/document.xml") as document_xml:
|
|
xml_content = document_xml.read()
|
|
|
|
WORD_NAMESPACE = '{http://schemas.openxmlformats.org/wordprocessingml/2006/main}'
|
|
|
|
paragraphs = []
|
|
for paragraph in xml.etree.ElementTree.XML(xml_content).iter(WORD_NAMESPACE + 'p'):
|
|
texts = [node.text for node in paragraph.iter(WORD_NAMESPACE + 't') if node.text]
|
|
if texts: paragraphs.append(''.join(texts))
|
|
|
|
return '\n'.join(paragraphs)
|
|
|
|
def process_input(file_path):
|
|
if file_path.endswith(".srt"): file_contents = ""
|
|
elif file_path.endswith(".docx"): file_contents = read_docx_text(file_path)
|
|
else:
|
|
try:
|
|
with open(file_path, "r", encoding="utf-8") as file:
|
|
file_contents = file.read()
|
|
except Exception as e:
|
|
gr_warning(translations["read_error"])
|
|
logger.debug(e)
|
|
file_contents = ""
|
|
|
|
gr_info(translations["upload_success"].format(name=translations["text"]))
|
|
return file_contents
|
|
|
|
def move_files_from_directory(src_dir, dest_weights, dest_logs, model_name):
|
|
for root, _, files in os.walk(src_dir):
|
|
for file in files:
|
|
file_path = os.path.join(root, file)
|
|
if file.endswith(".index"):
|
|
model_log_dir = os.path.join(dest_logs, model_name)
|
|
os.makedirs(model_log_dir, exist_ok=True)
|
|
|
|
filepath = process_output(os.path.join(model_log_dir, file.replace(' ', '_').replace('(', '').replace(')', '').replace('[', '').replace(']', '').replace(",", "").replace('"', "").replace("'", "").replace("|", "").replace("{", "").replace("}", "").strip()))
|
|
|
|
shutil.move(file_path, filepath)
|
|
elif file.endswith(".pth") and not file.startswith("D_") and not file.startswith("G_"):
|
|
pth_path = process_output(os.path.join(dest_weights, model_name + ".pth"))
|
|
|
|
shutil.move(file_path, pth_path)
|
|
elif file.endswith(".onnx") and not file.startswith("D_") and not file.startswith("G_"):
|
|
pth_path = process_output(os.path.join(dest_weights, model_name + ".onnx"))
|
|
|
|
shutil.move(file_path, pth_path)
|
|
|
|
def extract_name_model(filename):
|
|
match = re.search(r"_([A-Za-z0-9]+)(?=_v\d*)", filename.replace('-', '').replace('(', '').replace(')', '').replace('[', '').replace(']', '').replace(",", "").replace('"', "").replace("'", "").replace("|", "").replace("{", "").replace("}", "").strip())
|
|
return match.group(1) if match else None
|
|
|
|
def save_drop_model(dropbox):
|
|
weight_folder = configs["weights_path"]
|
|
logs_folder = configs["logs_path"]
|
|
save_model_temp = "save_model_temp"
|
|
|
|
if not os.path.exists(weight_folder): os.makedirs(weight_folder, exist_ok=True)
|
|
if not os.path.exists(logs_folder): os.makedirs(logs_folder, exist_ok=True)
|
|
if not os.path.exists(save_model_temp): os.makedirs(save_model_temp, exist_ok=True)
|
|
|
|
shutil.move(dropbox, save_model_temp)
|
|
|
|
try:
|
|
file_name = os.path.basename(dropbox)
|
|
|
|
if file_name.endswith(".zip"):
|
|
shutil.unpack_archive(os.path.join(save_model_temp, file_name), save_model_temp)
|
|
move_files_from_directory(save_model_temp, weight_folder, logs_folder, file_name.replace(".zip", ""))
|
|
elif file_name.endswith((".pth", ".onnx")):
|
|
output_file = process_output(os.path.join(weight_folder, file_name))
|
|
|
|
shutil.move(os.path.join(save_model_temp, file_name), output_file)
|
|
elif file_name.endswith(".index"):
|
|
modelname = extract_name_model(file_name)
|
|
if modelname is None: modelname = os.path.splitext(os.path.basename(file_name))[0]
|
|
|
|
model_logs = os.path.join(logs_folder, modelname)
|
|
if not os.path.exists(model_logs): os.makedirs(model_logs, exist_ok=True)
|
|
|
|
shutil.move(os.path.join(save_model_temp, file_name), model_logs)
|
|
else:
|
|
gr_warning(translations["unable_analyze_model"])
|
|
return None
|
|
|
|
gr_info(translations["upload_success"].format(name=translations["model"]))
|
|
return None
|
|
except Exception as e:
|
|
gr_error(message=translations["error_occurred"].format(e=e))
|
|
return None
|
|
finally:
|
|
shutil.rmtree(save_model_temp, ignore_errors=True)
|
|
|
|
def zip_file(name, pth, index):
|
|
pth_path = os.path.join(configs["weights_path"], pth)
|
|
if not pth or not os.path.exists(pth_path) or not pth.endswith((".pth", ".onnx")): return gr_warning(translations["provide_file"].format(filename=translations["model"]))
|
|
|
|
zip_file_path = os.path.join(configs["logs_path"], name, name + ".zip")
|
|
gr_info(translations["start"].format(start=translations["zip"]))
|
|
|
|
with zipfile.ZipFile(zip_file_path, 'w') as zipf:
|
|
zipf.write(pth_path, os.path.basename(pth_path))
|
|
if index: zipf.write(index, os.path.basename(index))
|
|
|
|
gr_info(translations["success"])
|
|
return {"visible": True, "value": zip_file_path, "__type__": "update"}
|
|
|
|
def fetch_pretrained_data():
|
|
try:
|
|
response = requests.get(codecs.decode("uggcf://uhttvatsnpr.pb/NauC/Ivrganzrfr-EIP-Cebwrpg/erfbyir/znva/wfba/phfgbz_cergenvarq.wfba", "rot13"))
|
|
response.raise_for_status()
|
|
|
|
return response.json()
|
|
except:
|
|
return {}
|
|
|
|
def update_sample_rate_dropdown(model):
|
|
data = fetch_pretrained_data()
|
|
if model != translations["success"]: return {"choices": list(data[model].keys()), "value": list(data[model].keys())[0], "__type__": "update"} |