File size: 7,018 Bytes
98d1d12
 
4006c1a
3f2007b
 
 
 
 
5e040c2
3f2007b
 
 
4006c1a
3f2007b
e33200c
3f2007b
e33200c
 
 
 
c700267
98d1d12
 
c700267
e361a15
98d1d12
 
 
 
1ca0012
98d1d12
4006c1a
98d1d12
 
 
4006c1a
 
98d1d12
3f2007b
 
 
 
 
 
 
 
98d1d12
3f2007b
 
0f701bd
 
 
3f2007b
0f701bd
 
 
 
 
 
 
3f2007b
 
 
 
0f701bd
 
 
 
3f2007b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e33200c
 
 
3f2007b
c700267
98d1d12
e33200c
4006c1a
3f2007b
 
 
 
 
 
 
0f701bd
3f2007b
 
 
 
 
 
4006c1a
e33200c
 
 
4006c1a
 
fce2161
 
4006c1a
bbd42f8
 
 
 
 
 
 
 
4006c1a
 
3f2007b
 
 
 
 
 
 
 
 
 
fce2161
4006c1a
 
3f2007b
4006c1a
 
994ffd2
8537faa
 
 
 
84ac83a
8537faa
 
 
 
 
3f2007b
 
 
4006c1a
 
3f2007b
4006c1a
3f2007b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import os
import subprocess
import gradio as gr
from tqdm import tqdm
import chardet
import logging
import tempfile
import concurrent.futures

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Configurable supported file types and size limit
SUPPORTED_FILE_TYPES = ["txt", "python", "markdown", "yaml", "json", "csv", "tsv", "xml", "html"]
MAX_FILE_SIZE = 32 * 1024  # 32 KB

def validate_url(url):
    return url.startswith('https://')

def clone_repo(url, repo_dir, hf_token, hf_user):
    env = os.environ.copy()
    env['GIT_LFS_SKIP_SMUDGE'] = '1'
    token_url = url.replace('https://', f'https://{hf_user}:{hf_token}@')
    result = subprocess.run(["git", "clone", token_url, repo_dir], env=env, capture_output=True, text=True)
    if result.returncode != 0:
        return False, result.stderr
    return True, None

def get_file_summary(file_path, file_type):
    size = os.path.getsize(file_path)
    return {
        "name": os.path.relpath(file_path),
        "type": file_type,
        "size": size,
    }

def read_file_content(file_path):
    with open(file_path, "rb") as file:
        file_bytes = file.read()
        encoding = chardet.detect(file_bytes)["encoding"]
        try:
            content = file_bytes.decode(encoding)
            return content
        except (UnicodeDecodeError, TypeError):
            return None

def validate_file_types(directory, supported_file_types):
    from magika import Magika
    m = Magika()
    file_types = {}
    for root, _, files in os.walk(directory):
        if any(dir_name in root for dir_name in ['.git', '__pycache__']):
            continue
        for file_name in files:
            file_path = os.path.join(root, file_name)
            try:
                with open(file_path, 'rb') as file:
                    file_bytes = file.read()
                result = m.identify_bytes(file_bytes)
                file_type = result.output.ct_label
                if file_type not in supported_file_types:
                    file_type = "Unsupported"
                file_types[file_path] = file_type
            except Exception as e:
                file_types[file_path] = f"Error: {str(e)}"
    return file_types

def process_file(file_path, file_type, max_file_size):
    file_summary = get_file_summary(file_path, file_type)
    content = {"header": file_summary}
    
    if file_type != "Unsupported" and file_summary["size"] <= max_file_size:
        try:
            file_content = read_file_content(file_path)
            if file_content is not None:
                content["content"] = file_content
            else:
                content["content"] = "Failed to read file content: Unsupported encoding or binary file."
        except Exception as e:
            content["content"] = f"Failed to read file content: {str(e)}"
    else:
        content["content"] = f"Skipped: {'File size exceeds limit.' if file_summary['size'] > max_file_size else 'Unsupported file type.'}"
    
    return content

