File size: 11,656 Bytes
776566f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
import gradio as gr
import torch
import os
import time
import subprocess
import tempfile

# --- Try to import ctransformers for GGUF, provide helpful message if not found ---
try:
    from ctransformers import AutoModelForCausalLM as AutoModelForCausalLM_GGUF
    from ctransformers.llm import LLM
    from transformers import AutoTokenizer, AutoModelForCausalLM
    GGUF_AVAILABLE = True
except ImportError:
    GGUF_AVAILABLE = False
    print("WARNING: 'ctransformers' not found. This app relies on it for efficient CPU inference.")
    print("Please install it with: pip install ctransformers transformers")
    from transformers import AutoTokenizer, AutoModelForCausalLM

# --- Configuration for Models and Generation ---
ORIGINAL_MODEL_ID = "HuggingFaceTB/SmolLM2-360M-Instruct"
GGUF_MODEL_ID = "TheBloke/TinyLlama-1.1B-Chat-v1.0-GGUF"
GGUF_MODEL_FILENAME = "tinyllama-1.1b-chat-v1.0.Q4_K_M.gguf"

# --- Generation Parameters ---
MAX_NEW_TOKENS = 256
TEMPERATURE = 0.7
TOP_K = 50
TOP_P = 0.95
DO_SAMPLE = True # This parameter is primarily for Hugging Face transformers.Model.generate()

# Global model and tokenizer
model = None
tokenizer = None
device = "cpu"

# --- Festival Audio Function ---
def speak_text_festival_to_file(text):
    """
    Uses Festival to speak the given text and saves the output to a temporary WAV file.
    Returns the path to the generated audio file, or None on error.
    """
    if not text.strip():
        print("No text provided for Festival to speak.")
        return None

    # Create a temporary WAV file for Festival output
    with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_audio_file:
        audio_filepath = temp_audio_file.name

    try:
        # Festival command to synthesize text and save to a WAV file
        festival_command = f"""
        (set! utt (SayText "{text.replace('"', '\\"')}"))
        (utt.save.wave utt "{audio_filepath}")
        """
        
        # Execute Festival via subprocess
        process = subprocess.Popen(['festival', '--pipe'],
                                   stdin=subprocess.PIPE,
                                   stdout=subprocess.PIPE,
                                   stderr=subprocess.PIPE,
                                   text=True)
        stdout, stderr = process.communicate(input=festival_command)
        
        if process.returncode != 0:
            print(f"Error speaking text with Festival. Return code: {process.returncode}")
            print(f"Festival stderr: {stderr}")
            if os.path.exists(audio_filepath):
                os.remove(audio_filepath)
            return None
        
        if not os.path.exists(audio_filepath) or os.path.getsize(audio_filepath) == 0:
            print(f"Festival did not create a valid WAV file at {audio_filepath}. Stderr: {stderr}")
            if os.path.exists(audio_filepath):
                os.remove(audio_filepath)
            return None
            
        print(f"Audio saved to: {audio_filepath}")
        return audio_filepath

    except FileNotFoundError:
        print("Error: Festival executable not found. Make sure Festival is installed and in your PATH.")
        if os.path.exists(audio_filepath):
            os.remove(audio_filepath)
        return None
    except Exception as e:
        print(f"An unexpected error occurred during Festival processing: {e}")
        if os.path.exists(audio_filepath):
            os.remove(audio_filepath)
        return None

# --- Model Loading Function ---
def load_model_for_zerocpu():
    global model, tokenizer, device

    if GGUF_AVAILABLE:
        print(f"Attempting to load GGUF model '{GGUF_MODEL_ID}' (file: '{GGUF_MODEL_FILENAME}') for ZeroCPU...")
        try:
            model = AutoModelForCausalLM_GGUF.from_pretrained(
                GGUF_MODEL_ID,
                model_file=GGUF_MODEL_FILENAME,
                model_type="llama",
                gpu_layers=0
            )
            tokenizer = AutoTokenizer.from_pretrained(ORIGINAL_MODEL_ID)
            if tokenizer.pad_token is None:
                tokenizer.pad_token = tokenizer.eos_token
            print(f"GGUF model '{GGUF_MODEL_ID}' loaded successfully for CPU.")
            return
        except Exception as e:
            print(f"WARNING: Could not load GGUF model '{GGUF_MODEL_ID}' from '{GGUF_MODEL_FILENAME}': {e}")
            print(f"Falling back to standard Hugging Face model '{ORIGINAL_MODEL_ID}' for CPU (will be slower without GGUF quantization).")
    else:
        print("WARNING: ctransformers is not available. Will load standard Hugging Face model directly.")
    
    print(f"Loading standard Hugging Face model '{ORIGINAL_MODEL_ID}' for CPU...")
    try:
        model = AutoModelForCausalLM.from_pretrained(ORIGINAL_MODEL_ID)
        tokenizer = AutoTokenizer.from_pretrained(ORIGINAL_MODEL_ID)
        if tokenizer.pad_token is None:
            tokenizer.pad_token = tokenizer.eos_token
        model.to(device)
        print(f"Standard model '{ORIGINAL_MODEL_ID}' loaded successfully on CPU.")
    except Exception as e:
        print(f"CRITICAL ERROR: Could not load standard model '{ORIGINAL_MODEL_ID}' on CPU: {e}")
        print("Please ensure the model ID is correct, you have enough RAM, and dependencies are installed.")
        model = None
        tokenizer = None

