File size: 10,796 Bytes
3e576d0
aead542
 
 
 
 
d23d879
00f0540
dac6bda
 
327dd62
aead542
0fd2cd9
 
d23d879
327dd62
 
aead542
 
 
 
 
73f2e8d
 
327dd62
aead542
 
 
0fd2cd9
73f2e8d
 
 
 
327dd62
 
aead542
 
327dd62
aead542
 
14562e6
aead542
dac6bda
aead542
 
 
0fd2cd9
 
 
 
 
 
d23d879
 
 
 
 
 
 
 
 
 
dac6bda
0fd2cd9
aead542
 
0fd2cd9
aead542
d23d879
 
 
 
 
 
aead542
327dd62
0fd2cd9
d23d879
0fd2cd9
aead542
d23d879
 
 
 
 
 
 
 
 
aead542
 
0fd2cd9
 
 
aead542
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0fd2cd9
aead542
0fd2cd9
 
73f2e8d
 
0fd2cd9
 
 
 
73f2e8d
0fd2cd9
 
 
 
 
 
 
 
 
 
 
 
 
73f2e8d
0fd2cd9
 
 
aead542
73f2e8d
d23d879
 
 
 
 
 
 
 
 
 
0fd2cd9
d23d879
 
 
 
0fd2cd9
d23d879
 
 
 
0fd2cd9
91be0ad
 
 
 
 
d23d879
 
91be0ad
 
d23d879
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
91be0ad
 
 
 
 
 
 
dac6bda
 
0fd2cd9
aead542
d23d879
 
 
0fd2cd9
73f2e8d
0fd2cd9
327dd62
73f2e8d
0fd2cd9
73f2e8d
 
0fd2cd9
 
aead542
0fd2cd9
 
91be0ad
 
d23d879
aead542
dac6bda
 
aead542
0fd2cd9
 
 
73f2e8d
0fd2cd9
 
 
 
 
 
73f2e8d
 
0fd2cd9
73f2e8d
 
 
 
d23d879
91be0ad
73f2e8d
 
0fd2cd9
 
 
73f2e8d
0fd2cd9
91be0ad
0fd2cd9
91be0ad
 
0fd2cd9
73f2e8d
 
0fd2cd9
 
73f2e8d
 
 
 
 
 
dac6bda
 
91be0ad
 
 
 
 
 
 
dac6bda
3e576d0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
import argparse
from dataclasses import dataclass
from pathlib import Path
import json
import hashlib
import os
from typing import List, Tuple, Iterator
import assemblyai as aai
from google import generativeai
from pydub import AudioSegment
import asyncio
import io
from multiprocessing import Pool
from functools import partial
from itertools import groupby


