import streamlit as st import pandas as pd import numpy as np from sentence_transformers import SentenceTransformer from sklearn.metrics.pairwise import cosine_similarity import torch import json import os import glob from pathlib import Path from datetime import datetime, timedelta import edge_tts import asyncio import requests from collections import defaultdict from audio_recorder_streamlit import audio_recorder import streamlit.components.v1 as components from urllib.parse import quote from xml.etree import ElementTree as ET from datasets import load_dataset # 🧠 Initialize session state variables SESSION_VARS = { 'search_history': [], # Track search history 'last_voice_input': "", # Last voice input 'transcript_history': [], # Conversation history 'should_rerun': False, # Trigger for UI updates 'search_columns': [], # Available search columns 'initial_search_done': False, # First search flag 'tts_voice': "en-US-AriaNeural", # Default voice 'arxiv_last_query': "", # Last ArXiv search 'dataset_loaded': False, # Dataset load status 'current_page': 0, # Current data page 'data_cache': None, # Data cache 'dataset_info': None, # Dataset metadata 'nps_submitted': False, # Track if user submitted NPS 'nps_last_shown': None, # When NPS was last shown 'voice_recorder_key': str(datetime.now()) # Unique key for voice recorder } # 📊 Constants ROWS_PER_PAGE = 100 MIN_SEARCH_SCORE = 0.3 EXACT_MATCH_BOOST = 2.0 # Initialize session state for var, default in SESSION_VARS.items(): if var not in st.session_state: st.session_state[var] = default class NPSTracker: """🎯 Net Promoter Score Tracker - Measuring happiness in numbers!""" def __init__(self, log_file="nps_logs.csv"): self.log_file = Path(log_file) self.initialize_log() def initialize_log(self): """📝 Create log file if it doesn't exist""" if not self.log_file.exists(): df = pd.DataFrame(columns=['timestamp', 'score', 'feedback']) df.to_csv(self.log_file, index=False) def log_response(self, score, feedback=""): """🖊️ Log new NPS response""" new_entry = pd.DataFrame([{ 'timestamp': datetime.now().isoformat(), 'score': score, 'feedback': feedback }]) if self.log_file.exists(): df = pd.read_csv(self.log_file) df = pd.concat([df, new_entry], ignore_index=True) else: df = new_entry df.to_csv(self.log_file, index=False) def get_nps_stats(self, days=30): """📊 Calculate NPS stats for recent period""" if not self.log_file.exists(): return { 'nps_score': 0, 'promoters': 0, 'passives': 0, 'detractors': 0, 'total_responses': 0, 'recent_feedback': [] } df = pd.read_csv(self.log_file) df['timestamp'] = pd.to_datetime(df['timestamp']) cutoff = datetime.now() - timedelta(days=days) recent_df = df[df['timestamp'] > cutoff] if len(recent_df) == 0: return { 'nps_score': 0, 'promoters': 0, 'passives': 0, 'detractors': 0, 'total_responses': 0, 'recent_feedback': [] } total = len(recent_df) promoters = len(recent_df[recent_df['score'] >= 9]) passives = len(recent_df[recent_df['score'].between(7, 8)]) detractors = len(recent_df[recent_df['score'] <= 6]) nps = ((promoters/total) - (detractors/total)) * 100 recent_feedback = recent_df[recent_df['feedback'].notna()].sort_values( 'timestamp', ascending=False )['feedback'].head(5).tolist() return { 'nps_score': round(nps, 1), 'promoters': promoters, 'passives': passives, 'detractors': detractors, 'total_responses': total, 'recent_feedback': recent_feedback } def setup_voice_recorder(): """🎤 Create an in-browser voice recorder component""" return components.html( """
Ready to record...
""", height=200, ) def render_nps_sidebar(): """🎨 Show NPS metrics in sidebar""" tracker = NPSTracker() stats = tracker.get_nps_stats() st.sidebar.markdown("### 📊 User Satisfaction Metrics") score_color = ( "🟢" if stats['nps_score'] >= 50 else "🟡" if stats['nps_score'] >= 0 else "🔴" ) st.sidebar.metric( "Net Promoter Score", f"{score_color} {stats['nps_score']}" ) st.sidebar.markdown("#### Response Breakdown") col1, col2, col3 = st.sidebar.columns(3) with col1: st.metric("😃", stats['promoters']) with col2: st.metric("😐", stats['passives']) with col3: st.metric("😕", stats['detractors']) if stats['recent_feedback']: st.sidebar.markdown("#### Recent Feedback") for feedback in stats['recent_feedback']: st.sidebar.info(feedback[:100] + "..." if len(feedback) > 100 else feedback) def render_nps_survey(): """🎯 Show NPS survey form""" tracker = NPSTracker() st.markdown("### 📝 Your Feedback Matters!") score = st.slider( "How likely are you to recommend this search tool to others?", 0, 10, help="0 = Not likely at all, 10 = Extremely likely" ) feedback = st.text_area("Additional feedback (optional)") if st.button("Submit Feedback", key="nps_submit"): tracker.log_response(score, feedback) st.session_state['nps_submitted'] = True st.success("Thank you for your feedback! 🙏") st.experimental_rerun() [... Rest of your existing code for search functionality ...] def main(): st.title("🎥 Smart Video Search with Voice & Feedback") # Initialize search search = VideoSearch() # Add NPS metrics to sidebar with st.sidebar: render_nps_sidebar() # Show survey periodically current_time = datetime.now() if (not st.session_state.get('nps_submitted') and (not st.session_state.get('nps_last_shown') or current_time - st.session_state['nps_last_shown'] > timedelta(hours=24))): with st.expander("📝 Quick Feedback", expanded=True): render_nps_survey() st.session_state['nps_last_shown'] = current_time # Create main tabs tab1, tab2, tab3, tab4 = st.tabs([ "🔍 Search", "🎙️ Voice Input", "📚 ArXiv", "📂 Files" ]) # Search Tab with tab1: st.subheader("Search Videos") col1, col2 = st.columns([3, 1]) with col1: query = st.text_input("Enter search query:", value="" if st.session_state['initial_search_done'] else "aliens") with col2: search_column = st.selectbox("Search in:", ["All Fields"] + st.session_state['search_columns']) col3, col4 = st.columns(2) with col3: num_results = st.slider("Max results:", 1, 100, 20) with col4: search_button = st.button("🔍 Search") if (search_button or not st.session_state['initial_search_done']) and query: st.session_state['initial_search_done'] = True selected_column = None if search_column == "All Fields" else search_column with st.spinner("Searching..."): results = search.search(query, selected_column, num_results) if results: st.session_state['search_history'].append({ 'query': query, 'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 'results': results[:5] }) st.write(f"Found {len(results)} results:") for i, result in enumerate(results, 1): with st.expander(f"Result {i}", expanded=(i==1)): render_result(result) else: st.warning("No matching results found.") # Voice Input Tab with tab2: st.subheader("Voice Search") st.write("🎙️ Record your query:") voice_recorder = setup_voice_recorder() if 'voice_data' in st.session_state: with st.spinner("Processing voice..."): voice_query = transcribe_audio(st.session_state['voice_data']) st.markdown("**Transcribed Text:**") st.write(voice_query) if st.button("🔍 Search with Voice"): results = search.search(voice_query, None, 20) for i, result in enumerate(results, 1): with st.expander(f"Result {i}", expanded=(i==1)): render_result(result) # ArXiv Tab with tab3: st.subheader("ArXiv Search") arxiv_query = st.text_input("Search ArXiv:", value=st.session_state['arxiv_last_query']) vocal_summary = st.checkbox("🎙 Quick Audio Summary", value=True) titles_summary = st.checkbox("🔖 Titles Only", value=True) full_audio = st.checkbox("📚 Full Audio Summary", value=False) if st.button("🔍 Search ArXiv"): st.session_state['arxiv_last_query'] = arxiv_query perform_arxiv_lookup(arxiv_query, vocal_summary, titles_summary, full_audio) # File Manager Tab with tab4: show_file_manager() if __name__ == "__main__": main()