awacke1's picture
Create app.py
6afbfac verified
raw
history blame
10.7 kB
def fetch_dataset_info_auth(dataset_id, hf_token):
"""Fetch dataset information with authentication"""
info_url = f"https://huggingface.co/api/datasets/{dataset_id}"
headers = {"Authorization": f"Bearer {hf_token}"}
try:
response = requests.get(info_url, headers=headers, timeout=30)
if response.status_code == 200:
return response.json()
except Exception as e:
st.warning(f"Error fetching dataset info: {e}")
return None
def fetch_dataset_splits_auth(dataset_id, hf_token):
"""Fetch available splits for the dataset"""
splits_url = f"https://datasets-server.huggingface.co/splits?dataset={dataset_id}"
headers = {"Authorization": f"Bearer {hf_token}"}
try:
response = requests.get(splits_url, headers=headers, timeout=30)
if response.status_code == 200:
return response.json().get('splits', [])
except Exception as e:
st.warning(f"Error fetching splits: {e}")
return []
def fetch_parquet_urls_auth(dataset_id, config, split, hf_token):
"""Fetch Parquet file URLs for a specific split"""
parquet_url = f"https://huggingface.co/api/datasets/{dataset_id}/parquet/{config}/{split}"
headers = {"Authorization": f"Bearer {hf_token}"}
try:
response = requests.get(parquet_url, headers=headers, timeout=30)
if response.status_code == 200:
return response.json()
except Exception as e:
st.warning(f"Error fetching parquet URLs: {e}")
return []
def fetch_rows_auth(dataset_id, config, split, offset, length, hf_token):
"""Fetch rows with authentication"""
url = f"https://datasets-server.huggingface.co/rows?dataset={dataset_id}&config={config}&split={split}&offset={offset}&length={length}"
headers = {"Authorization": f"Bearer {hf_token}"}
try:
response = requests.get(url, headers=headers, timeout=30)
if response.status_code == 200:
return response.json()
except Exception as e:
st.warning(f"Error fetching rows: {e}")
return None
class ParquetVideoSearch:
def __init__(self, hf_token):
self.text_model = SentenceTransformer('all-MiniLM-L6-v2')
self.dataset_id = "tomg-group-umd/cinepile"
self.config = "v2"
self.hf_token = hf_token
self.load_dataset()
def load_dataset(self):
"""Load initial dataset sample"""
try:
rows_data = fetch_rows_auth(
self.dataset_id,
self.config,
"train",
0,
100,
self.hf_token
)
if rows_data and 'rows' in rows_data:
processed_rows = []
for row_data in rows_data['rows']:
row = row_data.get('row', row_data)
processed_rows.append(row)
self.dataset = pd.DataFrame(processed_rows)
st.session_state['search_columns'] = [col for col in self.dataset.columns
if not any(term in col.lower() for term in ['embed', 'vector', 'encoding'])]
else:
self.dataset = self.load_example_data()
except Exception as e:
st.warning(f"Error loading dataset: {e}")
self.dataset = self.load_example_data()
self.prepare_features()
def load_example_data(self):
"""Load example data as fallback"""
return pd.DataFrame([{
"video_id": "example",
"title": "Example Video",
"description": "Example video content",
"duration": 120,
"start_time": 0,
"end_time": 120
}])
def prepare_features(self):
"""Prepare text features for search"""
try:
# Combine relevant text fields for search
text_fields = ['title', 'description'] if 'title' in self.dataset.columns else ['description']
combined_text = self.dataset[text_fields].fillna('').agg(' '.join, axis=1)
self.text_embeds = self.text_model.encode(combined_text.tolist())
except Exception as e:
st.warning(f"Error preparing features: {e}")
self.text_embeds = np.random.randn(len(self.dataset), 384)
def search(self, query, column=None, top_k=20):
"""Search using text embeddings and optional column filtering"""
query_embedding = self.text_model.encode([query])[0]
similarities = cosine_similarity([query_embedding], self.text_embeds)[0]
# Column filtering
if column and column in self.dataset.columns and column != "All Fields":
mask = self.dataset[column].astype(str).str.contains(query, case=False)
similarities[~mask] *= 0.5
top_k = min(top_k, len(similarities))
top_indices = np.argsort(similarities)[-top_k:][::-1]
results = []
for idx in top_indices:
result = {
'relevance_score': float(similarities[idx]),
**self.dataset.iloc[idx].to_dict()
}
results.append(result)
return results
def render_video_result(result):
"""Render a video result with enhanced display"""
col1, col2 = st.columns([2, 1])
with col1:
if 'title' in result:
st.markdown(f"**Title:** {result['title']}")
st.markdown("**Description:**")
st.write(result.get('description', 'No description available'))
# Show timing information
start_time = result.get('start_time', 0)
end_time = result.get('end_time', result.get('duration', 0))
st.markdown(f"**Time Range:** {start_time}s - {end_time}s")
# Show additional metadata
for key, value in result.items():
if key not in ['title', 'description', 'start_time', 'end_time', 'duration',
'relevance_score', 'video_id', '_config', '_split']:
st.markdown(f"**{key.replace('_', ' ').title()}:** {value}")
with col2:
st.markdown(f"**Relevance Score:** {result['relevance_score']:.2%}")
# Display video if URL is available
video_url = None
if 'video_url' in result:
video_url = result['video_url']
elif 'youtube_id' in result:
video_url = f"https://youtube.com/watch?v={result['youtube_id']}&t={start_time}"
if video_url:
st.video(video_url)
if st.button(f"πŸ”Š Audio Summary", key=f"audio_{result.get('video_id', '')}"):
summary = f"Video summary: {result.get('title', '')}. {result.get('description', '')[:200]}"
audio_file = asyncio.run(generate_speech(summary))
if audio_file:
st.audio(audio_file)
def main():
st.title("πŸŽ₯ Enhanced Video Search with Parquet Support")
# Get HF token from secrets or user input
if 'hf_token' not in st.session_state:
st.session_state['hf_token'] = st.secrets.get("HF_TOKEN", None)
if not st.session_state['hf_token']:
hf_token = st.text_input("Enter your Hugging Face API token:", type="password")
if hf_token:
st.session_state['hf_token'] = hf_token
if not st.session_state.get('hf_token'):
st.warning("Please provide a Hugging Face API token to access the dataset.")
return
# Initialize search class
search = ParquetVideoSearch(st.session_state['hf_token'])
# Create tabs
tab1, tab2 = st.tabs(["πŸ” Video Search", "πŸ“Š Dataset Info"])
# ---- Tab 1: Video Search ----
with tab1:
st.subheader("Search Videos")
col1, col2 = st.columns([3, 1])
with col1:
query = st.text_input("Enter your search query:",
value="" if st.session_state['initial_search_done'] else "")
with col2:
search_column = st.selectbox("Search in field:",
["All Fields"] + st.session_state['search_columns'])
col3, col4 = st.columns(2)
with col3:
num_results = st.slider("Number of results:", 1, 100, 20)
with col4:
search_button = st.button("πŸ” Search")
if search_button and query:
st.session_state['initial_search_done'] = True
selected_column = None if search_column == "All Fields" else search_column
with st.spinner("Searching..."):
results = search.search(query, selected_column, num_results)
st.session_state['search_history'].append({
'query': query,
'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
'results': results[:5]
})
for i, result in enumerate(results, 1):
with st.expander(
f"Result {i}: {result.get('title', result.get('description', 'No title'))[:100]}...",
expanded=(i==1)
):
render_video_result(result)
# ---- Tab 2: Dataset Info ----
with tab2:
st.subheader("Dataset Information")
# Show available splits
splits = fetch_dataset_splits_auth(search.dataset_id, st.session_state['hf_token'])
if splits:
st.write("### Available Splits")
for split in splits:
st.write(f"- {split['split']}: {split.get('num_rows', 'unknown')} rows")
# Show dataset statistics
st.write("### Dataset Statistics")
st.write(f"- Loaded rows: {len(search.dataset)}")
st.write(f"- Available columns: {', '.join(search.dataset.columns)}")
# Show sample data
st.write("### Sample Data")
st.dataframe(search.dataset.head())
# Sidebar
with st.sidebar:
st.subheader("βš™οΈ Settings & History")
if st.button("πŸ—‘οΈ Clear History"):
st.session_state['search_history'] = []
st.experimental_rerun()
st.markdown("### Recent Searches")
for entry in reversed(st.session_state['search_history'][-5:]):
with st.expander(f"{entry['timestamp']}: {entry['query']}"):
for i, result in enumerate(entry['results'], 1):
st.write(f"{i}. {result.get('title', result.get('description', 'No title'))[:100]}...")
if __name__ == "__main__":
main()