import gradio as gr from http import HTTPStatus import uuid from gradio_client import utils as client_utils import gradio.processing_utils as processing_utils import base64 from openai import OpenAI import soundfile as sf import numpy as np import io import os import modelscope_studio.components.base as ms import modelscope_studio.components.antd as antd import oss2 from oss2.credentials import EnvironmentVariableCredentialsProvider # Voice settings VOICE_LIST = ['Cherry', 'Ethan', 'Serena', 'Chelsie'] DEFAULT_VOICE = 'Cherry' # OSS_ACCESS_KEY_ID and OSS_ACCESS_KEY_SECRET。 auth = oss2.ProviderAuthV4(EnvironmentVariableCredentialsProvider()) endpoint = os.getenv("OSS_ENDPOINT") region = os.getenv("OSS_REGION") bucket_name = os.getenv("OSS_BUCKET_NAME") bucket = oss2.Bucket(auth, endpoint, bucket_name, region=region) default_system_prompt = 'You are Qwen, a virtual human developed by the Qwen Team, Alibaba Group, capable of perceiving auditory and visual inputs, as well as generating text and speech.' API_KEY = os.environ['API_KEY'] client = OpenAI( api_key=API_KEY, base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", ) is_modelscope_studio = os.getenv('MODELSCOPE_ENVIRONMENT') == 'studio' def get_text(text: str, cn_text: str): if is_modelscope_studio: return cn_text return text def encode_file_to_base64(file_path): with open(file_path, "rb") as file: mime_type = client_utils.get_mimetype(file_path) bae64_data = base64.b64encode(file.read()).decode("utf-8") return f"data:{mime_type};base64,{bae64_data}" def file_path_to_oss_url(file_path: str): if file_path.startswith("http"): return file_path ext = file_path.split('.')[-1] object_name = f'studio-temp/Qwen2.5-Omni-Demo/{uuid.uuid4()}.{ext}' response = bucket.put_object_from_file(object_name, file_path) file_url = file_path if response.status == HTTPStatus.OK: file_url = bucket.sign_url('GET', object_name, 60 * 60, slash_safe=True) return file_url def format_history(history: list, system_prompt: str, oss_cache): messages = [] messages.append({"role": "system", "content": system_prompt}) for item in history: if isinstance(item["content"], str): messages.append({"role": item['role'], "content": item['content']}) elif item["role"] == "user" and (isinstance(item["content"], list) or isinstance(item["content"], tuple)): file_path = item["content"][0] file_url = oss_cache.get(file_path, file_path_to_oss_url(file_path)) oss_cache[file_path] = file_url file_url = file_url if file_url.startswith( "http") else encode_file_to_base64(file_path=file_path) mime_type = client_utils.get_mimetype(file_path) ext = file_path.split('.')[-1] if mime_type.startswith("image"): messages.append({ "role": item['role'], "content": [{ "type": "image_url", "image_url": { "url": file_url } }] }) elif mime_type.startswith("video"): messages.append({ "role": item['role'], "content": [{ "type": "video_url", "video_url": { "url": file_url } }] }) elif mime_type.startswith("audio"): messages.append({ "role": item['role'], "content": [{ "type": "input_audio", "input_audio": { "data": file_url, "format": ext } }] }) return messages def predict(messages, voice=DEFAULT_VOICE): print('predict history: ', messages) completion = client.chat.completions.create( model="qwen-omni-turbo", messages=messages, modalities=["text", "audio"], audio={ "voice": voice, "format": "wav" }, stream=True, stream_options={"include_usage": True}) response_text = "" audio_str = "" for chunk in completion: if chunk.choices: delta = chunk.choices[0].delta if hasattr( delta, 'audio') and delta.audio and delta.audio.get("transcript"): response_text += delta.audio.get("transcript") if hasattr(delta, 'audio') and delta.audio and delta.audio.get("data"): audio_str += delta.audio.get("data") yield {"type": "text", "data": response_text} pcm_bytes = base64.b64decode(audio_str) audio_np = np.frombuffer(pcm_bytes, dtype=np.int16) wav_io = io.BytesIO() sf.write(wav_io, audio_np, samplerate=24000, format="WAV") wav_io.seek(0) wav_bytes = wav_io.getvalue() audio_path = processing_utils.save_bytes_to_cache( wav_bytes, "audio.wav", cache_dir=demo.GRADIO_CACHE) yield {"type": "audio", "data": audio_path} def media_predict(audio, video, history, system_prompt, state_value, voice_choice): files = [audio, video] for f in files: if f: history.append({"role": "user", "content": (f, )}) formatted_history = format_history(history=history, system_prompt=system_prompt, oss_cache=state_value["oss_cache"]) # First yield yield ( None, # microphone None, # webcam history, # media_chatbot gr.update(visible=False), # submit_btn gr.update(visible=True), # stop_btn state_value # state ) history.append({"role": "assistant", "content": ""}) for chunk in predict(formatted_history, voice_choice): if chunk["type"] == "text": history[-1]["content"] = chunk["data"] yield ( None, # microphone None, # webcam history, # media_chatbot gr.update(visible=False), # submit_btn gr.update(visible=True), # stop_btn state_value # state ) if chunk["type"] == "audio": history.append({ "role": "assistant", "content": gr.Audio(chunk["data"]) }) # Final yield yield ( None, # microphone None, # webcam history, # media_chatbot gr.update(visible=True), # submit_btn gr.update(visible=False), # stop_btn state_value # state ) def chat_predict(text, audio, image, video, history, system_prompt, state_value, voice_choice): # Process text input if text: history.append({"role": "user", "content": text}) # Process audio input if audio: history.append({"role": "user", "content": (audio, )}) # Process image input if image: history.append({"role": "user", "content": (image, )}) # Process video input if video: history.append({"role": "user", "content": (video, )}) formatted_history = format_history(history=history, system_prompt=system_prompt, oss_cache=state_value["oss_cache"]) yield None, None, None, None, history, state_value history.append({"role": "assistant", "content": ""}) for chunk in predict(formatted_history, voice_choice): if chunk["type"] == "text": history[-1]["content"] = chunk["data"] yield gr.skip(), gr.skip(), gr.skip(), gr.skip( ), history, state_value if chunk["type"] == "audio": history.append({ "role": "assistant", "content": gr.Audio(chunk["data"]) }) yield gr.skip(), gr.skip(), gr.skip(), gr.skip(), history, state_value with gr.Blocks() as demo, ms.Application(), antd.ConfigProvider(): state = gr.State({"oss_cache": {}}) with gr.Sidebar(open=False): system_prompt_textbox = gr.Textbox(label="System Prompt", value=default_system_prompt) voice_choice = gr.Dropdown(label="Voice Choice", choices=VOICE_LIST, value=DEFAULT_VOICE) with antd.Flex(gap="small", justify="center", align="center"): antd.Image('./logo-1.png', preview=False, width=67, height=67) with antd.Flex(vertical=True, gap="small", align="center"): antd.Typography.Title("Qwen2.5-Omni Demo", level=1, elem_style=dict(margin=0, fontSize=28)) with antd.Flex(vertical=True, gap="small"): antd.Typography.Text(get_text("🎯 Instructions for use:", "🎯 使用说明:"), strong=True) antd.Typography.Text( get_text( "1️⃣ Click the Audio Record button or the Camera Record button.", "1️⃣ 点击音频录制按钮,或摄像头-录制按钮")) antd.Typography.Text( get_text("2️⃣ Input audio or video.", "2️⃣ 输入音频或者视频")) antd.Typography.Text( get_text( "3️⃣ Click the submit button and wait for the model's response.", "3️⃣ 点击提交并等待模型的回答")) antd.Image('./logo-2.png', preview=False, width=80, height=80, elem_style=dict(marginTop=5)) with gr.Tabs(): with gr.Tab("Online"): with gr.Row(): with gr.Column(scale=1): microphone = gr.Audio(sources=['microphone'], format="wav", type="filepath") webcam = gr.Video(sources=['webcam'], format="mp4", height=400, include_audio=True) submit_btn = gr.Button(get_text("Submit", "提交"), variant="primary") stop_btn = gr.Button(get_text("Stop", "停止"), visible=False) clear_btn = gr.Button(get_text("Clear History", "清除历史")) with gr.Column(scale=2): media_chatbot = gr.Chatbot(height=650, type="messages") def clear_history(): return [], gr.update(value=None), gr.update(value=None) submit_event = submit_btn.click(fn=media_predict, inputs=[ microphone, webcam, media_chatbot, system_prompt_textbox, state, voice_choice ], outputs=[ microphone, webcam, media_chatbot, submit_btn, stop_btn, state ]) stop_btn.click( fn=lambda: (gr.update(visible=True), gr.update(visible=False)), inputs=None, outputs=[submit_btn, stop_btn], cancels=[submit_event], queue=False) clear_btn.click(fn=clear_history, inputs=None, outputs=[media_chatbot, microphone, webcam]) with gr.Tab("Offline"): chatbot = gr.Chatbot(type="messages", height=650) # Media upload section in one row with gr.Row(equal_height=True): audio_input = gr.Audio(sources=["upload"], type="filepath", label="Upload Audio", elem_classes="media-upload", scale=1) image_input = gr.Image(sources=["upload"], type="filepath", label="Upload Image", elem_classes="media-upload", scale=1) video_input = gr.Video(sources=["upload"], label="Upload Video", elem_classes="media-upload", scale=1) # Text input section text_input = gr.Textbox(show_label=False, placeholder="Enter text here...") # Control buttons with gr.Row(): submit_btn = gr.Button(get_text("Submit", "提交"), variant="primary", size="lg") stop_btn = gr.Button(get_text("Stop", "停止"), visible=False, size="lg") clear_btn = gr.Button(get_text("Clear History", "清除历史"), size="lg") def clear_chat_history(): return [], gr.update(value=None), gr.update( value=None), gr.update(value=None), gr.update(value=None) submit_event = gr.on( triggers=[submit_btn.click, text_input.submit], fn=chat_predict, inputs=[ text_input, audio_input, image_input, video_input, chatbot, system_prompt_textbox, state, voice_choice ], outputs=[ text_input, audio_input, image_input, video_input, chatbot, state ]) stop_btn.click(fn=lambda: (gr.update(visible=True), gr.update(visible=False)), inputs=None, outputs=[submit_btn, stop_btn], cancels=[submit_event], queue=False) clear_btn.click(fn=clear_chat_history, inputs=None, outputs=[ chatbot, text_input, audio_input, image_input, video_input ]) # Add some custom CSS to improve the layout gr.HTML(""" """) demo.queue(default_concurrency_limit=100, max_size=100).launch(max_threads=100, ssr_mode=False)