Spaces:
Running
Running
import gradio as gr | |
import torch | |
import os | |
import time | |
import subprocess | |
import tempfile | |
# --- Try to import ctransformers for GGUF, provide helpful message if not found --- | |
try: | |
from ctransformers import AutoModelForCausalLM as AutoModelForCausalLM_GGUF | |
from ctransformers.llm import LLM | |
from transformers import AutoTokenizer, AutoModelForCausalLM | |
GGUF_AVAILABLE = True | |
except ImportError: | |
GGUF_AVAILABLE = False | |
print("WARNING: 'ctransformers' not found. This app relies on it for efficient CPU inference.") | |
print("Please install it with: pip install ctransformers transformers") | |
from transformers import AutoTokenizer, AutoModelForCausalLM | |
# --- Configuration for Models and Generation --- | |
ORIGINAL_MODEL_ID = "HuggingFaceTB/SmolLM2-360M-Instruct" | |
GGUF_MODEL_ID = "TheBloke/TinyLlama-1.1B-Chat-v1.0-GGUF" | |
GGUF_MODEL_FILENAME = "tinyllama-1.1b-chat-v1.0.Q4_K_M.gguf" | |
# --- Generation Parameters --- | |
MAX_NEW_TOKENS = 256 | |
TEMPERATURE = 0.7 | |
TOP_K = 50 | |
TOP_P = 0.95 | |
DO_SAMPLE = True # This parameter is primarily for Hugging Face transformers.Model.generate() | |
# Global model and tokenizer | |
model = None | |
tokenizer = None | |
device = "cpu" | |
# --- Festival Audio Function --- | |
def speak_text_festival_to_file(text): | |
""" | |
Uses Festival to speak the given text and saves the output to a temporary WAV file. | |
Returns the path to the generated audio file, or None on error. | |
""" | |
if not text.strip(): | |
print("No text provided for Festival to speak.") | |
return None | |
# Create a temporary WAV file for Festival output | |
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_audio_file: | |
audio_filepath = temp_audio_file.name | |
try: | |
# Festival command to synthesize text and save to a WAV file | |
festival_command = f""" | |
(set! utt (SayText "{text.replace('"', '\\"')}")) | |
(utt.save.wave utt "{audio_filepath}") | |
""" | |
# Execute Festival via subprocess | |
process = subprocess.Popen(['festival', '--pipe'], | |
stdin=subprocess.PIPE, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
text=True) | |
stdout, stderr = process.communicate(input=festival_command) | |
if process.returncode != 0: | |
print(f"Error speaking text with Festival. Return code: {process.returncode}") | |
print(f"Festival stderr: {stderr}") | |
if os.path.exists(audio_filepath): | |
os.remove(audio_filepath) | |
return None | |
if not os.path.exists(audio_filepath) or os.path.getsize(audio_filepath) == 0: | |
print(f"Festival did not create a valid WAV file at {audio_filepath}. Stderr: {stderr}") | |
if os.path.exists(audio_filepath): | |
os.remove(audio_filepath) | |
return None | |
print(f"Audio saved to: {audio_filepath}") | |
return audio_filepath | |
except FileNotFoundError: | |
print("Error: Festival executable not found. Make sure Festival is installed and in your PATH.") | |
if os.path.exists(audio_filepath): | |
os.remove(audio_filepath) | |
return None | |
except Exception as e: | |
print(f"An unexpected error occurred during Festival processing: {e}") | |
if os.path.exists(audio_filepath): | |
os.remove(audio_filepath) | |
return None | |
# --- Model Loading Function --- | |
def load_model_for_zerocpu(): | |
global model, tokenizer, device | |
if GGUF_AVAILABLE: | |
print(f"Attempting to load GGUF model '{GGUF_MODEL_ID}' (file: '{GGUF_MODEL_FILENAME}') for ZeroCPU...") | |
try: | |
model = AutoModelForCausalLM_GGUF.from_pretrained( | |
GGUF_MODEL_ID, | |
model_file=GGUF_MODEL_FILENAME, | |
model_type="llama", | |
gpu_layers=0 | |
) | |
tokenizer = AutoTokenizer.from_pretrained(ORIGINAL_MODEL_ID) | |
if tokenizer.pad_token is None: | |
tokenizer.pad_token = tokenizer.eos_token | |
print(f"GGUF model '{GGUF_MODEL_ID}' loaded successfully for CPU.") | |
return | |
except Exception as e: | |
print(f"WARNING: Could not load GGUF model '{GGUF_MODEL_ID}' from '{GGUF_MODEL_FILENAME}': {e}") | |
print(f"Falling back to standard Hugging Face model '{ORIGINAL_MODEL_ID}' for CPU (will be slower without GGUF quantization).") | |
else: | |
print("WARNING: ctransformers is not available. Will load standard Hugging Face model directly.") | |
print(f"Loading standard Hugging Face model '{ORIGINAL_MODEL_ID}' for CPU...") | |
try: | |
model = AutoModelForCausalLM.from_pretrained(ORIGINAL_MODEL_ID) | |
tokenizer = AutoTokenizer.from_pretrained(ORIGINAL_MODEL_ID) | |
if tokenizer.pad_token is None: | |
tokenizer.pad_token = tokenizer.eos_token | |
model.to(device) | |
print(f"Standard model '{ORIGINAL_MODEL_ID}' loaded successfully on CPU.") | |
except Exception as e: | |
print(f"CRITICAL ERROR: Could not load standard model '{ORIGINAL_MODEL_ID}' on CPU: {e}") | |
print("Please ensure the model ID is correct, you have enough RAM, and dependencies are installed.") | |
model = None | |
tokenizer = None | |
# --- Inference Function for Gradio Blocks --- | |
# This function yields tuples for streaming text and then the final audio. | |
def predict_chat_with_audio_and_streaming(message: str, history: list): | |
if model is None or tokenizer is None: | |
# history will now be a list of dictionaries, so yield accordingly | |
yield history + [{"role": "user", "content": message}, {"role": "assistant", "content": "Error: Model or tokenizer failed to load."}], None | |
return | |
# Initialize llm_messages with a system message | |
llm_messages = [{"role": "system", "content": "You are a friendly chatbot."}] | |
# Iterate through the history (list of dictionaries) and convert it to the LLM message format | |
# The history from Gradio's Chatbot (type='messages') is already in the desired format | |
for item in history: | |
llm_messages.append(item) | |
# Add the current user message | |
llm_messages.append({"role": "user", "content": message}) | |
generated_text = "" | |
start_time = time.time() | |
if GGUF_AVAILABLE and isinstance(model, LLM): | |
prompt_input = tokenizer.apply_chat_template(llm_messages, tokenize=False, add_generation_prompt=True) | |
for token in model( | |
prompt_input, | |
max_new_tokens=MAX_NEW_TOKENS, | |
temperature=TEMPERATURE, | |
top_k=TOP_K, | |
top_p=TOP_P, | |
repetition_penalty=1.1, | |
stop=["User:", "\nUser", "\n#", "\n##", "<|endoftext|>", "<|im_end|>"], | |
stream=True | |
): | |
generated_text += token | |
# Strip common special tokens before yielding | |
cleaned_text = generated_text.replace("<|im_end|>", "").replace("<|endoftext|>", "").strip() | |
# Yield the current state of history (list of dictionaries) and an empty audio output for streaming text | |
yield history + [{"role": "user", "content": message}, {"role": "assistant", "content": cleaned_text}], None | |
else: | |
input_text = tokenizer.apply_chat_template(llm_messages, tokenize=False, add_generation_prompt=True) | |
inputs = tokenizer.encode(input_text, return_tensors="pt").to(device) | |
outputs = model.generate( | |
inputs, | |
max_length=inputs.shape[-1] + MAX_NEW_TOKENS, | |
temperature=TEMPERATURE, | |
top_k=TOP_K, | |
top_p=TOP_P, | |
do_sample=DO_SAMPLE, | |
pad_token_id=tokenizer.pad_token_id | |
) | |
generated_text = tokenizer.decode(outputs[0][inputs.shape[-1]:], skip_special_tokens=True).strip() | |
# Strip common special tokens from the final generated text | |
generated_text = generated_text.replace("<|im_end|>", "").replace("<|endoftext|>", "").strip() | |
# Yield the full text response before audio generation | |
yield history + [{"role": "user", "content": message}, {"role": "assistant", "content": generated_text}], None | |
end_time = time.time() | |
print(f"Inference Time for this turn: {end_time - start_time:.2f} seconds") | |
# After streaming is complete and full text is gathered | |
audio_file_path = speak_text_festival_to_file(generated_text) | |
# Yield the final state with audio file | |
yield history + [{"role": "user", "content": message}, {"role": "assistant", "content": generated_text}], audio_file_path | |
# --- Gradio Interface Setup --- | |
if __name__ == "__main__": | |
load_model_for_zerocpu() | |
# chatbot_initial_value is already in the correct format for type='messages' | |
chatbot_initial_value = [{"role": "assistant", "content": "Hello! I'm an AI assistant. I'm currently running in a CPU-only environment for efficient demonstration. How can I help you today?"}] | |
# Gradio Blocks for more flexible layout | |
with gr.Blocks(theme="soft", title="SmolLM2-360M-Instruct (or TinyLlama GGUF) on ZeroCPU with Festival TTS") as demo: | |
gr.Markdown( | |
""" | |
# SmolLM2-360M-Instruct (or TinyLlama GGUF) on ZeroCPU with Festival TTS | |
This Space demonstrates an LLM for efficient CPU-only inference. | |
**Note:** For ZeroCPU, this app prioritizes `tinyllama-1.1b-chat-v1.0.Q4_K_M.gguf` (a GGUF-quantized model | |
like TinyLlama) due to better CPU performance than `HuggingFaceTB/SmolLM2-360M-Instruct` | |
without GGUF. Expect varied responses each run due to randomized generation. | |
**Festival TTS:** The chatbot's responses will also be spoken aloud using the local Festival Speech Synthesis System. | |
""" | |
) | |
# The main Chatbot display component | |
chatbot_display = gr.Chatbot(value=chatbot_initial_value, height=500, label="Chat History", type='messages') | |
# Audio component for the last response | |
audio_output = gr.Audio(label="Chatbot Audio Response", type="filepath", autoplay=True) | |
# Textbox for user input | |
msg = gr.Textbox(placeholder="Ask me a question...", container=False, scale=7) | |
# Submit button | |
submit_btn = gr.Button("Send") | |
# Define example inputs for the textbox | |
# For examples, when type='messages', it expects a list of lists where each inner list | |
# represents a user message for the input textbox. The output is still the chat history. | |
examples_data = [ | |
["What is the capital of France?"], | |
["Can you tell me a fun fact about outer space?"], | |
["What's the best way to stay motivated?"], | |
] | |
# Gradio Examples | |
gr.Examples( | |
examples=examples_data, | |
inputs=[msg], | |
fn=predict_chat_with_audio_and_streaming, | |
outputs=[chatbot_display, audio_output], | |
cache_examples=False, | |
) | |
# Event listeners for submission | |
msg.submit(predict_chat_with_audio_and_streaming, | |
inputs=[msg, chatbot_display], | |
outputs=[chatbot_display, audio_output]) | |
submit_btn.click(predict_chat_with_audio_and_streaming, | |
inputs=[msg, chatbot_display], | |
outputs=[chatbot_display, audio_output]) | |
# Clear textbox after submission for better UX | |
msg.submit(lambda: "", outputs=[msg]) | |
submit_btn.click(lambda: "", outputs=[msg]) | |
demo.launch() | |