File size: 6,020 Bytes
1e4a2ab
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import os
import re
import sys
import shutil
import codecs
import zipfile
import requests
import xml.etree.ElementTree

sys.path.append(os.getcwd())

from main.app.variables import logger, translations, configs
from main.app.core.ui import gr_info, gr_warning, gr_error, process_output

def read_docx_text(path):
    with zipfile.ZipFile(path) as docx:
        with docx.open("word/document.xml") as document_xml:
            xml_content = document_xml.read()

    WORD_NAMESPACE = '{http://schemas.openxmlformats.org/wordprocessingml/2006/main}'

    paragraphs = []
    for paragraph in xml.etree.ElementTree.XML(xml_content).iter(WORD_NAMESPACE + 'p'):
        texts = [node.text for node in paragraph.iter(WORD_NAMESPACE + 't') if node.text]
        if texts: paragraphs.append(''.join(texts))

    return '\n'.join(paragraphs)

def process_input(file_path):
    if file_path.endswith(".srt"): file_contents = ""
    elif file_path.endswith(".docx"): file_contents = read_docx_text(file_path)
    else:
        try:
            with open(file_path, "r", encoding="utf-8") as file:
                file_contents = file.read()
        except Exception as e:
            gr_warning(translations["read_error"])
            logger.debug(e)
            file_contents = ""

    gr_info(translations["upload_success"].format(name=translations["text"]))
    return file_contents

def move_files_from_directory(src_dir, dest_weights, dest_logs, model_name):
    for root, _, files in os.walk(src_dir):
        for file in files:
            file_path = os.path.join(root, file)
            if file.endswith(".index"):
                model_log_dir = os.path.join(dest_logs, model_name)
                os.makedirs(model_log_dir, exist_ok=True)

                filepath = process_output(os.path.join(model_log_dir, file.replace(' ', '_').replace('(', '').replace(')', '').replace('[', '').replace(']', '').replace(",", "").replace('"', "").replace("'", "").replace("|", "").replace("{", "").replace("}", "").strip()))

                shutil.move(file_path, filepath)
            elif file.endswith(".pth") and not file.startswith("D_") and not file.startswith("G_"):
                pth_path = process_output(os.path.join(dest_weights, model_name + ".pth"))

                shutil.move(file_path, pth_path)
            elif file.endswith(".onnx") and not file.startswith("D_") and not file.startswith("G_"):
                pth_path = process_output(os.path.join(dest_weights, model_name + ".onnx"))

                shutil.move(file_path, pth_path)

def extract_name_model(filename):
    match = re.search(r"_([A-Za-z0-9]+)(?=_v\d*)", filename.replace('-', '').replace('(', '').replace(')', '').replace('[', '').replace(']', '').replace(",", "").replace('"', "").replace("'", "").replace("|", "").replace("{", "").replace("}", "").strip())
    return match.group(1) if match else None

def save_drop_model(dropbox):
    weight_folder = configs["weights_path"]
    logs_folder = configs["logs_path"]
    save_model_temp = "save_model_temp"

    if not os.path.exists(weight_folder): os.makedirs(weight_folder, exist_ok=True)
    if not os.path.exists(logs_folder): os.makedirs(logs_folder, exist_ok=True)
    if not os.path.exists(save_model_temp): os.makedirs(save_model_temp, exist_ok=True)

    shutil.move(dropbox, save_model_temp)

    try:
        file_name = os.path.basename(dropbox)

        if file_name.endswith(".zip"):
            shutil.unpack_archive(os.path.join(save_model_temp, file_name), save_model_temp)
            move_files_from_directory(save_model_temp, weight_folder, logs_folder, file_name.replace(".zip", ""))
        elif file_name.endswith((".pth", ".onnx")): 
            output_file = process_output(os.path.join(weight_folder, file_name))
            
            shutil.move(os.path.join(save_model_temp, file_name), output_file)
        elif file_name.endswith(".index"):
            modelname = extract_name_model(file_name)
            if modelname is None: modelname = os.path.splitext(os.path.basename(file_name))[0]

            model_logs = os.path.join(logs_folder, modelname)
            if not os.path.exists(model_logs): os.makedirs(model_logs, exist_ok=True)

            shutil.move(os.path.join(save_model_temp, file_name), model_logs)
        else: 
            gr_warning(translations["unable_analyze_model"])
            return None
        
        gr_info(translations["upload_success"].format(name=translations["model"]))
        return None
    except Exception as e:
        gr_error(message=translations["error_occurred"].format(e=e))
        return None
    finally:
        shutil.rmtree(save_model_temp, ignore_errors=True)

def zip_file(name, pth, index):
    pth_path = os.path.join(configs["weights_path"], pth)
    if not pth or not os.path.exists(pth_path) or not pth.endswith((".pth", ".onnx")): return gr_warning(translations["provide_file"].format(filename=translations["model"]))

    zip_file_path = os.path.join(configs["logs_path"], name, name + ".zip")
    gr_info(translations["start"].format(start=translations["zip"]))

    with zipfile.ZipFile(zip_file_path, 'w') as zipf:
        zipf.write(pth_path, os.path.basename(pth_path))
        if index: zipf.write(index, os.path.basename(index))

    gr_info(translations["success"])
    return {"visible": True, "value": zip_file_path, "__type__": "update"}

def fetch_pretrained_data():
    try:
        response = requests.get(codecs.decode("uggcf://uhttvatsnpr.pb/NauC/Ivrganzrfr-EIP-Cebwrpg/erfbyir/znva/wfba/phfgbz_cergenvarq.wfba", "rot13"))
        response.raise_for_status()

        return response.json()
    except:
        return {}

def update_sample_rate_dropdown(model):
    data = fetch_pretrained_data()
    if model != translations["success"]: return {"choices": list(data[model].keys()), "value": list(data[model].keys())[0], "__type__": "update"}