File size: 3,961 Bytes
0c1b8f7
0ba4242
 
0c1b8f7
10cb780
0c1b8f7
ea9ba29
 
0ba4242
0c1b8f7
0ba4242
bce38cc
0ba4242
806d92e
bce38cc
0ba4242
 
 
 
ab6b5e5
0ba4242
 
 
 
 
ab6b5e5
0ba4242
ab6b5e5
0ba4242
 
 
ab6b5e5
 
 
761375e
0ba4242
 
 
 
 
 
 
47473ae
c863607
ea9ba29
 
 
 
 
 
 
 
ff77d8a
ab6b5e5
7d0f94b
 
0ba4242
 
 
 
 
ea9ba29
 
 
 
 
7d0f94b
0ba4242
7d0f94b
 
 
 
 
0ba4242
a29c2e7
0ba4242
7d0f94b
0ba4242
 
 
 
 
 
 
 
 
 
 
be810f5
0ba4242
 
 
7a2c608
ab6b5e5
ea9ba29
 
 
 
 
 
 
 
bce38cc
0ba4242
 
 
ea9ba29
 
 
 
 
0ba4242
 
 
db9acad
ea9ba29
dc8101f
33cb38e
ea9ba29
0ba4242
 
7d0f94b
0ba4242
 
 
 
47473ae
0c1b8f7
56cff44
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import os
from collections.abc import Iterator
from threading import Thread
import gradio as gr
import spaces
import torch
import edge_tts
import asyncio
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer

DESCRIPTION = """
# QwQ Tiny
"""

css ='''
h1 {
  text-align: center;
  display: block;
}

#duplicate-button {
  margin: auto;
  color: #fff;
  background: #1565c0;
  border-radius: 100vh;
}
'''

MAX_MAX_NEW_TOKENS = 2048
DEFAULT_MAX_NEW_TOKENS = 1024
MAX_INPUT_TOKEN_LENGTH = int(os.getenv("MAX_INPUT_TOKEN_LENGTH", "4096"))

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

model_id = "prithivMLmods/FastThink-0.5B-Tiny"
tokenizer = AutoTokenizer.from_pretrained(model_id)
model = AutoModelForCausalLM.from_pretrained(
    model_id,
    device_map="auto",
    torch_dtype=torch.bfloat16,
)
model.eval()


async def text_to_speech(text: str, output_file="output.mp3"):
    """Convert text to speech using Edge TTS and save as MP3"""
    voice = "en-US-JennyNeural"  # Change this to your preferred voice
    communicate = edge_tts.Communicate(text, voice)
    await communicate.save(output_file)
    return output_file


@spaces.GPU
def generate(
    message: str,
    chat_history: list[dict],
    max_new_tokens: int = 1024,
    temperature: float = 0.6,
    top_p: float = 0.9,
    top_k: int = 50,
    repetition_penalty: float = 1.2,
):
    """Generates chatbot response and handles TTS requests"""
    is_tts = message.strip().lower().startswith("@tts")
    message = message.replace("@tts", "").strip()

    conversation = [*chat_history, {"role": "user", "content": message}]

    input_ids = tokenizer.apply_chat_template(conversation, add_generation_prompt=True, return_tensors="pt")
    if input_ids.shape[1] > MAX_INPUT_TOKEN_LENGTH:
        input_ids = input_ids[:, -MAX_INPUT_TOKEN_LENGTH:]
        gr.Warning(f"Trimmed input from conversation as it was longer than {MAX_INPUT_TOKEN_LENGTH} tokens.")
    input_ids = input_ids.to(model.device)

    streamer = TextIteratorStreamer(tokenizer, timeout=20.0, skip_prompt=True, skip_special_tokens=True)
    generate_kwargs = dict(
        {"input_ids": input_ids},
        streamer=streamer,
        max_new_tokens=max_new_tokens,
        do_sample=True,
        top_p=top_p,
        top_k=top_k,
        temperature=temperature,
        num_beams=1,
        repetition_penalty=repetition_penalty,
    )
    t = Thread(target=model.generate, kwargs=generate_kwargs)
    t.start()

    outputs = []
    for text in streamer:
        outputs.append(text)
        yield "".join(outputs)

    final_response = "".join(outputs)

    if is_tts:
        output_file = asyncio.run(text_to_speech(final_response))
        return output_file  # Return MP3 file

    return final_response  # Return text response


demo = gr.ChatInterface(
    fn=generate,
    additional_inputs=[
        gr.Slider(label="Max new tokens", minimum=1, maximum=MAX_MAX_NEW_TOKENS, step=1, value=DEFAULT_MAX_NEW_TOKENS),
        gr.Slider(label="Temperature", minimum=0.1, maximum=4.0, step=0.1, value=0.6),
        gr.Slider(label="Top-p (nucleus sampling)", minimum=0.05, maximum=1.0, step=0.05, value=0.9),
        gr.Slider(label="Top-k", minimum=1, maximum=1000, step=1, value=50),
        gr.Slider(label="Repetition penalty", minimum=1.0, maximum=2.0, step=0.05, value=1.2),
    ],
    stop_btn=None,
    examples=[
        ["A train travels 60 kilometers per hour. If it travels for 5 hours, how far will it travel in total?"],
        ["Write a Python function to check if a number is prime."],
        ["What causes rainbows to form?"],
        ["Rewrite the following sentence in passive voice: 'The dog chased the cat.'"],
        ["@tts What is the capital of France?"],
    ],
    cache_examples=False,
    type="messages",
    description=DESCRIPTION,
    css=css,
    fill_height=True,
)

if __name__ == "__main__":
    demo.queue(max_size=20).launch()