Manyue-DataScientist's picture
Update app.py
935113b verified
raw
history blame
6.35 kB
import streamlit as st
from pyannote.audio import Pipeline
import whisper
import tempfile
import os
import torch
from transformers import pipeline as tf_pipeline
from pydub import AudioSegment
import io
@st.cache_resource
def load_models():
try:
# Updated to 3.1 with parameters
diarization = Pipeline.from_pretrained(
"pyannote/[email protected]",
use_auth_token=st.secrets["hf_token"]
).instantiate({
"onset": 0.3,
"offset": 0.3,
"min_duration_on": 0.1,
"min_duration_off": 0.1
})
transcriber = whisper.load_model("base")
summarizer = tf_pipeline(
"summarization",
model="facebook/bart-large-cnn",
device=0 if torch.cuda.is_available() else -1
)
return diarization, transcriber, summarizer
except Exception as e:
st.error(f"Error loading models: {str(e)}")
return None, None, None
def process_audio(audio_file, max_duration=600):
try:
audio_bytes = io.BytesIO(audio_file.getvalue())
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
try:
if audio_file.name.lower().endswith('.mp3'):
audio = AudioSegment.from_mp3(audio_bytes)
else:
audio = AudioSegment.from_wav(audio_bytes)
# Standardize format
audio = audio.set_frame_rate(16000)
audio = audio.set_channels(1)
audio = audio.set_sample_width(2)
audio.export(
tmp.name,
format="wav",
parameters=["-ac", "1", "-ar", "16000"]
)
tmp_path = tmp.name
except Exception as e:
st.error(f"Error converting audio: {str(e)}")
return None
diarization, transcriber, summarizer = load_models()
if not all([diarization, transcriber, summarizer]):
return "Model loading failed"
with st.spinner("Identifying speakers..."):
diarization_result = diarization(tmp_path)
with st.spinner("Transcribing audio..."):
transcription = transcriber.transcribe(tmp_path)
with st.spinner("Generating summary..."):
summary = summarizer(transcription["text"], max_length=130, min_length=30)
os.unlink(tmp_path)
return {
"diarization": diarization_result,
"transcription": transcription, # Return full transcription object
"summary": summary[0]["summary_text"]
}
except Exception as e:
st.error(f"Error processing audio: {str(e)}")
return None
def format_speaker_segments(diarization_result, transcription):
formatted_segments = []
audio_duration = transcription.get('duration', 0)
for turn, _, speaker in diarization_result.itertracks(yield_label=True):
# Skip invalid timestamps
if turn.start > audio_duration or turn.end > audio_duration:
continue
# Only add segments with meaningful duration
if (turn.end - turn.start) >= 0.1: # 100ms minimum
formatted_segments.append({
'speaker': speaker,
'start': turn.start,
'end': turn.end,
'duration': turn.end - turn.start
})
return formatted_segments
def main():
st.title("Multi-Speaker Audio Analyzer")
st.write("Upload an audio file (MP3/WAV) up to 5 minutes long for best performance")
uploaded_file = st.file_uploader("Choose a file", type=["mp3", "wav"])
if uploaded_file:
file_size = len(uploaded_file.getvalue()) / (1024 * 1024)
st.write(f"File size: {file_size:.2f} MB")
st.audio(uploaded_file, format='audio/wav')
if st.button("Analyze Audio"):
if file_size > 200:
st.error("File size exceeds 200MB limit")
else:
results = process_audio(uploaded_file)
if results:
tab1, tab2, tab3 = st.tabs(["Speakers", "Transcription", "Summary"])
with tab1:
st.write("Speaker Timeline:")
segments = format_speaker_segments(
results["diarization"],
results["transcription"]
)
# Display segments with proper time formatting
for segment in segments:
col1, col2 = st.columns([2,8])
with col1:
speaker_num = int(segment['speaker'].split('_')[1])
colors = ['🔵', '🔴'] # Simplified to two colors
speaker_color = colors[speaker_num % len(colors)]
st.write(f"{speaker_color} {segment['speaker']}")
with col2:
mm_start = int(segment['start'] // 60)
ss_start = segment['start'] % 60
mm_end = int(segment['end'] // 60)
ss_end = segment['end'] % 60
time_str = f"{mm_start:02d}:{ss_start:05.2f}{mm_end:02d}:{ss_end:05.2f}"
st.write(time_str)
st.markdown("---")
with tab2:
st.write("Transcription:")
st.write(results["transcription"]["text"])
with tab3:
st.write("Summary:")
st.write(results["summary"])
if __name__ == "__main__":
main()