dwarkesh commited on
Commit
327dd62
·
1 Parent(s): 3e576d0

parallel processing

Browse files
Files changed (2) hide show
  1. .gitignore +3 -1
  2. transcript.py +206 -89
.gitignore CHANGED
@@ -1,2 +1,4 @@
1
  transcript.md
2
- autogenerated-transcript.md
 
 
 
1
  transcript.md
2
+ autogenerated-transcript.md
3
+ transcripts/
4
+ transcripts/.cache/
transcript.py CHANGED
@@ -4,6 +4,13 @@ from google import generativeai
4
  import os
5
  from pydub import AudioSegment
6
  import concurrent.futures
 
 
 
 
 
 
 
7
 
8
  # Suppress gRPC shutdown warnings
9
  os.environ["GRPC_PYTHON_LOG_LEVEL"] = "error"
@@ -16,6 +23,49 @@ aai.settings.api_key = ASSEMBLYAI_API_KEY
16
  generativeai.configure(api_key=GOOGLE_API_KEY)
17
  model = generativeai.GenerativeModel("gemini-exp-1206")
18
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
19
 
20
  def format_timestamp(seconds):
21
  """Convert seconds to HH:MM:SS format"""
@@ -71,54 +121,52 @@ def format_transcript(utterances):
71
  return "\n\n".join(formatted_sections)
72
 
73
 
74
- def enhance_transcript(chunk_text, audio_segment):
75
- """Enhance transcript using Gemini AI with both text and audio"""
76
- prompt = """You are an expert transcript editor. Your task is to enhance this transcript for maximum readability while maintaining the core message.
77
-
78
- IMPORTANT: Respond ONLY with the enhanced transcript. Do not include any explanations, headers, or phrases like "Here is the transcript."
79
-
80
- Note: Below you'll find an auto-generated transcript that may help with speaker identification, but focus on creating your own high-quality transcript from the audio.
81
-
82
- Please:
83
- 1. Fix speaker attribution errors, especially at segment boundaries. Watch for incomplete thoughts that were likely from the previous speaker.
84
-
85
- 2. Optimize AGGRESSIVELY for readability over verbatim accuracy:
86
- - Readability is the most important thing!!
87
- - Remove ALL conversational artifacts (yeah, so, I mean, etc.)
88
- - Remove ALL filler words (um, uh, like, you know)
89
- - Remove false starts and self-corrections completely
90
- - Remove redundant phrases and hesitations
91
- - Convert any indirect or rambling responses into direct statements
92
- - Break up run-on sentences into clear, concise statements
93
- - Maintain natural conversation flow while prioritizing clarity and directness
94
-
95
- 3. Format the output consistently:
96
- - Keep the "Speaker X 00:00:00" format (no brackets, no other formatting)
97
- - Add TWO line breaks between speaker/timestamp and the text
98
- - Use proper punctuation and capitalization
99
- - Add paragraph breaks for topic changes
100
- - When you add paragraph breaks between the same speaker's remarks, no need to restate the speaker attribution
101
- - Preserve distinct speaker turns
102
-
103
- Example input:
104
- Speaker A 00:01:15
105
-
106
- Um, yeah, so like, what I was thinking was, you know, when we look at the data, the data shows us that, uh, there's this pattern, this pattern that keeps coming up again and again in the results.
107
-
108
- Example output:
109
- Speaker A 00:01:15
110
 
111
- When we look at the data, we see a consistent pattern in the results.
112
 
113
- When we examine the second part of the analysis, it reveals a completely different finding.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
114
 
115
- Enhance the following transcript, starting directly with the speaker format:
116
- """
 
117
 
118
- response = model.generate_content(
119
- [prompt, chunk_text, {"mime_type": "audio/mp3", "data": audio_segment.read()}]
120
- )
121
- return response.text
122
 
123
 
124
  def create_chunks(utterances, target_tokens=2000):
@@ -163,66 +211,135 @@ def create_chunks(utterances, target_tokens=2000):
163
  return chunks
164
 
165
 
166
- def process_chunk(chunk_data):
167
- """Process a single chunk with Gemini"""
168
- audio_path, chunk = chunk_data
169
- chunk_text = format_transcript(chunk["utterances"])
170
- audio_segment = get_audio_segment(audio_path, chunk["start"], chunk["end"])
171
- return enhance_transcript(chunk_text, audio_segment)
172
-
 
 
173
 
