ai / src /main /gradio.py
hadadrjt's picture
ai: Refactor the code.
d17e7ef
raw
history blame
13.9 kB
#
# SPDX-FileCopyrightText: Hadad <[email protected]>
# SPDX-License-Identifier: Apache-2.0
#
import gradio as gr # Import Gradio library for building the web UI
import asyncio # Import asyncio for asynchronous programming
from pathlib import Path # Import Path for filesystem path manipulations
from config import * # Import all configuration constants and variables
from src.cores.session import create_session, ensure_stop_event, get_model_key # Import session management utilities
from src.main.file_extractors import extract_file_content # Import function to extract content from uploaded files
from src.cores.client import chat_with_model_async # Import async chat function with AI model
async def respond_async(multi, history, model_display, sess, custom_prompt, deep_search):
"""
Asynchronous handler for processing user input submissions.
Supports multi-modal input including text and file uploads.
Extracts content from uploaded files and appends it to user text input.
Streams AI-generated responses back to the UI, updating chat history live.
Allows graceful stopping of response generation upon user request.
Parameters:
- multi: dict containing user text input and uploaded files
- history: list of previous chat messages (user and AI)
- model_display: selected AI model identifier
- sess: current session object managing state and cancellation
- custom_prompt: user-defined system instructions
- deep_search: boolean flag to enable extended search capabilities
Yields:
- Updated chat history and UI state for real-time interaction
"""
ensure_stop_event(sess) # Ensure the session has a stop event initialized
sess.stop_event.clear() # Clear any previous stop signals
sess.cancel_token["cancelled"] = False # Reset cancellation flag
# Extract text and files from multimodal input dictionary
msg_input = {"text": multi.get("text", "").strip(), "files": multi.get("files", [])}
# If no input text or files, reset UI input and return early
if not msg_input["text"] and not msg_input["files"]:
yield history, gr.update(value="", interactive=True, submit_btn=True, stop_btn=False), sess
return
# Initialize combined input string with extracted file contents
inp = ""
for f in msg_input["files"]:
# Support both dict format or direct file path string
fp = f.get("data", f.get("name", "")) if isinstance(f, dict) else f
# Append extracted file content with spacing
inp += f"```\n{extract_file_content(fp)}\n``` \n\n\n"
# Append user text input if present
if msg_input["text"]:
inp += msg_input["text"]
# Append user input to chat history with placeholder AI response
history.append([inp, RESPONSES["RESPONSE_8"]]) # RESPONSE_8 is a placeholder text
# Yield updated history and disable input while AI is responding
yield history, gr.update(interactive=False, submit_btn=False, stop_btn=True), sess
queue = asyncio.Queue() # Queue to hold streamed AI response chunks
async def background():
"""
Background async task to fetch streamed AI responses from the model.
Handles reasoning and content chunks separately.
Supports cancellation via session stop event.
"""
reasoning = "" # Accumulate reasoning text
responses = "" # Accumulate content text
content_started = False # Flag to indicate content streaming started
ignore_reasoning = False # Flag to ignore reasoning after content starts
# Async iterate over streaming response chunks from AI model
async for typ, chunk in chat_with_model_async(history, inp, model_display, sess, custom_prompt, deep_search):
# Break if user requested stop or cancellation flagged
if sess.stop_event.is_set() or sess.cancel_token["cancelled"]:
break
if typ == "reasoning":
# Append reasoning chunk unless ignoring reasoning after content start
if ignore_reasoning:
continue
reasoning += chunk
# Put formatted reasoning text into queue for UI update
await queue.put(("reasoning", reasoning))
elif typ == "content":
if not content_started:
# On first content chunk, clear reasoning and start content accumulation
content_started = True
ignore_reasoning = True
responses = chunk
await queue.put(("reasoning", "")) # Clear reasoning display
await queue.put(("replace", responses)) # Replace placeholder with content start
else:
# Append subsequent content chunks and update UI
responses += chunk
await queue.put(("append", responses))
await queue.put(None) # Signal completion of streaming
return responses # Return final complete response text
bg_task = asyncio.create_task(background()) # Start background streaming task
stop_task = asyncio.create_task(sess.stop_event.wait()) # Task to wait for stop event
pending_tasks = {bg_task, stop_task} # Track pending async tasks
try:
while True:
queue_task = asyncio.create_task(queue.get()) # Task to get next queued update
pending_tasks.add(queue_task)
# Wait for either stop event or new queue item
done, _ = await asyncio.wait({stop_task, queue_task}, return_when=asyncio.FIRST_COMPLETED)
for task in done:
pending_tasks.discard(task)
if task is stop_task:
# User requested stop, cancel background task and update UI accordingly
sess.cancel_token["cancelled"] = True
bg_task.cancel()
try:
await bg_task
except asyncio.CancelledError:
pass
# Update last message with cancellation notice
history[-1][1] = RESPONSES["RESPONSE_1"]
yield history, gr.update(value="", interactive=True, submit_btn=True, stop_btn=False), sess
return
result = task.result()
if result is None:
# Streaming finished, stop iteration
raise StopAsyncIteration
action, text = result
# Update last message content in history with streamed text chunk
history[-1][1] = text
# Yield updated history and UI state to refresh chat display
yield history, gr.update(interactive=False, submit_btn=False, stop_btn=True), sess
except StopAsyncIteration:
# Normal completion of streaming
pass
finally:
# Cancel any remaining pending tasks to clean up
for task in pending_tasks:
task.cancel()
await asyncio.gather(*pending_tasks, return_exceptions=True)
# After completion, reset UI input to ready state
yield history, gr.update(value="", interactive=True, submit_btn=True, stop_btn=False), sess
def toggle_deep_search(deep_search_value, history, sess, prompt, model):
"""
Toggle the deep search checkbox state.
Maintains current chat history and session for production use.
Parameters:
- deep_search_value: new checkbox boolean value
- history: current chat history
- sess: current session object
- prompt: current system instructions
- model: currently selected model
Returns:
- Unchanged history, session, prompt, model
- Updated deep search checkbox UI state
"""
return history, sess, prompt, model, gr.update(value=deep_search_value)
def change_model(new):
"""
Handler to change the selected AI model.
Resets chat history and creates a new session.
Updates system instructions and deep search checkbox visibility.
Deep search is only enabled for the default model.
Parameters:
- new: newly selected model identifier
Returns:
- Empty chat history list
- New session object
- New model identifier
- Corresponding system instructions string
- Deep search checkbox reset to False
- UI update for deep search checkbox visibility
"""
visible = new == MODEL_CHOICES[0] # Deep search visible only for default model
# Get system instructions for new model or fallback to default instructions
default_prompt = SYSTEM_PROMPT_MAPPING.get(get_model_key(new, MODEL_MAPPING, DEFAULT_MODEL_KEY), SYSTEM_PROMPT_DEFAULT)
# Clear chat, create new session, reset deep search, update UI visibility
return [], create_session(), new, default_prompt, False, gr.update(visible=visible)
def stop_response(history, sess):
"""
Handler to stop ongoing AI response generation.
Sets cancellation flags and updates the last message to a cancellation notice.
Parameters:
- history: current chat history list
- sess: current session object
Returns:
- Updated chat history with cancellation message
- None for input box reset
- New session object for fresh state
"""
ensure_stop_event(sess) # Ensure stop event exists in session
sess.stop_event.set() # Signal stop event to cancel ongoing tasks
sess.cancel_token["cancelled"] = True # Mark cancellation flag
if history:
# Replace last AI response with cancellation message
history[-1][1] = RESPONSES["RESPONSE_1"]
return history, None, create_session()
def launch_ui():
"""
Launch the Gradio UI for the chatbot application.
Sets up the UI components, event handlers, and starts the server.
Installs required OCR dependencies for file content extraction.
"""
# ============================
# System Setup
# ============================
# Install Tesseract OCR and dependencies for extracting text from images
import os
os.system("apt-get update -q -y && \
apt-get install -q -y tesseract-ocr \
tesseract-ocr-eng tesseract-ocr-ind \
libleptonica-dev libtesseract-dev"
)
# Create Gradio Blocks container for full UI layout
with gr.Blocks(fill_height=True, fill_width=True, title=AI_TYPES["AI_TYPE_4"], head=META_TAGS) as jarvis:
# State variables to hold chat history, session, selected model, and instructions
user_history = gr.State([])
user_session = gr.State(create_session())
selected_model = gr.State(MODEL_CHOICES[0] if MODEL_CHOICES else "")
J_A_R_V_I_S = gr.State("")
# Chatbot UI
with gr.Column():
chatbot = gr.Chatbot(label=AI_TYPES["AI_TYPE_1"], show_copy_button=True, scale=1, elem_id=AI_TYPES["AI_TYPE_2"], examples=JARVIS_INIT, allow_tags=["think", "thinking"])
# User input
msg = gr.MultimodalTextbox(show_label=False, placeholder=RESPONSES["RESPONSE_5"], interactive=True, file_count=None, file_types=None, sources=[])
# Sidebar on left for model selection and deep search toggle
with gr.Sidebar(open=False):
deep_search = gr.Checkbox(label=AI_TYPES["AI_TYPE_8"], value=False, info=AI_TYPES["AI_TYPE_9"], visible=True)
# When deep search checkbox changes, call toggle_deep_search handler
deep_search.change(fn=toggle_deep_search, inputs=[deep_search, user_history, user_session, J_A_R_V_I_S, selected_model], outputs=[chatbot, user_session, J_A_R_V_I_S, selected_model, deep_search])
gr.Markdown() # Add spacing line
model_radio = gr.Radio(show_label=False, choices=MODEL_CHOICES, value=MODEL_CHOICES[0])
# Sidebar on right for notices and additional information
with gr.Sidebar(open=False, position="right"):
gr.Markdown(NOTICES)
# When model selection changes, call change_model handler
model_radio.change(fn=change_model, inputs=[model_radio], outputs=[user_history, user_session, selected_model, J_A_R_V_I_S, deep_search, deep_search])
# Event handler for selecting example messages in chatbot UI
def on_example_select(evt: gr.SelectData):
return evt.value
chatbot.example_select(fn=on_example_select, inputs=[], outputs=[msg]).then(
fn=respond_async,
inputs=[msg, user_history, selected_model, user_session, J_A_R_V_I_S, deep_search],
outputs=[chatbot, msg, user_session]
)
# Clear chat button handler resets chat, session, instructions, model, and history
def clear_chat(history, sess, prompt, model):
return [], create_session(), prompt, model, []
chatbot.clear(fn=clear_chat, inputs=[user_history, user_session, J_A_R_V_I_S, selected_model], outputs=[chatbot, user_session, J_A_R_V_I_S, selected_model, user_history])
# Submit user message triggers respond_async to generate AI response
msg.submit(fn=respond_async, inputs=[msg, user_history, selected_model, user_session, J_A_R_V_I_S, deep_search], outputs=[chatbot, msg, user_session], api_name=INTERNAL_AI_GET_SERVER)
# Stop button triggers stop_response handler to cancel ongoing AI generation
msg.stop(fn=stop_response, inputs=[user_history, user_session], outputs=[chatbot, msg, user_session])
# Launch
jarvis.queue(default_concurrency_limit=2).launch(max_file_size="1mb", mcp_server=True)