Spaces:
Running
Running
File size: 5,312 Bytes
bd97be7 9fe4dba b40af2a d07525d e88a1f3 82b4010 010e9c1 82b4010 bd97be7 e88a1f3 8057378 e88a1f3 8057378 e88a1f3 8057378 82b4010 8057378 190e895 82b4010 190e895 875dc71 190e895 875dc71 190e895 db3a36a 190e895 8057378 82b4010 8057378 525ee37 5f36451 d88ec40 525ee37 d88ec40 525ee37 e88a1f3 525ee37 010e9c1 55aeb9b 82b4010 010e9c1 82b4010 525ee37 82b4010 525ee37 db3a36a 82b4010 781e9f1 55aeb9b 525ee37 190e895 e22e17f 525ee37 55aeb9b 525ee37 82b4010 b40af2a 525ee37 82b4010 781e9f1 55aeb9b 6befe57 525ee37 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
import gradio as gr
import torch
from transformers import pipeline
import librosa
import soundfile as sf
import os
import uuid
import spaces # Ensure spaces is imported
# Directory to save recorded audio files
OUTPUT_DIR = os.getenv("HF_HOME", ".") # Use dynamic path or default to current directory
OUTPUT_DIR = os.path.join(OUTPUT_DIR, "recorded_audio_files")
os.makedirs(OUTPUT_DIR, exist_ok=True)
def split_audio(audio_data, sr, chunk_duration=30):
"""Split audio into chunks of chunk_duration seconds."""
chunks = []
for start in range(0, len(audio_data), int(chunk_duration * sr)):
end = start + int(chunk_duration * sr)
chunks.append(audio_data[start:end])
return chunks
def transcribe_long_audio(audio_path, transcriber, chunk_duration=30):
"""Transcribe long audio by splitting into smaller chunks."""
try:
# Load the audio file
audio_data, sr = librosa.load(audio_path, sr=None)
chunks = split_audio(audio_data, sr, chunk_duration)
transcriptions = []
for i, chunk in enumerate(chunks):
chunk_path = f"temp_chunk_{i}.wav"
sf.write(chunk_path, chunk, sr) # Save chunk as WAV
transcription = transcriber(chunk_path)["text"]
transcriptions.append(transcription)
os.remove(chunk_path) # Cleanup temp files
return " ".join(transcriptions)
except Exception as e:
print(f"Error in transcribe_long_audio: {e}")
return f"Error processing audio: {e}"
def cleanup_output_dir(max_storage_mb=500):
"""Remove old files if total directory size exceeds max_storage_mb."""
try:
total_size = sum(
os.path.getsize(os.path.join(OUTPUT_DIR, f)) for f in os.listdir(OUTPUT_DIR)
)
if total_size > max_storage_mb * 1024 * 1024:
files = sorted(
(os.path.join(OUTPUT_DIR, f) for f in os.listdir(OUTPUT_DIR)),
key=os.path.getctime,
)
for file in files:
os.remove(file)
total_size -= os.path.getsize(file)
if total_size <= max_storage_mb * 1024 * 1024:
break
except Exception as e:
print(f"Error during cleanup: {e}")
@spaces.GPU(duration=3)
def main():
device = 0 if torch.cuda.is_available() else -1
try:
transcriber = pipeline("automatic-speech-recognition", model="openai/whisper-base", device=device)
summarizer = pipeline("summarization", model="facebook/bart-large-cnn")
except Exception as e:
print(f"Error loading models: {e}")
raise
def process_audio(audio_input):
try:
# Debug input type and content
print(f"Input type: {type(audio_input)}, Input: {audio_input}")
if audio_input is None:
raise ValueError("No audio input received. Please record or upload an audio file.")
if isinstance(audio_input, tuple): # Recorded audio
print("Handling recorded audio.")
audio_data, sr = audio_input
filename = f"recorded_audio_{uuid.uuid4().hex}.wav"
temp_path = os.path.join(OUTPUT_DIR, filename)
sf.write(temp_path, audio_data, sr)
elif isinstance(audio_input, str): # Uploaded file path
print("Handling uploaded audio.")
if os.path.isdir(audio_input):
raise ValueError("Input is a directory, not a file.")
temp_path = audio_input
else:
raise ValueError("Unsupported audio input format.")
# Transcribe the saved audio file
transcription = transcribe_long_audio(temp_path, transcriber, chunk_duration=30)
summary = summarizer(transcription, max_length=50, min_length=10, do_sample=False)[0]["summary_text"]
# Cleanup old files
cleanup_output_dir()
return transcription, summary, temp_path
except Exception as e:
print(f"Error in process_audio: {e}")
return f"Error processing audio: {e}", "", ""
def stop_microphone():
"""Simulate stopping the microphone."""
print("Microphone stopped.")
return "Microphone stopped. Recording session has ended."
with gr.Blocks() as interface:
with gr.Row():
with gr.Column():
# Enable recording or file upload
audio_input = gr.Audio(type="numpy", label="Record or Upload Audio")
process_button = gr.Button("Process Audio")
stop_button = gr.Button("Stop Recording")
with gr.Column():
transcription_output = gr.Textbox(label="Full Transcription", lines=10)
summary_output = gr.Textbox(label="Summary", lines=5)
audio_output = gr.Audio(label="Playback Processed Audio")
process_button.click(
process_audio,
inputs=[audio_input],
outputs=[transcription_output, summary_output, audio_output]
)
stop_button.click(
stop_microphone,
inputs=[],
outputs=[]
)
interface.launch(share=False)
if __name__ == "__main__":
main()
|