Spaces:
Running
Running
import gradio as gr | |
import torch | |
from transformers import pipeline | |
import librosa | |
import soundfile as sf | |
import spaces | |
import os | |
def split_audio(audio_data, sr, chunk_duration=30): | |
"""Split audio into chunks of chunk_duration seconds.""" | |
chunks = [] | |
for start in range(0, len(audio_data), int(chunk_duration * sr)): | |
end = start + int(chunk_duration * sr) | |
chunks.append(audio_data[start:end]) | |
return chunks | |
def transcribe_long_audio(audio_input, transcriber, chunk_duration=30): | |
"""Transcribe long audio by splitting into smaller chunks.""" | |
try: | |
# Debugging input type and format | |
print(f"Audio input type: {type(audio_input)}") | |
if isinstance(audio_input, tuple): # Recorded audio | |
print("Processing recorded audio...") | |
audio_data, sr = audio_input # Unpack raw audio data and sample rate | |
temp_path = "recorded_audio.wav" | |
sf.write(temp_path, audio_data, sr) # Save recorded audio as a temporary file | |
elif isinstance(audio_input, str): # Uploaded file path | |
print("Processing uploaded audio...") | |
temp_path = audio_input # Use the file path directly | |
else: | |
raise ValueError("Unsupported audio input format.") | |
# Process the audio file (recorded or uploaded) | |
audio_data, sr = librosa.load(temp_path, sr=None) | |
chunks = split_audio(audio_data, sr, chunk_duration) | |
transcriptions = [] | |
for i, chunk in enumerate(chunks): | |
chunk_path = f"temp_chunk_{i}.wav" | |
sf.write(chunk_path, chunk, sr) # Save chunk as WAV | |
transcription = transcriber(chunk_path)["text"] | |
transcriptions.append(transcription) | |
os.remove(chunk_path) # Cleanup temp files | |
if temp_path == "recorded_audio.wav": | |
os.remove(temp_path) # Remove the temporary recorded audio file | |
return " ".join(transcriptions) | |
except Exception as e: | |
print(f"Error in transcribe_long_audio: {e}") | |
return f"Error processing audio: {e}" | |
def main(): | |
device = 0 if torch.cuda.is_available() else -1 | |
try: | |
transcriber = pipeline("automatic-speech-recognition", model="openai/whisper-base", device=device) | |
summarizer = pipeline("summarization", model="facebook/bart-large-cnn") | |
except Exception as e: | |
print(f"Error loading models: {e}") | |
raise | |
def process_audio(audio_input): | |
try: | |
transcription = transcribe_long_audio(audio_input, transcriber, chunk_duration=30) | |
summary = summarizer(transcription, max_length=50, min_length=10, do_sample=False)[0]["summary_text"] | |
return transcription, summary | |
except Exception as e: | |
print(f"Error in process_audio: {e}") | |
return f"Error processing audio: {e}", "" | |
def stop_microphone(): | |
"""Simulate stopping the microphone.""" | |
print("Microphone stopped.") # Debugging for user feedback | |
return "Microphone stopped. Recording session has ended." | |
with gr.Blocks() as interface: | |
with gr.Row(): | |
with gr.Column(): | |
# Enable recording or file upload | |
audio_input = gr.Audio(type="numpy", label="Record or Upload Audio") | |
process_button = gr.Button("Process Audio") | |
stop_button = gr.Button("Stop Recording") # Add Stop Button | |
with gr.Column(): | |
transcription_output = gr.Textbox(label="Full Transcription", lines=10) | |
summary_output = gr.Textbox(label="Summary", lines=5) | |
process_button.click( | |
process_audio, | |
inputs=[audio_input], | |
outputs=[transcription_output, summary_output] | |
) | |
stop_button.click( | |
stop_microphone, | |
inputs=[], | |
outputs=[] | |
) | |
interface.launch(share=True) | |
if __name__ == "__main__": | |
main() | |