174
- def process_audio(audio_path):
175
- """Main processing pipeline"""
176
- print("Stage 1: Getting raw transcript from AssemblyAI...")
177
- transcript_data = get_transcript(audio_path)
178
 
179
- print("Stage 2: Processing in chunks...")
 
180
  chunks = create_chunks(transcript_data)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
181
 
182
- # Get original transcript
183
- original_chunks = [format_transcript(chunk["utterances"]) for chunk in chunks]
184
- original_transcript = "\n".join(original_chunks)
 
 
185
 
186
- # Process enhanced versions in parallel
187
- print(f"Stage 3: Enhancing {len(chunks)} chunks in parallel...")
188
- chunk_data = [(audio_path, chunk) for chunk in chunks]
 
 
189
 
190
- # Use max_workers=None to allow as many threads as needed
191
- with concurrent.futures.ThreadPoolExecutor(max_workers=None) as executor:
192
- # Submit all tasks and store with their original indices
193
- future_to_index = {
194
- executor.submit(process_chunk, data): i for i, data in enumerate(chunk_data)
195
- }
196
 
197
- # Create a list to store results in order
198
- enhanced_chunks = [None] * len(chunks)
 
199
 
200
- # Process results as they complete
201
- for future in concurrent.futures.as_completed(future_to_index):
202
- index = future_to_index[future]
203
- print(f"Completed chunk {index + 1}/{len(chunks)}")
204
- enhanced_chunks[index] = future.result()
205
 
206
- enhanced_transcript = "\n".join(enhanced_chunks)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
207
 
208
  # Write transcripts to files
209
- with open("autogenerated-transcript.md", "w", encoding="utf-8") as f:
210
  f.write(original_transcript)
211
 
212
- with open("transcript.md", "w", encoding="utf-8") as f:
213
- f.write(enhanced_transcript)
214
 
215
  print("\nTranscripts have been saved to:")
216
- print("- autogenerated-transcript.md")
217
- print("- transcript.md")
218
-
219
-
220
- def get_audio_segment(audio_path, start_time, end_time):
221
- """Extract audio segment between start and end times"""
222
- audio = AudioSegment.from_file(audio_path)
223
- start_ms = int(float(start_time) * 1000)
224
- end_ms = int(float(end_time) * 1000)
225
- return audio[start_ms:end_ms].export(format="mp3")
226
 
227
 
228
  def main():
 
4
  import os
5
  from pydub import AudioSegment
6
  import concurrent.futures
7
+ import io
8
+ import time
9
+ import asyncio
10
+ from typing import List, Tuple
11
+ import json
12
+ import hashlib
13
+ from pathlib import Path
14
 
15
  # Suppress gRPC shutdown warnings
16
  os.environ["GRPC_PYTHON_LOG_LEVEL"] = "error"
 
23
  generativeai.configure(api_key=GOOGLE_API_KEY)
24
  model = generativeai.GenerativeModel("gemini-exp-1206")
25
 
26
+ # Define the prompt template
27
+ prompt = """You are an expert transcript editor. Your task is to enhance this transcript for maximum readability while maintaining the core message.
28
+
29
+ IMPORTANT: Respond ONLY with the enhanced transcript. Do not include any explanations, headers, or phrases like "Here is the transcript."
30
+
31
+ Note: Below you'll find an auto-generated transcript that may help with speaker identification, but focus on creating your own high-quality transcript from the audio.
32
+
33
+ Please:
34
+ 1. Fix speaker attribution errors, especially at segment boundaries. Watch for incomplete thoughts that were likely from the previous speaker.
35
+
36
+ 2. Optimize AGGRESSIVELY for readability over verbatim accuracy:
37
+ - Readability is the most important thing!!
38
+ - Remove ALL conversational artifacts (yeah, so, I mean, etc.)
39
+ - Remove ALL filler words (um, uh, like, you know)
40
+ - Remove false starts and self-corrections completely
41
+ - Remove redundant phrases and hesitations
42
+ - Convert any indirect or rambling responses into direct statements
43
+ - Break up run-on sentences into clear, concise statements
44
+ - Maintain natural conversation flow while prioritizing clarity and directness
45
+
46
+ 3. Format the output consistently:
47
+ - Keep the "Speaker X 00:00:00" format (no brackets, no other formatting)
48
+ - Add TWO line breaks between speaker/timestamp and the text
49
+ - Use proper punctuation and capitalization
50
+ - Add paragraph breaks for topic changes
51
+ - When you add paragraph breaks between the same speaker's remarks, no need to restate the speaker attribution
52
+ - Preserve distinct speaker turns
53
+
54
+ Example input:
55
+ Speaker A 00:01:15
56
+
57
+ Um, yeah, so like, what I was thinking was, you know, when we look at the data, the data shows us that, uh, there's this pattern, this pattern that keeps coming up again and again in the results.
58
+
59
+ Example output:
60
+ Speaker A 00:01:15
61
+
62
+ When we look at the data, we see a consistent pattern in the results.
63
+
64
+ When we examine the second part of the analysis, it reveals a completely different finding.
65
+
66
+ Enhance the following transcript, starting directly with the speaker format:
67
+ """
68
+
69
 
