import streamlit as st
import pandas as pd
import numpy as np
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
import torch
import json
import os
import glob
from pathlib import Path
from datetime import datetime, timedelta
import edge_tts
import asyncio
import requests
from collections import defaultdict
from audio_recorder_streamlit import audio_recorder
import streamlit.components.v1 as components
from urllib.parse import quote
from xml.etree import ElementTree as ET
from datasets import load_dataset
# 🧠 Initialize session state variables
SESSION_VARS = {
'search_history': [], # Track search history
'last_voice_input': "", # Last voice input
'transcript_history': [], # Conversation history
'should_rerun': False, # Trigger for UI updates
'search_columns': [], # Available search columns
'initial_search_done': False, # First search flag
'tts_voice': "en-US-AriaNeural", # Default voice
'arxiv_last_query': "", # Last ArXiv search
'dataset_loaded': False, # Dataset load status
'current_page': 0, # Current data page
'data_cache': None, # Data cache
'dataset_info': None, # Dataset metadata
'nps_submitted': False, # Track if user submitted NPS
'nps_last_shown': None, # When NPS was last shown
'voice_recorder_key': str(datetime.now()) # Unique key for voice recorder
}
# 📊 Constants
ROWS_PER_PAGE = 100
MIN_SEARCH_SCORE = 0.3
EXACT_MATCH_BOOST = 2.0
# Initialize session state
for var, default in SESSION_VARS.items():
if var not in st.session_state:
st.session_state[var] = default
class NPSTracker:
"""🎯 Net Promoter Score Tracker - Measuring happiness in numbers!"""
def __init__(self, log_file="nps_logs.csv"):
self.log_file = Path(log_file)
self.initialize_log()
def initialize_log(self):
"""📝 Create log file if it doesn't exist"""
if not self.log_file.exists():
df = pd.DataFrame(columns=['timestamp', 'score', 'feedback'])
df.to_csv(self.log_file, index=False)
def log_response(self, score, feedback=""):
"""🖊️ Log new NPS response"""
new_entry = pd.DataFrame([{
'timestamp': datetime.now().isoformat(),
'score': score,
'feedback': feedback
}])
if self.log_file.exists():
df = pd.read_csv(self.log_file)
df = pd.concat([df, new_entry], ignore_index=True)
else:
df = new_entry
df.to_csv(self.log_file, index=False)
def get_nps_stats(self, days=30):
"""📊 Calculate NPS stats for recent period"""
if not self.log_file.exists():
return {
'nps_score': 0,
'promoters': 0,
'passives': 0,
'detractors': 0,
'total_responses': 0,
'recent_feedback': []
}
df = pd.read_csv(self.log_file)
df['timestamp'] = pd.to_datetime(df['timestamp'])
cutoff = datetime.now() - timedelta(days=days)
recent_df = df[df['timestamp'] > cutoff]
if len(recent_df) == 0:
return {
'nps_score': 0,
'promoters': 0,
'passives': 0,
'detractors': 0,
'total_responses': 0,
'recent_feedback': []
}
total = len(recent_df)
promoters = len(recent_df[recent_df['score'] >= 9])
passives = len(recent_df[recent_df['score'].between(7, 8)])
detractors = len(recent_df[recent_df['score'] <= 6])
nps = ((promoters/total) - (detractors/total)) * 100
recent_feedback = recent_df[recent_df['feedback'].notna()].sort_values(
'timestamp', ascending=False
)['feedback'].head(5).tolist()
return {
'nps_score': round(nps, 1),
'promoters': promoters,
'passives': passives,
'detractors': detractors,
'total_responses': total,
'recent_feedback': recent_feedback
}
def setup_voice_recorder():
"""🎤 Create an in-browser voice recorder component"""
return components.html(
"""
Ready to record...
""",
height=200,
)
def render_nps_sidebar():
"""🎨 Show NPS metrics in sidebar"""
tracker = NPSTracker()
stats = tracker.get_nps_stats()
st.sidebar.markdown("### 📊 User Satisfaction Metrics")
score_color = (
"🟢" if stats['nps_score'] >= 50 else
"🟡" if stats['nps_score'] >= 0 else
"🔴"
)
st.sidebar.metric(
"Net Promoter Score",
f"{score_color} {stats['nps_score']}"
)
st.sidebar.markdown("#### Response Breakdown")
col1, col2, col3 = st.sidebar.columns(3)
with col1:
st.metric("😃", stats['promoters'])
with col2:
st.metric("😐", stats['passives'])
with col3:
st.metric("😕", stats['detractors'])
if stats['recent_feedback']:
st.sidebar.markdown("#### Recent Feedback")
for feedback in stats['recent_feedback']:
st.sidebar.info(feedback[:100] + "..." if len(feedback) > 100 else feedback)
def render_nps_survey():
"""🎯 Show NPS survey form"""
tracker = NPSTracker()
st.markdown("### 📝 Your Feedback Matters!")
score = st.slider(
"How likely are you to recommend this search tool to others?",
0, 10,
help="0 = Not likely at all, 10 = Extremely likely"
)
feedback = st.text_area("Additional feedback (optional)")
if st.button("Submit Feedback", key="nps_submit"):
tracker.log_response(score, feedback)
st.session_state['nps_submitted'] = True
st.success("Thank you for your feedback! 🙏")
st.experimental_rerun()
[... Rest of your existing code for search functionality ...]
def main():
st.title("🎥 Smart Video Search with Voice & Feedback")
# Initialize search
search = VideoSearch()
# Add NPS metrics to sidebar
with st.sidebar:
render_nps_sidebar()
# Show survey periodically
current_time = datetime.now()
if (not st.session_state.get('nps_submitted') and
(not st.session_state.get('nps_last_shown') or
current_time - st.session_state['nps_last_shown'] > timedelta(hours=24))):
with st.expander("📝 Quick Feedback", expanded=True):
render_nps_survey()
st.session_state['nps_last_shown'] = current_time
# Create main tabs
tab1, tab2, tab3, tab4 = st.tabs([
"🔍 Search", "🎙️ Voice Input", "📚 ArXiv", "📂 Files"
])
# Search Tab
with tab1:
st.subheader("Search Videos")
col1, col2 = st.columns([3, 1])
with col1:
query = st.text_input("Enter search query:",
value="" if st.session_state['initial_search_done'] else "aliens")
with col2:
search_column = st.selectbox("Search in:",
["All Fields"] + st.session_state['search_columns'])
col3, col4 = st.columns(2)
with col3:
num_results = st.slider("Max results:", 1, 100, 20)
with col4:
search_button = st.button("🔍 Search")
if (search_button or not st.session_state['initial_search_done']) and query:
st.session_state['initial_search_done'] = True
selected_column = None if search_column == "All Fields" else search_column
with st.spinner("Searching..."):
results = search.search(query, selected_column, num_results)
if results:
st.session_state['search_history'].append({
'query': query,
'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
'results': results[:5]
})
st.write(f"Found {len(results)} results:")
for i, result in enumerate(results, 1):
with st.expander(f"Result {i}", expanded=(i==1)):
render_result(result)
else:
st.warning("No matching results found.")
# Voice Input Tab
with tab2:
st.subheader("Voice Search")
st.write("🎙️ Record your query:")
voice_recorder = setup_voice_recorder()
if 'voice_data' in st.session_state:
with st.spinner("Processing voice..."):
voice_query = transcribe_audio(st.session_state['voice_data'])
st.markdown("**Transcribed Text:**")
st.write(voice_query)
if st.button("🔍 Search with Voice"):
results = search.search(voice_query, None, 20)
for i, result in enumerate(results, 1):
with st.expander(f"Result {i}", expanded=(i==1)):
render_result(result)
# ArXiv Tab
with tab3:
st.subheader("ArXiv Search")
arxiv_query = st.text_input("Search ArXiv:", value=st.session_state['arxiv_last_query'])
vocal_summary = st.checkbox("🎙 Quick Audio Summary", value=True)
titles_summary = st.checkbox("🔖 Titles Only", value=True)
full_audio = st.checkbox("📚 Full Audio Summary", value=False)
if st.button("🔍 Search ArXiv"):
st.session_state['arxiv_last_query'] = arxiv_query
perform_arxiv_lookup(arxiv_query, vocal_summary, titles_summary, full_audio)
# File Manager Tab
with tab4:
show_file_manager()
if __name__ == "__main__":
main()