Spaces:
Sleeping
Sleeping
import openai | |
import whisper | |
import tempfile | |
import gradio as gr | |
from pydub import AudioSegment | |
import fitz # PyMuPDF para manejar PDFs | |
import docx # Para manejar archivos .docx | |
import pandas as pd # Para manejar archivos .xlsx y .csv | |
# Configura tu clave API de OpenAI | |
openai.api_key = "sk-proj-oa7IsWQdSibP9urkzepFT3BlbkFJ9L9tXiDZpjgPikmAhtQP" | |
# Cargar el modelo Whisper de mayor calidad | |
model = whisper.load_model("large") | |
def preprocess_audio(audio_file): | |
"""Preprocesa el archivo de audio para mejorar la calidad.""" | |
try: | |
audio = AudioSegment.from_file(audio_file) | |
# Normaliza el audio al -20 dBFS | |
audio = audio.apply_gain(-audio.dBFS + (-20)) | |
# Exporta el audio procesado a un archivo temporal | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as temp_file: | |
audio.export(temp_file.name, format="mp3") | |
return temp_file.name | |
except Exception as e: | |
return f"Error al preprocesar el archivo de audio: {str(e)}" | |
def transcribir_audio(audio_file): | |
"""Transcribe un archivo de audio.""" | |
try: | |
if isinstance(audio_file, str): | |
archivo_path = preprocess_audio(audio_file) | |
else: | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as temp_file: | |
temp_file.write(audio_file.read()) | |
temp_file.flush() | |
archivo_path = preprocess_audio(temp_file.name) | |
resultado = model.transcribe(archivo_path) | |
return resultado.get("text", "Error en la transcripción") | |
except Exception as e: | |
return f"Error al procesar el archivo de audio: {str(e)}" | |
def leer_documento(documento_path): | |
"""Lee el contenido de un documento PDF, DOCX, XLSX o CSV.""" | |
try: | |
# Identificar el tipo de archivo | |
if documento_path.endswith(".pdf"): | |
doc = fitz.open(documento_path) | |
texto_completo = "" | |
for pagina in doc: | |
texto_completo += pagina.get_text() | |
return texto_completo | |
elif documento_path.endswith(".docx"): | |
doc = docx.Document(documento_path) | |
texto_completo = "\n".join([parrafo.text for parrafo in doc.paragraphs]) | |
return texto_completo | |
elif documento_path.endswith(".xlsx"): | |
df = pd.read_excel(documento_path) | |
texto_completo = df.to_string() | |
return texto_completo | |
elif documento_path.endswith(".csv"): | |
df = pd.read_csv(documento_path) | |
texto_completo = df.to_string() | |
return texto_completo | |
else: | |
return "Tipo de archivo no soportado. Por favor suba un documento PDF, DOCX, XLSX o CSV." | |
except Exception as e: | |
return f"Error al leer el documento: {str(e)}" | |
def generar_noticia(instrucciones, hechos, tamaño, tono, *args): | |
"""Genera una noticia a partir de instrucciones, hechos y transcripciones.""" | |
base_de_conocimiento = { | |
"instrucciones": instrucciones, | |
"hechos": hechos, | |
"contenido_documentos": [], | |
"audio_data": [] | |
} | |
# Recolecta los documentos y el audio desde los argumentos | |
num_audios = 5 * 3 # 5 audios * 3 campos (audio, nombre, cargo) | |
audios = args[:num_audios] | |
documentos = args[num_audios:] | |
# Leer el contenido de los documentos si se han subido | |
for documento in documentos: | |
if documento is not None: | |
contenido_doc = leer_documento(documento.name) | |
print(f"Contenido del documento {documento.name}: {contenido_doc}") | |
base_de_conocimiento["contenido_documentos"].append(contenido_doc) | |
# Recolecta datos de cada archivo de audio | |
for i in range(0, len(audios), 3): | |
audio_file, nombre, cargo = audios[i:i+3] | |
if audio_file is not None: | |
base_de_conocimiento["audio_data"].append({"audio": audio_file, "nombre": nombre, "cargo": cargo}) | |
transcripciones_texto = "" | |
transcripciones_brutas = "" | |
total_citas_directas = 0 | |
# Transcribe y compila las transcripciones | |
for idx, data in enumerate(base_de_conocimiento["audio_data"]): | |
if data["audio"] is not None: | |
transcripcion = transcribir_audio(data["audio"]) | |
transcripcion_texto = f'"{transcripcion}" - {data["nombre"]}, {data["cargo"]}' | |
transcripcion_bruta = f'[Audio {idx + 1}]: "{transcripcion}" - {data["nombre"]}, {data["cargo"]}' | |
# Decidir si usar cita directa o indirecta | |
if total_citas_directas < len(base_de_conocimiento["audio_data"]) * 0.8: | |
transcripciones_texto += transcripcion_texto + "\n" | |
total_citas_directas += 1 | |
else: | |
transcripciones_texto += f'{data["nombre"]} mencionó que {transcripcion}' + "\n" | |
transcripciones_brutas += transcripcion_bruta + "\n\n" | |
print(f"Transcripción bruta [Audio {idx + 1}]: {transcripcion_bruta}") | |
print(f"Transcripciones brutas acumuladas: {transcripciones_brutas}") | |
contenido_documentos = "\n\n".join(base_de_conocimiento["contenido_documentos"]) | |
# Prompt adicional para instrucciones internas | |
prompt_interno = """ | |
Instrucciones para el modelo: | |
- Asegúrate de que al menos el 80% de las citas sean directas y estén entrecomilladas. | |
- El 20% restante puede ser citas indirectas. | |
- No inventes información nueva. | |
- Sé riguroso con los hechos proporcionados. | |
- Al procesar los documentos cargados, extrae y resalta citas importantes y testimonios textuales de las fuentes. | |
- Al procesar los documentos cargados, extrae y resalta cifras clave. | |
""" | |
# Compila el prompt para OpenAI | |
prompt = f""" | |
{prompt_interno} | |
Escribe una noticia con la siguiente información, incluyendo un título, un sumario de 20 palabras, y el cuerpo del contenido cuyo tamaño es {tamaño} palabras. El tono debe ser {tono}. | |
Instrucciones: {base_de_conocimiento["instrucciones"]} | |
Hechos: {base_de_conocimiento["hechos"]} | |
Contenido adicional de los documentos: {contenido_documentos} | |
Utiliza las siguientes transcripciones como citas directas e indirectas (sin cambiar ni inventar contenido): | |
{transcripciones_texto} | |
""" | |
try: | |
respuesta = openai.ChatCompletion.create( | |
model="gpt-3.5-turbo", | |
messages=[{"role": "user", "content": prompt}], | |
temperature=0.1 # Bajamos la temperatura para mayor rigurosidad | |
) | |
noticia = respuesta['choices'][0]['message']['content'] | |
return noticia, transcripciones_brutas | |
except Exception as e: | |
return f"Error al generar la noticia: {str(e)}", "" | |
with gr.Blocks() as demo: | |
gr.Markdown("## Chatbot de noticias") | |
with gr.Row(): | |
with gr.Column(scale=2): | |
instrucciones = gr.Textbox(label="Instrucciones para la noticia", lines=2) | |
hechos = gr.Textbox(label="Describe los hechos de la noticia", lines=4) | |
tamaño = gr.Number(label="Tamaño del cuerpo de la noticia (en palabras)", value=100) | |
tono = gr.Dropdown(label="Tono de la noticia", choices=["serio", "neutral", "divertido"], value="neutral") | |
with gr.Column(scale=3): | |
inputs_list = [instrucciones, hechos, tamaño, tono] | |
with gr.Tabs(): | |
for i in range(1, 6): | |
with gr.TabItem(f"Audio {i}"): | |
audio = gr.Audio(type="filepath", label=f"Audio {i}") | |
nombre = gr.Textbox(label="Nombre", scale=1) | |
cargo = gr.Textbox(label="Cargo", scale=1) | |
inputs_list.extend([audio, nombre, cargo]) | |
for i in range(1, 6): | |
with gr.TabItem(f"Documento {i}"): | |
documento = gr.File(label=f"Documento {i}", type="filepath", file_count="single") | |
inputs_list.append(documento) | |
with gr.Row(): | |
generar = gr.Button("Generar noticia") | |
with gr.Row(): | |
noticia_output = gr.Textbox(label="Noticia generada", lines=20) | |
with gr.Row(): | |
transcripciones_output = gr.Textbox(label="Transcripciones brutas de los audios", lines=10) | |
generar.click(fn=generar_noticia, inputs=inputs_list, outputs=[noticia_output, transcripciones_output]) | |
demo.launch(share=True) |