File size: 6,693 Bytes
2a6784d caa4c85 b3635dd 853df82 caa4c85 2a6784d b3635dd d6e0f11 2a6784d 935113b d6e0f11 2a6784d d6e0f11 2a6784d d6e0f11 2a6784d d6e0f11 2a6784d 83bc687 d6e0f11 e0d61c7 83bc687 d6e0f11 83bc687 b3635dd d6e0f11 2a6784d d6e0f11 2a6784d d6e0f11 2a6784d b3635dd d6e0f11 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 |
import streamlit as st
from pyannote.audio import Pipeline
import whisper
import tempfile
import os
import torch
from transformers import pipeline as tf_pipeline
from pydub import AudioSegment
import io
@st.cache_resource
def load_models():
try:
# Back to original model name
diarization = Pipeline.from_pretrained(
"pyannote/speaker-diarization", # Original model name
use_auth_token=st.secrets["hf_token"]
)
transcriber = whisper.load_model("base")
summarizer = tf_pipeline(
"summarization",
model="facebook/bart-large-cnn",
device=0 if torch.cuda.is_available() else -1
)
# Validate models loaded correctly
if not diarization or not transcriber or not summarizer:
raise ValueError("One or more models failed to load")
return diarization, transcriber, summarizer
except Exception as e:
st.error(f"Error loading models: {str(e)}")
st.error("Debug info: Check if HF token is valid and has necessary permissions")
return None, None, None
def process_audio(audio_file, max_duration=600):
try:
audio_bytes = io.BytesIO(audio_file.getvalue())
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
try:
if audio_file.name.lower().endswith('.mp3'):
audio = AudioSegment.from_mp3(audio_bytes)
else:
audio = AudioSegment.from_wav(audio_bytes)
# Standardize format
audio = audio.set_frame_rate(16000)
audio = audio.set_channels(1)
audio = audio.set_sample_width(2)
audio.export(
tmp.name,
format="wav",
parameters=["-ac", "1", "-ar", "16000"]
)
tmp_path = tmp.name
except Exception as e:
st.error(f"Error converting audio: {str(e)}")
return None
diarization, transcriber, summarizer = load_models()
if not all([diarization, transcriber, summarizer]):
return "Model loading failed"
with st.spinner("Identifying speakers..."):
diarization_result = diarization(tmp_path)
with st.spinner("Transcribing audio..."):
transcription = transcriber.transcribe(tmp_path)
with st.spinner("Generating summary..."):
summary = summarizer(transcription["text"], max_length=130, min_length=30)
os.unlink(tmp_path)
return {
"diarization": diarization_result,
"transcription": transcription,
"summary": summary[0]["summary_text"]
}
except Exception as e:
st.error(f"Error processing audio: {str(e)}")
return None
def format_speaker_segments(diarization_result):
if diarization_result is None:
return []
formatted_segments = []
try:
for turn, _, speaker in diarization_result.itertracks(yield_label=True):
formatted_segments.append({
'speaker': str(speaker), # Ensure string
'start': float(turn.start) if turn.start is not None else 0.0,
'end': float(turn.end) if turn.end is not None else 0.0
})
except Exception as e:
st.error(f"Error formatting segments: {str(e)}")
return []
return formatted_segments
def format_timestamp(seconds):
minutes = int(seconds // 60)
seconds = seconds % 60
return f"{minutes:02d}:{seconds:05.2f}"
def main():
st.title("Multi-Speaker Audio Analyzer")
st.write("Upload an audio file (MP3/WAV) up to 5 minutes long for best performance")
uploaded_file = st.file_uploader("Choose a file", type=["mp3", "wav"])
if uploaded_file:
file_size = len(uploaded_file.getvalue()) / (1024 * 1024)
st.write(f"File size: {file_size:.2f} MB")
st.audio(uploaded_file, format='audio/wav')
if st.button("Analyze Audio"):
if file_size > 200:
st.error("File size exceeds 200MB limit")
else:
results = process_audio(uploaded_file)
if results:
tab1, tab2, tab3 = st.tabs(["Speakers", "Transcription", "Summary"])
with tab1:
st.write("Speaker Timeline:")
segments = format_speaker_segments(results["diarization"])
if segments: # Only proceed if we have segments
for segment in segments:
col1, col2 = st.columns([2,8])
with col1:
try:
speaker_num = int(segment['speaker'].split('_')[1])
colors = ['🔵', '🔴'] # Two colors for alternating speakers
speaker_color = colors[speaker_num % len(colors)]
st.write(f"{speaker_color} {segment['speaker']}")
except (IndexError, ValueError) as e:
st.write(f"⚪ {segment['speaker']}")
with col2:
start_time = format_timestamp(segment['start'])
end_time = format_timestamp(segment['end'])
st.write(f"{start_time} → {end_time}")
st.markdown("---")
else:
st.warning("No speaker segments detected")
with tab2:
st.write("Transcription:")
if "text" in results["transcription"]:
st.write(results["transcription"]["text"])
else:
st.warning("No transcription available")
with tab3:
st.write("Summary:")
if results["summary"]:
st.write(results["summary"])
else:
st.warning("No summary available")
if __name__ == "__main__":
main() |