refactored code
Browse files- prompt.txt +40 -0
- transcript.py +183 -308
prompt.txt
ADDED
@@ -0,0 +1,40 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
You are an expert transcript editor. Your task is to enhance this transcript for maximum readability while maintaining the core message.
|
2 |
+
|
3 |
+
IMPORTANT: Respond ONLY with the enhanced transcript. Do not include any explanations, headers, or phrases like "Here is the transcript."
|
4 |
+
|
5 |
+
Note: Below you'll find an auto-generated transcript that may help with speaker identification, but focus on creating your own high-quality transcript from the audio.
|
6 |
+
|
7 |
+
Please:
|
8 |
+
1. Fix speaker attribution errors, especially at segment boundaries. Watch for incomplete thoughts that were likely from the previous speaker.
|
9 |
+
|
10 |
+
2. Optimize AGGRESSIVELY for readability over verbatim accuracy:
|
11 |
+
- Readability is the most important thing!!
|
12 |
+
- Remove ALL conversational artifacts (yeah, so, I mean, etc.)
|
13 |
+
- Remove ALL filler words (um, uh, like, you know)
|
14 |
+
- Remove false starts and self-corrections completely
|
15 |
+
- Remove redundant phrases and hesitations
|
16 |
+
- Convert any indirect or rambling responses into direct statements
|
17 |
+
- Break up run-on sentences into clear, concise statements
|
18 |
+
- Maintain natural conversation flow while prioritizing clarity and directness
|
19 |
+
|
20 |
+
3. Format the output consistently:
|
21 |
+
- Keep the "Speaker X 00:00:00" format (no brackets, no other formatting)
|
22 |
+
- Add TWO line breaks between speaker/timestamp and the text
|
23 |
+
- Use proper punctuation and capitalization
|
24 |
+
- Add paragraph breaks for topic changes
|
25 |
+
- When you add paragraph breaks between the same speaker's remarks, no need to restate the speaker attribution
|
26 |
+
- Preserve distinct speaker turns
|
27 |
+
|
28 |
+
Example input:
|
29 |
+
Speaker A 00:01:15
|
30 |
+
|
31 |
+
Um, yeah, so like, what I was thinking was, you know, when we look at the data, the data shows us that, uh, there's this pattern, this pattern that keeps coming up again and again in the results.
|
32 |
+
|
33 |
+
Example output:
|
34 |
+
Speaker A 00:01:15
|
35 |
+
|
36 |
+
When we look at the data, we see a consistent pattern in the results.
|
37 |
+
|
38 |
+
When we examine the second part of the analysis, it reveals a completely different finding.
|
39 |
+
|
40 |
+
Enhance the following transcript, starting directly with the speaker format:
|
transcript.py
CHANGED
@@ -1,363 +1,238 @@
|
|
1 |
import argparse
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
2 |
import assemblyai as aai
|
3 |
from google import generativeai
|
4 |
-
import os
|
5 |
from pydub import AudioSegment
|
6 |
-
import concurrent.futures
|
7 |
-
import io
|
8 |
-
import time
|
9 |
import asyncio
|
10 |
-
|
11 |
-
import json
|
12 |
-
import hashlib
|
13 |
-
from pathlib import Path
|
14 |
-
|
15 |
-
# Suppress gRPC shutdown warnings
|
16 |
-
os.environ["GRPC_PYTHON_LOG_LEVEL"] = "error"
|
17 |
-
|
18 |
-
# Initialize API clients
|
19 |
-
ASSEMBLYAI_API_KEY = os.getenv("ASSEMBLYAI_API_KEY")
|
20 |
-
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
|
21 |
-
|
22 |
-
aai.settings.api_key = ASSEMBLYAI_API_KEY
|
23 |
-
generativeai.configure(api_key=GOOGLE_API_KEY)
|
24 |
-
model = generativeai.GenerativeModel("gemini-exp-1206")
|
25 |
-
|
26 |
-
# Define the prompt template
|
27 |
-
prompt = """You are an expert transcript editor. Your task is to enhance this transcript for maximum readability while maintaining the core message.
|
28 |
-
|
29 |
-
IMPORTANT: Respond ONLY with the enhanced transcript. Do not include any explanations, headers, or phrases like "Here is the transcript."
|
30 |
-
|
31 |
-
Note: Below you'll find an auto-generated transcript that may help with speaker identification, but focus on creating your own high-quality transcript from the audio.
|
32 |
-
|
33 |
-
Please:
|
34 |
-
1. Fix speaker attribution errors, especially at segment boundaries. Watch for incomplete thoughts that were likely from the previous speaker.
|
35 |
-
|
36 |
-
2. Optimize AGGRESSIVELY for readability over verbatim accuracy:
|
37 |
-
- Readability is the most important thing!!
|
38 |
-
- Remove ALL conversational artifacts (yeah, so, I mean, etc.)
|
39 |
-
- Remove ALL filler words (um, uh, like, you know)
|
40 |
-
- Remove false starts and self-corrections completely
|
41 |
-
- Remove redundant phrases and hesitations
|
42 |
-
- Convert any indirect or rambling responses into direct statements
|
43 |
-
- Break up run-on sentences into clear, concise statements
|
44 |
-
- Maintain natural conversation flow while prioritizing clarity and directness
|
45 |
-
|
46 |
-
3. Format the output consistently:
|
47 |
-
- Keep the "Speaker X 00:00:00" format (no brackets, no other formatting)
|
48 |
-
- Add TWO line breaks between speaker/timestamp and the text
|
49 |
-
- Use proper punctuation and capitalization
|
50 |
-
- Add paragraph breaks for topic changes
|
51 |
-
- When you add paragraph breaks between the same speaker's remarks, no need to restate the speaker attribution
|
52 |
-
- Preserve distinct speaker turns
|
53 |
|
54 |
-
Example input:
|
55 |
-
Speaker A 00:01:15
|
56 |
|
57 |
-
|
|
|
|
|
58 |
|
59 |
-
|
60 |
-
|
|
|
|
|
61 |
|
62 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
63 |
|
64 |
-
When we examine the second part of the analysis, it reveals a completely different finding.
|
65 |
|
66 |
-
|
67 |
-
"""
|
68 |
|
|
|
|
|
|
|
|
|
69 |
|
70 |
-
def
|
71 |
-
|
72 |
-
|
73 |
-
|
74 |
-
|
75 |
-
|
76 |
|
|
|
|
|
77 |
|
78 |
-
def
|
79 |
-
|
80 |
-
|
|
|
|
|
81 |
|
82 |
-
|
83 |
-
|
|
|
|
|
84 |
|
85 |
-
|
86 |
|
|
|
|
|
|
|
|
|
87 |
|
88 |
-
|
89 |
-
|
90 |
-
|
91 |
-
current_speaker = None
|
92 |
-
current_text = []
|
93 |
-
current_start = None
|
94 |
-
|
95 |
-
for utterance in utterances:
|
96 |
-
# If this is a new speaker
|
97 |
-
if current_speaker != utterance.speaker:
|
98 |
-
# Write out the previous section if it exists
|
99 |
-
if current_text:
|
100 |
-
# Convert milliseconds to seconds for timestamp
|
101 |
-
timestamp = format_timestamp(float(current_start) / 1000)
|
102 |
-
section = f"Speaker {current_speaker} {timestamp}\n\n{' '.join(current_text).strip()}"
|
103 |
-
formatted_sections.append(section)
|
104 |
-
current_text = []
|
105 |
-
|
106 |
-
# Start new section
|
107 |
-
current_speaker = utterance.speaker
|
108 |
-
current_start = utterance.start
|
109 |
-
|
110 |
-
current_text.append(utterance.text.strip())
|
111 |
-
|
112 |
-
# Add the final section
|
113 |
-
if current_text:
|
114 |
-
# Convert milliseconds to seconds for timestamp
|
115 |
-
timestamp = format_timestamp(float(current_start) / 1000)
|
116 |
-
section = (
|
117 |
-
f"Speaker {current_speaker} {timestamp}\n\n{' '.join(current_text).strip()}"
|
118 |
-
)
|
119 |
-
formatted_sections.append(section)
|
120 |
-
|
121 |
-
return "\n\n".join(formatted_sections)
|
122 |
-
|
123 |
-
|
124 |
-
async def enhance_transcript_async(chunk_text: str, audio_segment: io.BytesIO) -> str:
|
125 |
-
"""Enhance transcript using Gemini AI asynchronously"""
|
126 |
-
audio_segment.seek(0) # Ensure we're at the start of the buffer
|
127 |
-
response = await model.generate_content_async(
|
128 |
-
[
|
129 |
-
prompt,
|
130 |
-
chunk_text,
|
131 |
-
{
|
132 |
-
"mime_type": "audio/mp3",
|
133 |
-
"data": audio_segment.read(),
|
134 |
-
},
|
135 |
]
|
136 |
-
)
|
137 |
-
return response.text
|
138 |
-
|
139 |
-
|
140 |
-
async def process_chunks_async(
|
141 |
-
prepared_chunks: List[Tuple[str, io.BytesIO]]
|
142 |
-
) -> List[str]:
|
143 |
-
"""Process all chunks in parallel using async API"""
|
144 |
-
enhancement_tasks = []
|
145 |
-
for chunk_text, audio_segment in prepared_chunks:
|
146 |
-
task = enhance_transcript_async(chunk_text, audio_segment)
|
147 |
-
enhancement_tasks.append(task)
|
148 |
-
|
149 |
-
print(f"Processing {len(enhancement_tasks)} chunks in parallel...")
|
150 |
-
start_time = time.time()
|
151 |
-
|
152 |
-
enhanced_chunks = []
|
153 |
-
for i, future in enumerate(asyncio.as_completed(enhancement_tasks), 1):
|
154 |
-
try:
|
155 |
-
result = await future
|
156 |
-
processing_time = time.time() - start_time
|
157 |
-
print(
|
158 |
-
f"Completed chunk {i}/{len(enhancement_tasks)} in {processing_time:.2f} seconds"
|
159 |
-
)
|
160 |
-
enhanced_chunks.append(result)
|
161 |
-
except Exception as e:
|
162 |
-
print(f"Error processing chunk {i}: {str(e)}")
|
163 |
-
enhanced_chunks.append(None)
|
164 |
-
|
165 |
-
total_time = time.time() - start_time
|
166 |
-
print(f"\nTotal enhancement time: {total_time:.2f} seconds")
|
167 |
-
print(f"Average time per chunk: {total_time/len(enhancement_tasks):.2f} seconds")
|
168 |
|
169 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
170 |
|
171 |
|
172 |
-
def
|
173 |
-
|
|
|
|
|
174 |
chunks = []
|
175 |
-
|
176 |
-
|
177 |
-
|
178 |
-
|
179 |
-
|
180 |
-
|
181 |
-
|
182 |
-
|
183 |
-
|
184 |
-
|
185 |
-
# Check if adding this utterance would exceed token limit
|
186 |
-
elif (
|
187 |
-
len(" ".join(u.text for u in current_chunk)) + len(utterance.text)
|
188 |
-
) / 4 > target_tokens:
|
189 |
-
# Save current chunk and start new one
|
190 |
-
chunks.append(
|
191 |
-
{
|
192 |
-
"utterances": current_chunk,
|
193 |
-
"start": current_start,
|
194 |
-
"end": current_end,
|
195 |
-
}
|
196 |
-
)
|
197 |
-
current_chunk = [utterance]
|
198 |
-
current_start = float(utterance.start) / 1000
|
199 |
-
current_end = float(utterance.end) / 1000
|
200 |
else:
|
201 |
-
|
202 |
-
|
203 |
-
current_end = float(utterance.end) / 1000
|
204 |
|
205 |
# Add final chunk
|
206 |
-
if
|
207 |
-
chunks.append(
|
208 |
-
{"utterances": current_chunk, "start": current_start, "end": current_end}
|
209 |
-
)
|
210 |
-
|
211 |
-
return chunks
|
212 |
-
|
213 |
|
214 |
-
|
215 |
-
"""Extract audio segment between start and end times and return bytes"""
|
216 |
audio = AudioSegment.from_file(audio_path)
|
217 |
-
|
218 |
-
end_ms = int(float(end_time) * 1000)
|
219 |
-
buffer = io.BytesIO()
|
220 |
-
audio[start_ms:end_ms].export(buffer, format="mp3")
|
221 |
-
buffer.seek(0)
|
222 |
-
return buffer
|
223 |
-
|
224 |
-
|
225 |
-
def prepare_chunks(audio_path, transcript_data):
|
226 |
-
"""Prepare chunks with their audio segments upfront"""
|
227 |
-
chunks = create_chunks(transcript_data)
|
228 |
-
prepared_chunks = []
|
229 |
|
230 |
print(f"Preparing {len(chunks)} audio segments...")
|
231 |
-
|
232 |
-
|
233 |
-
|
234 |
-
|
235 |
-
|
236 |
-
audio_segment.seek(0)
|
237 |
-
prepared_chunks.append((chunk_text, audio_segment))
|
238 |
-
print(f"Prepared audio segment {i}/{len(chunks)}")
|
239 |
-
|
240 |
-
print(f"Audio preparation took {time.time() - start_time:.2f} seconds")
|
241 |
-
return prepared_chunks
|
242 |
-
|
243 |
-
|
244 |
-
def get_file_hash(file_path: str) -> str:
|
245 |
-
"""Calculate MD5 hash of a file"""
|
246 |
-
hash_md5 = hashlib.md5()
|
247 |
-
with open(file_path, "rb") as f:
|
248 |
-
for chunk in iter(lambda: f.read(4096), b""):
|
249 |
-
hash_md5.update(chunk)
|
250 |
-
return hash_md5.hexdigest()
|
251 |
-
|
252 |
-
|
253 |
-
def get_cached_transcript(audio_path: str) -> List[dict]:
|
254 |
-
"""Get transcript from cache if available and valid"""
|
255 |
-
audio_hash = get_file_hash(audio_path)
|
256 |
-
cache_dir = Path("transcripts/.cache")
|
257 |
-
cache_file = cache_dir / f"{Path(audio_path).stem}.json"
|
258 |
-
|
259 |
-
if cache_file.exists():
|
260 |
-
with open(cache_file) as f:
|
261 |
-
cached_data = json.load(f)
|
262 |
-
if cached_data.get("hash") == audio_hash:
|
263 |
-
print("Using cached AssemblyAI transcript...")
|
264 |
-
return cached_data["utterances"]
|
265 |
-
|
266 |
-
return None
|
267 |
|
|
|
|
|
268 |
|
269 |
-
|
270 |
-
"""Save transcript data to cache"""
|
271 |
-
audio_hash = get_file_hash(audio_path)
|
272 |
-
cache_dir = Path("transcripts/.cache")
|
273 |
-
cache_dir.mkdir(parents=True, exist_ok=True)
|
274 |
|
275 |
-
|
276 |
-
utterances_data = [
|
277 |
-
{"speaker": u.speaker, "text": u.text, "start": u.start, "end": u.end}
|
278 |
-
for u in utterances
|
279 |
-
]
|
280 |
|
281 |
-
cache_data = {"hash": audio_hash, "utterances": utterances_data}
|
282 |
-
|
283 |
-
cache_file = cache_dir / f"{Path(audio_path).stem}.json"
|
284 |
-
with open(cache_file, "w") as f:
|
285 |
-
json.dump(cache_data, f, indent=2)
|
286 |
|
|
|
|
|
|
|
|
|
|
|
287 |
|
288 |
-
|
289 |
-
|
290 |
-
|
|
|
|
|
|
|
|
|
|
|
291 |
|
292 |
-
|
293 |
-
|
|
|
|
|
294 |
|
295 |
-
|
296 |
-
# Convert cached data back to utterance-like objects
|
297 |
-
class Utterance:
|
298 |
-
def __init__(self, data):
|
299 |
-
self.speaker = data["speaker"]
|
300 |
-
self.text = data["text"]
|
301 |
-
self.start = data["start"]
|
302 |
-
self.end = data["end"]
|
303 |
|
304 |
-
transcript_data = [Utterance(u) for u in cached_utterances]
|
305 |
-
else:
|
306 |
-
# Get new transcript from AssemblyAI
|
307 |
-
config = aai.TranscriptionConfig(speaker_labels=True, language_code="en")
|
308 |
-
transcriber = aai.Transcriber()
|
309 |
-
transcript = transcriber.transcribe(audio_path, config=config)
|
310 |
-
transcript_data = transcript.utterances
|
311 |
|
312 |
-
|
313 |
-
|
|
|
|
|
314 |
|
315 |
-
|
316 |
-
|
317 |
-
|
|
|
318 |
|
319 |
-
#
|
320 |
-
|
321 |
-
|
322 |
-
)
|
323 |
|
324 |
-
|
|
|
|
|
325 |
|
326 |
-
|
327 |
-
|
328 |
-
enhanced_chunks = asyncio.run(process_chunks_async(prepared_chunks))
|
329 |
|
330 |
-
#
|
331 |
-
|
|
|
332 |
|
333 |
-
#
|
334 |
-
|
335 |
-
|
336 |
|
337 |
-
|
338 |
-
|
339 |
|
340 |
-
print("\nTranscripts
|
341 |
print("- transcripts/autogenerated-transcript.md")
|
342 |
print("- transcripts/transcript.md")
|
343 |
|
344 |
|
345 |
-
def main():
|
346 |
-
parser = argparse.ArgumentParser(
|
347 |
-
description="Generate enhanced transcripts from audio files"
|
348 |
-
)
|
349 |
-
parser.add_argument("audio_file", help="Path to the audio file to transcribe")
|
350 |
-
args = parser.parse_args()
|
351 |
-
|
352 |
-
if not os.path.exists(args.audio_file):
|
353 |
-
print(f"Error: File '{args.audio_file}' not found")
|
354 |
-
return
|
355 |
-
|
356 |
-
try:
|
357 |
-
process_audio(args.audio_file)
|
358 |
-
except Exception as e:
|
359 |
-
print(f"Error processing audio: {str(e)}")
|
360 |
-
|
361 |
-
|
362 |
if __name__ == "__main__":
|
363 |
main()
|
|
|
1 |
import argparse
|
2 |
+
from dataclasses import dataclass
|
3 |
+
from pathlib import Path
|
4 |
+
import json
|
5 |
+
import hashlib
|
6 |
+
import os
|
7 |
+
from typing import List, Optional
|
8 |
+
|
9 |
import assemblyai as aai
|
10 |
from google import generativeai
|
|
|
11 |
from pydub import AudioSegment
|
|
|
|
|
|
|
12 |
import asyncio
|
13 |
+
import io
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
14 |
|
|
|
|
|
15 |
|
16 |
+
@dataclass
|
17 |
+
class Utterance:
|
18 |
+
"""A single utterance from a speaker"""
|
19 |
|
20 |
+
speaker: str
|
21 |
+
text: str
|
22 |
+
start: int # milliseconds
|
23 |
+
end: int # milliseconds
|
24 |
|
25 |
+
@property
|
26 |
+
def timestamp(self) -> str:
|
27 |
+
"""Format start time as HH:MM:SS"""
|
28 |
+
seconds = self.start // 1000
|
29 |
+
h = seconds // 3600
|
30 |
+
m = (seconds % 3600) // 60
|
31 |
+
s = seconds % 60
|
32 |
+
return f"{h:02d}:{m:02d}:{s:02d}"
|
33 |
|
|
|
34 |
|
35 |
+
class Transcriber:
|
36 |
+
"""Handles getting and caching transcripts from AssemblyAI"""
|
37 |
|
38 |
+
def __init__(self, api_key: str):
|
39 |
+
aai.settings.api_key = api_key
|
40 |
+
self.cache_dir = Path("transcripts/.cache")
|
41 |
+
self.cache_dir.mkdir(parents=True, exist_ok=True)
|
42 |
|
43 |
+
def get_transcript(self, audio_path: Path) -> List[Utterance]:
|
44 |
+
"""Get transcript, using cache if available"""
|
45 |
+
cached = self._get_cached(audio_path)
|
46 |
+
if cached:
|
47 |
+
print("Using cached AssemblyAI transcript...")
|
48 |
+
return cached
|
49 |
|
50 |
+
print("Getting new transcript from AssemblyAI...")
|
51 |
+
return self._get_fresh(audio_path)
|
52 |
|
53 |
+
def _get_cached(self, audio_path: Path) -> Optional[List[Utterance]]:
|
54 |
+
"""Try to get transcript from cache"""
|
55 |
+
cache_file = self.cache_dir / f"{audio_path.stem}.json"
|
56 |
+
if not cache_file.exists():
|
57 |
+
return None
|
58 |
|
59 |
+
with open(cache_file) as f:
|
60 |
+
data = json.load(f)
|
61 |
+
if data["hash"] != self._get_file_hash(audio_path):
|
62 |
+
return None
|
63 |
|
64 |
+
return [Utterance(**u) for u in data["utterances"]]
|
65 |
|
66 |
+
def _get_fresh(self, audio_path: Path) -> List[Utterance]:
|
67 |
+
"""Get new transcript from AssemblyAI"""
|
68 |
+
config = aai.TranscriptionConfig(speaker_labels=True, language_code="en")
|
69 |
+
transcript = aai.Transcriber().transcribe(str(audio_path), config=config)
|
70 |
|
71 |
+
utterances = [
|
72 |
+
Utterance(speaker=u.speaker, text=u.text, start=u.start, end=u.end)
|
73 |
+
for u in transcript.utterances
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
74 |
]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
75 |
|
76 |
+
self._save_cache(audio_path, utterances)
|
77 |
+
return utterances
|
78 |
+
|
79 |
+
def _save_cache(self, audio_path: Path, utterances: List[Utterance]) -> None:
|
80 |
+
"""Save transcript to cache"""
|
81 |
+
cache_file = self.cache_dir / f"{audio_path.stem}.json"
|
82 |
+
data = {
|
83 |
+
"hash": self._get_file_hash(audio_path),
|
84 |
+
"utterances": [vars(u) for u in utterances],
|
85 |
+
}
|
86 |
+
with open(cache_file, "w") as f:
|
87 |
+
json.dump(data, f, indent=2)
|
88 |
+
|
89 |
+
def _get_file_hash(self, file_path: Path) -> str:
|
90 |
+
"""Calculate MD5 hash of a file"""
|
91 |
+
hash_md5 = hashlib.md5()
|
92 |
+
with open(file_path, "rb") as f:
|
93 |
+
for chunk in iter(lambda: f.read(4096), b""):
|
94 |
+
hash_md5.update(chunk)
|
95 |
+
return hash_md5.hexdigest()
|
96 |
+
|
97 |
+
|
98 |
+
class Enhancer:
|
99 |
+
"""Handles enhancing transcripts using Gemini"""
|
100 |
+
|
101 |
+
def __init__(self, api_key: str):
|
102 |
+
generativeai.configure(api_key=api_key)
|
103 |
+
self.model = generativeai.GenerativeModel("gemini-exp-1206")
|
104 |
+
|
105 |
+
# Load prompt template
|
106 |
+
prompt_path = Path(__file__).parent / "prompt.txt"
|
107 |
+
self.prompt = prompt_path.read_text()
|
108 |
+
|
109 |
+
async def enhance_chunks(self, chunks: List[tuple[str, io.BytesIO]]) -> List[str]:
|
110 |
+
"""Enhance multiple transcript chunks in parallel"""
|
111 |
+
tasks = [self._enhance_chunk(text, audio) for text, audio in chunks]
|
112 |
+
|
113 |
+
print(f"Enhancing {len(tasks)} chunks in parallel...")
|
114 |
+
results = []
|
115 |
+
for i, future in enumerate(asyncio.as_completed(tasks), 1):
|
116 |
+
try:
|
117 |
+
result = await future
|
118 |
+
results.append(result)
|
119 |
+
print(f"Completed chunk {i}/{len(tasks)}")
|
120 |
+
except Exception as e:
|
121 |
+
print(f"Error enhancing chunk {i}: {e}")
|
122 |
+
results.append(None)
|
123 |
+
|
124 |
+
return [r for r in results if r is not None]
|
125 |
+
|
126 |
+
async def _enhance_chunk(self, text: str, audio: io.BytesIO) -> str:
|
127 |
+
"""Enhance a single chunk"""
|
128 |
+
audio.seek(0)
|
129 |
+
response = await self.model.generate_content_async(
|
130 |
+
[self.prompt, text, {"mime_type": "audio/mp3", "data": audio.read()}]
|
131 |
+
)
|
132 |
+
return response.text
|
133 |
|
134 |
|
135 |
+
def prepare_audio_chunks(
|
136 |
+
audio_path: Path, utterances: List[Utterance]
|
137 |
+
) -> List[tuple[str, io.BytesIO]]:
|
138 |
+
"""Prepare audio chunks and their corresponding text"""
|
139 |
chunks = []
|
140 |
+
current = []
|
141 |
+
current_text = []
|
142 |
+
|
143 |
+
for u in utterances:
|
144 |
+
# Start new chunk if this is first utterance or would exceed token limit
|
145 |
+
if not current or len(" ".join(current_text)) > 8000: # ~2000 tokens
|
146 |
+
if current:
|
147 |
+
chunks.append((current[0].start, current[-1].end, current))
|
148 |
+
current = [u]
|
149 |
+
current_text = [u.text]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
150 |
else:
|
151 |
+
current.append(u)
|
152 |
+
current_text.append(u.text)
|
|
|
153 |
|
154 |
# Add final chunk
|
155 |
+
if current:
|
156 |
+
chunks.append((current[0].start, current[-1].end, current))
|
|
|
|
|
|
|
|
|
|
|
157 |
|
158 |
+
# Prepare audio segments and format text
|
|
|
159 |
audio = AudioSegment.from_file(audio_path)
|
160 |
+
prepared = []
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
161 |
|
162 |
print(f"Preparing {len(chunks)} audio segments...")
|
163 |
+
for start_ms, end_ms, utterances in chunks:
|
164 |
+
# Get audio segment
|
165 |
+
segment = audio[start_ms:end_ms]
|
166 |
+
buffer = io.BytesIO()
|
167 |
+
segment.export(buffer, format="mp3")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
168 |
|
169 |
+
# Format text
|
170 |
+
text = format_transcript(utterances)
|
171 |
|
172 |
+
prepared.append((text, buffer))
|
|
|
|
|
|
|
|
|
173 |
|
174 |
+
return prepared
|
|
|
|
|
|
|
|
|
175 |
|
|
|
|
|
|
|
|
|
|
|
176 |
|
177 |
+
def format_transcript(utterances: List[Utterance]) -> str:
|
178 |
+
"""Format utterances into readable text"""
|
179 |
+
sections = []
|
180 |
+
current_speaker = None
|
181 |
+
current_text = []
|
182 |
|
183 |
+
for u in utterances:
|
184 |
+
if current_speaker != u.speaker and current_text:
|
185 |
+
sections.append(
|
186 |
+
f"Speaker {current_speaker} {utterances[0].timestamp}\n\n{' '.join(current_text)}"
|
187 |
+
)
|
188 |
+
current_text = []
|
189 |
+
current_speaker = u.speaker
|
190 |
+
current_text.append(u.text)
|
191 |
|
192 |
+
if current_text:
|
193 |
+
sections.append(
|
194 |
+
f"Speaker {current_speaker} {utterances[0].timestamp}\n\n{' '.join(current_text)}"
|
195 |
+
)
|
196 |
|
197 |
+
return "\n\n".join(sections)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
198 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
199 |
|
200 |
+
def main():
|
201 |
+
parser = argparse.ArgumentParser()
|
202 |
+
parser.add_argument("audio_file", help="Audio file to transcribe")
|
203 |
+
args = parser.parse_args()
|
204 |
|
205 |
+
audio_path = Path(args.audio_file)
|
206 |
+
if not audio_path.exists():
|
207 |
+
print(f"Error: File not found: {audio_path}")
|
208 |
+
return
|
209 |
|
210 |
+
# Initialize services
|
211 |
+
transcriber = Transcriber(os.getenv("ASSEMBLYAI_API_KEY"))
|
212 |
+
enhancer = Enhancer(os.getenv("GOOGLE_API_KEY"))
|
|
|
213 |
|
214 |
+
# Create output directory
|
215 |
+
out_dir = Path("transcripts")
|
216 |
+
out_dir.mkdir(exist_ok=True)
|
217 |
|
218 |
+
# Get transcript
|
219 |
+
utterances = transcriber.get_transcript(audio_path)
|
|
|
220 |
|
221 |
+
# Save original transcript
|
222 |
+
original = format_transcript(utterances)
|
223 |
+
(out_dir / "autogenerated-transcript.md").write_text(original)
|
224 |
|
225 |
+
# Prepare and enhance chunks
|
226 |
+
chunks = prepare_audio_chunks(audio_path, utterances)
|
227 |
+
enhanced = asyncio.run(enhancer.enhance_chunks(chunks))
|
228 |
|
229 |
+
# Save enhanced transcript
|
230 |
+
(out_dir / "transcript.md").write_text("\n".join(enhanced))
|
231 |
|
232 |
+
print("\nTranscripts saved to:")
|
233 |
print("- transcripts/autogenerated-transcript.md")
|
234 |
print("- transcripts/transcript.md")
|
235 |
|
236 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
237 |
if __name__ == "__main__":
|
238 |
main()
|