File size: 7,979 Bytes
dac6bda
00f0540
dac6bda
 
 
 
 
00f0540
dac6bda
 
00f0540
dac6bda
4312094
dac6bda
 
 
 
00f0540
 
 
dac6bda
 
 
 
00f0540
 
 
 
 
 
 
dac6bda
 
 
 
 
 
 
 
 
 
 
 
 
 
00f0540
 
 
dac6bda
 
 
 
 
 
 
00f0540
dac6bda
 
 
00f0540
 
 
 
 
dac6bda
 
 
 
 
 
 
4312094
dac6bda
4312094
dac6bda
2b8d628
 
4312094
 
dac6bda
4312094
 
 
 
 
 
dac6bda
4312094
1fb905d
 
4312094
 
1fb905d
 
dac6bda
4312094
2b8d628
1fb905d
4312094
dac6bda
4312094
2b8d628
1fb905d
4312094
dac6bda
1fb905d
 
4312094
dac6bda
 
 
00f0540
dac6bda
 
 
 
 
 
 
 
 
 
 
 
 
 
00f0540
dac6bda
00f0540
dac6bda
00f0540
 
 
dac6bda
 
 
 
 
 
 
 
 
00f0540
 
dac6bda
 
 
00f0540
dac6bda
 
 
 
 
 
 
 
 
 
 
 
00f0540
dac6bda
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
48f04c5
 
dac6bda
 
00f0540
dac6bda
48f04c5
 
 
 
 
dac6bda
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
import gradio as gr
import assemblyai as aai
from google import generativeai
import os
from pydub import AudioSegment

# Initialize API clients
ASSEMBLYAI_API_KEY = os.getenv("ASSEMBLYAI_API_KEY")
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")

aai.settings.api_key = ASSEMBLYAI_API_KEY
generativeai.configure(api_key=GOOGLE_API_KEY)
model = generativeai.GenerativeModel("gemini-exp-1206")


def format_timestamp(seconds):
    """Convert seconds to HH:MM:SS format"""
    h = int(seconds) // 3600
    m = (int(seconds) % 3600) // 60
    s = int(seconds) % 60
    return f"{h:02d}:{m:02d}:{s:02d}"


def get_transcript(audio_path):
    """Get transcript from AssemblyAI with speaker diarization"""
    config = aai.TranscriptionConfig(speaker_labels=True, language_code="en")

    transcriber = aai.Transcriber()
    transcript = transcriber.transcribe(audio_path, config=config)

    return transcript.utterances


def format_transcript(utterances):
    """Format transcript into readable text with speaker labels"""
    formatted_sections = []
    current_speaker = None
    current_text = []
    current_start = None

    for utterance in utterances:
        # If this is a new speaker
        if current_speaker != utterance.speaker:
            # Write out the previous section if it exists
            if current_text:
                # Convert milliseconds to seconds for timestamp
                timestamp = format_timestamp(float(current_start) / 1000)
                section = f"Speaker {current_speaker} {timestamp}\n\n{' '.join(current_text).strip()}"
                formatted_sections.append(section)
                current_text = []

            # Start new section
            current_speaker = utterance.speaker
            current_start = utterance.start

        current_text.append(utterance.text.strip())

    # Add the final section
    if current_text:
        # Convert milliseconds to seconds for timestamp
        timestamp = format_timestamp(float(current_start) / 1000)
        section = (
            f"Speaker {current_speaker} {timestamp}\n\n{' '.join(current_text).strip()}"
        )
        formatted_sections.append(section)

    return "\n\n".join(formatted_sections)


