Luigi's picture
pin torch to 2.4.0
4c6b4c5
raw
history blame
12.4 kB
import os
import time
import gc
import threading
from itertools import islice
from datetime import datetime
import gradio as gr
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from duckduckgo_search import DDGS
import spaces # Import spaces early to enable ZeroGPU support
# Disable GPU visibility if you wish to force CPU usage outside of GPU functions
# (Not strictly needed for ZeroGPU as the decorator handles allocation)
# os.environ["CUDA_VISIBLE_DEVICES"] = ""
# ------------------------------
# Global Cancellation Event
# ------------------------------
cancel_event = threading.Event()
# ------------------------------
# Torch-Compatible Model Definitions with Adjusted Descriptions
# ------------------------------
MODELS = {
"Taiwan-tinyllama-v1.0-chat (Q8_0)": {
"repo_id": "DavidLanz/Taiwan-tinyllama-v1.0-chat",
"description": "Taiwan-tinyllama-v1.0-chat (Q8_0) – Torch-compatible version converted from GGUF."
},
"Llama-3.2-Taiwan-3B-Instruct (Q4_K_M)": {
"repo_id": "https://huggingface.co/lianghsun/Llama-3.2-Taiwan-3B-Instruct",
"description": "Llama-3.2-Taiwan-3B-Instruct (Q4_K_M) – Torch-compatible version converted from GGUF."
},
"MiniCPM3-4B (Q4_K_M)": {
"repo_id": "openbmb/MiniCPM3-4B",
"description": "MiniCPM3-4B (Q4_K_M) – Torch-compatible version converted from GGUF."
},
"Qwen2.5-3B-Instruct (Q4_K_M)": {
"repo_id": "Qwen/Qwen2.5-3B-Instruct",
"description": "Qwen2.5-3B-Instruct (Q4_K_M) – Torch-compatible version converted from GGUF."
},
"Qwen2.5-7B-Instruct (Q2_K)": {
"repo_id": "Qwen/Qwen2.5-7B-Instruct",
"description": "Qwen2.5-7B-Instruct (Q2_K) – Torch-compatible version converted from GGUF."
},
"Gemma-3-4B-IT (Q4_K_M)": {
"repo_id": "unsloth/gemma-3-4b-it",
"description": "Gemma-3-4B-IT (Q4_K_M) – Torch-compatible version converted from GGUF."
},
"Phi-4-mini-Instruct (Q4_K_M)": {
"repo_id": "unsloth/Phi-4-mini-instruct",
"description": "Phi-4-mini-Instruct (Q4_K_M) – Torch-compatible version converted from GGUF."
},
"Meta-Llama-3.1-8B-Instruct (Q2_K)": {
"repo_id": "MaziyarPanahi/Meta-Llama-3.1-8B-Instruct",
"description": "Meta-Llama-3.1-8B-Instruct (Q2_K) – Torch-compatible version converted from GGUF."
},
"DeepSeek-R1-Distill-Llama-8B (Q2_K)": {
"repo_id": "unsloth/DeepSeek-R1-Distill-Llama-8B",
"description": "DeepSeek-R1-Distill-Llama-8B (Q2_K) – Torch-compatible version converted from GGUF."
},
"Mistral-7B-Instruct-v0.3 (IQ3_XS)": {
"repo_id": "MaziyarPanahi/Mistral-7B-Instruct-v0.3",
"description": "Mistral-7B-Instruct-v0.3 (IQ3_XS) – Torch-compatible version converted from GGUF."
},
"Qwen2.5-Coder-7B-Instruct (Q2_K)": {
"repo_id": "Qwen/Qwen2.5-Coder-7B-Instruct",
"description": "Qwen2.5-Coder-7B-Instruct (Q2_K) – Torch-compatible version converted from GGUF."
},
}
LOADED_MODELS = {}
CURRENT_MODEL_NAME = None
# ------------------------------
# Model Loading Helper Function (PyTorch/Transformers)
# ------------------------------
def load_model(model_name):
global LOADED_MODELS, CURRENT_MODEL_NAME
if model_name in LOADED_MODELS:
return LOADED_MODELS[model_name]
selected_model = MODELS[model_name]
# Load the model and tokenizer using Transformers.
model = AutoModelForCausalLM.from_pretrained(selected_model["repo_id"], trust_remote_code=True)
tokenizer = AutoTokenizer.from_pretrained(selected_model["repo_id"], trust_remote_code=True)
LOADED_MODELS[model_name] = (model, tokenizer)
CURRENT_MODEL_NAME = model_name
return model, tokenizer
# ------------------------------
# Web Search Context Retrieval Function
# ------------------------------
def retrieve_context(query, max_results=6, max_chars_per_result=600):
try:
with DDGS() as ddgs:
results = list(islice(ddgs.text(query, region="wt-wt", safesearch="off", timelimit="y"), max_results))
context = ""
for i, result in enumerate(results, start=1):
title = result.get("title", "No Title")
snippet = result.get("body", "")[:max_chars_per_result]
context += f"Result {i}:\nTitle: {title}\nSnippet: {snippet}\n\n"
return context.strip()
except Exception:
return ""
# ------------------------------
# Chat Response Generation with ZeroGPU
# ------------------------------
@spaces.GPU(duration=60) # This decorator triggers GPU allocation for up to 60 seconds.
def chat_response(user_message, chat_history, system_prompt, enable_search,
max_results, max_chars, model_name, max_tokens, temperature, top_k, top_p, repeat_penalty):
# Reset the cancellation event.
cancel_event.clear()
# Prepare internal chat history.
internal_history = list(chat_history) if chat_history else []
internal_history.append({"role": "user", "content": user_message})
# Retrieve web search context (with debug feedback).
debug_message = ""
if enable_search:
debug_message = "Initiating web search..."
yield internal_history, debug_message
search_result = [""]
def do_search():
search_result[0] = retrieve_context(user_message, max_results, max_chars)
search_thread = threading.Thread(target=do_search)
search_thread.start()
search_thread.join(timeout=2)
retrieved_context = search_result[0]
if retrieved_context:
debug_message = f"Web search results:\n\n{retrieved_context}"
else:
debug_message = "Web search returned no results or timed out."
else:
retrieved_context = ""
debug_message = "Web search disabled."
# Augment the prompt with search context if available.
if enable_search and retrieved_context:
augmented_user_input = (
f"{system_prompt.strip()}\n\n"
"Use the following recent web search context to help answer the query:\n\n"
f"{retrieved_context}\n\n"
f"User Query: {user_message}"
)
else:
augmented_user_input = f"{system_prompt.strip()}\n\nUser Query: {user_message}"
# Append a placeholder for the assistant's response.
internal_history.append({"role": "assistant", "content": ""})
try:
# Load the model and tokenizer.
model, tokenizer = load_model(model_name)
# Move the model to GPU (using .to('cuda')) inside the GPU-decorated function.
model = model.to('cuda')
# Tokenize the augmented prompt and move input tensors to GPU.
input_ids = tokenizer(augmented_user_input, return_tensors="pt").input_ids.to('cuda')
with torch.no_grad():
output_ids = model.generate(
input_ids,
max_new_tokens=max_tokens,
temperature=temperature,
top_k=top_k,
top_p=top_p,
repetition_penalty=repeat_penalty,
do_sample=True
)
# Decode the generated tokens.
generated_text = tokenizer.decode(output_ids[0], skip_special_tokens=True)
# Remove the original prompt to isolate the assistant's reply.
assistant_text = generated_text[len(augmented_user_input):].strip()
# Simulate streaming output by yielding word-by-word.
words = assistant_text.split()
assistant_message = ""
for word in words:
if cancel_event.is_set():
assistant_message += "\n\n[Response generation cancelled by user]"
internal_history[-1]["content"] = assistant_message
yield internal_history, debug_message
return
assistant_message += word + " "
internal_history[-1]["content"] = assistant_message
yield internal_history, debug_message
time.sleep(0.05) # Short delay to simulate streaming
except Exception as e:
internal_history[-1]["content"] = f"Error: {e}"
yield internal_history, debug_message
gc.collect()
# ------------------------------
# Cancel Function
# ------------------------------
def cancel_generation():
cancel_event.set()
return "Cancellation requested."
# ------------------------------
# Gradio UI Definition
# ------------------------------
with gr.Blocks(title="LLM Inference with ZeroGPU") as demo:
gr.Markdown("## 🧠 ZeroGPU LLM Inference with Web Search")
gr.Markdown("Interact with the model. Select your model, set your system prompt, and adjust parameters on the left.")
with gr.Row():
with gr.Column(scale=3):
default_model = list(MODELS.keys())[0] if MODELS else "No models available"
model_dropdown = gr.Dropdown(
label="Select Model",
choices=list(MODELS.keys()) if MODELS else [],
value=default_model,
info="Choose from available models."
)
today = datetime.now().strftime('%Y-%m-%d')
default_prompt = f"You are a helpful assistant. Today is {today}. Please leverage the latest web data when responding to queries."
system_prompt_text = gr.Textbox(label="System Prompt",
value=default_prompt,
lines=3,
info="Define the base context for the AI's responses.")
gr.Markdown("### Generation Parameters")
max_tokens_slider = gr.Slider(label="Max Tokens", minimum=64, maximum=1024, value=1024, step=32,
info="Maximum tokens for the response.")
temperature_slider = gr.Slider(label="Temperature", minimum=0.1, maximum=2.0, value=0.7, step=0.1,
info="Controls the randomness of the output.")
top_k_slider = gr.Slider(label="Top-K", minimum=1, maximum=100, value=40, step=1,
info="Limits token candidates to the top-k tokens.")
top_p_slider = gr.Slider(label="Top-P (Nucleus Sampling)", minimum=0.1, maximum=1.0, value=0.95, step=0.05,
info="Limits token candidates to a cumulative probability threshold.")
repeat_penalty_slider = gr.Slider(label="Repetition Penalty", minimum=1.0, maximum=2.0, value=1.1, step=0.1,
info="Penalizes token repetition to improve diversity.")
gr.Markdown("### Web Search Settings")
enable_search_checkbox = gr.Checkbox(label="Enable Web Search", value=False,
info="Include recent search context to improve answers.")
max_results_number = gr.Number(label="Max Search Results", value=6, precision=0,
info="Maximum number of search results to retrieve.")
max_chars_number = gr.Number(label="Max Chars per Result", value=600, precision=0,
info="Maximum characters to retrieve per search result.")
clear_button = gr.Button("Clear Chat")
cancel_button = gr.Button("Cancel Generation")
with gr.Column(scale=7):
chatbot = gr.Chatbot(label="Chat", type="messages")
msg_input = gr.Textbox(label="Your Message", placeholder="Enter your message and press Enter")
search_debug = gr.Markdown(label="Web Search Debug")
def clear_chat():
return [], "", ""
clear_button.click(fn=clear_chat, outputs=[chatbot, msg_input, search_debug])
cancel_button.click(fn=cancel_generation, outputs=search_debug)
# Submission: the chat_response function is now decorated with @spaces.GPU.
msg_input.submit(
fn=chat_response,
inputs=[msg_input, chatbot, system_prompt_text, enable_search_checkbox,
max_results_number, max_chars_number, model_dropdown,
max_tokens_slider, temperature_slider, top_k_slider, top_p_slider, repeat_penalty_slider],
outputs=[chatbot, search_debug],
)
demo.launch()