Luigi's picture
switch to gradio version for stability reason
a703203
raw
history blame
13.2 kB
import os
import time
import re
import gc
import threading
from itertools import islice
from datetime import datetime
import gradio as gr
from llama_cpp import Llama
from llama_cpp.llama_speculative import LlamaPromptLookupDecoding
from huggingface_hub import hf_hub_download
from duckduckgo_search import DDGS
# ------------------------------
# Global Cancellation Event
# ------------------------------
cancel_event = threading.Event()
# ------------------------------
# Model Definitions and Global Variables
# ------------------------------
REQUIRED_SPACE_BYTES = 5 * 1024 ** 3 # 5 GB
MODELS = {
"Taiwan-tinyllama-v1.0-chat (Q8_0)": {
"repo_id": "NapYang/DavidLanz-Taiwan-tinyllama-v1.0-chat.GGUF",
"filename": "Taiwan-tinyllama-v1.0-chat-Q8_0.gguf",
"description": "Taiwan-tinyllama-v1.0-chat (Q8_0)"
},
"Llama-3.2-Taiwan-3B-Instruct (Q4_K_M)": {
"repo_id": "itlwas/Llama-3.2-Taiwan-3B-Instruct-Q4_K_M-GGUF",
"filename": "llama-3.2-taiwan-3b-instruct-q4_k_m.gguf",
"description": "Llama-3.2-Taiwan-3B-Instruct (Q4_K_M)"
},
"MiniCPM3-4B (Q4_K_M)": {
"repo_id": "openbmb/MiniCPM3-4B-GGUF",
"filename": "minicpm3-4b-q4_k_m.gguf",
"description": "MiniCPM3-4B (Q4_K_M)"
},
"Qwen2.5-3B-Instruct (Q4_K_M)": {
"repo_id": "Qwen/Qwen2.5-3B-Instruct-GGUF",
"filename": "qwen2.5-3b-instruct-q4_k_m.gguf",
"description": "Qwen2.5-3B-Instruct (Q4_K_M)"
},
"Qwen2.5-7B-Instruct (Q2_K)": {
"repo_id": "Qwen/Qwen2.5-7B-Instruct-GGUF",
"filename": "qwen2.5-7b-instruct-q2_k.gguf",
"description": "Qwen2.5-7B Instruct (Q2_K)"
},
"Gemma-3-4B-IT (Q4_K_M)": {
"repo_id": "unsloth/gemma-3-4b-it-GGUF",
"filename": "gemma-3-4b-it-Q4_K_M.gguf",
"description": "Gemma 3 4B IT (Q4_K_M)"
},
"Phi-4-mini-Instruct (Q4_K_M)": {
"repo_id": "unsloth/Phi-4-mini-instruct-GGUF",
"filename": "Phi-4-mini-instruct-Q4_K_M.gguf",
"description": "Phi-4 Mini Instruct (Q4_K_M)"
},
"Meta-Llama-3.1-8B-Instruct (Q2_K)": {
"repo_id": "MaziyarPanahi/Meta-Llama-3.1-8B-Instruct-GGUF",
"filename": "Meta-Llama-3.1-8B-Instruct.Q2_K.gguf",
"description": "Meta-Llama-3.1-8B-Instruct (Q2_K)"
},
"DeepSeek-R1-Distill-Llama-8B (Q2_K)": {
"repo_id": "unsloth/DeepSeek-R1-Distill-Llama-8B-GGUF",
"filename": "DeepSeek-R1-Distill-Llama-8B-Q2_K.gguf",
"description": "DeepSeek-R1-Distill-Llama-8B (Q2_K)"
},
"Mistral-7B-Instruct-v0.3 (IQ3_XS)": {
"repo_id": "MaziyarPanahi/Mistral-7B-Instruct-v0.3-GGUF",
"filename": "Mistral-7B-Instruct-v0.3.IQ3_XS.gguf",
"description": "Mistral-7B-Instruct-v0.3 (IQ3_XS)"
},
"Qwen2.5-Coder-7B-Instruct (Q2_K)": {
"repo_id": "Qwen/Qwen2.5-Coder-7B-Instruct-GGUF",
"filename": "qwen2.5-coder-7b-instruct-q2_k.gguf",
"description": "Qwen2.5-Coder-7B-Instruct (Q2_K)"
},
}
LOADED_MODELS = {}
CURRENT_MODEL_NAME = None
# ------------------------------
# Model Loading Helper Functions
# ------------------------------
def try_load_model(model_path):
try:
return Llama(
model_path=model_path,
n_ctx=4096,
n_threads=2,
n_threads_batch=1,
n_batch=256,
n_gpu_layers=0,
use_mlock=True,
use_mmap=True,
verbose=False,
logits_all=True,
draft_model=LlamaPromptLookupDecoding(num_pred_tokens=2),
)
except Exception as e:
return str(e)
def download_model(selected_model):
hf_hub_download(
repo_id=selected_model["repo_id"],
filename=selected_model["filename"],
local_dir="./models",
local_dir_use_symlinks=False,
)
def validate_or_download_model(selected_model):
model_path = os.path.join("models", selected_model["filename"])
os.makedirs("models", exist_ok=True)
if not os.path.exists(model_path):
download_model(selected_model)
result = try_load_model(model_path)
if isinstance(result, str):
try:
os.remove(model_path)
except Exception:
pass
download_model(selected_model)
result = try_load_model(model_path)
if isinstance(result, str):
raise Exception(f"Model load failed: {result}")
return result
def load_model(model_name):
global LOADED_MODELS, CURRENT_MODEL_NAME
if model_name in LOADED_MODELS:
return LOADED_MODELS[model_name]
selected_model = MODELS[model_name]
model = validate_or_download_model(selected_model)
LOADED_MODELS[model_name] = model
CURRENT_MODEL_NAME = model_name
return model
# ------------------------------
# Web Search Context Retrieval Function
# ------------------------------
def retrieve_context(query, max_results=6, max_chars_per_result=600):
try:
with DDGS() as ddgs:
results = list(islice(ddgs.text(query, region="wt-wt", safesearch="off", timelimit="y"), max_results))
context = ""
for i, result in enumerate(results, start=1):
title = result.get("title", "No Title")
snippet = result.get("body", "")[:max_chars_per_result]
context += f"Result {i}:\nTitle: {title}\nSnippet: {snippet}\n\n"
return context.strip()
except Exception:
return ""
# ------------------------------
# Chat Response Generation (Streaming) with Cancellation
# ------------------------------
def chat_response(user_message, chat_history, system_prompt, enable_search,
max_results, max_chars, model_name, max_tokens, temperature, top_k, top_p, repeat_penalty):
"""
Generator function that:
- Uses the chat history (list of dicts) from the Chatbot.
- Appends the new user message.
- Optionally retrieves web search context.
- Streams the assistant response token-by-token.
- Checks for cancellation.
"""
# Reset the cancellation event.
cancel_event.clear()
# Prepare internal history.
internal_history = list(chat_history) if chat_history else []
internal_history.append({"role": "user", "content": user_message})
# Retrieve web search context (with debug feedback).
debug_message = ""
if enable_search:
debug_message = "Initiating web search..."
yield internal_history, debug_message
search_result = [""]
def do_search():
search_result[0] = retrieve_context(user_message, max_results, max_chars)
search_thread = threading.Thread(target=do_search)
search_thread.start()
search_thread.join(timeout=2)
retrieved_context = search_result[0]
if retrieved_context:
debug_message = f"Web search results:\n\n{retrieved_context}"
else:
debug_message = "Web search returned no results or timed out."
else:
retrieved_context = ""
debug_message = "Web search disabled."
# Augment prompt.
if enable_search and retrieved_context:
augmented_user_input = (
f"{system_prompt.strip()}\n\n"
"Use the following recent web search context to help answer the query:\n\n"
f"{retrieved_context}\n\n"
f"User Query: {user_message}"
)
else:
augmented_user_input = f"{system_prompt.strip()}\n\nUser Query: {user_message}"
# Build final prompt messages.
messages = internal_history[:-1] + [{"role": "user", "content": augmented_user_input}]
# Load the model.
model = load_model(model_name)
# Add an empty assistant message.
internal_history.append({"role": "assistant", "content": ""})
assistant_message = ""
try:
stream = model.create_chat_completion(
messages=messages,
max_tokens=max_tokens,
temperature=temperature,
top_k=top_k,
top_p=top_p,
repeat_penalty=repeat_penalty,
stream=True,
)
for chunk in stream:
# Check if a cancellation has been requested.
if cancel_event.is_set():
assistant_message += "\n\n[Response generation cancelled by user]"
internal_history[-1]["content"] = assistant_message
yield internal_history, debug_message
break
if "choices" in chunk:
delta = chunk["choices"][0]["delta"].get("content", "")
assistant_message += delta
internal_history[-1]["content"] = assistant_message
yield internal_history, debug_message
if chunk["choices"][0].get("finish_reason", ""):
break
except Exception as e:
internal_history[-1]["content"] = f"Error: {e}"
yield internal_history, debug_message
gc.collect()
# ------------------------------
# Cancel Function
# ------------------------------
def cancel_generation():
cancel_event.set()
return "Cancellation requested."
# ------------------------------
# Gradio UI Definition
# ------------------------------
with gr.Blocks(title="Multi-GGUF LLM Inference") as demo:
gr.Markdown("## 🧠 Multi-GGUF LLM Inference with Web Search")
gr.Markdown("Interact with the model. Select your model, set your system prompt, and adjust parameters on the left.")
with gr.Row():
with gr.Column(scale=3):
default_model = list(MODELS.keys())[0] if MODELS else "No models available"
model_dropdown = gr.Dropdown(
label="Select Model",
choices=list(MODELS.keys()) if MODELS else [],
value=default_model,
info="Choose from available models."
)
today = datetime.now().strftime('%Y-%m-%d')
default_prompt = f"You are a helpful assistant. Today is {today}. Please leverage the latest web data when responding to queries."
system_prompt_text = gr.Textbox(label="System Prompt",
value=default_prompt,
lines=3,
info="Define the base context for the AI's responses.")
gr.Markdown("### Generation Parameters")
max_tokens_slider = gr.Slider(label="Max Tokens", minimum=64, maximum=1024, value=1024, step=32,
info="Maximum tokens for the response.")
temperature_slider = gr.Slider(label="Temperature", minimum=0.1, maximum=2.0, value=0.7, step=0.1,
info="Controls the randomness of the output.")
top_k_slider = gr.Slider(label="Top-K", minimum=1, maximum=100, value=40, step=1,
info="Limits token candidates to the top-k tokens.")
top_p_slider = gr.Slider(label="Top-P (Nucleus Sampling)", minimum=0.1, maximum=1.0, value=0.95, step=0.05,
info="Limits token candidates to a cumulative probability threshold.")
repeat_penalty_slider = gr.Slider(label="Repetition Penalty", minimum=1.0, maximum=2.0, value=1.1, step=0.1,
info="Penalizes token repetition to improve diversity.")
gr.Markdown("### Web Search Settings")
enable_search_checkbox = gr.Checkbox(label="Enable Web Search", value=False,
info="Include recent search context to improve answers.")
max_results_number = gr.Number(label="Max Search Results", value=6, precision=0,
info="Maximum number of search results to retrieve.")
max_chars_number = gr.Number(label="Max Chars per Result", value=600, precision=0,
info="Maximum characters to retrieve per search result.")
clear_button = gr.Button("Clear Chat")
cancel_button = gr.Button("Cancel Generation")
with gr.Column(scale=7):
chatbot = gr.Chatbot(label="Chat", type="messages")
msg_input = gr.Textbox(label="Your Message", placeholder="Enter your message and press Enter")
search_debug = gr.Markdown(label="Web Search Debug")
def clear_chat():
return [], "", ""
clear_button.click(fn=clear_chat, outputs=[chatbot, msg_input, search_debug])
cancel_button.click(fn=cancel_generation, outputs=search_debug)
# Submission that returns conversation and debug info.
msg_input.submit(
fn=chat_response,
inputs=[msg_input, chatbot, system_prompt_text, enable_search_checkbox,
max_results_number, max_chars_number, model_dropdown,
max_tokens_slider, temperature_slider, top_k_slider, top_p_slider, repeat_penalty_slider],
outputs=[chatbot, search_debug],
# Uncomment streaming=True if supported.
# streaming=True,
)
demo.launch()