def enhance_transcript(chunk_text, audio_segment):
    """Enhance transcript using Gemini AI with both text and audio"""
    prompt = """You are an expert transcript editor. Your task is to enhance this transcript for maximum readability while maintaining the core message.

IMPORTANT: Respond ONLY with the enhanced transcript. Do not include any explanations, headers, or phrases like "Here is the transcript."

Note: Below you'll find an auto-generated transcript that may help with speaker identification, but focus on creating your own high-quality transcript from the audio.

Please:
1. Fix speaker attribution errors, especially at segment boundaries. Watch for incomplete thoughts that were likely from the previous speaker.

2. Optimize for readability over verbatim accuracy:
   - Remove filler words (um, uh, like, you know)
   - Eliminate false starts and repetitions
   - Convert rambling sentences into clear, concise statements
   - Break up run-on sentences into shorter ones
   - Maintain natural conversation flow while improving clarity

3. Format the output consistently:
   - Keep the "Speaker X 00:00:00" format (no brackets, no other formatting)
   - Add TWO line breaks between speaker/timestamp and the text
   - Use proper punctuation and capitalization
   - Add paragraph breaks for topic changes
   - When you add paragraph breaks between the same speaker's remarks, no need to restate the speaker attribution
   - Preserve distinct speaker turns

Example input:
Speaker A 00:01:15

Um, yeah, so like, what I was thinking was, you know, when we look at the data, the data shows us that, uh, there's this pattern, this pattern that keeps coming up again and again in the results.

Example output:
Speaker A 00:01:15

When we look at the data, we see a consistent pattern in the results.

And when we examine the second part of the analysis, it reveals a completely different finding.

Enhance the following transcript, starting directly with the speaker format:
"""

    response = model.generate_content(
        [prompt, chunk_text, {"mime_type": "audio/mp3", "data": audio_segment.read()}]
    )
    return response.text


def create_chunks(utterances, target_tokens=7500):
    """Create chunks of utterances that fit within token limits"""
    chunks = []
    current_chunk = []
    current_start = None
    current_end = None

    for utterance in utterances:
        # Start new chunk if this is first utterance
        if not current_chunk:
            current_start = float(utterance.start) / 1000  # Convert ms to seconds
            current_chunk = [utterance]
            current_end = float(utterance.end) / 1000  # Convert ms to seconds
        # Check if adding this utterance would exceed token limit
        elif (
            len(" ".join(u.text for u in current_chunk)) + len(utterance.text)
        ) / 4 > target_tokens:
            # Save current chunk and start new one
            chunks.append(
                {
                    "utterances": current_chunk,
                    "start": current_start,
                    "end": current_end,
                }
            )
            current_chunk = [utterance]
            current_start = float(utterance.start) / 1000
            current_end = float(utterance.end) / 1000
        else:
            # Add to current chunk
            current_chunk.append(utterance)
            current_end = float(utterance.end) / 1000

    # Add final chunk
    if current_chunk:
        chunks.append(
            {"utterances": current_chunk, "start": current_start, "end": current_end}
        )

    return chunks


def process_audio(audio_path):
    """Main processing pipeline"""
    print("Stage 1: Getting raw transcript from AssemblyAI...")
    transcript_data = get_transcript(audio_path)

    print("Stage 2: Processing in chunks...")
    chunks = create_chunks(transcript_data)
    original_chunks = []
    enhanced_chunks = []

    for i, chunk in enumerate(chunks):
        # Get original chunk
        chunk_text = format_transcript(chunk["utterances"])
        original_chunks.append(chunk_text)

        # Process enhanced version
        print(f"Processing chunk {i+1} of {len(chunks)}...")
        audio_segment = get_audio_segment(audio_path, chunk["start"], chunk["end"])
        enhanced_chunk = enhance_transcript(chunk_text, audio_segment)
        enhanced_chunks.append(enhanced_chunk)

    return "\n".join(original_chunks), "\n".join(enhanced_chunks)


def handle_upload(audio):
    """Handle Gradio interface uploads"""
    if audio is None:
        return "Please upload an audio file.", "Please upload an audio file."

    try:
        original, enhanced = process_audio(audio)
        return original, enhanced
    except Exception as e:
        error_msg = f"Error processing audio: {str(e)}"
        return error_msg, error_msg


def get_audio_segment(audio_path, start_time, end_time):
    """Extract audio segment between start and end times"""
    audio = AudioSegment.from_file(audio_path)
    start_ms = int(float(start_time) * 1000)
    end_ms = int(float(end_time) * 1000)
    return audio[start_ms:end_ms].export(format="mp3")


# Create Gradio interface
iface = gr.Interface(
    fn=handle_upload,
    inputs=gr.Audio(type="filepath"),
    outputs=[
        gr.Textbox(label="Original Transcript", container=False),
        gr.Textbox(label="Enhanced Transcript", container=False),
    ],
    title="Audio Transcript Enhancement",
    description="Upload an MP3 file to get both the original and enhanced transcripts using AssemblyAI and Gemini.",
    cache_examples=False,
    allow_flagging="never",
    theme=gr.themes.Default(
        spacing_size="sm",
        text_size="sm",
    ),
)

if __name__ == "__main__":
    iface.launch()