|
|
|
|
|
|
|
|
|
|
|
|
|
import os |
|
import requests |
|
from PIL import Image |
|
from io import BytesIO |
|
from huggingface_hub import InferenceClient |
|
from IPython.display import Audio, display |
|
import gradio as gr |
|
|
|
read_token = os.getenv('HF_READ') |
|
write_token = os.getenv('HF_WRITE') |
|
|
|
chatmodel="mistralai/Mistral-Nemo-Instruct-2407" |
|
|
|
WHISPER_API_URL = "https://api-inference.huggingface.co/models/distil-whisper/distil-large-v2" |
|
WHISPER_HEADERS = {"Authorization": "Bearer " + read_token} |
|
|
|
BARK_API_URL = "https://api-inference.huggingface.co/models/suno/bark" |
|
BARK_HEADERS = {"Authorization": "Bearer "+read_token} |
|
|
|
FLUX_API_URL = "https://api-inference.huggingface.co/models/enhanceaiteam/Flux-uncensored" |
|
FLUX_HEADERS = {"Authorization": "Bearer "+read_token} |
|
|
|
def speech_to_text(filename): |
|
with open(filename, "rb") as f: |
|
data = f.read() |
|
response = requests.post(WHISPER_API_URL, headers=WHISPER_HEADERS, data=data) |
|
if response.status_code == 200: |
|
return response.json().get("text", "Could not recognize speech") |
|
else: |
|
print(f"Error: {response.status_code} - {response.text}") |
|
return None |
|
|
|
|
|
client = InferenceClient(api_key=read_token) |
|
|
|
def chatbot_logic(input_text): |
|
messages = [{"role": "user", "content": input_text}] |
|
try: |
|
completion = client.chat.completions.create( |
|
model=chatmodel, |
|
messages=messages, |
|
max_tokens=500 |
|
) |
|
return completion.choices[0].message["content"] |
|
except Exception as e: |
|
print(f"Error: {e}") |
|
return None |
|
|
|
|
|
def text_to_speech(text): |
|
payload = {"inputs": text} |
|
response = requests.post(BARK_API_URL, headers=BARK_HEADERS, json=payload) |
|
if response.status_code == 200: |
|
return response.content |
|
else: |
|
print(f"Error: {response.status_code} - {response.text}") |
|
return None |
|
|
|
def generate_image(prompt): |
|
data = {"inputs": prompt} |
|
response = requests.post(FLUX_API_URL, headers=FLUX_HEADERS, json=data) |
|
if response.status_code == 200: |
|
image_bytes = BytesIO(response.content) |
|
return Image.open(image_bytes) |
|
else: |
|
print(f"Error: {response.status_code} - {response.text}") |
|
return None |
|
|
|
|
|
def create_ui(): |
|
def process_chat(audio_file): |
|
|
|
recognized_text = speech_to_text(audio_file) |
|
if not recognized_text: |
|
return "Could not recognize speech", None, None |
|
|
|
|
|
response_text = chatbot_logic(recognized_text) |
|
if not response_text: |
|
return f"Error generating response for: {recognized_text}", None, None |
|
|
|
|
|
audio_output = text_to_speech(response_text) |
|
if not audio_output: |
|
return f"Error synthesizing response: {response_text}", None, None |
|
|
|
|
|
generated_image = generate_image(response_text) |
|
|
|
return response_text, Audio(audio_output, autoplay=True), generated_image |
|
|
|
with gr.Blocks(title="Voice-to-Voice Chatbot with Image Generation") as ui: |
|
gr.Markdown("## Voice-to-Voice Chatbot with Image Generation\nUpload an audio file to interact with the chatbot.") |
|
|
|
audio_input = gr.Audio(source="upload", type="filepath", label="Input Audio File") |
|
submit_button = gr.Button("Process") |
|
|
|
with gr.Row(): |
|
chatbot_response = gr.Textbox(label="Chatbot Response", lines=2) |
|
|
|
with gr.Row(): |
|
audio_output = gr.Audio(label="Generated Audio Response") |
|
image_output = gr.Image(label="Generated Image") |
|
|
|
submit_button.click( |
|
fn=process_chat, |
|
inputs=audio_input, |
|
outputs=[chatbot_response, audio_output, image_output], |
|
show_progress=True |
|
) |
|
|
|
return ui |
|
|
|
|
|
if __name__ == "__main__": |
|
create_ui().launch(debug=True) |
|
|