Spaces:
Sleeping
Sleeping
import streamlit as st | |
from streamlit.components.v1 import components | |
from pathlib import Path | |
import json | |
import time | |
from datetime import datetime | |
import re | |
import pandas as pd | |
import yaml | |
from io import StringIO, BytesIO | |
import openpyxl | |
import csv | |
import base64 | |
import glob | |
import os | |
import zipfile | |
# Initialize session state for chat history if not exists | |
if 'chat_history' not in st.session_state: | |
st.session_state.chat_history = [] | |
if 'messages' not in st.session_state: | |
st.session_state.messages = [] | |
if 'current_file' not in st.session_state: | |
st.session_state.current_file = None | |
if 'file_content' not in st.session_state: | |
st.session_state.file_content = None | |
def create_zip_of_files(files): | |
"""Create a zip file containing all markdown files""" | |
zip_buffer = BytesIO() | |
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file: | |
for file in files: | |
with open(file, 'r', encoding='utf-8') as f: | |
zip_file.writestr(file, f.read()) | |
return zip_buffer.getvalue() | |
def get_download_link(file_path): | |
"""Create a download link for a file""" | |
if isinstance(file_path, bytes): # For zip files | |
b64 = base64.b64encode(file_path).decode() | |
href = f'data:application/zip;base64,{b64}' | |
filename = 'all_files.zip' | |
else: # For individual files | |
with open(file_path, 'r', encoding='utf-8') as f: | |
content = f.read() | |
b64 = base64.b64encode(content.encode()).decode() | |
href = f'data:text/markdown;base64,{b64}' | |
filename = os.path.basename(file_path) | |
return f'<a href="{href}" download="{filename}">{filename}</a>' | |
def load_file(filepath): | |
"""Load file content""" | |
with open(filepath, 'r', encoding='utf-8') as f: | |
return f.read() | |
def display_file_manager(): | |
"""Display file management sidebar with guaranteed unique button keys.""" | |
st.sidebar.title("π File Management") | |
all_files = glob.glob("*.md") | |
all_files.sort(reverse=True) | |
if st.sidebar.button("π Delete All", key="delete_all_files_button"): | |
for file in all_files: | |
os.remove(file) | |
st.rerun() | |
if st.sidebar.button("β¬οΈ Download All", key="download_all_files_button"): | |
zip_file = create_zip_of_files(all_files) | |
st.sidebar.markdown(get_download_link(zip_file), unsafe_allow_html=True) | |
for idx, file in enumerate(all_files): | |
file_stat = os.stat(file) | |
unique_id = f"{idx}_{file_stat.st_size}_{file_stat.st_mtime}" | |
col1, col2, col3, col4 = st.sidebar.columns([1,3,1,1]) | |
with col1: | |
if st.button("π", key=f"view_{unique_id}"): | |
st.session_state.current_file = file | |
st.session_state.file_content = load_file(file) | |
with col2: | |
st.markdown(get_download_link(file), unsafe_allow_html=True) | |
with col3: | |
if st.button("π", key=f"edit_{unique_id}"): | |
st.session_state.current_file = file | |
st.session_state.file_content = load_file(file) | |
with col4: | |
if st.button("π", key=f"delete_{unique_id}"): | |
os.remove(file) | |
st.rerun() | |
def process_with_gpt(user_input): | |
"""Process input with GPT-4""" | |
# Add your GPT processing logic here | |
response = f"GPT-4 Response to: {user_input}" | |
st.session_state.messages.append({"role": "user", "content": user_input}) | |
st.session_state.messages.append({"role": "assistant", "content": response}) | |
return response | |
def process_with_claude(user_input): | |
"""Process input with Claude""" | |
# Add your Claude processing logic here | |
response = f"Claude Response to: {user_input}" | |
st.session_state.chat_history.append({ | |
"user": user_input, | |
"claude": response | |
}) | |
return response | |
def perform_ai_lookup(query): | |
"""Perform AI lookup with ArXiv""" | |
# Add your ArXiv lookup logic here | |
return f"ArXiv results for: {query}" | |
def search_arxiv(query): | |
"""Search ArXiv papers""" | |
# Add your ArXiv search logic here | |
return f"ArXiv search results for: {query}" | |
def create_media_gallery(): | |
"""Create media gallery interface""" | |
st.subheader("Media Gallery") | |
st.write("Media gallery functionality goes here") | |
def show_chat_history(): | |
"""Display chat history for both models""" | |
st.subheader("Chat History π") | |
tab1, tab2 = st.tabs(["Claude History", "GPT-4o History"]) | |
with tab1: | |
for chat in st.session_state.chat_history: | |
st.text_area("You:", chat["user"], height=100) | |
st.text_area("Claude:", chat["claude"], height=200) | |
st.markdown(chat["claude"]) | |
with tab2: | |
for message in st.session_state.messages: | |
with st.chat_message(message["role"]): | |
st.markdown(message["content"]) | |
def handle_voice_input(model_choice): | |
"""Handle voice input tab logic""" | |
st.subheader("Voice Recognition") | |
if 'voice_transcript' not in st.session_state: | |
st.session_state.voice_transcript = "" | |
user_input = st.text_area("Message:", height=100) | |
if st.button("Send π¨") and user_input: | |
process_ai_input(user_input, model_choice) | |
show_chat_history() | |
def handle_file_editor(): | |
"""Handle file editor tab logic""" | |
if st.session_state.current_file: | |
st.subheader(f"Editing: {st.session_state.current_file}") | |
new_content = st.text_area("Content:", st.session_state.file_content, height=300) | |
if st.button("Save Changes"): | |
with open(st.session_state.current_file, 'w', encoding='utf-8') as file: | |
file.write(new_content) | |
st.success("File updated successfully!") | |
def process_ai_input(user_input, model_choice): | |
"""Handle AI processing based on model choice""" | |
if model_choice == "GPT-4o": | |
gpt_response = process_with_gpt(user_input) | |
elif model_choice == "Claude-3": | |
claude_response = process_with_claude(user_input) | |
else: # All Three AIs | |
col1, col2, col3 = st.columns(3) | |
with col2: | |
st.subheader("Claude-3.5 Sonnet:") | |
try: | |
claude_response = process_with_claude(user_input) | |
except: | |
st.write('Claude 3.5 Sonnet out of tokens.') | |
with col1: | |
st.subheader("GPT-4o Omni:") | |
try: | |
gpt_response = process_with_gpt(user_input) | |
except: | |
st.write('GPT 4o out of tokens') | |
with col3: | |
st.subheader("Arxiv and Mistral Research:") | |
with st.spinner("Searching ArXiv..."): | |
try: | |
results = perform_ai_lookup(user_input) | |
st.markdown(results) | |
except: | |
st.write("Arxiv Mistral too busy - try again.") | |
def handle_arxiv_search(): | |
"""Handle ArXiv search tab logic""" | |
query = st.text_input("Enter your research query:") | |
if query: | |
with st.spinner("Searching ArXiv..."): | |
results = search_arxiv(query) | |
st.markdown(results) | |
def main(): | |
st.sidebar.markdown("### π²BikeAIπ Claude and GPT Multi-Agent Research AI") | |
# Display file manager in sidebar first | |
display_file_manager() | |
# Model Selection | |
model_choice = st.sidebar.radio( | |
"Choose AI Model:", | |
["GPT+Claude+Arxiv", "GPT-4o", "Claude-3"] | |
) | |
# Main navigation | |
tab_main = st.radio("Choose Action:", | |
["π€ Voice Input", "πΈ Media Gallery", "π Search ArXiv", "π File Editor"], | |
horizontal=True) | |
try: | |
# Component Magic section - wrapped in try/except in case component isn't available | |
mycomponent = components.declare_component("mycomponent", path="mycomponent") | |
value = mycomponent(my_input_value="hello there") | |
st.write("Received", value) | |
if value is not None: | |
process_ai_input(value, model_choice) | |
except: | |
st.warning("Voice component not available") | |
# Handle main tab navigation | |
if tab_main == "π€ Voice Input": | |
handle_voice_input(model_choice) | |
elif tab_main == "πΈ Media Gallery": | |
create_media_gallery() | |
elif tab_main == "π Search ArXiv": | |
handle_arxiv_search() | |
elif tab_main == "π File Editor": | |
handle_file_editor() | |
if __name__ == "__main__": | |
main() |