# # SPDX-FileCopyrightText: Hadad # SPDX-License-Identifier: Apache-2.0 # import asyncio import docx import gradio as gr import httpx import json import os import pandas as pd import pdfplumber import pytesseract import random import requests import threading import uuid from PIL import Image from pathlib import Path from pptx import Presentation os.system("apt-get update -q -y && apt-get install -q -y tesseract-ocr tesseract-ocr-eng tesseract-ocr-ind libleptonica-dev libtesseract-dev") INTERNAL_AI_GET_SERVER = os.getenv("INTERNAL_AI_GET_SERVER") INTERNAL_TRAINING_DATA = os.getenv("INTERNAL_TRAINING_DATA") SYSTEM_PROMPT_MAPPING = json.loads(os.getenv("SYSTEM_PROMPT_MAPPING", "{}")) SYSTEM_PROMPT_DEFAULT = os.getenv("DEFAULT_SYSTEM") LINUX_SERVER_HOSTS = [h for h in json.loads(os.getenv("LINUX_SERVER_HOST", "[]")) if h] LINUX_SERVER_HOSTS_MARKED = set() LINUX_SERVER_HOSTS_ATTEMPTS = {} LINUX_SERVER_PROVIDER_KEYS = [k for k in json.loads(os.getenv("LINUX_SERVER_PROVIDER_KEY", "[]")) if k] LINUX_SERVER_PROVIDER_KEYS_MARKED = set() LINUX_SERVER_PROVIDER_KEYS_ATTEMPTS = {} LINUX_SERVER_ERRORS = set(map(int, os.getenv("LINUX_SERVER_ERROR", "").split(","))) AI_TYPES = {f"AI_TYPE_{i}": os.getenv(f"AI_TYPE_{i}") for i in range(1, 8)} RESPONSES = {f"RESPONSE_{i}": os.getenv(f"RESPONSE_{i}") for i in range(1, 10)} MODEL_MAPPING = json.loads(os.getenv("MODEL_MAPPING", "{}")) MODEL_CONFIG = json.loads(os.getenv("MODEL_CONFIG", "{}")) MODEL_CHOICES = list(MODEL_MAPPING.values()) DEFAULT_CONFIG = json.loads(os.getenv("DEFAULT_CONFIG", "{}")) DEFAULT_MODEL_KEY = list(MODEL_MAPPING.keys())[0] if MODEL_MAPPING else None META_TAGS = os.getenv("META_TAGS") ALLOWED_EXTENSIONS = json.loads(os.getenv("ALLOWED_EXTENSIONS", "[]")) ACTIVE_CANDIDATE = None class SessionWithID(requests.Session): def __init__(self): super().__init__() self.session_id = str(uuid.uuid4()) def create_session(): return SessionWithID() def get_available_items(items, marked): a = [i for i in items if i not in marked] random.shuffle(a) return a def marked_item(item, marked, attempts): marked.add(item) attempts[item] = attempts.get(item, 0) + 1 if attempts[item] >= 3: def remove(): marked.discard(item) attempts.pop(item, None) threading.Timer(300, remove).start() def get_model_key(display): return next((k for k, v in MODEL_MAPPING.items() if v == display), DEFAULT_MODEL_KEY) def extract_file_content(fp): ext = Path(fp).suffix.lower() c = "" try: if ext == ".pdf": with pdfplumber.open(fp) as pdf: for p in pdf.pages: t = p.extract_text() or "" c += t + "\n" elif ext in [".doc", ".docx"]: d = docx.Document(fp) for para in d.paragraphs: c += para.text + "\n" elif ext in [".xlsx", ".xls"]: df = pd.read_excel(fp) c += df.to_csv(index=False) elif ext in [".ppt", ".pptx"]: prs = Presentation(fp) for s in prs.slides: for sh in s.shapes: if hasattr(sh, "text") and sh.text: c += sh.text + "\n" else: c = Path(fp).read_text(encoding="utf-8") except Exception as e: c = f"{fp}: {e}" return c.strip() async def fetch_response_async(host, key, model, msgs, cfg, sid): for t in [60, 80, 120, 240]: try: async with httpx.AsyncClient(timeout=t) as client: r = await client.post(host, json={"model": model, "messages": msgs, **cfg, "session_id": sid}, headers={"Authorization": f"Bearer {key}"}) if r.status_code in LINUX_SERVER_ERRORS: marked_item(key, LINUX_SERVER_PROVIDER_KEYS_MARKED, LINUX_SERVER_PROVIDER_KEYS_ATTEMPTS) return None r.raise_for_status() j = r.json() if isinstance(j, dict) and j.get("choices"): ch = j["choices"][0] if ch.get("message") and isinstance(ch["message"].get("content"), str): return ch["message"]["content"] return None except: continue marked_item(key, LINUX_SERVER_PROVIDER_KEYS_MARKED, LINUX_SERVER_PROVIDER_KEYS_ATTEMPTS) return None async def chat_with_model_async(history, user_input, model_display, sess, custom_prompt): if not get_available_items(LINUX_SERVER_PROVIDER_KEYS, LINUX_SERVER_PROVIDER_KEYS_MARKED) or not get_available_items(LINUX_SERVER_HOSTS, LINUX_SERVER_HOSTS_ATTEMPTS): return RESPONSES["RESPONSE_3"] if not hasattr(sess, "session_id"): sess.session_id = str(uuid.uuid4()) model_key = get_model_key(model_display) cfg = MODEL_CONFIG.get(model_key, DEFAULT_CONFIG) msgs = [{"role": "user", "content": u} for u, _ in history] + [{"role": "assistant", "content": a} for _, a in history if a] if model_key == DEFAULT_MODEL_KEY and INTERNAL_TRAINING_DATA: prompt = INTERNAL_TRAINING_DATA else: prompt = custom_prompt or SYSTEM_PROMPT_MAPPING.get(model_key, SYSTEM_PROMPT_DEFAULT) msgs.insert(0, {"role": "system", "content": prompt}) msgs.append({"role": "user", "content": user_input}) global ACTIVE_CANDIDATE if ACTIVE_CANDIDATE: res = await fetch_response_async(ACTIVE_CANDIDATE[0], ACTIVE_CANDIDATE[1], model_key, msgs, cfg, sess.session_id) if res: return res ACTIVE_CANDIDATE = None keys = get_available_items(LINUX_SERVER_PROVIDER_KEYS, LINUX_SERVER_PROVIDER_KEYS_MARKED) hosts = get_available_items(LINUX_SERVER_HOSTS, LINUX_SERVER_HOSTS_ATTEMPTS) cands = [(h, k) for h in hosts for k in keys] random.shuffle(cands) for h, k in cands: res = await fetch_response_async(h, k, model_key, msgs, cfg, sess.session_id) if res: ACTIVE_CANDIDATE = (h, k) return res return RESPONSES["RESPONSE_2"] async def respond_async(multi, history, model_display, sess, custom_prompt): msg = {"text": multi.get("text", "").strip(), "files": multi.get("files", [])} if not msg["text"] and not msg["files"]: yield history, gr.MultimodalTextbox(value=None, interactive=True), sess return inp = "" for f in msg["files"]: p = f["name"] if isinstance(f, dict) and "name" in f else f inp += f"{Path(p).name}\n\n{extract_file_content(p)}\n\n" if msg["text"]: inp += msg["text"] history.append([inp, ""]) ai = await chat_with_model_async(history, inp, model_display, sess, custom_prompt) history[-1][1] = "" def to_str(d): if isinstance(d, (str, int, float)): return str(d) if isinstance(d, bytes): return d.decode("utf-8", errors="ignore") if isinstance(d, (list, tuple)): return "".join(map(to_str, d)) if isinstance(d, dict): return json.dumps(d, ensure_ascii=False) return repr(d) for c in ai: history[-1][1] += to_str(c) await asyncio.sleep(0.0001) yield history, gr.MultimodalTextbox(value=None, interactive=True), sess def change_model(new): visible = new != MODEL_CHOICES[0] default = SYSTEM_PROMPT_MAPPING.get(get_model_key(new), SYSTEM_PROMPT_DEFAULT) return [], create_session(), new, default, gr.update(value=default, visible=visible) with gr.Blocks(fill_height=True, fill_width=True, title=AI_TYPES["AI_TYPE_4"], head=META_TAGS) as jarvis: user_history = gr.State([]) user_session = gr.State(create_session()) selected_model = gr.State(MODEL_CHOICES[0] if MODEL_CHOICES else "") custom_prompt_state = gr.State("") chatbot = gr.Chatbot(label=AI_TYPES["AI_TYPE_1"], show_copy_button=True, scale=1, elem_id=AI_TYPES["AI_TYPE_2"]) with gr.Row(): msg = gr.MultimodalTextbox(show_label=False, placeholder=RESPONSES["RESPONSE_5"], interactive=True, file_count="single", file_types=ALLOWED_EXTENSIONS) with gr.Accordion(AI_TYPES["AI_TYPE_6"], open=False): model_dropdown = gr.Dropdown(show_label=False, choices=MODEL_CHOICES, value=MODEL_CHOICES[0]) system_prompt = gr.Textbox(label=AI_TYPES["AI_TYPE_7"], lines=2, interactive=True, visible=False) model_dropdown.change(fn=change_model, inputs=[model_dropdown], outputs=[user_history, user_session, selected_model, custom_prompt_state, system_prompt]) system_prompt.change(fn=lambda x: x, inputs=[system_prompt], outputs=[custom_prompt_state]) msg.submit(fn=respond_async, inputs=[msg, user_history, selected_model, user_session, custom_prompt_state], outputs=[chatbot, msg, user_session], api_name=INTERNAL_AI_GET_SERVER) jarvis.launch(max_file_size="1mb")