|
import base64 |
|
import cv2 |
|
import glob |
|
import json |
|
import math |
|
import os |
|
import pytz |
|
import re |
|
import time |
|
import zipfile |
|
import asyncio |
|
import streamlit as st |
|
import streamlit.components.v1 as components |
|
from concurrent.futures import ThreadPoolExecutor, as_completed |
|
from tqdm import tqdm |
|
import requests |
|
|
|
|
|
from audio_recorder_streamlit import audio_recorder |
|
from bs4 import BeautifulSoup |
|
from collections import deque |
|
from datetime import datetime |
|
from dotenv import load_dotenv |
|
from gradio_client import Client |
|
from io import BytesIO |
|
from moviepy.editor import VideoFileClip |
|
from PIL import Image |
|
from PyPDF2 import PdfReader |
|
|
|
|
|
import openai |
|
from openai import OpenAI |
|
import pandas as pd |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
|
|
class PerformanceTracker: |
|
"""Tracks and displays the performance of executed tasks.""" |
|
def track(self, model_name_provider): |
|
|
|
def decorator(func): |
|
def wrapper(*args, **kwargs): |
|
st.info(f"Executing with model: `{model_name_provider() if callable(model_name_provider) else model_name_provider}`...") |
|
start_time = time.time() |
|
result = func(*args, **kwargs) |
|
end_time = time.time() |
|
duration = end_time - start_time |
|
st.success(f"✅ **Execution Complete!** | Runtime: `{duration:.2f} seconds`") |
|
return result |
|
return wrapper |
|
return decorator |
|
|
|
class FileHandler: |
|
"""Manages all file system operations like naming, saving, and zipping.""" |
|
def __init__(self, should_save=True): |
|
|
|
self.should_save = should_save |
|
self.central_tz = pytz.timezone('US/Central') |
|
|
|
def generate_filename(self, prompt, file_type, original_name=None): |
|
|
|
safe_date_time = datetime.now(self.central_tz).strftime("%m%d_%H%M") |
|
safe_prompt = re.sub(r'[<>:"/\\|?*\n\r]', ' ', str(prompt)).strip()[:50] |
|
file_stem = f"{safe_date_time}_{safe_prompt}" |
|
if original_name: |
|
base_name = os.path.splitext(original_name)[0] |
|
file_stem = f"{file_stem}_{base_name}" |
|
return f"{file_stem[:100]}.{file_type}" |
|
|
|
def save_file(self, content, filename, prompt=None): |
|
|
|
if not self.should_save: |
|
return None |
|
with open(filename, "w", encoding="utf-8") as f: |
|
if prompt: |
|
f.write(str(prompt) + "\n\n") |
|
f.write(str(content)) |
|
return filename |
|
|
|
def save_uploaded_file(self, uploaded_file): |
|
|
|
path = os.path.join(uploaded_file.name) |
|
with open(path, "wb") as f: |
|
f.write(uploaded_file.getvalue()) |
|
return path |
|
|
|
def create_zip_archive(self, files_to_zip, zip_name="files.zip"): |
|
|
|
with zipfile.ZipFile(zip_name, 'w') as zipf: |
|
for file in files_to_zip: |
|
if os.path.exists(file): |
|
zipf.write(file) |
|
return zip_name |
|
|
|
@st.cache_data |
|
def get_base64_download_link(_self, file_path, link_text): |
|
|
|
with open(file_path, 'rb') as f: |
|
data = f.read() |
|
b64 = base64.b64encode(data).decode() |
|
ext = os.path.splitext(file_path)[1].lower() |
|
mime_map = {'.md': 'text/markdown', '.pdf': 'application/pdf', '.png': 'image/png', '.jpg': 'image/jpeg', '.wav': 'audio/wav', '.mp3': 'audio/mpeg', '.mp4': 'video/mp4', '.zip': 'application/zip'} |
|
mime_type = mime_map.get(ext, "application/octet-stream") |
|
return f'<a href="data:{mime_type};base64,{b64}" download="{os.path.basename(file_path)}">{link_text}</a>' |
|
|
|
class OpenAIProcessor: |
|
"""Handles all interactions with the OpenAI API.""" |
|
def __init__(self, api_key, org_id): |
|
|
|
self.client = OpenAI(api_key=api_key, organization=org_id) |
|
|
|
def execute_text_completion(self, model, messages): |
|
|
|
return self.client.chat.completions.create( |
|
model=model, |
|
messages=[{"role": m["role"], "content": m["content"]} for m in messages] |
|
).choices[0].message.content |
|
|
|
def execute_image_completion(self, model, prompt, image_bytes): |
|
|
|
base64_image = base64.b64encode(image_bytes).decode("utf-8") |
|
return self.client.chat.completions.create( |
|
model=model, |
|
messages=[ |
|
{"role": "system", "content": "You are a helpful assistant that responds in Markdown."}, |
|
{"role": "user", "content": [ |
|
{"type": "text", "text": prompt}, |
|
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}} |
|
]} |
|
] |
|
).choices[0].message.content |
|
|
|
def execute_video_completion(self, model, frames, transcript): |
|
|
|
return self.client.chat.completions.create( |
|
model=model, |
|
messages=[ |
|
{"role": "system", "content": "Summarize the video and its transcript in Markdown."}, |
|
{"role": "user", "content": [ |
|
"Video frames:", *map(lambda x: {"type": "image_url", "image_url": {"url": f"data:image/jpg;base64,{x}"}}, frames), |
|
{"type": "text", "text": f"Transcription: {transcript}"} |
|
]} |
|
] |
|
).choices[0].message.content |
|
|
|
def transcribe_audio(self, audio_bytes, file_name="temp_audio.wav"): |
|
|
|
try: |
|
|
|
with open(file_name, 'wb') as f: |
|
f.write(audio_bytes) |
|
with open(file_name, 'rb') as f: |
|
transcription = self.client.audio.transcriptions.create(model="whisper-1", file=f) |
|
os.remove(file_name) |
|
return transcription.text |
|
except Exception as e: |
|
st.error(f"Audio processing error: {e}") |
|
if os.path.exists(file_name): os.remove(file_name) |
|
return None |
|
|
|
class MediaProcessor: |
|
"""Handles processing of media files like video and audio.""" |
|
def extract_video_components(self, video_path, seconds_per_frame=5): |
|
|
|
base64Frames, audio_path = [], None |
|
try: |
|
video = cv2.VideoCapture(video_path) |
|
total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
fps = video.get(cv2.CAP_PROP_FPS) |
|
frames_to_skip = int(fps * seconds_per_frame) if fps > 0 else 1 |
|
curr_frame = 0 |
|
while curr_frame < total_frames - 1: |
|
video.set(cv2.CAP_PROP_POS_FRAMES, curr_frame) |
|
success, frame = video.read() |
|
if not success: break |
|
_, buffer = cv2.imencode(".jpg", frame) |
|
base64Frames.append(base64.b64encode(buffer).decode("utf-8")) |
|
curr_frame += frames_to_skip |
|
video.release() |
|
|
|
audio_path = f"{os.path.splitext(video_path)[0]}.mp3" |
|
with VideoFileClip(video_path) as clip: |
|
if clip.audio: |
|
clip.audio.write_audiofile(audio_path, bitrate="32k", logger=None) |
|
else: audio_path = None |
|
except Exception as e: |
|
st.warning(f"Could not process video: {e}") |
|
return base64Frames, audio_path |
|
|
|
class RAGManager: |
|
"""Manages Retrieval-Augmented Generation processes.""" |
|
def __init__(self, openai_client): |
|
|
|
self.client = openai_client |
|
|
|
def create_vector_store(self, name): |
|
|
|
try: |
|
return self.client.vector_stores.create(name=name) |
|
except Exception as e: |
|
st.error(f"Failed to create vector store: {e}") |
|
return None |
|
|
|
def upload_files_to_store(self, vector_store_id, file_paths): |
|
|
|
stats = {"total": len(file_paths), "success": 0, "failed": 0, "errors": []} |
|
def upload_file(file_path): |
|
try: |
|
with open(file_path, "rb") as f: |
|
file_batch = self.client.files.create(file=f, purpose="vision") |
|
self.client.vector_stores.files.create(vector_store_id=vector_store_id, file_id=file_batch.id) |
|
return True, None |
|
except Exception as e: |
|
return False, f"File {os.path.basename(file_path)}: {e}" |
|
|
|
with ThreadPoolExecutor(max_workers=5) as executor: |
|
futures = {executor.submit(upload_file, path): path for path in file_paths} |
|
for future in tqdm(as_completed(futures), total=len(futures), desc="Uploading PDFs"): |
|
success, error = future.result() |
|
if success: |
|
stats["success"] += 1 |
|
else: |
|
stats["failed"] += 1 |
|
stats["errors"].append(error) |
|
return stats |
|
|
|
def generate_questions_from_pdf(self, pdf_path): |
|
|
|
try: |
|
text = "" |
|
with open(pdf_path, "rb") as f: |
|
pdf = PdfReader(f) |
|
for page in pdf.pages: |
|
text += page.extract_text() or "" |
|
if not text: return "Could not extract text." |
|
|
|
prompt = f"Generate a 5-question quiz with answers based only on this document. Format as markdown with numbered questions and answers:\n{text[:4000]}\n\n" |
|
response = self.client.chat.completions.create( |
|
model="gpt-4o", messages=[{"role": "user", "content": prompt}] |
|
) |
|
return response.choices[0].message.content |
|
except Exception as e: |
|
return f"Error generating questions: {e}" |
|
|
|
class ExternalAPIHandler: |
|
"""Handles calls to external APIs like ArXiv.""" |
|
def search_arxiv(self, query): |
|
|
|
try: |
|
client = Client("awacke1/Arxiv-Paper-Search-And-QA-RAG-Pattern") |
|
result, _ = client.predict( |
|
message=query, api_name="/predict" |
|
) |
|
return result |
|
except Exception as e: |
|
st.error(f"ArXiv search failed: {e}") |
|
return "Could not connect to the ArXiv search service." |
|
|
|
class Benchmarker: |
|
"""Runs a suite of tests to benchmark different AI models.""" |
|
def __init__(self, openai_processor, media_processor, file_handler): |
|
|
|
self.openai_processor = openai_processor |
|
self.media_processor = media_processor |
|
self.file_handler = file_handler |
|
self.performance_tracker = PerformanceTracker() |
|
|
|
def run_all_benchmarks(self, model_name): |
|
|
|
st.info(f"🚀 Starting benchmark tests for `{model_name}`...") |
|
self.benchmark_text_completion(model_name) |
|
if "vision" in model_name or "4o" in model_name: |
|
self.benchmark_image_analysis(model_name) |
|
self.benchmark_video_processing(model_name) |
|
else: |
|
st.warning(f"Skipping vision benchmarks for non-vision model `{model_name}`.") |
|
st.success("🎉 All benchmark tests complete!") |
|
|
|
def benchmark_text_completion(self, model_name): |
|
|
|
pass |
|
|
|
def benchmark_image_analysis(self, model_name): |
|
|
|
pass |
|
|
|
def benchmark_video_processing(self, model_name): |
|
|
|
pass |
|
|
|
|
|
|
|
class StreamlitUI: |
|
"""Main class to build and run the Streamlit user interface.""" |
|
|
|
def __init__(self): |
|
|
|
self.setup_page() |
|
self.initialize_state() |
|
|
|
self.MODELS = { |
|
"GPT-4o": {"emoji": "🚀", "model_name": "gpt-4o"}, |
|
"GPT-4 Turbo": {"emoji": "🧠", "model_name": "gpt-4-turbo"}, |
|
"GPT-3.5 Turbo": {"emoji": "⚡", "model_name": "gpt-3.5-turbo"}, |
|
} |
|
|
|
|
|
self.file_handler = FileHandler(should_save=st.session_state.should_save) |
|
self.openai_processor = OpenAIProcessor(api_key=os.getenv('OPENAI_API_KEY'), org_id=os.getenv('OPENAI_ORG_ID')) |
|
self.media_processor = MediaProcessor() |
|
self.rag_manager = RAGManager(self.openai_processor.client) |
|
self.external_api_handler = ExternalAPIHandler() |
|
self.benchmarker = Benchmarker(self.openai_processor, self.media_processor, self.file_handler) |
|
self.performance_tracker = PerformanceTracker() |
|
|
|
def setup_page(self): |
|
|
|
st.set_page_config(page_title="🔬🧠ScienceBrain.AI", page_icon="🔬", layout="wide", initial_sidebar_state="auto") |
|
|
|
def initialize_state(self): |
|
|
|
defaults = { |
|
"openai_model": "gpt-4o", "messages": [], "should_save": True, |
|
"test_mode": False, "input_option": "Text", "rag_prompt": "" |
|
} |
|
for key, value in defaults.items(): |
|
if key not in st.session_state: |
|
st.session_state[key] = value |
|
|
|
def display_sidebar(self): |
|
|
|
with st.sidebar: |
|
st.title("Configuration") |
|
st.session_state.should_save = st.checkbox("💾 Save Session Logs", st.session_state.should_save) |
|
st.session_state.test_mode = st.checkbox("🔬 Run Benchmark Tests", st.session_state.test_mode) |
|
|
|
st.markdown("---") |
|
st.subheader("Select a Model") |
|
|
|
for name, details in self.MODELS.items(): |
|
if st.button(f"{details['emoji']} {name}", key=f"model_{name}", use_container_width=True): |
|
self.select_model_and_reset_session(details['model_name']) |
|
|
|
st.markdown("---") |
|
if st.button("🗑️ Clear Chat History", use_container_width=True): |
|
st.session_state.messages = [] |
|
st.rerun() |
|
|
|
st.markdown("---") |
|
self.display_file_browser() |
|
|
|
def display_file_browser(self): |
|
|
|
st.subheader("File Operations") |
|
default_types = [".md", ".png", ".pdf"] |
|
file_types = st.multiselect("Filter by type", [".md", ".wav", ".png", ".mp4", ".mp3", ".pdf"], default=default_types) |
|
|
|
all_files = [f for f in glob.glob("*.*") if os.path.splitext(f)[1] in file_types and len(os.path.splitext(f)[0]) >= 10] |
|
all_files.sort(key=lambda x: os.path.getmtime(x), reverse=True) |
|
|
|
if st.button("⬇️ Download All Filtered", use_container_width=True): |
|
zip_path = self.file_handler.create_zip_archive(all_files) |
|
st.markdown(self.file_handler.get_base64_download_link(zip_path, "Click to download ZIP"), unsafe_allow_html=True) |
|
|
|
for file in all_files[:20]: |
|
with st.expander(os.path.basename(file)): |
|
st.markdown(self.file_handler.get_base64_download_link(file, f"Download {os.path.basename(file)}"), unsafe_allow_html=True) |
|
if st.button("🗑 Delete", key=f"del_{file}"): |
|
os.remove(file) |
|
st.rerun() |
|
|
|
def select_model_and_reset_session(self, model_name): |
|
|
|
st.session_state.openai_model = model_name |
|
st.session_state.messages = [] |
|
st.info(f"Model set to `{model_name}`. New session started.") |
|
if st.session_state.test_mode: |
|
self.benchmarker.run_all_benchmarks(model_name) |
|
st.rerun() |
|
|
|
def display_main_interface(self): |
|
|
|
st.title("🔬🧠 ScienceBrain.AI") |
|
st.markdown(f"**Model:** `{st.session_state.openai_model}` | **Input Mode:** `{st.session_state.input_option}`") |
|
|
|
options = ("Text", "Image", "Audio", "Video", "ArXiv Search", "RAG PDF Gallery") |
|
st.session_state.input_option = st.selectbox("Select Input Type", options, index=options.index(st.session_state.input_option)) |
|
|
|
|
|
handler_map = { |
|
"Text": self.handle_text_input, "Image": self.handle_image_input, |
|
"Audio": self.handle_audio_input, "Video": self.handle_video_input, |
|
"ArXiv Search": self.handle_arxiv_search, "RAG PDF Gallery": self.handle_rag_gallery |
|
} |
|
handler_map[st.session_state.input_option]() |
|
|
|
|
|
st.markdown("---") |
|
st.subheader("Conversation History") |
|
for message in st.session_state.messages: |
|
with st.chat_message(message["role"]): |
|
st.markdown(message["content"]) |
|
|
|
if prompt := st.chat_input(f"Chat with {st.session_state.openai_model}..."): |
|
self.process_and_display_completion(prompt) |
|
|
|
def process_and_display_completion(self, prompt, context=""): |
|
|
|
full_prompt = f"{context}\n\n{prompt}" if context else prompt |
|
st.session_state.messages.append({"role": "user", "content": full_prompt}) |
|
|
|
with st.chat_message("user"): |
|
st.markdown(full_prompt) |
|
|
|
with st.chat_message("assistant"): |
|
with st.spinner("Thinking..."): |
|
response = self.openai_processor.execute_text_completion( |
|
st.session_state.openai_model, st.session_state.messages |
|
) |
|
st.markdown(response) |
|
st.session_state.messages.append({"role": "assistant", "content": response}) |
|
if st.session_state.should_save: |
|
filename = self.file_handler.generate_filename(prompt, "md") |
|
self.file_handler.save_file(response, filename, prompt=full_prompt) |
|
st.rerun() |
|
|
|
def handle_text_input(self): |
|
|
|
if prompt := st.text_area("Enter your text prompt:", key="text_prompt", height=150): |
|
if st.button("Submit Text", key="submit_text"): |
|
self.process_and_display_completion(prompt) |
|
|
|
def handle_image_input(self): |
|
|
|
prompt = st.text_input("Prompt for the image:", value="Describe this image in detail.") |
|
uploaded_image = st.file_uploader("Upload an image:", type=["png", "jpg", "jpeg"]) |
|
|
|
if st.button("Submit Image") and uploaded_image and prompt: |
|
with st.chat_message("user"): |
|
st.image(uploaded_image, width=250) |
|
st.markdown(prompt) |
|
|
|
with st.chat_message("assistant"): |
|
with st.spinner("Analyzing image..."): |
|
image_bytes = uploaded_image.getvalue() |
|
response = self.openai_processor.execute_image_completion(st.session_state.openai_model, prompt, image_bytes) |
|
st.markdown(response) |
|
if st.session_state.should_save: |
|
filename = self.file_handler.generate_filename(prompt, "md", original_name=uploaded_image.name) |
|
self.file_handler.save_file(response, filename, prompt=prompt) |
|
st.rerun() |
|
|
|
def handle_audio_input(self): |
|
|
|
prompt = st.text_input("Prompt for the audio:", value="Summarize this audio transcription.") |
|
uploaded_audio = st.file_uploader("Upload an audio file:", type=["mp3", "wav", "m4a"]) |
|
st.write("OR") |
|
recorded_audio = audio_recorder(text="Click to Record", icon_size="2x") |
|
|
|
audio_bytes, source = (uploaded_audio.getvalue(), uploaded_audio.name) if uploaded_audio else (recorded_audio, "recording.wav") if recorded_audio else (None, None) |
|
|
|
if st.button("Submit Audio") and audio_bytes and prompt: |
|
with st.chat_message("user"): |
|
st.audio(audio_bytes) |
|
st.markdown(prompt) |
|
with st.chat_message("assistant"): |
|
with st.spinner("Transcribing and processing audio..."): |
|
transcript = self.openai_processor.transcribe_audio(audio_bytes, file_name=source) |
|
if transcript: |
|
self.process_and_display_completion(prompt, context=f"Audio Transcription:\n{transcript}") |
|
st.rerun() |
|
|
|
def handle_video_input(self): |
|
|
|
prompt = st.text_input("Prompt for the video:", value="Summarize this video frame by frame and the audio.") |
|
uploaded_video = st.file_uploader("Upload a video:", type=["mp4", "mov"]) |
|
|
|
if st.button("Submit Video") and uploaded_video and prompt: |
|
with st.chat_message("user"): |
|
st.video(uploaded_video) |
|
st.markdown(prompt) |
|
with st.chat_message("assistant"): |
|
with st.spinner("Processing video... this may take a while."): |
|
video_path = self.file_handler.save_uploaded_file(uploaded_video) |
|
frames, audio_path = self.media_processor.extract_video_components(video_path) |
|
transcript = "No audio found." |
|
if audio_path and os.path.exists(audio_path): |
|
with open(audio_path, "rb") as af: |
|
transcript = self.openai_processor.transcribe_audio(af.read(), file_name=audio_path) |
|
|
|
response = self.openai_processor.execute_video_completion(st.session_state.openai_model, frames, transcript or "No audio transcribed.") |
|
st.markdown(response) |
|
if st.session_state.should_save: |
|
filename = self.file_handler.generate_filename(prompt, "md", original_name=uploaded_video.name) |
|
self.file_handler.save_file(response, filename, prompt=prompt) |
|
st.rerun() |
|
|
|
def handle_arxiv_search(self): |
|
|
|
query = st.text_input("Search ArXiv for scholarly articles:") |
|
if st.button("Search ArXiv") and query: |
|
with st.spinner("Searching ArXiv..."): |
|
result = self.external_api_handler.search_arxiv(query) |
|
self.process_and_display_completion(f"Summarize the findings from this ArXiv search result.", context=result) |
|
|
|
def handle_rag_gallery(self): |
|
|
|
st.subheader("RAG PDF Gallery") |
|
pdf_files = st.file_uploader("Upload PDFs to build a Vector Store:", type=["pdf"], accept_multiple_files=True) |
|
|
|
if pdf_files: |
|
if st.button(f"Create Vector Store with {len(pdf_files)} PDFs"): |
|
with st.spinner("Saving files and creating vector store..."): |
|
pdf_paths = [self.file_handler.save_uploaded_file(f) for f in pdf_files] |
|
vector_store = self.rag_manager.create_vector_store(f"PDF_Gallery_{int(time.time())}") |
|
if vector_store: |
|
st.session_state.vector_store_id = vector_store.id |
|
stats = self.rag_manager.upload_files_to_store(vector_store.id, pdf_paths) |
|
st.json(stats) |
|
st.success(f"Vector Store `{vector_store.name}` created with ID: `{vector_store.id}`") |
|
|
|
if st.session_state.get("vector_store_id"): |
|
st.info(f"Active Vector Store ID: `{st.session_state.vector_store_id}`") |
|
|
|
if st.button("Generate Quiz from a Random PDF"): |
|
with st.spinner("Generating quiz..."): |
|
random_pdf = self.file_handler.save_uploaded_file(pdf_files[0]) |
|
quiz = self.rag_manager.generate_questions_from_pdf(random_pdf) |
|
st.markdown(quiz) |
|
|
|
def run(self): |
|
|
|
self.display_sidebar() |
|
self.display_main_interface() |
|
|
|
|
|
if __name__ == "__main__": |
|
app = StreamlitUI() |
|
app.run() |
|
|