# --- Inference Function for Gradio Blocks ---
# This function yields tuples for streaming text and then the final audio.
def predict_chat_with_audio_and_streaming(message: str, history: list):
    if model is None or tokenizer is None:
        # history will now be a list of dictionaries, so yield accordingly
        yield history + [{"role": "user", "content": message}, {"role": "assistant", "content": "Error: Model or tokenizer failed to load."}], None
        return

    # Initialize llm_messages with a system message
    llm_messages = [{"role": "system", "content": "You are a friendly chatbot."}]
    
    # Iterate through the history (list of dictionaries) and convert it to the LLM message format
    # The history from Gradio's Chatbot (type='messages') is already in the desired format
    for item in history:
        llm_messages.append(item)
            
    # Add the current user message
    llm_messages.append({"role": "user", "content": message})

    generated_text = ""
    start_time = time.time()

    if GGUF_AVAILABLE and isinstance(model, LLM):
        prompt_input = tokenizer.apply_chat_template(llm_messages, tokenize=False, add_generation_prompt=True)
        for token in model(
            prompt_input,
            max_new_tokens=MAX_NEW_TOKENS,
            temperature=TEMPERATURE,
            top_k=TOP_K,
            top_p=TOP_P,
            repetition_penalty=1.1,
            stop=["User:", "\nUser", "\n#", "\n##", "<|endoftext|>", "<|im_end|>"], 
            stream=True
        ):
            generated_text += token
            # Strip common special tokens before yielding
            cleaned_text = generated_text.replace("<|im_end|>", "").replace("<|endoftext|>", "").strip()
            # Yield the current state of history (list of dictionaries) and an empty audio output for streaming text
            yield history + [{"role": "user", "content": message}, {"role": "assistant", "content": cleaned_text}], None
    else:
        input_text = tokenizer.apply_chat_template(llm_messages, tokenize=False, add_generation_prompt=True)
        inputs = tokenizer.encode(input_text, return_tensors="pt").to(device)
        outputs = model.generate(
            inputs,
            max_length=inputs.shape[-1] + MAX_NEW_TOKENS,
            temperature=TEMPERATURE,
            top_k=TOP_K,
            top_p=TOP_P,
            do_sample=DO_SAMPLE,
            pad_token_id=tokenizer.pad_token_id
        )
        generated_text = tokenizer.decode(outputs[0][inputs.shape[-1]:], skip_special_tokens=True).strip()
        # Strip common special tokens from the final generated text
        generated_text = generated_text.replace("<|im_end|>", "").replace("<|endoftext|>", "").strip()
        # Yield the full text response before audio generation
        yield history + [{"role": "user", "content": message}, {"role": "assistant", "content": generated_text}], None

    end_time = time.time()
    print(f"Inference Time for this turn: {end_time - start_time:.2f} seconds")

    # After streaming is complete and full text is gathered
    audio_file_path = speak_text_festival_to_file(generated_text)
    
    # Yield the final state with audio file
    yield history + [{"role": "user", "content": message}, {"role": "assistant", "content": generated_text}], audio_file_path


# --- Gradio Interface Setup ---
if __name__ == "__main__":
    load_model_for_zerocpu()

    # chatbot_initial_value is already in the correct format for type='messages'
    chatbot_initial_value = [{"role": "assistant", "content": "Hello! I'm an AI assistant. I'm currently running in a CPU-only environment for efficient demonstration. How can I help you today?"}]

    # Gradio Blocks for more flexible layout
    with gr.Blocks(theme="soft", title="SmolLM2-360M-Instruct (or TinyLlama GGUF) on ZeroCPU with Festival TTS") as demo:
        gr.Markdown(
            """
            # SmolLM2-360M-Instruct (or TinyLlama GGUF) on ZeroCPU with Festival TTS
            This Space demonstrates an LLM for efficient CPU-only inference.
            **Note:** For ZeroCPU, this app prioritizes `tinyllama-1.1b-chat-v1.0.Q4_K_M.gguf` (a GGUF-quantized model
            like TinyLlama) due to better CPU performance than `HuggingFaceTB/SmolLM2-360M-Instruct`
            without GGUF. Expect varied responses each run due to randomized generation.
            **Festival TTS:** The chatbot's responses will also be spoken aloud using the local Festival Speech Synthesis System.
            """
        )
        
        # The main Chatbot display component
        chatbot_display = gr.Chatbot(value=chatbot_initial_value, height=500, label="Chat History", type='messages')
        
        # Audio component for the last response
        audio_output = gr.Audio(label="Chatbot Audio Response", type="filepath", autoplay=True)

        # Textbox for user input
        msg = gr.Textbox(placeholder="Ask me a question...", container=False, scale=7)

        # Submit button
        submit_btn = gr.Button("Send")

        # Define example inputs for the textbox
        # For examples, when type='messages', it expects a list of lists where each inner list
        # represents a user message for the input textbox. The output is still the chat history.
        examples_data = [
            ["What is the capital of France?"],
            ["Can you tell me a fun fact about outer space?"],
            ["What's the best way to stay motivated?"],
        ]
        
        # Gradio Examples
        gr.Examples(
            examples=examples_data,
            inputs=[msg],
            fn=predict_chat_with_audio_and_streaming,
            outputs=[chatbot_display, audio_output],
            cache_examples=False,
        )

        # Event listeners for submission
        msg.submit(predict_chat_with_audio_and_streaming,
                   inputs=[msg, chatbot_display],
                   outputs=[chatbot_display, audio_output])
        submit_btn.click(predict_chat_with_audio_and_streaming,
                         inputs=[msg, chatbot_display],
                         outputs=[chatbot_display, audio_output])

        # Clear textbox after submission for better UX
        msg.submit(lambda: "", outputs=[msg])
        submit_btn.click(lambda: "", outputs=[msg])

    demo.launch()