70
  def format_timestamp(seconds):
71
  """Convert seconds to HH:MM:SS format"""
 
121
  return "\n\n".join(formatted_sections)
122
 
123
 
124
+ async def enhance_transcript_async(chunk_text: str, audio_segment: io.BytesIO) -> str:
125
+ """Enhance transcript using Gemini AI asynchronously"""
126
+ audio_segment.seek(0) # Ensure we're at the start of the buffer
127
+ response = await model.generate_content_async(
128
+ [
129
+ prompt,
130
+ chunk_text,
131
+ {
132
+ "mime_type": "audio/mp3",
133
+ "data": audio_segment.read(),
134
+ },
135
+ ]
136
+ )
137
+ return response.text
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
138
 
 
139
 
140
+ async def process_chunks_async(
141
+ prepared_chunks: List[Tuple[str, io.BytesIO]]
142
+ ) -> List[str]:
143
+ """Process all chunks in parallel using async API"""
144
+ enhancement_tasks = []
145
+ for chunk_text, audio_segment in prepared_chunks:
146
+ task = enhance_transcript_async(chunk_text, audio_segment)
147
+ enhancement_tasks.append(task)
148
+
149
+ print(f"Processing {len(enhancement_tasks)} chunks in parallel...")
150
+ start_time = time.time()
151
+
152
+ enhanced_chunks = []
153
+ for i, future in enumerate(asyncio.as_completed(enhancement_tasks), 1):
154
+ try:
155
+ result = await future
156
+ processing_time = time.time() - start_time
157
+ print(
158
+ f"Completed chunk {i}/{len(enhancement_tasks)} in {processing_time:.2f} seconds"
159
+ )
160
+ enhanced_chunks.append(result)
161
+ except Exception as e:
162
+ print(f"Error processing chunk {i}: {str(e)}")
163
+ enhanced_chunks.append(None)
164
 
165
+ total_time = time.time() - start_time
166
+ print(f"\nTotal enhancement time: {total_time:.2f} seconds")
167
+ print(f"Average time per chunk: {total_time/len(enhancement_tasks):.2f} seconds")
168
 
169
+ return enhanced_chunks
 
 
 
170
 
171
 
172
  def create_chunks(utterances, target_tokens=2000):
 
211
  return chunks
212
 
213
 
214
+ def get_audio_segment(audio_path, start_time, end_time):
215
+ """Extract audio segment between start and end times and return bytes"""
216
+ audio = AudioSegment.from_file(audio_path)
217
+ start_ms = int(float(start_time) * 1000)
218
+ end_ms = int(float(end_time) * 1000)
219
+ buffer = io.BytesIO()
220
+ audio[start_ms:end_ms].export(buffer, format="mp3")
221
+ buffer.seek(0)
222
+ return buffer
223
 
 
 
 
 
224
 
225
+ def prepare_chunks(audio_path, transcript_data):
226
+ """Prepare chunks with their audio segments upfront"""
227
  chunks = create_chunks(transcript_data)
