Spaces:
Sleeping
Sleeping
import streamlit as st | |
import pandas as pd | |
import numpy as np | |
from sentence_transformers import SentenceTransformer | |
from sklearn.metrics.pairwise import cosine_similarity | |
import torch | |
import json | |
import os | |
import glob | |
from pathlib import Path | |
from datetime import datetime, timedelta | |
import edge_tts | |
import asyncio | |
import requests | |
from collections import defaultdict | |
from audio_recorder_streamlit import audio_recorder | |
import streamlit.components.v1 as components | |
from urllib.parse import quote | |
from xml.etree import ElementTree as ET | |
from datasets import load_dataset | |
# π§ Initialize session state variables | |
SESSION_VARS = { | |
'search_history': [], # Track search history | |
'last_voice_input': "", # Last voice input | |
'transcript_history': [], # Conversation history | |
'should_rerun': False, # Trigger for UI updates | |
'search_columns': [], # Available search columns | |
'initial_search_done': False, # First search flag | |
'tts_voice': "en-US-AriaNeural", # Default voice | |
'arxiv_last_query': "", # Last ArXiv search | |
'dataset_loaded': False, # Dataset load status | |
'current_page': 0, # Current data page | |
'data_cache': None, # Data cache | |
'dataset_info': None, # Dataset metadata | |
'nps_submitted': False, # Track if user submitted NPS | |
'nps_last_shown': None, # When NPS was last shown | |
'voice_recorder_key': str(datetime.now()) # Unique key for voice recorder | |
} | |
# π Constants | |
ROWS_PER_PAGE = 100 | |
MIN_SEARCH_SCORE = 0.3 | |
EXACT_MATCH_BOOST = 2.0 | |
# Initialize session state | |
for var, default in SESSION_VARS.items(): | |
if var not in st.session_state: | |
st.session_state[var] = default | |
class NPSTracker: | |
"""π― Net Promoter Score Tracker - Measuring happiness in numbers!""" | |
def __init__(self, log_file="nps_logs.csv"): | |
self.log_file = Path(log_file) | |
self.initialize_log() | |
def initialize_log(self): | |
"""π Create log file if it doesn't exist""" | |
if not self.log_file.exists(): | |
df = pd.DataFrame(columns=['timestamp', 'score', 'feedback']) | |
df.to_csv(self.log_file, index=False) | |
def log_response(self, score, feedback=""): | |
"""ποΈ Log new NPS response""" | |
new_entry = pd.DataFrame([{ | |
'timestamp': datetime.now().isoformat(), | |
'score': score, | |
'feedback': feedback | |
}]) | |
if self.log_file.exists(): | |
df = pd.read_csv(self.log_file) | |
df = pd.concat([df, new_entry], ignore_index=True) | |
else: | |
df = new_entry | |
df.to_csv(self.log_file, index=False) | |
def get_nps_stats(self, days=30): | |
"""π Calculate NPS stats for recent period""" | |
if not self.log_file.exists(): | |
return { | |
'nps_score': 0, | |
'promoters': 0, | |
'passives': 0, | |
'detractors': 0, | |
'total_responses': 0, | |
'recent_feedback': [] | |
} | |
df = pd.read_csv(self.log_file) | |
df['timestamp'] = pd.to_datetime(df['timestamp']) | |
cutoff = datetime.now() - timedelta(days=days) | |
recent_df = df[df['timestamp'] > cutoff] | |
if len(recent_df) == 0: | |
return { | |
'nps_score': 0, | |
'promoters': 0, | |
'passives': 0, | |
'detractors': 0, | |
'total_responses': 0, | |
'recent_feedback': [] | |
} | |
total = len(recent_df) | |
promoters = len(recent_df[recent_df['score'] >= 9]) | |
passives = len(recent_df[recent_df['score'].between(7, 8)]) | |
detractors = len(recent_df[recent_df['score'] <= 6]) | |
nps = ((promoters/total) - (detractors/total)) * 100 | |
recent_feedback = recent_df[recent_df['feedback'].notna()].sort_values( | |
'timestamp', ascending=False | |
)['feedback'].head(5).tolist() | |
return { | |
'nps_score': round(nps, 1), | |
'promoters': promoters, | |
'passives': passives, | |
'detractors': detractors, | |
'total_responses': total, | |
'recent_feedback': recent_feedback | |
} | |
def setup_voice_recorder(): | |
"""π€ Create an in-browser voice recorder component""" | |
return components.html( | |
""" | |
<div style="display: flex; flex-direction: column; align-items: center; gap: 10px;"> | |
<button id="startButton" | |
style="padding: 10px 20px; background: #ff4b4b; color: white; border: none; border-radius: 5px; cursor: pointer"> | |
Start Recording | |
</button> | |
<button id="stopButton" | |
style="padding: 10px 20px; background: #4b4bff; color: white; border: none; border-radius: 5px; cursor: pointer" | |
disabled> | |
Stop Recording | |
</button> | |
<audio id="audioPlayback" controls style="display: none;"></audio> | |
<div id="statusText" style="color: #666;">Ready to record...</div> | |
</div> | |
<script> | |
let mediaRecorder; | |
let audioChunks = []; | |
document.getElementById('startButton').onclick = async () => { | |
try { | |
const stream = await navigator.mediaDevices.getUserMedia({ audio: true }); | |
mediaRecorder = new MediaRecorder(stream); | |
mediaRecorder.ondataavailable = (e) => { | |
audioChunks.push(e.data); | |
}; | |
mediaRecorder.onstop = () => { | |
const audioBlob = new Blob(audioChunks, { type: 'audio/wav' }); | |
const audioUrl = URL.createObjectURL(audioBlob); | |
document.getElementById('audioPlayback').src = audioUrl; | |
document.getElementById('audioPlayback').style.display = 'block'; | |
// Send to Python | |
const reader = new FileReader(); | |
reader.readAsDataURL(audioBlob); | |
reader.onloadend = () => { | |
window.parent.postMessage({ | |
type: 'voiceData', | |
data: reader.result | |
}, '*'); | |
}; | |
}; | |
mediaRecorder.start(); | |
document.getElementById('startButton').disabled = true; | |
document.getElementById('stopButton').disabled = false; | |
document.getElementById('statusText').textContent = 'Recording...'; | |
} catch (err) { | |
console.error('Error:', err); | |
document.getElementById('statusText').textContent = 'Error: ' + err.message; | |
} | |
}; | |
document.getElementById('stopButton').onclick = () => { | |
mediaRecorder.stop(); | |
document.getElementById('startButton').disabled = false; | |
document.getElementById('stopButton').disabled = true; | |
document.getElementById('statusText').textContent = 'Recording complete!'; | |
}; | |
</script> | |
""", | |
height=200, | |
) | |
def render_nps_sidebar(): | |
"""π¨ Show NPS metrics in sidebar""" | |
tracker = NPSTracker() | |
stats = tracker.get_nps_stats() | |
st.sidebar.markdown("### π User Satisfaction Metrics") | |
score_color = ( | |
"π’" if stats['nps_score'] >= 50 else | |
"π‘" if stats['nps_score'] >= 0 else | |
"π΄" | |
) | |
st.sidebar.metric( | |
"Net Promoter Score", | |
f"{score_color} {stats['nps_score']}" | |
) | |
st.sidebar.markdown("#### Response Breakdown") | |
col1, col2, col3 = st.sidebar.columns(3) | |
with col1: | |
st.metric("π", stats['promoters']) | |
with col2: | |
st.metric("π", stats['passives']) | |
with col3: | |
st.metric("π", stats['detractors']) | |
if stats['recent_feedback']: | |
st.sidebar.markdown("#### Recent Feedback") | |
for feedback in stats['recent_feedback']: | |
st.sidebar.info(feedback[:100] + "..." if len(feedback) > 100 else feedback) | |
def render_nps_survey(): | |
"""π― Show NPS survey form""" | |
tracker = NPSTracker() | |
st.markdown("### π Your Feedback Matters!") | |
score = st.slider( | |
"How likely are you to recommend this search tool to others?", | |
0, 10, | |
help="0 = Not likely at all, 10 = Extremely likely" | |
) | |
feedback = st.text_area("Additional feedback (optional)") | |
if st.button("Submit Feedback", key="nps_submit"): | |
tracker.log_response(score, feedback) | |
st.session_state['nps_submitted'] = True | |
st.success("Thank you for your feedback! π") | |
st.experimental_rerun() | |
[... Rest of your existing code for search functionality ...] | |
def main(): | |
st.title("π₯ Smart Video Search with Voice & Feedback") | |
# Initialize search | |
search = VideoSearch() | |
# Add NPS metrics to sidebar | |
with st.sidebar: | |
render_nps_sidebar() | |
# Show survey periodically | |
current_time = datetime.now() | |
if (not st.session_state.get('nps_submitted') and | |
(not st.session_state.get('nps_last_shown') or | |
current_time - st.session_state['nps_last_shown'] > timedelta(hours=24))): | |
with st.expander("π Quick Feedback", expanded=True): | |
render_nps_survey() | |
st.session_state['nps_last_shown'] = current_time | |
# Create main tabs | |
tab1, tab2, tab3, tab4 = st.tabs([ | |
"π Search", "ποΈ Voice Input", "π ArXiv", "π Files" | |
]) | |
# Search Tab | |
with tab1: | |
st.subheader("Search Videos") | |
col1, col2 = st.columns([3, 1]) | |
with col1: | |
query = st.text_input("Enter search query:", | |
value="" if st.session_state['initial_search_done'] else "aliens") | |
with col2: | |
search_column = st.selectbox("Search in:", | |
["All Fields"] + st.session_state['search_columns']) | |
col3, col4 = st.columns(2) | |
with col3: | |
num_results = st.slider("Max results:", 1, 100, 20) | |
with col4: | |
search_button = st.button("π Search") | |
if (search_button or not st.session_state['initial_search_done']) and query: | |
st.session_state['initial_search_done'] = True | |
selected_column = None if search_column == "All Fields" else search_column | |
with st.spinner("Searching..."): | |
results = search.search(query, selected_column, num_results) | |
if results: | |
st.session_state['search_history'].append({ | |
'query': query, | |
'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
'results': results[:5] | |
}) | |
st.write(f"Found {len(results)} results:") | |
for i, result in enumerate(results, 1): | |
with st.expander(f"Result {i}", expanded=(i==1)): | |
render_result(result) | |
else: | |
st.warning("No matching results found.") | |
# Voice Input Tab | |
with tab2: | |
st.subheader("Voice Search") | |
st.write("ποΈ Record your query:") | |
voice_recorder = setup_voice_recorder() | |
if 'voice_data' in st.session_state: | |
with st.spinner("Processing voice..."): | |
voice_query = transcribe_audio(st.session_state['voice_data']) | |
st.markdown("**Transcribed Text:**") | |
st.write(voice_query) | |
if st.button("π Search with Voice"): | |
results = search.search(voice_query, None, 20) | |
for i, result in enumerate(results, 1): | |
with st.expander(f"Result {i}", expanded=(i==1)): | |
render_result(result) | |
# ArXiv Tab | |
with tab3: | |
st.subheader("ArXiv Search") | |
arxiv_query = st.text_input("Search ArXiv:", value=st.session_state['arxiv_last_query']) | |
vocal_summary = st.checkbox("π Quick Audio Summary", value=True) | |
titles_summary = st.checkbox("π Titles Only", value=True) | |
full_audio = st.checkbox("π Full Audio Summary", value=False) | |
if st.button("π Search ArXiv"): | |
st.session_state['arxiv_last_query'] = arxiv_query | |
perform_arxiv_lookup(arxiv_query, vocal_summary, titles_summary, full_audio) | |
# File Manager Tab | |
with tab4: | |
show_file_manager() | |
if __name__ == "__main__": | |
main() |