@dataclass
class Utterance:
    """A single utterance from a speaker"""
    speaker: str
    text: str
    start: int  # timestamp in ms from AssemblyAI
    end: int    # timestamp in ms from AssemblyAI

    @property
    def timestamp(self) -> str:
        """Format start time as HH:MM:SS"""
        seconds = int(self.start // 1000)
        hours = seconds // 3600
        minutes = (seconds % 3600) // 60
        seconds = seconds % 60
        return f"{hours:02d}:{minutes:02d}:{seconds:02d}"


class Transcriber:
    """Handles getting and caching transcripts from AssemblyAI"""

    def __init__(self, api_key: str):
        aai.settings.api_key = api_key
        self.cache_dir = Path("output/transcripts/.cache")
        self.cache_dir.mkdir(parents=True, exist_ok=True)

    def get_transcript(self, audio_path: Path) -> List[Utterance]:
        """Get transcript, using cache if available"""
        cache_file = self.cache_dir / f"{audio_path.stem}.json"
        
        if cache_file.exists():
            with open(cache_file) as f:
                data = json.load(f)
                if data["hash"] == self._get_file_hash(audio_path):
                    print("Using cached AssemblyAI transcript...")
                    # Create proper Utterance objects from cached data
                    return [
                        Utterance(
                            speaker=u["speaker"],
                            text=u["text"],
                            start=u["start"],
                            end=u["end"]
                        )
                        for u in data["utterances"]
                    ]

        print("Getting new transcript from AssemblyAI...")
        config = aai.TranscriptionConfig(speaker_labels=True, language_code="en")
        transcript = aai.Transcriber().transcribe(str(audio_path), config=config)
        
        utterances = [
            Utterance(
                speaker=u.speaker,
                text=u.text,
                start=u.start,
                end=u.end
            )
            for u in transcript.utterances
        ]
        
        # Cache the raw utterance data
        cache_data = {
            "hash": self._get_file_hash(audio_path),
            "utterances": [
                {
                    "speaker": u.speaker,
                    "text": u.text,
                    "start": u.start,
                    "end": u.end
                }
                for u in utterances
            ]
        }
        with open(cache_file, "w") as f:
            json.dump(cache_data, f, indent=2)
            
        return utterances

    def _get_file_hash(self, file_path: Path) -> str:
        """Calculate MD5 hash of a file"""
        hash_md5 = hashlib.md5()
        with open(file_path, "rb") as f:
            for chunk in iter(lambda: f.read(4096), b""):
                hash_md5.update(chunk)
        return hash_md5.hexdigest()


class Enhancer:
    """Handles enhancing transcripts using Gemini"""

    def __init__(self, api_key: str):
        generativeai.configure(api_key=api_key)
        self.model = generativeai.GenerativeModel("gemini-exp-1206")
        self.prompt = Path("prompts/enhance.txt").read_text()

    async def enhance_chunks(self, chunks: List[Tuple[str, io.BytesIO]]) -> List[str]:
        """Enhance multiple transcript chunks concurrently with concurrency control"""
        print(f"Enhancing {len(chunks)} chunks...")
        
        # Create a semaphore to limit concurrent requests
        semaphore = asyncio.Semaphore(3)  # Allow up to 3 concurrent requests
        
        async def process_chunk(i: int, chunk: Tuple[str, io.BytesIO]) -> str:
            text, audio = chunk
            async with semaphore:
                audio.seek(0)
                response = await self.model.generate_content_async(
                    [self.prompt, text, {"mime_type": "audio/mp3", "data": audio.read()}]
                )
                print(f"Completed chunk {i+1}/{len(chunks)}")
                return response.text

        # Create tasks for all chunks and run them concurrently
        tasks = [
            process_chunk(i, chunk) 
            for i, chunk in enumerate(chunks)
        ]
        
        # Wait for all tasks to complete
        results = await asyncio.gather(*tasks)
        return results


@dataclass
class SpeakerDialogue:
    """Represents a continuous section of speech from a single speaker"""
    speaker: str
    utterances: List[Utterance]
    
    @property
    def start(self) -> int:
        """Start time of first utterance"""
        return self.utterances[0].start
    
    @property
    def end(self) -> int:
        """End time of last utterance"""
        return self.utterances[-1].end
    
    @property
    def timestamp(self) -> str:
        """Format start time as HH:MM:SS"""
        return self.utterances[0].timestamp
    
    def format(self, markdown: bool = False) -> str:
        """Format this dialogue as text with newlines between utterances
        Args:
            markdown: If True, add markdown formatting for speaker and timestamp
        """
        texts = [u.text + "\n\n" for u in self.utterances]  # Add two newlines after each utterance
        combined_text = ''.join(texts).rstrip()  # Remove trailing whitespace at the end
        if markdown:
            return f"**Speaker {self.speaker}** *{self.timestamp}*\n\n{combined_text}"
        return f"Speaker {self.speaker} {self.timestamp}\n\n{combined_text}"


def group_utterances_by_speaker(utterances: List[Utterance]) -> Iterator[SpeakerDialogue]:
    """Group consecutive utterances by the same speaker"""
    for speaker, group in groupby(utterances, key=lambda u: u.speaker):
        yield SpeakerDialogue(speaker=speaker, utterances=list(group))


def estimate_tokens(text: str, chars_per_token: int = 4) -> int:
    """
    Estimate number of tokens in text
    Args:
        text: The text to estimate tokens for
        chars_per_token: Estimated characters per token (default 4)
    """
    return (len(text) + chars_per_token - 1) // chars_per_token


def chunk_dialogues(
    dialogues: Iterator[SpeakerDialogue], 
    max_tokens: int = 2000, 
    chars_per_token: int = 4
) -> List[List[SpeakerDialogue]]:
    """
    Split dialogues into chunks that fit within token limit
    Args:
        dialogues: Iterator of SpeakerDialogues
        max_tokens: Maximum tokens per chunk
        chars_per_token: Estimated characters per token (default 4)
    """
    chunks = []
    current_chunk = []
    current_text = ""
    
    for dialogue in dialogues:
        # Format this dialogue
        formatted = dialogue.format()
        
        # If adding this dialogue would exceed token limit, start new chunk
        new_text = current_text + "\n\n" + formatted if current_text else formatted
        if current_chunk and estimate_tokens(new_text, chars_per_token) > max_tokens:
            chunks.append(current_chunk)
            current_chunk = [dialogue]
            current_text = formatted
        else:
            current_chunk.append(dialogue)
            current_text = new_text
    
    if current_chunk:
        chunks.append(current_chunk)
    
    return chunks


def format_chunk(dialogues: List[SpeakerDialogue], markdown: bool = False) -> str:
    """Format a chunk of dialogues into readable text
    Args:
        dialogues: List of dialogues to format
        markdown: If True, add markdown formatting for speaker and timestamp
    """
    return "\n\n".join(dialogue.format(markdown=markdown) for dialogue in dialogues)


def prepare_audio_chunks(audio_path: Path, utterances: List[Utterance]) -> List[Tuple[str, io.BytesIO]]:
    """Prepare audio chunks and their corresponding text"""
    # Group utterances by speaker and split into chunks
    dialogues = group_utterances_by_speaker(utterances)
    chunks = chunk_dialogues(dialogues)
    print(f"Preparing {len(chunks)} audio segments...")
    
    # Load audio once
    audio = AudioSegment.from_file(audio_path)
    
    # Process each chunk
    prepared = []
    for chunk in chunks:
        # Extract just the needed segment
        segment = audio[chunk[0].start:chunk[-1].end]
        buffer = io.BytesIO()
        # Use lower quality MP3 for faster processing
        segment.export(buffer, format="mp3", parameters=["-q:a", "9"])
        # Use non-markdown format for Gemini
        prepared.append((format_chunk(chunk, markdown=False), buffer))
    
    return prepared


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("audio_file", help="Audio file to transcribe")
    args = parser.parse_args()
    
    audio_path = Path(args.audio_file)
    if not audio_path.exists():
        raise FileNotFoundError(f"File not found: {audio_path}")
        
    out_dir = Path("output/transcripts")
    out_dir.mkdir(parents=True, exist_ok=True)
    
    try:
        # Get transcript
        transcriber = Transcriber(os.getenv("ASSEMBLYAI_API_KEY"))
        utterances = transcriber.get_transcript(audio_path)
        
        # Save original transcript
        dialogues = list(group_utterances_by_speaker(utterances))  # Convert iterator to list
        original = format_chunk(dialogues, markdown=True)  # Use markdown for final output
        (out_dir / "autogenerated-transcript.md").write_text(original)
        
        # Enhance transcript
        enhancer = Enhancer(os.getenv("GOOGLE_API_KEY"))
        chunks = prepare_audio_chunks(audio_path, utterances)
        enhanced = asyncio.run(enhancer.enhance_chunks(chunks))
        
        # Save enhanced transcript with markdown
        merged = "\n\n".join(chunk.strip() for chunk in enhanced)
        # Apply markdown formatting to the final enhanced transcript
        merged = apply_markdown_formatting(merged)
        (out_dir / "transcript.md").write_text(merged)
        
        print("\nTranscripts saved to:")
        print(f"- {out_dir}/autogenerated-transcript.md")
        print(f"- {out_dir}/transcript.md")
        
    except Exception as e:
        print(f"Error: {e}")
        return 1
    
    return 0


def apply_markdown_formatting(text: str) -> str:
    """Apply markdown formatting to speaker and timestamp in the transcript"""
    import re
    pattern = r"(Speaker \w+) (\d{2}:\d{2}:\d{2})"
    return re.sub(pattern, r"**\1** *\2*", text)


if __name__ == "__main__":
    main()