228
+ prepared_chunks = []
229
+
230
+ print(f"Preparing {len(chunks)} audio segments...")
231
+ start_time = time.time()
232
+ for i, chunk in enumerate(chunks, 1):
233
+ chunk_text = format_transcript(chunk["utterances"])
234
+ audio_segment = get_audio_segment(audio_path, chunk["start"], chunk["end"])
235
+ # Ensure the buffer is at the start for each use
236
+ audio_segment.seek(0)
237
+ prepared_chunks.append((chunk_text, audio_segment))
238
+ print(f"Prepared audio segment {i}/{len(chunks)}")
239
+
240
+ print(f"Audio preparation took {time.time() - start_time:.2f} seconds")
241
+ return prepared_chunks
242
+
243
+
244
+ def get_file_hash(file_path: str) -> str:
245
+ """Calculate MD5 hash of a file"""
246
+ hash_md5 = hashlib.md5()
247
+ with open(file_path, "rb") as f:
248
+ for chunk in iter(lambda: f.read(4096), b""):
249
+ hash_md5.update(chunk)
250
+ return hash_md5.hexdigest()
251
+
252
+
253
+ def get_cached_transcript(audio_path: str) -> List[dict]:
254
+ """Get transcript from cache if available and valid"""
255
+ audio_hash = get_file_hash(audio_path)
256
+ cache_dir = Path("transcripts/.cache")
257
+ cache_file = cache_dir / f"{Path(audio_path).stem}.json"
258
+
259
+ if cache_file.exists():
260
+ with open(cache_file) as f:
261
+ cached_data = json.load(f)
262
+ if cached_data.get("hash") == audio_hash:
263
+ print("Using cached AssemblyAI transcript...")
264
+ return cached_data["utterances"]
265
+
266
+ return None
267
+
268
 
269
+ def save_transcript_cache(audio_path: str, utterances: List) -> None:
270
+ """Save transcript data to cache"""
271
+ audio_hash = get_file_hash(audio_path)
272
+ cache_dir = Path("transcripts/.cache")
273
+ cache_dir.mkdir(parents=True, exist_ok=True)
274
 
275
+ # Convert utterances to JSON-serializable format
276
+ utterances_data = [
277
+ {"speaker": u.speaker, "text": u.text, "start": u.start, "end": u.end}
278
+ for u in utterances
279
+ ]
280
 
281
+ cache_data = {"hash": audio_hash, "utterances": utterances_data}
 
 
 
 
 
282
 
283
+ cache_file = cache_dir / f"{Path(audio_path).stem}.json"
284
+ with open(cache_file, "w") as f:
285
+ json.dump(cache_data, f, indent=2)
286
 
 
 
 
 
 
287
 
288
+ def process_audio(audio_path):
289
+ """Main processing pipeline"""
290
+ print("Stage 1: Getting transcript from AssemblyAI...")
291
+
292
+ # Try to get cached transcript first
293
+ cached_utterances = get_cached_transcript(audio_path)
294
+
295
+ if cached_utterances:
296
+ # Convert cached data back to utterance-like objects
297
+ class Utterance:
298
+ def __init__(self, data):
299
+ self.speaker = data["speaker"]
300
+ self.text = data["text"]
301
+ self.start = data["start"]
302
+ self.end = data["end"]
303
+
304
+ transcript_data = [Utterance(u) for u in cached_utterances]
305
+ else:
306
+ # Get new transcript from AssemblyAI
307
+ config = aai.TranscriptionConfig(speaker_labels=True, language_code="en")
308
+ transcriber = aai.Transcriber()
309
+ transcript = transcriber.transcribe(audio_path, config=config)
310
+ transcript_data = transcript.utterances
311
+
312
+ # Save to cache
313
+ save_transcript_cache(audio_path, transcript_data)
314
+
315
+ print("Preparing audio segments...")
316
+ chunks = create_chunks(transcript_data)
317
+ prepared_chunks = prepare_chunks(audio_path, transcript_data)
318
+
319
+ # Get original transcript for saving
320
+ original_transcript = "\n".join(
321
+ format_transcript(chunk["utterances"]) for chunk in chunks
322
+ )
323
+
324
+ os.makedirs("transcripts", exist_ok=True)
325
+
326
+ print("\nStage 2: Enhancing chunks with Gemini...")
327
+ # Run async enhancement in an event loop
328
+ enhanced_chunks = asyncio.run(process_chunks_async(prepared_chunks))
329
+
330
+ # Filter out any failed chunks
331
+ enhanced_chunks = [chunk for chunk in enhanced_chunks if chunk is not None]
332
 
333
  # Write transcripts to files
334
+ with open("transcripts/autogenerated-transcript.md", "w", encoding="utf-8") as f:
335
  f.write(original_transcript)
336
 
337
+ with open("transcripts/transcript.md", "w", encoding="utf-8") as f:
338
+ f.write("\n".join(enhanced_chunks))
339
 
340
  print("\nTranscripts have been saved to:")
341
+ print("- transcripts/autogenerated-transcript.md")
342
+ print("- transcripts/transcript.md")
 
 
 
 
 
 
 
 
343
 
344
 
345
  def main():