Spaces:
Running
Running
import streamlit as st | |
import base64 | |
import io | |
from huggingface_hub import InferenceClient | |
from gtts import gTTS | |
from pydub import AudioSegment | |
from pydub.playback import play | |
from streamlit_webrtc import webrtc_streamer, AudioProcessorBase | |
import cv2 | |
import numpy as np | |
import speech_recognition as sr | |
import subprocess | |
if "history" not in st.session_state: | |
st.session_state.history = [] | |
recognizer = sr.Recognizer() | |
# Reconociendo voz en tiempo real | |
def recognize_speech_with_vad(audio_data, show_messages=True): | |
try: | |
audio_text = recognizer.recognize_google(audio_data, language="es-ES") | |
if show_messages: | |
st.subheader("Texto Reconocido:") | |
st.write(audio_text) | |
except sr.UnknownValueError: | |
st.warning("No se pudo reconocer el audio. ¿Intentaste grabar algo?") | |
audio_text = "" | |
except sr.RequestError: | |
st.error("Hablame para comenzar!") | |
audio_text = "" | |
return audio_text | |
# Procesador de voice activity detection con streamlit_webrtc | |
class VADProcessor(AudioProcessorBase): | |
def __init__(self): | |
self.buffer = np.zeros((0,)) | |
self.vad_active = True | |
def recv(self, audio_data): | |
if self.vad_active: | |
audio_array = np.frombuffer(audio_data, dtype=np.int16) | |
self.buffer = np.concatenate((self.buffer, audio_array), axis=None) | |
if len(self.buffer) >= 44100 * 5: # 5 seconds of audio | |
st.audio(self.buffer, format="audio/wav") | |
audio_text = recognize_speech_with_vad(self.buffer) | |
if audio_text: | |
st.success("Frase detectada. Procesando audio...") | |
output, audio_file = generate(audio_text, history=st.session_state.history) | |
if audio_file is not None: | |
play(audio_file) | |
# Desactiva el VAD después de detectar una frase | |
self.vad_active = False | |
self.buffer = np.zeros((0,)) | |
# Preparando entrada para el modelo de lenguaje | |
def format_prompt(message, history): | |
prompt = "<s>" | |
for user_prompt, bot_response in history: | |
prompt += f"[INST] {user_prompt} [/INST]" | |
prompt += f" {bot_response}</s> " | |
prompt += f"[INST] {message} [/INST]" | |
return prompt | |
# Generando respuesta en texto | |
def generate(audio_text, history, temperature=None, max_new_tokens=512, top_p=0.95, repetition_penalty=1.0): | |
client = InferenceClient("mistralai/Mixtral-8x7B-Instruct-v0.1") | |
temperature = float(temperature) if temperature is not None else 0.9 | |
if temperature < 1e-2: | |
temperature = 1e-2 | |
top_p = float(top_p) | |
generate_kwargs = dict( | |
temperature=temperature, | |
max_new_tokens=max_new_tokens, | |
top_p=top_p, | |
repetition_penalty=repetition_penalty, | |
do_sample=True, | |
seed=42, | |
) | |
formatted_prompt = format_prompt(audio_text, history) | |
stream = client.text_generation(formatted_prompt, **generate_kwargs, stream=True, details=True, return_full_text=True) | |
response = "" | |
for response_token in stream: | |
response += response_token.token.text | |
response = ' '.join(response.split()).replace('</s>', '') | |
audio_file = text_to_speech(response, speed=1.3) | |
return response, audio_file | |
# Texto a voz | |
def text_to_speech(text, speed=1.3): | |
tts = gTTS(text=text, lang='es') | |
audio_fp = io.BytesIO() | |
tts.write_to_fp(audio_fp) | |
audio_fp.seek(0) | |
audio = AudioSegment.from_file(audio_fp, format="mp3") | |
modified_speed_audio = audio.speedup(playback_speed=speed) | |
modified_audio_fp = io.BytesIO() | |
modified_speed_audio.export(modified_audio_fp, format="mp3") | |
modified_audio_fp.seek(0) | |
return modified_audio_fp | |
# Reproductor de texto a voz | |
def audio_player_markup(audio_file): | |
return f""" | |
<audio autoplay="autoplay" controls="controls" src="data:audio/mp3;base64,{base64.b64encode(audio_file.read()).decode()}" type="audio/mp3" id="audio_player"></audio> | |
""" | |
# Interfaz de usuario con streamlit_webrtc | |
def main(): | |
st.title("Chatbot de Voz a Voz") | |
webrtc_ctx = webrtc_streamer( | |
key="vad", | |
audio_processor_factory=VADProcessor, | |
async_processing=True, | |
media_stream_constraints={"video": False, "audio": True}, | |
) | |
if __name__ == "__main__": | |
main() |