|
import streamlit as st |
|
import pandas as pd |
|
import numpy as np |
|
from sentence_transformers import SentenceTransformer |
|
from sklearn.metrics.pairwise import cosine_similarity |
|
import torch |
|
import json |
|
import os |
|
import glob |
|
from pathlib import Path |
|
from datetime import datetime |
|
import requests |
|
from collections import defaultdict |
|
import re |
|
from urllib.parse import quote |
|
from xml.etree import ElementTree as ET |
|
import base64 |
|
from PIL import Image |
|
|
|
|
|
|
|
|
|
if 'search_history' not in st.session_state: |
|
st.session_state['search_history'] = [] |
|
if 'last_voice_input' not in st.session_state: |
|
st.session_state['last_voice_input'] = "" |
|
if 'transcript_history' not in st.session_state: |
|
st.session_state['transcript_history'] = [] |
|
if 'should_rerun' not in st.session_state: |
|
st.session_state['should_rerun'] = False |
|
if 'search_columns' not in st.session_state: |
|
st.session_state['search_columns'] = [] |
|
if 'initial_search_done' not in st.session_state: |
|
st.session_state['initial_search_done'] = False |
|
if 'tts_voice' not in st.session_state: |
|
st.session_state['tts_voice'] = "en-US-AriaNeural" |
|
if 'arxiv_last_query' not in st.session_state: |
|
st.session_state['arxiv_last_query'] = "" |
|
if 'old_val' not in st.session_state: |
|
st.session_state['old_val'] = None |
|
if 'current_file' not in st.session_state: |
|
st.session_state['current_file'] = None |
|
if 'file_content' not in st.session_state: |
|
st.session_state['file_content'] = "" |
|
|
|
|
|
|
|
|
|
def highlight_text(text, query): |
|
"""Highlight case-insensitive occurrences of query in text with bold formatting.""" |
|
if not query: |
|
return text |
|
pattern = re.compile(re.escape(query), re.IGNORECASE) |
|
return pattern.sub(lambda m: f"**{m.group(0)}**", text) |
|
|
|
@st.cache_data(show_spinner=False) |
|
def fetch_dataset_rows(): |
|
"""Fetch dataset from Hugging Face API and cache it.""" |
|
try: |
|
url = "https://datasets-server.huggingface.co/first-rows?dataset=omegalabsinc%2Fomega-multimodal&config=default&split=train" |
|
response = requests.get(url, timeout=30) |
|
if response.status_code == 200: |
|
data = response.json() |
|
if 'rows' in data: |
|
processed_rows = [] |
|
for row_data in data['rows']: |
|
row = row_data.get('row', row_data) |
|
|
|
for key in row: |
|
if any(term in key.lower() for term in ['embed', 'vector', 'encoding']): |
|
if isinstance(row[key], str): |
|
try: |
|
row[key] = [float(x.strip()) for x in row[key].strip('[]').split(',') if x.strip()] |
|
except: |
|
continue |
|
processed_rows.append(row) |
|
|
|
df = pd.DataFrame(processed_rows) |
|
st.session_state['search_columns'] = [col for col in df.columns |
|
if col not in ['video_embed', 'description_embed', 'audio_embed']] |
|
return df |
|
except: |
|
pass |
|
return load_example_data() |
|
|
|
def load_example_data(): |
|
"""Load example data as fallback.""" |
|
example_data = [ |
|
{ |
|
"video_id": "cd21da96-fcca-4c94-a60f-0b1e4e1e29fc", |
|
"youtube_id": "IO-vwtyicn4", |
|
"description": "This video shows a close-up of an ancient text carved into a surface.", |
|
"views": 45489, |
|
"start_time": 1452, |
|
"end_time": 1458, |
|
"video_embed": [0.014160037972033024, -0.003111184574663639, -0.016604168340563774], |
|
"description_embed": [-0.05835828185081482, 0.02589797042310238, 0.11952091753482819] |
|
} |
|
] |
|
return pd.DataFrame(example_data) |
|
|
|
@st.cache_data(show_spinner=False) |
|
def load_dataset(): |
|
df = fetch_dataset_rows() |
|
return df |
|
|
|
def prepare_features(dataset): |
|
"""Prepare embeddings with adaptive field detection.""" |
|
try: |
|
embed_cols = [col for col in dataset.columns |
|
if any(term in col.lower() for term in ['embed', 'vector', 'encoding'])] |
|
|
|
embeddings = {} |
|
for col in embed_cols: |
|
try: |
|
data = [] |
|
for row in dataset[col]: |
|
if isinstance(row, str): |
|
values = [float(x.strip()) for x in row.strip('[]').split(',') if x.strip()] |
|
elif isinstance(row, list): |
|
values = row |
|
else: |
|
continue |
|
data.append(values) |
|
|
|
if data: |
|
embeddings[col] = np.array(data) |
|
except: |
|
continue |
|
|
|
|
|
video_embeds = embeddings.get('video_embed', None) |
|
text_embeds = embeddings.get('description_embed', None) |
|
|
|
|
|
if video_embeds is None and embeddings: |
|
video_embeds = next(iter(embeddings.values())) |
|
if text_embeds is None: |
|
text_embeds = video_embeds if video_embeds is not None else np.random.randn(len(dataset), 384) |
|
|
|
if video_embeds is None: |
|
|
|
num_rows = len(dataset) |
|
video_embeds = np.random.randn(num_rows, 384) |
|
text_embeds = np.random.randn(num_rows, 384) |
|
|
|
return video_embeds, text_embeds |
|
except: |
|
|
|
num_rows = len(dataset) |
|
return np.random.randn(num_rows, 384), np.random.randn(num_rows, 384) |
|
|
|
class VideoSearch: |
|
def __init__(self): |
|
self.text_model = SentenceTransformer('all-MiniLM-L6-v2') |
|
self.dataset = load_dataset() |
|
self.video_embeds, self.text_embeds = prepare_features(self.dataset) |
|
|
|
def search(self, query, column=None, top_k=20): |
|
|
|
if not query.strip(): |
|
|
|
results = [] |
|
df_copy = self.dataset.copy() |
|
|
|
for row in df_copy.itertuples(): |
|
result = {'relevance_score': 1.0} |
|
for col in df_copy.columns: |
|
if col not in ['video_embed', 'description_embed', 'audio_embed']: |
|
result[col] = getattr(row, col) |
|
results.append(result) |
|
return results[:top_k] |
|
|
|
|
|
query_embedding = self.text_model.encode([query])[0] |
|
video_sims = cosine_similarity([query_embedding], self.video_embeds)[0] |
|
text_sims = cosine_similarity([query_embedding], self.text_embeds)[0] |
|
combined_sims = 0.5 * video_sims + 0.5 * text_sims |
|
|
|
|
|
if column and column in self.dataset.columns and column != "All Fields": |
|
mask = self.dataset[column].astype(str).str.contains(query, case=False, na=False) |
|
combined_sims = combined_sims[mask] |
|
filtered_dataset = self.dataset[mask].copy() |
|
else: |
|
filtered_dataset = self.dataset.copy() |
|
|
|
|
|
top_k = min(top_k, len(combined_sims)) |
|
if top_k == 0: |
|
return [] |
|
top_indices = np.argsort(combined_sims)[-top_k:][::-1] |
|
|
|
results = [] |
|
filtered_dataset = filtered_dataset.iloc[top_indices] |
|
filtered_sims = combined_sims[top_indices] |
|
for idx, row in zip(top_indices, filtered_dataset.itertuples()): |
|
result = {'relevance_score': float(filtered_sims[list(top_indices).index(idx)])} |
|
for col in filtered_dataset.columns: |
|
if col not in ['video_embed', 'description_embed', 'audio_embed']: |
|
result[col] = getattr(row, col) |
|
results.append(result) |
|
|
|
return results |
|
|
|
|
|
|
|
|
|
def arxiv_search(query, max_results=5): |
|
"""Perform a simple Arxiv search using their API and return top results.""" |
|
if not query.strip(): |
|
return [] |
|
base_url = "http://export.arxiv.org/api/query?" |
|
search_url = base_url + f"search_query={quote(query)}&start=0&max_results={max_results}" |
|
r = requests.get(search_url) |
|
if r.status_code == 200: |
|
root = ET.fromstring(r.text) |
|
ns = {'atom': 'http://www.w3.org/2005/Atom'} |
|
entries = root.findall('atom:entry', ns) |
|
results = [] |
|
for entry in entries: |
|
title = entry.find('atom:title', ns).text.strip() |
|
summary = entry.find('atom:summary', ns).text.strip() |
|
link = None |
|
for l in entry.findall('atom:link', ns): |
|
if l.get('type') == 'text/html': |
|
link = l.get('href') |
|
break |
|
results.append((title, summary, link)) |
|
return results |
|
return [] |
|
|
|
def perform_arxiv_lookup(q, vocal_summary=True, titles_summary=True, full_audio=False): |
|
results = arxiv_search(q, max_results=5) |
|
if not results: |
|
st.write("No Arxiv results found.") |
|
return |
|
st.markdown(f"**Arxiv Search Results for '{q}':**") |
|
for i, (title, summary, link) in enumerate(results, start=1): |
|
st.markdown(f"**{i}. {title}**") |
|
st.write(summary) |
|
if link: |
|
st.markdown(f"[View Paper]({link})") |
|
|
|
|
|
|
|
|
|
def show_file_manager(): |
|
"""Display file manager interface for uploading and browsing local files.""" |
|
st.subheader("π File Manager") |
|
col1, col2 = st.columns(2) |
|
with col1: |
|
uploaded_file = st.file_uploader("Upload File", type=['txt', 'md', 'mp3']) |
|
if uploaded_file: |
|
with open(uploaded_file.name, "wb") as f: |
|
f.write(uploaded_file.getvalue()) |
|
st.success(f"Uploaded: {uploaded_file.name}") |
|
st.session_state.should_rerun = True |
|
|
|
with col2: |
|
if st.button("π Clear All Files"): |
|
for f in glob.glob("*.txt") + glob.glob("*.md") + glob.glob("*.mp3"): |
|
os.remove(f) |
|
st.success("All files cleared!") |
|
st.session_state.should_rerun = True |
|
|
|
files = glob.glob("*.txt") + glob.glob("*.md") + glob.glob("*.mp3") |
|
if files: |
|
st.write("### Existing Files") |
|
for f in files: |
|
with st.expander(f"π {os.path.basename(f)}"): |
|
if f.endswith('.mp3'): |
|
st.audio(f) |
|
else: |
|
with open(f, 'r', encoding='utf-8') as file: |
|
st.text_area("Content", file.read(), height=100) |
|
if st.button(f"Delete {os.path.basename(f)}", key=f"del_{f}"): |
|
os.remove(f) |
|
st.session_state.should_rerun = True |
|
|
|
|
|
|
|
|
|
def display_editor(): |
|
|
|
text_files = glob.glob("*.txt") + glob.glob("*.md") |
|
selected_file = st.selectbox("Select a file to edit:", ["None"] + text_files) |
|
if selected_file != "None": |
|
with open(selected_file, 'r', encoding='utf-8') as f: |
|
content = f.read() |
|
new_content = st.text_area("βοΈ Edit Content:", value=content, height=300) |
|
if st.button("πΎ Save"): |
|
with open(selected_file, 'w', encoding='utf-8') as f: |
|
f.write(new_content) |
|
st.success("File saved!") |
|
st.session_state.should_rerun = True |
|
|
|
|
|
|
|
|
|
def show_media(): |
|
st.header("πΈ Images & π₯ Videos") |
|
tabs = st.tabs(["πΌ Images", "π₯ Video"]) |
|
with tabs[0]: |
|
imgs = glob.glob("*.png") + glob.glob("*.jpg") + glob.glob("*.jpeg") |
|
if imgs: |
|
c = st.slider("Columns", 1, 5, 3) |
|
cols = st.columns(c) |
|
for i, f in enumerate(imgs): |
|
with cols[i % c]: |
|
st.image(Image.open(f), use_column_width=True) |
|
else: |
|
st.write("No images found.") |
|
|
|
with tabs[1]: |
|
vids = glob.glob("*.mp4") + glob.glob("*.webm") + glob.glob("*.mov") |
|
if vids: |
|
for v in vids: |
|
with st.expander(f"π₯ {os.path.basename(v)}"): |
|
st.video(v) |
|
else: |
|
st.write("No videos found.") |
|
|
|
|
|
|
|
|
|
def display_video_search(): |
|
st.subheader("Search Videos") |
|
search_instance = VideoSearch() |
|
col1, col2 = st.columns([3, 1]) |
|
with col1: |
|
query = st.text_input("Enter your search query:", value="ancient" if not st.session_state['initial_search_done'] else "") |
|
with col2: |
|
search_column = st.selectbox("Search in field:", ["All Fields"] + st.session_state['search_columns']) |
|
|
|
col3, col4 = st.columns(2) |
|
with col3: |
|
num_results = st.slider("Number of results:", 1, 100, 20) |
|
with col4: |
|
search_button = st.button("π Search") |
|
|
|
if (search_button or not st.session_state['initial_search_done']) and query is not None: |
|
st.session_state['initial_search_done'] = True |
|
selected_column = None if search_column == "All Fields" else search_column |
|
with st.spinner("Searching..."): |
|
results = search_instance.search(query, selected_column, num_results) |
|
|
|
st.session_state['search_history'].append({ |
|
'query': query, |
|
'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S"), |
|
'results': results[:5] |
|
}) |
|
|
|
for i, result in enumerate(results, 1): |
|
highlighted_desc = highlight_text(result['description'], query) |
|
with st.expander(f"Result {i}: {result['description'][:100]}...", expanded=(i == 1)): |
|
cols = st.columns([2, 1]) |
|
with cols[0]: |
|
st.markdown("**Description:**") |
|
st.write(highlighted_desc) |
|
st.markdown(f"**Time Range:** {result['start_time']}s - {result['end_time']}s") |
|
st.markdown(f"**Views:** {result['views']:,}") |
|
|
|
with cols[1]: |
|
st.markdown(f"**Relevance Score:** {result['relevance_score']:.2%}") |
|
if result.get('youtube_id'): |
|
st.video(f"https://youtube.com/watch?v={result['youtube_id']}&t={result['start_time']}") |
|
|
|
|
|
|
|
|
|
def main(): |
|
st.sidebar.markdown("### π²BikeAIπ Multi-Agent Research") |
|
|
|
tab_main = st.sidebar.radio("Action:", ["πΈ Media", "π ArXiv", "π Editor"]) |
|
|
|
|
|
with st.sidebar: |
|
st.subheader("βοΈ Settings & History") |
|
if st.button("ποΈ Clear History"): |
|
st.session_state['search_history'] = [] |
|
st.experimental_rerun() |
|
|
|
st.markdown("### Recent Searches") |
|
for entry in reversed(st.session_state['search_history'][-5:]): |
|
with st.expander(f"{entry['timestamp']}: {entry['query']}"): |
|
for i, result in enumerate(entry['results'], 1): |
|
st.write(f"{i}. {result['description'][:100]}...") |
|
|
|
st.markdown("### TTS Voice (unused)") |
|
st.selectbox("TTS Voice:", |
|
["en-US-AriaNeural", "en-US-GuyNeural", "en-GB-SoniaNeural"], |
|
key="tts_voice") |
|
|
|
|
|
if tab_main == "πΈ Media": |
|
|
|
show_media() |
|
st.write("---") |
|
display_video_search() |
|
|
|
elif tab_main == "π ArXiv": |
|
st.subheader("Arxiv Search") |
|
q = st.text_input("Enter your Arxiv search query:", value=st.session_state['arxiv_last_query']) |
|
vocal_summary = st.checkbox("π Short Audio Summary (Placeholder - no TTS actually)", value=True) |
|
titles_summary = st.checkbox("π Titles Only", value=True) |
|
full_audio = st.checkbox("π Full Audio Results (Placeholder)", value=False) |
|
|
|
if st.button("π Arxiv Search"): |
|
st.session_state['arxiv_last_query'] = q |
|
perform_arxiv_lookup(q, vocal_summary=vocal_summary, titles_summary=titles_summary, full_audio=full_audio) |
|
|
|
elif tab_main == "π Editor": |
|
show_file_manager() |
|
st.write("---") |
|
display_editor() |
|
|
|
|
|
if st.session_state.should_rerun: |
|
st.session_state.should_rerun = False |
|
st.experimental_rerun() |
|
|
|
if __name__ == "__main__": |
|
main() |
|
|