|
import gradio as gr |
|
import assemblyai as aai |
|
from google import generativeai |
|
import os |
|
from pydub import AudioSegment |
|
|
|
|
|
ASSEMBLYAI_API_KEY = os.getenv("ASSEMBLYAI_API_KEY") |
|
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") |
|
|
|
aai.settings.api_key = ASSEMBLYAI_API_KEY |
|
generativeai.configure(api_key=GOOGLE_API_KEY) |
|
model = generativeai.GenerativeModel("gemini-exp-1206") |
|
|
|
|
|
def format_timestamp(seconds): |
|
"""Convert seconds to HH:MM:SS format""" |
|
h = int(seconds) // 3600 |
|
m = (int(seconds) % 3600) // 60 |
|
s = int(seconds) % 60 |
|
return f"{h:02d}:{m:02d}:{s:02d}" |
|
|
|
|
|
def get_transcript(audio_path): |
|
"""Get transcript from AssemblyAI with speaker diarization""" |
|
config = aai.TranscriptionConfig(speaker_labels=True, language_code="en") |
|
|
|
transcriber = aai.Transcriber() |
|
transcript = transcriber.transcribe(audio_path, config=config) |
|
|
|
return transcript.utterances |
|
|
|
|
|
def format_transcript(utterances): |
|
"""Format transcript into readable text with speaker labels""" |
|
formatted_sections = [] |
|
current_speaker = None |
|
current_text = [] |
|
current_start = None |
|
|
|
for utterance in utterances: |
|
|
|
if current_speaker != utterance.speaker: |
|
|
|
if current_text: |
|
|
|
timestamp = format_timestamp(float(current_start) / 1000) |
|
section = f"Speaker {current_speaker} {timestamp}\n\n{' '.join(current_text).strip()}" |
|
formatted_sections.append(section) |
|
current_text = [] |
|
|
|
|
|
current_speaker = utterance.speaker |
|
current_start = utterance.start |
|
|
|
current_text.append(utterance.text.strip()) |
|
|
|
|
|
if current_text: |
|
|
|
timestamp = format_timestamp(float(current_start) / 1000) |
|
section = ( |
|
f"Speaker {current_speaker} {timestamp}\n\n{' '.join(current_text).strip()}" |
|
) |
|
formatted_sections.append(section) |
|
|
|
return "\n\n".join(formatted_sections) |
|
|
|
|
|
def enhance_transcript(chunk_text, audio_segment): |
|
"""Enhance transcript using Gemini AI with both text and audio""" |
|
prompt = """You are an expert transcript editor. Your task is to enhance this transcript for maximum readability while maintaining the core message. |
|
|
|
IMPORTANT: Respond ONLY with the enhanced transcript. Do not include any explanations, headers, or phrases like "Here is the transcript." |
|
|
|
Note: Below you'll find an auto-generated transcript that may help with speaker identification, but focus on creating your own high-quality transcript from the audio. |
|
|
|
Please: |
|
1. Fix speaker attribution errors, especially at segment boundaries. Watch for incomplete thoughts that were likely from the previous speaker. |
|
|
|
2. Optimize for readability over verbatim accuracy: |
|
- Remove filler words (um, uh, like, you know) |
|
- Eliminate false starts and repetitions |
|
- Convert rambling sentences into clear, concise statements |
|
- Break up run-on sentences into shorter ones |
|
- Maintain natural conversation flow while improving clarity |
|
|
|
3. Format the output consistently: |
|
- Keep the "Speaker X 00:00:00" format (no brackets, no other formatting) |
|
- Add TWO line breaks between speaker/timestamp and the text |
|
- Use proper punctuation and capitalization |
|
- Add paragraph breaks for topic changes |
|
- When you add paragraph breaks between the same speaker's remarks, no need to restate the speaker attribution |
|
- Preserve distinct speaker turns |
|
|
|
Example input: |
|
Speaker A 00:01:15 |
|
|
|
Um, yeah, so like, what I was thinking was, you know, when we look at the data, the data shows us that, uh, there's this pattern, this pattern that keeps coming up again and again in the results. |
|
|
|
Example output: |
|
Speaker A 00:01:15 |
|
|
|
When we look at the data, we see a consistent pattern in the results. |
|
|
|
And when we examine the second part of the analysis, it reveals a completely different finding. |
|
|
|
Enhance the following transcript, starting directly with the speaker format: |
|
""" |
|
|
|
response = model.generate_content( |
|
[prompt, chunk_text, {"mime_type": "audio/mp3", "data": audio_segment.read()}] |
|
) |
|
return response.text |
|
|
|
|
|
def create_chunks(utterances, target_tokens=7500): |
|
"""Create chunks of utterances that fit within token limits""" |
|
chunks = [] |
|
current_chunk = [] |
|
current_start = None |
|
current_end = None |
|
|
|
for utterance in utterances: |
|
|
|
if not current_chunk: |
|
current_start = float(utterance.start) / 1000 |
|
current_chunk = [utterance] |
|
current_end = float(utterance.end) / 1000 |
|
|
|
elif ( |
|
len(" ".join(u.text for u in current_chunk)) + len(utterance.text) |
|
) / 4 > target_tokens: |
|
|
|
chunks.append( |
|
{ |
|
"utterances": current_chunk, |
|
"start": current_start, |
|
"end": current_end, |
|
} |
|
) |
|
current_chunk = [utterance] |
|
current_start = float(utterance.start) / 1000 |
|
current_end = float(utterance.end) / 1000 |
|
else: |
|
|
|
current_chunk.append(utterance) |
|
current_end = float(utterance.end) / 1000 |
|
|
|
|
|
if current_chunk: |
|
chunks.append( |
|
{"utterances": current_chunk, "start": current_start, "end": current_end} |
|
) |
|
|
|
return chunks |
|
|
|
|
|
def process_audio(audio_path): |
|
"""Main processing pipeline""" |
|
print("Stage 1: Getting raw transcript from AssemblyAI...") |
|
transcript_data = get_transcript(audio_path) |
|
|
|
print("Stage 2: Processing in chunks...") |
|
chunks = create_chunks(transcript_data) |
|
original_chunks = [] |
|
enhanced_chunks = [] |
|
|
|
for i, chunk in enumerate(chunks): |
|
|
|
chunk_text = format_transcript(chunk["utterances"]) |
|
original_chunks.append(chunk_text) |
|
|
|
|
|
print(f"Processing chunk {i+1} of {len(chunks)}...") |
|
audio_segment = get_audio_segment(audio_path, chunk["start"], chunk["end"]) |
|
enhanced_chunk = enhance_transcript(chunk_text, audio_segment) |
|
enhanced_chunks.append(enhanced_chunk) |
|
|
|
return "\n".join(original_chunks), "\n".join(enhanced_chunks) |
|
|
|
|
|
def handle_upload(audio): |
|
"""Handle Gradio interface uploads""" |
|
if audio is None: |
|
return "Please upload an audio file.", "Please upload an audio file." |
|
|
|
try: |
|
original, enhanced = process_audio(audio) |
|
return original, enhanced |
|
except Exception as e: |
|
error_msg = f"Error processing audio: {str(e)}" |
|
return error_msg, error_msg |
|
|
|
|
|
def get_audio_segment(audio_path, start_time, end_time): |
|
"""Extract audio segment between start and end times""" |
|
audio = AudioSegment.from_file(audio_path) |
|
start_ms = int(float(start_time) * 1000) |
|
end_ms = int(float(end_time) * 1000) |
|
return audio[start_ms:end_ms].export(format="mp3") |
|
|
|
|
|
|
|
iface = gr.Interface( |
|
fn=handle_upload, |
|
inputs=gr.Audio(type="filepath"), |
|
outputs=[ |
|
gr.Textbox(label="Original Transcript", container=False), |
|
gr.Textbox(label="Enhanced Transcript", container=False), |
|
], |
|
title="Audio Transcript Enhancement", |
|
description="Upload an MP3 file to get both the original and enhanced transcripts using AssemblyAI and Gemini.", |
|
cache_examples=False, |
|
allow_flagging="never", |
|
theme=gr.themes.Default( |
|
spacing_size="sm", |
|
text_size="sm", |
|
), |
|
) |
|
|
|
if __name__ == "__main__": |
|
iface.launch() |
|
|