prithivMLmods's picture
Update app.py
40dd3a7 verified
raw
history blame
10.6 kB
import os
from threading import Thread
import gradio as gr
import spaces
import torch
import edge_tts
import asyncio
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from transformers import Qwen2VLForConditionalGeneration, AutoProcessor, TextIteratorStreamer
from transformers.image_utils import load_image
import time
# =============================================================================
# New imports and helper classes for image generation
# =============================================================================
try:
# We use Hugging Face’s InferenceClient as a generic image-generation API client.
from huggingface_hub import InferenceClient as HFInferenceClient
except ImportError:
HFInferenceClient = None
# A simple wrapper client for our primary image-generation space.
class Client:
def __init__(self, repo_id):
self.repo_id = repo_id
if HFInferenceClient is not None:
self.client = HFInferenceClient(repo_id)
else:
self.client = None
def predict(self, task, arg2, prompt, api_name):
if self.client is not None:
# Here we assume that calling the client with the prompt returns an image.
# (Depending on your API, you might need to adjust parameters.)
return self.client(prompt)
else:
raise Exception("HFInferenceClient not available")
def image_gen(prompt):
"""
Uses the STABLE-HAMSTER space to generate an image based on the prompt.
"""
client = Client("prithivMLmods/STABLE-HAMSTER")
return client.predict("Image Generation", None, prompt, api_name="/stable_hamster")
# =============================================================================
# Original Code (with modifications below)
# =============================================================================
DESCRIPTION = """
# QwQ Edge 💬
"""
css = '''
h1 {
text-align: center;
display: block;
}
#duplicate-button {
margin: auto;
color: #fff;
background: #1565c0;
border-radius: 100vh;
}
'''
MAX_MAX_NEW_TOKENS = 2048
DEFAULT_MAX_NEW_TOKENS = 1024
MAX_INPUT_TOKEN_LENGTH = int(os.getenv("MAX_INPUT_TOKEN_LENGTH", "4096"))
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# Load text-only model and tokenizer
model_id = "prithivMLmods/FastThink-0.5B-Tiny"
tokenizer = AutoTokenizer.from_pretrained(model_id)
model = AutoModelForCausalLM.from_pretrained(
model_id,
device_map="auto",
torch_dtype=torch.bfloat16,
)
model.eval()
TTS_VOICES = [
"en-US-JennyNeural", # @tts1
"en-US-GuyNeural", # @tts2
]
# Load multimodal (OCR) model and processor
MODEL_ID = "prithivMLmods/Qwen2-VL-OCR-2B-Instruct"
processor = AutoProcessor.from_pretrained(MODEL_ID, trust_remote_code=True)
model_m = Qwen2VLForConditionalGeneration.from_pretrained(
MODEL_ID,
trust_remote_code=True,
torch_dtype=torch.float16
).to("cuda").eval()
async def text_to_speech(text: str, voice: str, output_file="output.mp3"):
"""Convert text to speech using Edge TTS and save as MP3"""
communicate = edge_tts.Communicate(text, voice)
await communicate.save(output_file)
return output_file
def clean_chat_history(chat_history):
"""
Filter out any chat entries whose "content" is not a string.
This helps prevent errors when concatenating previous messages.
"""
cleaned = []
for msg in chat_history:
if isinstance(msg, dict) and isinstance(msg.get("content"), str):
cleaned.append(msg)
return cleaned
@spaces.GPU
def generate(
input_dict: dict,
chat_history: list[dict],
max_new_tokens: int = 1024,
temperature: float = 0.6,
top_p: float = 0.9,
top_k: int = 50,
repetition_penalty: float = 1.2,
):
"""
Generates chatbot responses with support for multimodal input, TTS, and now image generation.
If the query starts with an @tts command (e.g. "@tts1"), previous chat history is cleared.
If the query starts with an @image command, the image generation branch is used.
"""
text = input_dict["text"]
files = input_dict.get("files", [])
# -------------------------------------------------------------------------
# NEW: Check for image generation command (@image)
# -------------------------------------------------------------------------
image_prefix = "@image"
if text.strip().lower().startswith(image_prefix):
# Remove the prefix and any extra whitespace
query = text[len(image_prefix):].strip()
yield "Generating Image, Please wait 10 sec..."
try:
image = image_gen(query)
# If the API returns a tuple (as in the snippet) use the second element;
# otherwise assume it returns an image directly.
if isinstance(image, (list, tuple)) and len(image) > 1:
yield gr.Image(image[1])
else:
yield gr.Image(image)
except Exception as e:
yield "Error in primary image generation, trying fallback..."
try:
# Use the fallback image generation client.
if HFInferenceClient is not None:
client_flux = HFInferenceClient("black-forest-labs/FLUX.1-schnell")
image = client_flux.text_to_image(query)
yield gr.Image(image)
else:
yield "Fallback client not available."
except Exception as fallback_error:
yield f"Error in image generation: {str(fallback_error)}"
return # End execution after processing the image-generation request.
# -------------------------------------------------------------------------
# Continue with the original processing (image files, TTS, or text conversation)
# -------------------------------------------------------------------------
if len(files) > 1:
images = [load_image(image) for image in files]
elif len(files) == 1:
images = [load_image(files[0])]
else:
images = []
tts_prefix = "@tts"
is_tts = any(text.strip().lower().startswith(f"{tts_prefix}{i}") for i in range(1, 3))
voice_index = next((i for i in range(1, 3) if text.strip().lower().startswith(f"{tts_prefix}{i}")), None)
if is_tts and voice_index:
voice = TTS_VOICES[voice_index - 1]
text = text.replace(f"{tts_prefix}{voice_index}", "").strip()
# Clear any previous chat history to avoid concatenation issues
conversation = [{"role": "user", "content": text}]
else:
voice = None
text = text.replace(tts_prefix, "").strip()
conversation = clean_chat_history(chat_history)
conversation.append({"role": "user", "content": text})
if images:
# Multimodal branch using the OCR model
messages = [{
"role": "user",
"content": [
*[{"type": "image", "image": image} for image in images],
{"type": "text", "text": text},
]
}]
prompt = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
inputs = processor(text=[prompt], images=images, return_tensors="pt", padding=True).to("cuda")
streamer = TextIteratorStreamer(processor, skip_prompt=True, skip_special_tokens=True)
generation_kwargs = {**inputs, "streamer": streamer, "max_new_tokens": max_new_tokens}
thread = Thread(target=model_m.generate, kwargs=generation_kwargs)
thread.start()
buffer = ""
yield "Thinking..."
for new_text in streamer:
buffer += new_text
buffer = buffer.replace("<|im_end|>", "")
time.sleep(0.01)
yield buffer
else:
# Text-only branch using the text model
input_ids = tokenizer.apply_chat_template(conversation, add_generation_prompt=True, return_tensors="pt")
if input_ids.shape[1] > MAX_INPUT_TOKEN_LENGTH:
input_ids = input_ids[:, -MAX_INPUT_TOKEN_LENGTH:]
gr.Warning(f"Trimmed input from conversation as it was longer than {MAX_INPUT_TOKEN_LENGTH} tokens.")
input_ids = input_ids.to(model.device)
streamer = TextIteratorStreamer(tokenizer, timeout=20.0, skip_prompt=True, skip_special_tokens=True)
generation_kwargs = {
"input_ids": input_ids,
"streamer": streamer,
"max_new_tokens": max_new_tokens,
"do_sample": True,
"top_p": top_p,
"top_k": top_k,
"temperature": temperature,
"num_beams": 1,
"repetition_penalty": repetition_penalty,
}
t = Thread(target=model.generate, kwargs=generation_kwargs)
t.start()
outputs = []
for new_text in streamer:
outputs.append(new_text)
yield "".join(outputs)
final_response = "".join(outputs)
yield final_response
if is_tts and voice:
output_file = asyncio.run(text_to_speech(final_response, voice))
yield gr.Audio(output_file, autoplay=True)
demo = gr.ChatInterface(
fn=generate,
additional_inputs=[
gr.Slider(label="Max new tokens", minimum=1, maximum=MAX_MAX_NEW_TOKENS, step=1, value=DEFAULT_MAX_NEW_TOKENS),
gr.Slider(label="Temperature", minimum=0.1, maximum=4.0, step=0.1, value=0.6),
gr.Slider(label="Top-p (nucleus sampling)", minimum=0.05, maximum=1.0, step=0.05, value=0.9),
gr.Slider(label="Top-k", minimum=1, maximum=1000, step=1, value=50),
gr.Slider(label="Repetition penalty", minimum=1.0, maximum=2.0, step=0.05, value=1.2),
],
examples=[
["@tts1 Who is Nikola Tesla, and why did he die?"],
[{"text": "Extract JSON from the image", "files": ["examples/document.jpg"]}],
[{"text": "summarize the letter", "files": ["examples/1.png"]}],
["A train travels 60 kilometers per hour. If it travels for 5 hours, how far will it travel in total?"],
["Write a Python function to check if a number is prime."],
["@tts2 What causes rainbows to form?"],
["@image A beautiful sunset over a mountain range"],
],
cache_examples=False,
type="messages",
description=DESCRIPTION,
css=css,
fill_height=True,
textbox=gr.MultimodalTextbox(label="Query Input", file_types=["image"], file_count="multiple"),
stop_btn="Stop Generation",
multimodal=True,
)
if __name__ == "__main__":
demo.queue(max_size=20).launch(share=True)