File size: 6,886 Bytes
a657082
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
import logging
import os
import subprocess
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, Tuple
from urllib.request import urlopen, urlretrieve

import gradio as gr
from huggingface_hub import HfApi, whoami

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class Config:
    """Application configuration."""

    hf_token: str
    hf_username: str
    transformers_version: str = "3.0.0"
    hf_base_url: str = "https://huggingface.co"
    transformers_base_url: str = (
        "https://github.com/xenova/transformers.js/archive/refs"
    )
    repo_path: Path = Path("./transformers.js")

    @classmethod
    def from_env(cls) -> "Config":
        """Create config from environment variables and secrets."""
        system_token = os.getenv("HF_TOKEN")
        user_token = gr.session_state.get("user_hf_token")
        if user_token:
            hf_username = whoami(token=user_token)["name"]
        else:
            hf_username = (
                os.getenv("SPACE_AUTHOR_NAME") or whoami(token=system_token)["name"]
            )
        hf_token = user_token or system_token

        if not hf_token:
            raise ValueError("HF_TOKEN must be set")

        return cls(hf_token=hf_token, hf_username=hf_username)


class ModelConverter:
    """Handles model conversion and upload operations."""

    def __init__(self, config: Config):
        self.config = config
        self.api = HfApi(token=config.hf_token)

    def _get_ref_type(self) -> str:
        """Determine the reference type for the transformers repository."""
        url = f"{self.config.transformers_base_url}/tags/{self.config.transformers_version}.tar.gz"
        try:
            return "tags" if urlopen(url).getcode() == 200 else "heads"
        except Exception as e:
            logger.warning(f"Failed to check tags, defaulting to heads: {e}")
            return "heads"

    def setup_repository(self) -> None:
        """Download and setup transformers repository if needed."""
        if self.config.repo_path.exists():
            return

        ref_type = self._get_ref_type()
        archive_url = f"{self.config.transformers_base_url}/{ref_type}/{self.config.transformers_version}.tar.gz"
        archive_path = Path(f"./transformers_{self.config.transformers_version}.tar.gz")

        try:
            urlretrieve(archive_url, archive_path)
            self._extract_archive(archive_path)
            logger.info("Repository downloaded and extracted successfully")
        except Exception as e:
            raise RuntimeError(f"Failed to setup repository: {e}")
        finally:
            archive_path.unlink(missing_ok=True)

    def _extract_archive(self, archive_path: Path) -> None:
        """Extract the downloaded archive."""
        import tarfile
        import tempfile

        with tempfile.TemporaryDirectory() as tmp_dir:
            with tarfile.open(archive_path, "r:gz") as tar:
                tar.extractall(tmp_dir)

            extracted_folder = next(Path(tmp_dir).iterdir())
            extracted_folder.rename(self.config.repo_path)

    def convert_model(self, input_model_id: str) -> Tuple[bool, Optional[str]]:
        """Convert the model to ONNX format."""
        try:
            result = subprocess.run(
                [
                    sys.executable,
                    "-m",
                    "scripts.convert",
                    "--quantize",
                    "--model_id",
                    input_model_id,
                ],
                cwd=self.config.repo_path,
                capture_output=True,
                text=True,
                env={},
            )

            if result.returncode != 0:
                return False, result.stderr

            return True, result.stderr

        except Exception as e:
            return False, str(e)

    def upload_model(self, input_model_id: str) -> Optional[str]:
        """Upload the converted model to Hugging Face under the onnx/ folder."""
        try:
            model_folder_path = self.config.repo_path / "models" / input_model_id
            onnx_folder_path = model_folder_path / "onnx"

            # Create the onnx folder if it doesn't exist
            onnx_folder_path.mkdir(exist_ok=True)

            # Move the converted model files to the onnx folder
            for file in model_folder_path.iterdir():
                if file.is_file() and file.suffix == ".onnx":
                    file.rename(onnx_folder_path / file.name)

            # Upload the onnx folder to the same model path
            self.api.upload_folder(
                folder_path=str(onnx_folder_path), repo_id=input_model_id, path_in_repo="onnx"
            )
            return None
        except Exception as e:
            return str(e)
        finally:
            import shutil

            shutil.rmtree(model_folder_path, ignore_errors=True)


def convert_and_upload_model(input_model_id: str, user_hf_token: str = None):
    """Function to handle model conversion and upload."""
    try:
        config = Config.from_env()
        if user_hf_token:
            gr.session_state["user_hf_token"] = user_hf_token
            config = Config.from_env()

        converter = ModelConverter(config)
        converter.setup_repository()

        if converter.api.repo_exists(input_model_id):
            with gr.spinner("Converting model..."):
                success, stderr = converter.convert_model(input_model_id)
                if not success:
                    return f"Conversion failed: {stderr}"

                gr.Info("Conversion successful!")
                gr.Code(stderr)

            with gr.spinner("Uploading model..."):
                error = converter.upload_model(input_model_id)
                if error:
                    return f"Upload failed: {error}"

                gr.Info("Upload successful!")
                return f"Model uploaded to {input_model_id}/onnx"
        else:
            return f"Model {input_model_id} does not exist on Hugging Face."

    except Exception as e:
        logger.exception("Application error")
        return f"An error occurred: {str(e)}"


# Gradio Interface
iface = gr.Interface(
    fn=convert_and_upload_model,
    inputs=[
        gr.Textbox(label="Hugging Face Model ID", placeholder="Enter the Hugging Face model ID to convert. Example: `EleutherAI/pythia-14m`"),
        gr.Textbox(label="Hugging Face Write Token (Optional)", type="password", placeholder="Fill it if you want to upload the model under your account.")
    ],
    outputs=gr.Textbox(label="Output"),
    title="Convert a Hugging Face model to ONNX",
    description="This tool converts a Hugging Face model to ONNX format and uploads it to the same model path under an `onnx/` folder."
)

if __name__ == "__main__":
    iface.launch()