|
from datetime import datetime |
|
import os |
|
import warnings |
|
import traceback |
|
import gradio as gr |
|
import subprocess |
|
from huggingface_hub import Repository |
|
from git import Repo |
|
import requests |
|
|
|
warnings.filterwarnings('ignore') |
|
|
|
DOC_INDEXER = "indexer_multi.py" |
|
SPEC_INDEXER = "spec_indexer_multi.py" |
|
SPEC_DOC_INDEXER = "spec_doc_indexer_multi.py" |
|
BM25_INDEXER = "bm25_maker.py" |
|
|
|
DOC_INDEX_FILE = "indexed_docs.json" |
|
SPEC_INDEX_FILE = "indexed_specifications.json" |
|
SPEC_DOC_INDEX_FILE = "indexed_docs_content.zip" |
|
BM25_INDEX_FILE = "bm25s.zip" |
|
|
|
HF_SEARCH_REPO = "OrganizedProgrammers/3GPPDocFinder" |
|
REPO_DIR = os.path.dirname(os.path.abspath(__file__)) |
|
|
|
def get_docs_stats(): |
|
if os.path.exists(DOC_INDEX_FILE): |
|
import json |
|
with open(DOC_INDEX_FILE, 'r', encoding='utf-8') as f: |
|
data = json.load(f) |
|
return len(data["docs"]) |
|
return 0 |
|
|
|
def get_specs_stats(): |
|
if os.path.exists(SPEC_INDEX_FILE): |
|
import json |
|
with open(SPEC_INDEX_FILE, 'r', encoding='utf-8') as f: |
|
data = json.load(f) |
|
return len(data["specs"]) |
|
return 0 |
|
|
|
def get_scopes_stats(): |
|
if os.path.exists(SPEC_INDEX_FILE): |
|
import json |
|
with open(SPEC_INDEX_FILE, 'r', encoding="utf-8") as f: |
|
data = json.load(f) |
|
return len(data['scopes']) |
|
return 0 |
|
|
|
def check_permissions(user: str, token: str): |
|
try: |
|
req = requests.get("https://huggingface.co/api/whoami-v2", verify=False, headers={"Accept": "application/json", "Authorization": f"Bearer {token}"}) |
|
if req.status_code != 200: |
|
return False |
|
reqJson: dict = req.json() |
|
if not reqJson.get("name") or reqJson['name'] != user: |
|
return False |
|
if not reqJson.get("orgs") or len(reqJson['orgs']) == 0: |
|
return False |
|
for org in reqJson['orgs']: |
|
if "645cfa1b5ebf379fd6d8a339" == org['id']: |
|
return True |
|
if not reqJson.get('auth') or reqJson['auth'] == {}: |
|
return False |
|
if reqJson['auth']['accessToken']['role'] != "fineGrained": |
|
return False |
|
for scope in reqJson['auth']['accessToken']['fineGrained']['scoped']: |
|
if scope['entity']['type'] == "org" and scope['entity']['_id'] == "645cfa1b5ebf379fd6d8a339" and all(perm in scope['permissions'] for perm in ['repo.write', 'repo.content.read']): |
|
return True |
|
return False |
|
except Exception as e: |
|
traceback.print_exception(e) |
|
return False |
|
|
|
def update_logged(user: str, token: str): |
|
if check_permissions(user, token): |
|
return gr.update(visible=False), gr.update(visible=True), gr.update(visible=True), gr.update(visible=True) |
|
else: |
|
return gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False) |
|
|
|
def commit_and_push_3gppindexers(user, token, files, message, current_log=""): |
|
log = current_log + "\n" |
|
repo = Repo(REPO_DIR) |
|
origin = repo.remotes.origin |
|
repo.config_writer().set_value("user", "name", "3GPP Indexer Automatic Git Tool").release() |
|
repo.config_writer().set_value("user", "email", "[email protected]").release() |
|
origin.pull() |
|
log += "Git pull succeed !\n" |
|
yield log |
|
|
|
repo.git.add(files) |
|
repo.index.commit(message) |
|
|
|
try: |
|
repo.git.push(f"https://{user}:{token}@huggingface.co/spaces/OrganizedProgrammers/3GPPIndexers") |
|
log += "Git push succeed !\n" |
|
yield log |
|
log += "Wait for Huggingface to restart the Space\n" |
|
yield log |
|
except Exception as e: |
|
log += f"Git push failed: {e}\n" |
|
yield log |
|
|
|
def commit_and_push_3gppdocfinder(token, files, message, current_log=""): |
|
log = current_log + "\n" |
|
if not token: |
|
log += "No token provided. Skipping HuggingFace push.\n" |
|
yield log |
|
return |
|
|
|
hf_repo_dir = os.path.join(REPO_DIR, "hf_spaces") |
|
repo = None |
|
|
|
if not os.path.exists(hf_repo_dir): |
|
repo = Repository( |
|
local_dir=hf_repo_dir, |
|
repo_type="space", |
|
clone_from=HF_SEARCH_REPO, |
|
git_user="3GPP Indexer Automatic Git Tool", |
|
git_email="[email protected]", |
|
token=token, |
|
skip_lfs_files=True |
|
) |
|
else: |
|
repo = Repository( |
|
local_dir=hf_repo_dir, |
|
repo_type="space", |
|
git_user="3GPP Indexer Automatic Git Tool", |
|
git_email="[email protected]", |
|
token=token, |
|
skip_lfs_files=True |
|
) |
|
|
|
repo.git_pull() |
|
|
|
|
|
for f in files: |
|
import shutil |
|
shutil.copy2(f, os.path.join(hf_repo_dir, f)) |
|
|
|
repo.git_add(auto_lfs_track=True) |
|
repo.git_commit(message) |
|
repo.git_push() |
|
|
|
log += "Pushed to HuggingFace.\n" |
|
yield log |
|
|
|
def refresh_stats(): |
|
return str(get_docs_stats()), str(get_specs_stats()), str(get_scopes_stats()) |
|
|
|
def stream_script_output(script_path, current_log=""): |
|
accumulated_output = current_log |
|
|
|
process = subprocess.Popen( |
|
["python", script_path], |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.STDOUT, |
|
bufsize=1, |
|
universal_newlines=True, |
|
) |
|
|
|
for line in process.stdout: |
|
accumulated_output += line |
|
yield accumulated_output |
|
|
|
process.stdout.close() |
|
process.wait() |
|
|
|
yield accumulated_output |
|
|
|
def index_documents(user, token): |
|
log_output = "⏳ Indexation en cours...\n" |
|
|
|
yield gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False), log_output |
|
|
|
|
|
if not check_permissions(user, token): |
|
log_output += "❌ Identifiants invalides\n" |
|
yield gr.update(interactive=True), gr.update(interactive=True), gr.update(interactive=True), log_output |
|
return |
|
|
|
for log in stream_script_output(DOC_INDEXER, log_output): |
|
yield gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False), log |
|
log_output = log |
|
|
|
d = datetime.today().strftime("%d/%m/%Y-%H:%M:%S") |
|
|
|
for log in commit_and_push_3gppdocfinder(token, [DOC_INDEX_FILE], f"Update documents indexer via Indexer: {d}", log_output): |
|
yield gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False), log |
|
log_output = log |
|
|
|
for log in commit_and_push_3gppindexers(user, token, [DOC_INDEX_FILE], f"Update documents indexer via Indexer: {d}", log_output): |
|
yield gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False), log |
|
log_output = log |
|
|
|
|
|
log_output += "✅ Terminé.\n" |
|
yield gr.update(interactive=True), gr.update(interactive=True), gr.update(interactive=True), log_output |
|
|
|
def index_specifications(user, token): |
|
log_output = "⏳ Indexation en cours...\n" |
|
|
|
yield gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False), log_output |
|
|
|
|
|
if not check_permissions(user, token): |
|
log_output += "❌ Identifiants invalides\n" |
|
yield gr.update(interactive=True), gr.update(interactive=True), gr.update(interactive=True), log_output |
|
return |
|
|
|
for log in stream_script_output(SPEC_INDEXER, log_output): |
|
yield gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False), log |
|
log_output = log |
|
|
|
for log in stream_script_output(SPEC_DOC_INDEXER, log_output): |
|
yield gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False), log |
|
log_output = log |
|
|
|
for log in stream_script_output(BM25_INDEXER, log_output): |
|
yield gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False), log |
|
log_output = log |
|
|
|
d = datetime.today().strftime("%d/%m/%Y-%H:%M:%S") |
|
|
|
for log in commit_and_push_3gppdocfinder(token, [SPEC_DOC_INDEX_FILE, BM25_INDEX_FILE, SPEC_INDEX_FILE], f"Update specifications indexer via Indexer: {d}", log_output): |
|
yield gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False), log |
|
log_output = log |
|
|
|
for log in commit_and_push_3gppindexers(user, token, [SPEC_DOC_INDEX_FILE, BM25_INDEX_FILE, SPEC_INDEX_FILE], f"Update specifications indexer via Indexer: {d}", log_output): |
|
yield gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False), log |
|
log_output = log |
|
|
|
|
|
log_output += "✅ Terminé.\n" |
|
yield gr.update(interactive=True), gr.update(interactive=True), gr.update(interactive=True), log_output |
|
|
|
with gr.Blocks(theme=gr.themes.Soft()) as demo: |
|
gr.Markdown("## 📄 3GPP Indexers") |
|
|
|
with gr.Row() as r1: |
|
with gr.Column(): |
|
git_user = gr.Textbox(label="Git user (for push/pull indexes)") |
|
git_pass = gr.Textbox(label="Git Token", type="password") |
|
btn_login = gr.Button("Login", variant="primary") |
|
|
|
with gr.Row(visible=False) as r2: |
|
with gr.Column(): |
|
doc_count = gr.Textbox(label="Docs Indexed", value=str(get_docs_stats()), interactive=False) |
|
btn_docs = gr.Button("Re-index Documents", variant="primary") |
|
with gr.Column(): |
|
spec_count = gr.Textbox(label="Specs Indexed", value=str(get_specs_stats()), interactive=False) |
|
btn_specs = gr.Button("Re-index Specifications", variant="primary") |
|
with gr.Column(): |
|
scope_count = gr.Textbox(label="Scopes Indexed", value=str(get_scopes_stats()), interactive=False) |
|
|
|
out = gr.Textbox(label="Output/Log", lines=13, autoscroll=True, visible=False) |
|
refresh = gr.Button(value="🔄 Refresh Stats", visible=False) |
|
|
|
btn_login.click(update_logged, inputs=[git_user, git_pass], outputs=[r1, r2, out, refresh]) |
|
btn_docs.click(index_documents, inputs=[git_user, git_pass], outputs=[btn_docs, btn_specs, refresh, out]) |
|
btn_specs.click(index_specifications, inputs=[git_user, git_pass], outputs=[btn_docs, btn_specs, refresh, out]) |
|
refresh.click(refresh_stats, outputs=[doc_count, spec_count, scope_count]) |
|
|
|
demo.launch() |
|
|