def extract_repo_content(url, hf_token, hf_user, supported_file_types, max_file_size):
    if not validate_url(url):
        return [{"header": {"name": "Error", "type": "error", "size": 0}, "content": "Invalid URL"}]
    
    repo_dir = tempfile.mkdtemp(prefix="temp_repo_")
    success, error = clone_repo(url, repo_dir, hf_token, hf_user)
    if not success:
        return [{"header": {"name": "Error", "type": "error", "size": 0}, "content": f"Failed to clone repository: {error}"}]
    
    file_types = validate_file_types(repo_dir, supported_file_types)
    
    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = []
        for file_path, file_type in file_types.items():
            future = executor.submit(process_file, file_path, file_type, max_file_size)
            futures.append(future)
        
        extracted_content = []
        with tqdm(total=len(futures), desc="Processing files") as progress_bar:
            for future in concurrent.futures.as_completed(futures):
                content = future.result()
                extracted_content.append(content)
                progress_bar.update(1)
    
    # Cleanup temporary directory
    subprocess.run(["rm", "-rf", repo_dir])
    
    return extracted_content

def format_output(extracted_content, repo_url):
    formatted_output = f"# Repository URL: {repo_url}\n\n"
    for file_data in extracted_content:
        if isinstance(file_data, dict) and 'header' in file_data:
            formatted_output += f"### File: {file_data['header']['name']}\n"
            formatted_output += f"**Type:** {file_data['header']['type']}\n"
            formatted_output += f"**Size:** {file_data['header']['size']} bytes\n"
            formatted_output += "#### Content:\n"
            formatted_output += f"```\n{file_data['content']}\n```\n\n"
        else:
            formatted_output += "Error in file data format.\n"
    return formatted_output

def extract_and_display(url, supported_file_types, max_file_size):
    hf_token = os.getenv("HF_TOKEN")
    hf_user = os.getenv("SPACE_AUTHOR_NAME")
    
    if not hf_token:
        raise ValueError("HF_TOKEN environment variable is not set")
    if not hf_user:
        raise ValueError("SPACE_AUTHOR_NAME environment variable is not set")
    
    extracted_content = extract_repo_content(url, hf_token, hf_user, supported_file_types, max_file_size)
    formatted_output = format_output(extracted_content, url)
    return formatted_output

app = gr.Blocks()

with app:
    gr.Markdown("# Hugging Face Space / Model Repository Content Extractor")
    url_input = gr.Textbox(label="https:// URL of Repository", placeholder="Enter the repository URL here OR select an example below...")
    url_examples = gr.Examples(
        examples=[
            ["https://huggingface.co/spaces/big-vision/paligemma-hf"],
            ["https://huggingface.co/google/paligemma-3b-mix-224"],
            ["https://huggingface.co/microsoft/Phi-3-vision-128k-instruct"],
            ["https://huggingface.co/llava-hf/llava-v1.6-mistral-7b-hf"]
        ],
        inputs=url_input
    )
    supported_file_types = gr.CheckboxGroup(SUPPORTED_FILE_TYPES, label="Supported File Types", info="Select the file types to include in the extraction.")
    max_file_size = gr.Slider(1, 1024, value=32, step=1, label="Max File Size (KB)", info="Files larger than this size will be skipped.")
    output_display = gr.Textbox(label="Extracted Repository Content", show_copy_button=True, lines=20, placeholder="Repository content will be extracted here...\n\nMetadata is captured for all files, but text content provided only for files less than the specified size limit.\n\n\n\nReview and search through the content here OR simply copy it for offline analysis!!. 🤖")
    extract_button = gr.Button("Extract Content")
    
    extract_button.click(fn=extract_and_display, inputs=[url_input, supported_file_types, max_file_size], outputs=output_display)

app.launch()