transcript improvements
Browse files- scripts/transcript.py +114 -43
scripts/transcript.py
CHANGED
@@ -4,7 +4,7 @@ from pathlib import Path
|
|
4 |
import json
|
5 |
import hashlib
|
6 |
import os
|
7 |
-
from typing import List, Tuple
|
8 |
import assemblyai as aai
|
9 |
from google import generativeai
|
10 |
from pydub import AudioSegment
|
@@ -12,6 +12,7 @@ import asyncio
|
|
12 |
import io
|
13 |
from multiprocessing import Pool
|
14 |
from functools import partial
|
|
|
15 |
|
16 |
|
17 |
@dataclass
|
@@ -49,21 +50,43 @@ class Transcriber:
|
|
49 |
data = json.load(f)
|
50 |
if data["hash"] == self._get_file_hash(audio_path):
|
51 |
print("Using cached AssemblyAI transcript...")
|
52 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
53 |
|
54 |
print("Getting new transcript from AssemblyAI...")
|
55 |
config = aai.TranscriptionConfig(speaker_labels=True, language_code="en")
|
56 |
transcript = aai.Transcriber().transcribe(str(audio_path), config=config)
|
57 |
|
58 |
utterances = [
|
59 |
-
Utterance(
|
|
|
|
|
|
|
|
|
|
|
60 |
for u in transcript.utterances
|
61 |
]
|
62 |
|
63 |
-
# Cache the
|
64 |
cache_data = {
|
65 |
"hash": self._get_file_hash(audio_path),
|
66 |
-
"utterances": [
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
67 |
}
|
68 |
with open(cache_file, "w") as f:
|
69 |
json.dump(cache_data, f, indent=2)
|
@@ -115,49 +138,96 @@ class Enhancer:
|
|
115 |
return results
|
116 |
|
117 |
|
118 |
-
|
119 |
-
|
120 |
-
|
121 |
-
|
122 |
-
|
|
|
|
|
|
|
|
|
|
|
123 |
|
124 |
-
|
125 |
-
|
126 |
-
|
127 |
-
|
128 |
-
current_speaker = u.speaker
|
129 |
-
current_texts = []
|
130 |
-
current_texts.append(u.text)
|
131 |
|
132 |
-
|
133 |
-
|
|
|
|
|
134 |
|
135 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
136 |
|
137 |
|
138 |
def prepare_audio_chunks(audio_path: Path, utterances: List[Utterance]) -> List[Tuple[str, io.BytesIO]]:
|
139 |
"""Prepare audio chunks and their corresponding text"""
|
140 |
-
|
141 |
-
|
142 |
-
|
143 |
-
text_length = 0
|
144 |
-
|
145 |
-
for u in utterances:
|
146 |
-
new_length = text_length + len(u.text)
|
147 |
-
if current and new_length > max_tokens:
|
148 |
-
chunks.append(current)
|
149 |
-
current = [u]
|
150 |
-
text_length = len(u.text)
|
151 |
-
else:
|
152 |
-
current.append(u)
|
153 |
-
text_length = new_length
|
154 |
-
|
155 |
-
if current:
|
156 |
-
chunks.append(current)
|
157 |
-
return chunks
|
158 |
-
|
159 |
-
# Split utterances into chunks
|
160 |
-
chunks = chunk_utterances(utterances)
|
161 |
print(f"Preparing {len(chunks)} audio segments...")
|
162 |
|
163 |
# Load audio once
|
@@ -172,7 +242,7 @@ def prepare_audio_chunks(audio_path: Path, utterances: List[Utterance]) -> List[
|
|
172 |
# Use lower quality MP3 for faster processing
|
173 |
segment.export(buffer, format="mp3", parameters=["-q:a", "9"])
|
174 |
prepared.append((format_chunk(chunk), buffer))
|
175 |
-
|
176 |
return prepared
|
177 |
|
178 |
|
@@ -194,7 +264,8 @@ def main():
|
|
194 |
utterances = transcriber.get_transcript(audio_path)
|
195 |
|
196 |
# Save original transcript
|
197 |
-
|
|
|
198 |
(out_dir / "autogenerated-transcript.md").write_text(original)
|
199 |
|
200 |
# Enhance transcript
|
|
|
4 |
import json
|
5 |
import hashlib
|
6 |
import os
|
7 |
+
from typing import List, Tuple, Iterator
|
8 |
import assemblyai as aai
|
9 |
from google import generativeai
|
10 |
from pydub import AudioSegment
|
|
|
12 |
import io
|
13 |
from multiprocessing import Pool
|
14 |
from functools import partial
|
15 |
+
from itertools import groupby
|
16 |
|
17 |
|
18 |
@dataclass
|
|
|
50 |
data = json.load(f)
|
51 |
if data["hash"] == self._get_file_hash(audio_path):
|
52 |
print("Using cached AssemblyAI transcript...")
|
53 |
+
# Create proper Utterance objects from cached data
|
54 |
+
return [
|
55 |
+
Utterance(
|
56 |
+
speaker=u["speaker"],
|
57 |
+
text=u["text"],
|
58 |
+
start=u["start"],
|
59 |
+
end=u["end"]
|
60 |
+
)
|
61 |
+
for u in data["utterances"]
|
62 |
+
]
|
63 |
|
64 |
print("Getting new transcript from AssemblyAI...")
|
65 |
config = aai.TranscriptionConfig(speaker_labels=True, language_code="en")
|
66 |
transcript = aai.Transcriber().transcribe(str(audio_path), config=config)
|
67 |
|
68 |
utterances = [
|
69 |
+
Utterance(
|
70 |
+
speaker=u.speaker,
|
71 |
+
text=u.text,
|
72 |
+
start=u.start,
|
73 |
+
end=u.end
|
74 |
+
)
|
75 |
for u in transcript.utterances
|
76 |
]
|
77 |
|
78 |
+
# Cache the raw utterance data
|
79 |
cache_data = {
|
80 |
"hash": self._get_file_hash(audio_path),
|
81 |
+
"utterances": [
|
82 |
+
{
|
83 |
+
"speaker": u.speaker,
|
84 |
+
"text": u.text,
|
85 |
+
"start": u.start,
|
86 |
+
"end": u.end
|
87 |
+
}
|
88 |
+
for u in utterances
|
89 |
+
]
|
90 |
}
|
91 |
with open(cache_file, "w") as f:
|
92 |
json.dump(cache_data, f, indent=2)
|
|
|
138 |
return results
|
139 |
|
140 |
|
141 |
+
@dataclass
|
142 |
+
class SpeakerDialogue:
|
143 |
+
"""Represents a continuous section of speech from a single speaker"""
|
144 |
+
speaker: str
|
145 |
+
utterances: List[Utterance]
|
146 |
+
|
147 |
+
@property
|
148 |
+
def start(self) -> int:
|
149 |
+
"""Start time of first utterance"""
|
150 |
+
return self.utterances[0].start
|
151 |
|
152 |
+
@property
|
153 |
+
def end(self) -> int:
|
154 |
+
"""End time of last utterance"""
|
155 |
+
return self.utterances[-1].end
|
|
|
|
|
|
|
156 |
|
157 |
+
@property
|
158 |
+
def timestamp(self) -> str:
|
159 |
+
"""Format start time as HH:MM:SS"""
|
160 |
+
return self.utterances[0].timestamp
|
161 |
|
162 |
+
def format(self) -> str:
|
163 |
+
"""Format this dialogue as text with newlines between utterances"""
|
164 |
+
texts = [u.text + "\n\n" for u in self.utterances] # Add two newlines after each utterance
|
165 |
+
combined_text = ''.join(texts).rstrip() # Remove trailing whitespace at the end
|
166 |
+
return f"Speaker {self.speaker} {self.timestamp}\n\n{combined_text}"
|
167 |
+
|
168 |
+
|
169 |
+
def group_utterances_by_speaker(utterances: List[Utterance]) -> Iterator[SpeakerDialogue]:
|
170 |
+
"""Group consecutive utterances by the same speaker"""
|
171 |
+
for speaker, group in groupby(utterances, key=lambda u: u.speaker):
|
172 |
+
yield SpeakerDialogue(speaker=speaker, utterances=list(group))
|
173 |
+
|
174 |
+
|
175 |
+
def estimate_tokens(text: str, chars_per_token: int = 4) -> int:
|
176 |
+
"""
|
177 |
+
Estimate number of tokens in text
|
178 |
+
Args:
|
179 |
+
text: The text to estimate tokens for
|
180 |
+
chars_per_token: Estimated characters per token (default 4)
|
181 |
+
"""
|
182 |
+
return (len(text) + chars_per_token - 1) // chars_per_token
|
183 |
+
|
184 |
+
|
185 |
+
def chunk_dialogues(
|
186 |
+
dialogues: Iterator[SpeakerDialogue],
|
187 |
+
max_tokens: int = 2000,
|
188 |
+
chars_per_token: int = 4
|
189 |
+
) -> List[List[SpeakerDialogue]]:
|
190 |
+
"""
|
191 |
+
Split dialogues into chunks that fit within token limit
|
192 |
+
Args:
|
193 |
+
dialogues: Iterator of SpeakerDialogues
|
194 |
+
max_tokens: Maximum tokens per chunk
|
195 |
+
chars_per_token: Estimated characters per token (default 4)
|
196 |
+
"""
|
197 |
+
chunks = []
|
198 |
+
current_chunk = []
|
199 |
+
current_text = ""
|
200 |
+
|
201 |
+
for dialogue in dialogues:
|
202 |
+
# Format this dialogue
|
203 |
+
formatted = dialogue.format()
|
204 |
+
|
205 |
+
# If adding this dialogue would exceed token limit, start new chunk
|
206 |
+
new_text = current_text + "\n\n" + formatted if current_text else formatted
|
207 |
+
if current_chunk and estimate_tokens(new_text, chars_per_token) > max_tokens:
|
208 |
+
chunks.append(current_chunk)
|
209 |
+
current_chunk = [dialogue]
|
210 |
+
current_text = formatted
|
211 |
+
else:
|
212 |
+
current_chunk.append(dialogue)
|
213 |
+
current_text = new_text
|
214 |
+
|
215 |
+
if current_chunk:
|
216 |
+
chunks.append(current_chunk)
|
217 |
+
|
218 |
+
return chunks
|
219 |
+
|
220 |
+
|
221 |
+
def format_chunk(dialogues: List[SpeakerDialogue]) -> str:
|
222 |
+
"""Format a chunk of dialogues into readable text"""
|
223 |
+
return "\n\n".join(dialogue.format() for dialogue in dialogues)
|
224 |
|
225 |
|
226 |
def prepare_audio_chunks(audio_path: Path, utterances: List[Utterance]) -> List[Tuple[str, io.BytesIO]]:
|
227 |
"""Prepare audio chunks and their corresponding text"""
|
228 |
+
# Group utterances by speaker and split into chunks
|
229 |
+
dialogues = group_utterances_by_speaker(utterances)
|
230 |
+
chunks = chunk_dialogues(dialogues)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
231 |
print(f"Preparing {len(chunks)} audio segments...")
|
232 |
|
233 |
# Load audio once
|
|
|
242 |
# Use lower quality MP3 for faster processing
|
243 |
segment.export(buffer, format="mp3", parameters=["-q:a", "9"])
|
244 |
prepared.append((format_chunk(chunk), buffer))
|
245 |
+
|
246 |
return prepared
|
247 |
|
248 |
|
|
|
264 |
utterances = transcriber.get_transcript(audio_path)
|
265 |
|
266 |
# Save original transcript
|
267 |
+
dialogues = list(group_utterances_by_speaker(utterances)) # Convert iterator to list
|
268 |
+
original = format_chunk(dialogues)
|
269 |
(out_dir / "autogenerated-transcript.md").write_text(original)
|
270 |
|
271 |
# Enhance transcript
|