import os import time import re import gc import threading from itertools import islice from datetime import datetime import gradio as gr from llama_cpp import Llama from llama_cpp.llama_speculative import LlamaPromptLookupDecoding from huggingface_hub import hf_hub_download from duckduckgo_search import DDGS # ------------------------------ # Global Cancellation Event # ------------------------------ cancel_event = threading.Event() # ------------------------------ # Model Definitions and Global Variables # ------------------------------ REQUIRED_SPACE_BYTES = 5 * 1024 ** 3 # 5 GB MODELS = { "Taiwan-tinyllama-v1.0-chat (Q8_0)": { "repo_id": "NapYang/DavidLanz-Taiwan-tinyllama-v1.0-chat.GGUF", "filename": "Taiwan-tinyllama-v1.0-chat-Q8_0.gguf", "description": "Taiwan-tinyllama-v1.0-chat (Q8_0)" }, "Llama-3.2-Taiwan-3B-Instruct (Q4_K_M)": { "repo_id": "itlwas/Llama-3.2-Taiwan-3B-Instruct-Q4_K_M-GGUF", "filename": "llama-3.2-taiwan-3b-instruct-q4_k_m.gguf", "description": "Llama-3.2-Taiwan-3B-Instruct (Q4_K_M)" }, "MiniCPM3-4B (Q4_K_M)": { "repo_id": "openbmb/MiniCPM3-4B-GGUF", "filename": "minicpm3-4b-q4_k_m.gguf", "description": "MiniCPM3-4B (Q4_K_M)" }, "Qwen2.5-3B-Instruct (Q4_K_M)": { "repo_id": "Qwen/Qwen2.5-3B-Instruct-GGUF", "filename": "qwen2.5-3b-instruct-q4_k_m.gguf", "description": "Qwen2.5-3B-Instruct (Q4_K_M)" }, "Qwen2.5-7B-Instruct (Q2_K)": { "repo_id": "Qwen/Qwen2.5-7B-Instruct-GGUF", "filename": "qwen2.5-7b-instruct-q2_k.gguf", "description": "Qwen2.5-7B Instruct (Q2_K)" }, "Gemma-3-4B-IT (Q4_K_M)": { "repo_id": "unsloth/gemma-3-4b-it-GGUF", "filename": "gemma-3-4b-it-Q4_K_M.gguf", "description": "Gemma 3 4B IT (Q4_K_M)" }, "Phi-4-mini-Instruct (Q4_K_M)": { "repo_id": "unsloth/Phi-4-mini-instruct-GGUF", "filename": "Phi-4-mini-instruct-Q4_K_M.gguf", "description": "Phi-4 Mini Instruct (Q4_K_M)" }, "Meta-Llama-3.1-8B-Instruct (Q2_K)": { "repo_id": "MaziyarPanahi/Meta-Llama-3.1-8B-Instruct-GGUF", "filename": "Meta-Llama-3.1-8B-Instruct.Q2_K.gguf", "description": "Meta-Llama-3.1-8B-Instruct (Q2_K)" }, "DeepSeek-R1-Distill-Llama-8B (Q2_K)": { "repo_id": "unsloth/DeepSeek-R1-Distill-Llama-8B-GGUF", "filename": "DeepSeek-R1-Distill-Llama-8B-Q2_K.gguf", "description": "DeepSeek-R1-Distill-Llama-8B (Q2_K)" }, "Mistral-7B-Instruct-v0.3 (IQ3_XS)": { "repo_id": "MaziyarPanahi/Mistral-7B-Instruct-v0.3-GGUF", "filename": "Mistral-7B-Instruct-v0.3.IQ3_XS.gguf", "description": "Mistral-7B-Instruct-v0.3 (IQ3_XS)" }, "Qwen2.5-Coder-7B-Instruct (Q2_K)": { "repo_id": "Qwen/Qwen2.5-Coder-7B-Instruct-GGUF", "filename": "qwen2.5-coder-7b-instruct-q2_k.gguf", "description": "Qwen2.5-Coder-7B-Instruct (Q2_K)" }, } LOADED_MODELS = {} CURRENT_MODEL_NAME = None # ------------------------------ # Model Loading Helper Functions # ------------------------------ def try_load_model(model_path): try: return Llama( model_path=model_path, n_ctx=4096, n_threads=2, n_threads_batch=1, n_batch=256, n_gpu_layers=0, use_mlock=True, use_mmap=True, verbose=False, logits_all=True, draft_model=LlamaPromptLookupDecoding(num_pred_tokens=2), ) except Exception as e: return str(e) def download_model(selected_model): hf_hub_download( repo_id=selected_model["repo_id"], filename=selected_model["filename"], local_dir="./models", local_dir_use_symlinks=False, ) def validate_or_download_model(selected_model): model_path = os.path.join("models", selected_model["filename"]) os.makedirs("models", exist_ok=True) if not os.path.exists(model_path): download_model(selected_model) result = try_load_model(model_path) if isinstance(result, str): try: os.remove(model_path) except Exception: pass download_model(selected_model) result = try_load_model(model_path) if isinstance(result, str): raise Exception(f"Model load failed: {result}") return result def load_model(model_name): global LOADED_MODELS, CURRENT_MODEL_NAME if model_name in LOADED_MODELS: return LOADED_MODELS[model_name] selected_model = MODELS[model_name] model = validate_or_download_model(selected_model) LOADED_MODELS[model_name] = model CURRENT_MODEL_NAME = model_name return model # ------------------------------ # Web Search Context Retrieval Function # ------------------------------ def retrieve_context(query, max_results=6, max_chars_per_result=600): try: with DDGS() as ddgs: results = list(islice(ddgs.text(query, region="wt-wt", safesearch="off", timelimit="y"), max_results)) context = "" for i, result in enumerate(results, start=1): title = result.get("title", "No Title") snippet = result.get("body", "")[:max_chars_per_result] context += f"Result {i}:\nTitle: {title}\nSnippet: {snippet}\n\n" return context.strip() except Exception: return "" # ------------------------------ # Chat Response Generation (Streaming) with Cancellation # ------------------------------ def chat_response(user_message, chat_history, system_prompt, enable_search, max_results, max_chars, model_name, max_tokens, temperature, top_k, top_p, repeat_penalty): """ Generator function that: - Uses the chat history (list of dicts) from the Chatbot. - Appends the new user message. - Optionally retrieves web search context. - Streams the assistant response token-by-token. - Checks for cancellation. """ # Reset the cancellation event. cancel_event.clear() # Prepare internal history. internal_history = list(chat_history) if chat_history else [] internal_history.append({"role": "user", "content": user_message}) # Retrieve web search context (with debug feedback). debug_message = "" if enable_search: debug_message = "Initiating web search..." yield internal_history, debug_message search_result = [""] def do_search(): search_result[0] = retrieve_context(user_message, max_results, max_chars) search_thread = threading.Thread(target=do_search) search_thread.start() search_thread.join(timeout=2) retrieved_context = search_result[0] if retrieved_context: debug_message = f"Web search results:\n\n{retrieved_context}" else: debug_message = "Web search returned no results or timed out." else: retrieved_context = "" debug_message = "Web search disabled." # Augment prompt. if enable_search and retrieved_context: augmented_user_input = ( f"{system_prompt.strip()}\n\n" "Use the following recent web search context to help answer the query:\n\n" f"{retrieved_context}\n\n" f"User Query: {user_message}" ) else: augmented_user_input = f"{system_prompt.strip()}\n\nUser Query: {user_message}" # Build final prompt messages. messages = internal_history[:-1] + [{"role": "user", "content": augmented_user_input}] # Load the model. model = load_model(model_name) # Add an empty assistant message. internal_history.append({"role": "assistant", "content": ""}) assistant_message = "" try: stream = model.create_chat_completion( messages=messages, max_tokens=max_tokens, temperature=temperature, top_k=top_k, top_p=top_p, repeat_penalty=repeat_penalty, stream=True, ) for chunk in stream: # Check if a cancellation has been requested. if cancel_event.is_set(): assistant_message += "\n\n[Response generation cancelled by user]" internal_history[-1]["content"] = assistant_message yield internal_history, debug_message break if "choices" in chunk: delta = chunk["choices"][0]["delta"].get("content", "") assistant_message += delta internal_history[-1]["content"] = assistant_message yield internal_history, debug_message if chunk["choices"][0].get("finish_reason", ""): break except Exception as e: internal_history[-1]["content"] = f"Error: {e}" yield internal_history, debug_message gc.collect() # ------------------------------ # Cancel Function # ------------------------------ def cancel_generation(): cancel_event.set() return "Cancellation requested." # ------------------------------ # Gradio UI Definition # ------------------------------ with gr.Blocks(title="Multi-GGUF LLM Inference") as demo: gr.Markdown("## 🧠 Multi-GGUF LLM Inference with Web Search") gr.Markdown("Interact with the model. Select your model, set your system prompt, and adjust parameters on the left.") with gr.Row(): with gr.Column(scale=3): default_model = list(MODELS.keys())[0] if MODELS else "No models available" model_dropdown = gr.Dropdown( label="Select Model", choices=list(MODELS.keys()) if MODELS else [], value=default_model, info="Choose from available models." ) today = datetime.now().strftime('%Y-%m-%d') default_prompt = f"You are a helpful assistant. Today is {today}. Please leverage the latest web data when responding to queries." system_prompt_text = gr.Textbox(label="System Prompt", value=default_prompt, lines=3, info="Define the base context for the AI's responses.") gr.Markdown("### Generation Parameters") max_tokens_slider = gr.Slider(label="Max Tokens", minimum=64, maximum=1024, value=1024, step=32, info="Maximum tokens for the response.") temperature_slider = gr.Slider(label="Temperature", minimum=0.1, maximum=2.0, value=0.7, step=0.1, info="Controls the randomness of the output.") top_k_slider = gr.Slider(label="Top-K", minimum=1, maximum=100, value=40, step=1, info="Limits token candidates to the top-k tokens.") top_p_slider = gr.Slider(label="Top-P (Nucleus Sampling)", minimum=0.1, maximum=1.0, value=0.95, step=0.05, info="Limits token candidates to a cumulative probability threshold.") repeat_penalty_slider = gr.Slider(label="Repetition Penalty", minimum=1.0, maximum=2.0, value=1.1, step=0.1, info="Penalizes token repetition to improve diversity.") gr.Markdown("### Web Search Settings") enable_search_checkbox = gr.Checkbox(label="Enable Web Search", value=False, info="Include recent search context to improve answers.") max_results_number = gr.Number(label="Max Search Results", value=6, precision=0, info="Maximum number of search results to retrieve.") max_chars_number = gr.Number(label="Max Chars per Result", value=600, precision=0, info="Maximum characters to retrieve per search result.") clear_button = gr.Button("Clear Chat") cancel_button = gr.Button("Cancel Generation") with gr.Column(scale=7): chatbot = gr.Chatbot(label="Chat", type="messages") msg_input = gr.Textbox(label="Your Message", placeholder="Enter your message and press Enter") search_debug = gr.Markdown(label="Web Search Debug") def clear_chat(): return [], "", "" clear_button.click(fn=clear_chat, outputs=[chatbot, msg_input, search_debug]) cancel_button.click(fn=cancel_generation, outputs=search_debug) # Submission that returns conversation and debug info. msg_input.submit( fn=chat_response, inputs=[msg_input, chatbot, system_prompt_text, enable_search_checkbox, max_results_number, max_chars_number, model_dropdown, max_tokens_slider, temperature_slider, top_k_slider, top_p_slider, repeat_penalty_slider], outputs=[chatbot, search_debug], # Uncomment streaming=True if supported. # streaming=True, ) demo.launch()