File size: 5,312 Bytes
bd97be7
9fe4dba
b40af2a
d07525d
e88a1f3
 
82b4010
010e9c1
82b4010
 
 
 
 
bd97be7
e88a1f3
8057378
 
e88a1f3
8057378
e88a1f3
 
8057378
82b4010
8057378
190e895
82b4010
 
190e895
 
 
875dc71
 
 
190e895
875dc71
190e895
 
db3a36a
190e895
8057378
82b4010
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8057378
525ee37
 
5f36451
d88ec40
525ee37
 
d88ec40
525ee37
 
 
e88a1f3
525ee37
010e9c1
55aeb9b
 
 
 
82b4010
 
 
 
 
 
 
 
010e9c1
 
82b4010
 
 
 
 
 
525ee37
82b4010
 
 
 
 
525ee37
db3a36a
82b4010
781e9f1
55aeb9b
 
 
 
 
525ee37
 
 
190e895
e22e17f
525ee37
55aeb9b
525ee37
 
 
82b4010
b40af2a
525ee37
 
 
82b4010
781e9f1
 
55aeb9b
 
 
 
 
 
 
6befe57
525ee37
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import gradio as gr
import torch
from transformers import pipeline
import librosa
import soundfile as sf
import os
import uuid
import spaces  # Ensure spaces is imported

# Directory to save recorded audio files
OUTPUT_DIR = os.getenv("HF_HOME", ".")  # Use dynamic path or default to current directory
OUTPUT_DIR = os.path.join(OUTPUT_DIR, "recorded_audio_files")
os.makedirs(OUTPUT_DIR, exist_ok=True)

def split_audio(audio_data, sr, chunk_duration=30):
    """Split audio into chunks of chunk_duration seconds."""
    chunks = []
    for start in range(0, len(audio_data), int(chunk_duration * sr)):
        end = start + int(chunk_duration * sr)
        chunks.append(audio_data[start:end])
    return chunks

def transcribe_long_audio(audio_path, transcriber, chunk_duration=30):
    """Transcribe long audio by splitting into smaller chunks."""
    try:
        # Load the audio file
        audio_data, sr = librosa.load(audio_path, sr=None)
        chunks = split_audio(audio_data, sr, chunk_duration)
        transcriptions = []
        for i, chunk in enumerate(chunks):
            chunk_path = f"temp_chunk_{i}.wav"
            sf.write(chunk_path, chunk, sr)  # Save chunk as WAV
            transcription = transcriber(chunk_path)["text"]
            transcriptions.append(transcription)
            os.remove(chunk_path)  # Cleanup temp files
        return " ".join(transcriptions)
    except Exception as e:
        print(f"Error in transcribe_long_audio: {e}")
        return f"Error processing audio: {e}"

def cleanup_output_dir(max_storage_mb=500):
    """Remove old files if total directory size exceeds max_storage_mb."""
    try:
        total_size = sum(
            os.path.getsize(os.path.join(OUTPUT_DIR, f)) for f in os.listdir(OUTPUT_DIR)
        )
        if total_size > max_storage_mb * 1024 * 1024:
            files = sorted(
                (os.path.join(OUTPUT_DIR, f) for f in os.listdir(OUTPUT_DIR)),
                key=os.path.getctime,
            )
            for file in files:
                os.remove(file)
                total_size -= os.path.getsize(file)
                if total_size <= max_storage_mb * 1024 * 1024:
                    break
    except Exception as e:
        print(f"Error during cleanup: {e}")

@spaces.GPU(duration=3)
def main():
    device = 0 if torch.cuda.is_available() else -1

    try:
        transcriber = pipeline("automatic-speech-recognition", model="openai/whisper-base", device=device)
        summarizer = pipeline("summarization", model="facebook/bart-large-cnn")
    except Exception as e:
        print(f"Error loading models: {e}")
        raise

    def process_audio(audio_input):
        try:
            # Debug input type and content
            print(f"Input type: {type(audio_input)}, Input: {audio_input}")
            if audio_input is None:
                raise ValueError("No audio input received. Please record or upload an audio file.")

            if isinstance(audio_input, tuple):  # Recorded audio
                print("Handling recorded audio.")
                audio_data, sr = audio_input
                filename = f"recorded_audio_{uuid.uuid4().hex}.wav"
                temp_path = os.path.join(OUTPUT_DIR, filename)
                sf.write(temp_path, audio_data, sr)
            elif isinstance(audio_input, str):  # Uploaded file path
                print("Handling uploaded audio.")
                if os.path.isdir(audio_input):
                    raise ValueError("Input is a directory, not a file.")
                temp_path = audio_input
            else:
                raise ValueError("Unsupported audio input format.")

            # Transcribe the saved audio file
            transcription = transcribe_long_audio(temp_path, transcriber, chunk_duration=30)
            summary = summarizer(transcription, max_length=50, min_length=10, do_sample=False)[0]["summary_text"]

            # Cleanup old files
            cleanup_output_dir()

            return transcription, summary, temp_path
        except Exception as e:
            print(f"Error in process_audio: {e}")
            return f"Error processing audio: {e}", "", ""

    def stop_microphone():
        """Simulate stopping the microphone."""
        print("Microphone stopped.")
        return "Microphone stopped. Recording session has ended."

    with gr.Blocks() as interface:
        with gr.Row():
            with gr.Column():
                # Enable recording or file upload
                audio_input = gr.Audio(type="numpy", label="Record or Upload Audio")
                process_button = gr.Button("Process Audio")
                stop_button = gr.Button("Stop Recording")
            with gr.Column():
                transcription_output = gr.Textbox(label="Full Transcription", lines=10)
                summary_output = gr.Textbox(label="Summary", lines=5)
                audio_output = gr.Audio(label="Playback Processed Audio")

        process_button.click(
            process_audio,
            inputs=[audio_input],
            outputs=[transcription_output, summary_output, audio_output]
        )

        stop_button.click(
            stop_microphone,
            inputs=[],
            outputs=[]
        )

    interface.launch(share=False)

if __name__ == "__main__":
    main()