Spaces:
Running
Running
| import os | |
| import time | |
| import gc | |
| import threading | |
| from itertools import islice | |
| from datetime import datetime | |
| import gradio as gr | |
| import torch | |
| from transformers import AutoModelForCausalLM, AutoTokenizer | |
| from duckduckgo_search import DDGS | |
| import spaces # Import spaces early to enable ZeroGPU support | |
| # Disable GPU visibility if you wish to force CPU usage outside of GPU functions | |
| # (Not strictly needed for ZeroGPU as the decorator handles allocation) | |
| # os.environ["CUDA_VISIBLE_DEVICES"] = "" | |
| # ------------------------------ | |
| # Global Cancellation Event | |
| # ------------------------------ | |
| cancel_event = threading.Event() | |
| # ------------------------------ | |
| # Torch-Compatible Model Definitions with Adjusted Descriptions | |
| # ------------------------------ | |
| MODELS = { | |
| "Taiwan-tinyllama-v1.0-chat (Q8_0)": { | |
| "repo_id": "DavidLanz/Taiwan-tinyllama-v1.0-chat", | |
| "description": "Taiwan-tinyllama-v1.0-chat (Q8_0) – Torch-compatible version converted from GGUF." | |
| }, | |
| "Llama-3.2-Taiwan-3B-Instruct (Q4_K_M)": { | |
| "repo_id": "https://huggingface.co/lianghsun/Llama-3.2-Taiwan-3B-Instruct", | |
| "description": "Llama-3.2-Taiwan-3B-Instruct (Q4_K_M) – Torch-compatible version converted from GGUF." | |
| }, | |
| "MiniCPM3-4B (Q4_K_M)": { | |
| "repo_id": "openbmb/MiniCPM3-4B", | |
| "description": "MiniCPM3-4B (Q4_K_M) – Torch-compatible version converted from GGUF." | |
| }, | |
| "Qwen2.5-3B-Instruct (Q4_K_M)": { | |
| "repo_id": "Qwen/Qwen2.5-3B-Instruct", | |
| "description": "Qwen2.5-3B-Instruct (Q4_K_M) – Torch-compatible version converted from GGUF." | |
| }, | |
| "Qwen2.5-7B-Instruct (Q2_K)": { | |
| "repo_id": "Qwen/Qwen2.5-7B-Instruct", | |
| "description": "Qwen2.5-7B-Instruct (Q2_K) – Torch-compatible version converted from GGUF." | |
| }, | |
| "Gemma-3-4B-IT (Q4_K_M)": { | |
| "repo_id": "unsloth/gemma-3-4b-it", | |
| "description": "Gemma-3-4B-IT (Q4_K_M) – Torch-compatible version converted from GGUF." | |
| }, | |
| "Phi-4-mini-Instruct (Q4_K_M)": { | |
| "repo_id": "unsloth/Phi-4-mini-instruct", | |
| "description": "Phi-4-mini-Instruct (Q4_K_M) – Torch-compatible version converted from GGUF." | |
| }, | |
| "Meta-Llama-3.1-8B-Instruct (Q2_K)": { | |
| "repo_id": "MaziyarPanahi/Meta-Llama-3.1-8B-Instruct", | |
| "description": "Meta-Llama-3.1-8B-Instruct (Q2_K) – Torch-compatible version converted from GGUF." | |
| }, | |
| "DeepSeek-R1-Distill-Llama-8B (Q2_K)": { | |
| "repo_id": "unsloth/DeepSeek-R1-Distill-Llama-8B", | |
| "description": "DeepSeek-R1-Distill-Llama-8B (Q2_K) – Torch-compatible version converted from GGUF." | |
| }, | |
| "Mistral-7B-Instruct-v0.3 (IQ3_XS)": { | |
| "repo_id": "MaziyarPanahi/Mistral-7B-Instruct-v0.3", | |
| "description": "Mistral-7B-Instruct-v0.3 (IQ3_XS) – Torch-compatible version converted from GGUF." | |
| }, | |
| "Qwen2.5-Coder-7B-Instruct (Q2_K)": { | |
| "repo_id": "Qwen/Qwen2.5-Coder-7B-Instruct", | |
| "description": "Qwen2.5-Coder-7B-Instruct (Q2_K) – Torch-compatible version converted from GGUF." | |
| }, | |
| } | |
| LOADED_MODELS = {} | |
| CURRENT_MODEL_NAME = None | |
| # ------------------------------ | |
| # Model Loading Helper Function (PyTorch/Transformers) | |
| # ------------------------------ | |
| def load_model(model_name): | |
| global LOADED_MODELS, CURRENT_MODEL_NAME | |
| if model_name in LOADED_MODELS: | |
| return LOADED_MODELS[model_name] | |
| selected_model = MODELS[model_name] | |
| # Load the model and tokenizer using Transformers. | |
| model = AutoModelForCausalLM.from_pretrained(selected_model["repo_id"], trust_remote_code=True) | |
| tokenizer = AutoTokenizer.from_pretrained(selected_model["repo_id"], trust_remote_code=True) | |
| LOADED_MODELS[model_name] = (model, tokenizer) | |
| CURRENT_MODEL_NAME = model_name | |
| return model, tokenizer | |
| # ------------------------------ | |
| # Web Search Context Retrieval Function | |
| # ------------------------------ | |
| def retrieve_context(query, max_results=6, max_chars_per_result=600): | |
| try: | |
| with DDGS() as ddgs: | |
| results = list(islice(ddgs.text(query, region="wt-wt", safesearch="off", timelimit="y"), max_results)) | |
| context = "" | |
| for i, result in enumerate(results, start=1): | |
| title = result.get("title", "No Title") | |
| snippet = result.get("body", "")[:max_chars_per_result] | |
| context += f"Result {i}:\nTitle: {title}\nSnippet: {snippet}\n\n" | |
| return context.strip() | |
| except Exception: | |
| return "" | |
| # ------------------------------ | |
| # Chat Response Generation with ZeroGPU | |
| # ------------------------------ | |
| # This decorator triggers GPU allocation for up to 60 seconds. | |
| def chat_response(user_message, chat_history, system_prompt, enable_search, | |
| max_results, max_chars, model_name, max_tokens, temperature, top_k, top_p, repeat_penalty): | |
| # Reset the cancellation event. | |
| cancel_event.clear() | |
| # Prepare internal chat history. | |
| internal_history = list(chat_history) if chat_history else [] | |
| internal_history.append({"role": "user", "content": user_message}) | |
| # Retrieve web search context (with debug feedback). | |
| debug_message = "" | |
| if enable_search: | |
| debug_message = "Initiating web search..." | |
| yield internal_history, debug_message | |
| search_result = [""] | |
| def do_search(): | |
| search_result[0] = retrieve_context(user_message, max_results, max_chars) | |
| search_thread = threading.Thread(target=do_search) | |
| search_thread.start() | |
| search_thread.join(timeout=2) | |
| retrieved_context = search_result[0] | |
| if retrieved_context: | |
| debug_message = f"Web search results:\n\n{retrieved_context}" | |
| else: | |
| debug_message = "Web search returned no results or timed out." | |
| else: | |
| retrieved_context = "" | |
| debug_message = "Web search disabled." | |
| # Augment the prompt with search context if available. | |
| if enable_search and retrieved_context: | |
| augmented_user_input = ( | |
| f"{system_prompt.strip()}\n\n" | |
| "Use the following recent web search context to help answer the query:\n\n" | |
| f"{retrieved_context}\n\n" | |
| f"User Query: {user_message}" | |
| ) | |
| else: | |
| augmented_user_input = f"{system_prompt.strip()}\n\nUser Query: {user_message}" | |
| # Append a placeholder for the assistant's response. | |
| internal_history.append({"role": "assistant", "content": ""}) | |
| try: | |
| # Load the model and tokenizer. | |
| model, tokenizer = load_model(model_name) | |
| # Move the model to GPU (using .to('cuda')) inside the GPU-decorated function. | |
| model = model.to('cuda') | |
| # Tokenize the augmented prompt and move input tensors to GPU. | |
| input_ids = tokenizer(augmented_user_input, return_tensors="pt").input_ids.to('cuda') | |
| with torch.no_grad(): | |
| output_ids = model.generate( | |
| input_ids, | |
| max_new_tokens=max_tokens, | |
| temperature=temperature, | |
| top_k=top_k, | |
| top_p=top_p, | |
| repetition_penalty=repeat_penalty, | |
| do_sample=True | |
| ) | |
| # Decode the generated tokens. | |
| generated_text = tokenizer.decode(output_ids[0], skip_special_tokens=True) | |
| # Remove the original prompt to isolate the assistant's reply. | |
| assistant_text = generated_text[len(augmented_user_input):].strip() | |
| # Simulate streaming output by yielding word-by-word. | |
| words = assistant_text.split() | |
| assistant_message = "" | |
| for word in words: | |
| if cancel_event.is_set(): | |
| assistant_message += "\n\n[Response generation cancelled by user]" | |
| internal_history[-1]["content"] = assistant_message | |
| yield internal_history, debug_message | |
| return | |
| assistant_message += word + " " | |
| internal_history[-1]["content"] = assistant_message | |
| yield internal_history, debug_message | |
| time.sleep(0.05) # Short delay to simulate streaming | |
| except Exception as e: | |
| internal_history[-1]["content"] = f"Error: {e}" | |
| yield internal_history, debug_message | |
| gc.collect() | |
| # ------------------------------ | |
| # Cancel Function | |
| # ------------------------------ | |
| def cancel_generation(): | |
| cancel_event.set() | |
| return "Cancellation requested." | |
| # ------------------------------ | |
| # Gradio UI Definition | |
| # ------------------------------ | |
| with gr.Blocks(title="LLM Inference with ZeroGPU") as demo: | |
| gr.Markdown("## 🧠 ZeroGPU LLM Inference with Web Search") | |
| gr.Markdown("Interact with the model. Select your model, set your system prompt, and adjust parameters on the left.") | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| default_model = list(MODELS.keys())[0] if MODELS else "No models available" | |
| model_dropdown = gr.Dropdown( | |
| label="Select Model", | |
| choices=list(MODELS.keys()) if MODELS else [], | |
| value=default_model, | |
| info="Choose from available models." | |
| ) | |
| today = datetime.now().strftime('%Y-%m-%d') | |
| default_prompt = f"You are a helpful assistant. Today is {today}. Please leverage the latest web data when responding to queries." | |
| system_prompt_text = gr.Textbox(label="System Prompt", | |
| value=default_prompt, | |
| lines=3, | |
| info="Define the base context for the AI's responses.") | |
| gr.Markdown("### Generation Parameters") | |
| max_tokens_slider = gr.Slider(label="Max Tokens", minimum=64, maximum=1024, value=1024, step=32, | |
| info="Maximum tokens for the response.") | |
| temperature_slider = gr.Slider(label="Temperature", minimum=0.1, maximum=2.0, value=0.7, step=0.1, | |
| info="Controls the randomness of the output.") | |
| top_k_slider = gr.Slider(label="Top-K", minimum=1, maximum=100, value=40, step=1, | |
| info="Limits token candidates to the top-k tokens.") | |
| top_p_slider = gr.Slider(label="Top-P (Nucleus Sampling)", minimum=0.1, maximum=1.0, value=0.95, step=0.05, | |
| info="Limits token candidates to a cumulative probability threshold.") | |
| repeat_penalty_slider = gr.Slider(label="Repetition Penalty", minimum=1.0, maximum=2.0, value=1.1, step=0.1, | |
| info="Penalizes token repetition to improve diversity.") | |
| gr.Markdown("### Web Search Settings") | |
| enable_search_checkbox = gr.Checkbox(label="Enable Web Search", value=False, | |
| info="Include recent search context to improve answers.") | |
| max_results_number = gr.Number(label="Max Search Results", value=6, precision=0, | |
| info="Maximum number of search results to retrieve.") | |
| max_chars_number = gr.Number(label="Max Chars per Result", value=600, precision=0, | |
| info="Maximum characters to retrieve per search result.") | |
| clear_button = gr.Button("Clear Chat") | |
| cancel_button = gr.Button("Cancel Generation") | |
| with gr.Column(scale=7): | |
| chatbot = gr.Chatbot(label="Chat", type="messages") | |
| msg_input = gr.Textbox(label="Your Message", placeholder="Enter your message and press Enter") | |
| search_debug = gr.Markdown(label="Web Search Debug") | |
| def clear_chat(): | |
| return [], "", "" | |
| clear_button.click(fn=clear_chat, outputs=[chatbot, msg_input, search_debug]) | |
| cancel_button.click(fn=cancel_generation, outputs=search_debug) | |
| # Submission: the chat_response function is now decorated with @spaces.GPU. | |
| msg_input.submit( | |
| fn=chat_response, | |
| inputs=[msg_input, chatbot, system_prompt_text, enable_search_checkbox, | |
| max_results_number, max_chars_number, model_dropdown, | |
| max_tokens_slider, temperature_slider, top_k_slider, top_p_slider, repeat_penalty_slider], | |
| outputs=[chatbot, search_debug], | |
| ) | |
| demo.launch() | |