|
import base64 |
|
import cv2 |
|
import glob |
|
import json |
|
import math |
|
import os |
|
import pytz |
|
import random |
|
import re |
|
import requests |
|
import streamlit as st |
|
import streamlit.components.v1 as components |
|
import textract |
|
import time |
|
import zipfile |
|
from concurrent.futures import ThreadPoolExecutor |
|
from tqdm import tqdm |
|
import concurrent |
|
|
|
from audio_recorder_streamlit import audio_recorder |
|
from bs4 import BeautifulSoup |
|
from collections import deque |
|
from datetime import datetime |
|
from dotenv import load_dotenv |
|
from gradio_client import Client, handle_file |
|
from huggingface_hub import InferenceClient |
|
from io import BytesIO |
|
from moviepy import VideoFileClip |
|
from PIL import Image |
|
from PyPDF2 import PdfReader |
|
from templates import bot_template, css, user_template |
|
from urllib.parse import quote |
|
from xml.etree import ElementTree as ET |
|
|
|
import openai |
|
from openai import OpenAI |
|
import pandas as pd |
|
|
|
|
|
Site_Name = 'Scholarly-Article-Document-Search-With-Memory' |
|
title = "🔬🧠ScienceBrain.AI" |
|
helpURL = 'https://huggingface.co/awacke1' |
|
bugURL = 'https://huggingface.co/spaces/awacke1' |
|
icons = Image.open("icons.ico") |
|
st.set_page_config( |
|
page_title=title, |
|
page_icon=icons, |
|
layout="wide", |
|
initial_sidebar_state="auto", |
|
menu_items={'Get Help': helpURL, 'Report a bug': bugURL, 'About': title} |
|
) |
|
|
|
|
|
API_KEY = os.getenv('API_KEY') |
|
HF_KEY = os.getenv('HF_KEY') |
|
headers = {"Authorization": f"Bearer {HF_KEY}", "Content-Type": "application/json"} |
|
key = os.getenv('OPENAI_API_KEY') |
|
client = OpenAI(api_key=key, organization=os.getenv('OPENAI_ORG_ID')) |
|
MODEL = "gpt-4o-2024-05-13" |
|
if "openai_model" not in st.session_state: |
|
st.session_state["openai_model"] = MODEL |
|
if "messages" not in st.session_state: |
|
st.session_state.messages = [] |
|
if st.button("Clear Session"): |
|
st.session_state.messages = [] |
|
|
|
|
|
should_save = st.sidebar.checkbox("💾 Save", value=True, help="Save your session data.") |
|
|
|
|
|
@st.cache_resource |
|
def SpeechSynthesis(result): |
|
documentHTML5 = ''' |
|
<!DOCTYPE html> |
|
<html> |
|
<head> |
|
<title>Read It Aloud</title> |
|
<script type="text/javascript"> |
|
function readAloud() { |
|
const text = document.getElementById("textArea").value; |
|
const speech = new SpeechSynthesisUtterance(text); |
|
window.speechSynthesis.speak(speech); |
|
} |
|
</script> |
|
</head> |
|
<body> |
|
<h1>🔊 Read It Aloud</h1> |
|
<textarea id="textArea" rows="10" cols="80"> |
|
''' |
|
documentHTML5 += result + ''' |
|
</textarea> |
|
<br> |
|
<button onclick="readAloud()">🔊 Read Aloud</button> |
|
</body> |
|
</html> |
|
''' |
|
components.html(documentHTML5, width=1280, height=300) |
|
|
|
|
|
def generate_filename(prompt, file_type): |
|
central = pytz.timezone('US/Central') |
|
safe_date_time = datetime.now(central).strftime("%m%d_%H%M") |
|
replaced_prompt = re.sub(r'[<>:"/\\|?*\n]', ' ', prompt) |
|
safe_prompt = re.sub(r'\s+', ' ', replaced_prompt).strip()[:240] |
|
return f"{safe_date_time}_{safe_prompt}.{file_type}" |
|
|
|
def create_and_save_file(content, file_type="md", prompt=None, is_image=False, should_save=True): |
|
if not should_save: |
|
return None |
|
filename = generate_filename(prompt if prompt else content, file_type) |
|
with open(filename, "w", encoding="utf-8") as f: |
|
if is_image: |
|
f.write(content) |
|
else: |
|
f.write(prompt + "\n\n" + content if prompt else content) |
|
return filename |
|
|
|
|
|
def process_text(text_input): |
|
if text_input: |
|
st.session_state.messages.append({"role": "user", "content": text_input}) |
|
with st.chat_message("user"): |
|
st.markdown(text_input) |
|
with st.chat_message("assistant"): |
|
completion = client.chat.completions.create( |
|
model=st.session_state["openai_model"], |
|
messages=[{"role": m["role"], "content": m["content"]} for m in st.session_state.messages], |
|
stream=False |
|
) |
|
response = completion.choices[0].message.content |
|
st.markdown(response) |
|
filename = generate_filename(text_input, "md") |
|
create_and_save_file(response, "md", text_input, should_save=should_save) |
|
st.session_state.messages.append({"role": "assistant", "content": response}) |
|
|
|
|
|
def process_audio(audio_input, text_input=''): |
|
if audio_input: |
|
audio_bytes = audio_input.read() if not isinstance(audio_input, str) else open(audio_input, "rb").read() |
|
with st.spinner("Transcribing audio..."): |
|
transcription = client.audio.transcriptions.create(model="whisper-1", file=BytesIO(audio_bytes)) |
|
st.session_state.messages.append({"role": "user", "content": transcription.text}) |
|
with st.chat_message("user"): |
|
st.markdown(transcription.text) |
|
with st.chat_message("assistant"): |
|
completion = client.chat.completions.create( |
|
model=st.session_state["openai_model"], |
|
messages=[{"role": "user", "content": text_input + "\n\nTranscription: " + transcription.text}] |
|
) |
|
response = completion.choices[0].message.content |
|
st.markdown(response) |
|
filename = generate_filename(transcription.text, "md") |
|
create_and_save_file(response, "md", text_input, should_save=should_save) |
|
st.session_state.messages.append({"role": "assistant", "content": response}) |
|
|
|
|
|
def process_image(image_input, user_prompt): |
|
if isinstance(image_input, str): |
|
with open(image_input, "rb") as image_file: |
|
image_bytes = image_file.read() |
|
else: |
|
image_bytes = image_input.read() |
|
base64_image = base64.b64encode(image_bytes).decode("utf-8") |
|
response = client.chat.completions.create( |
|
model=st.session_state["openai_model"], |
|
messages=[ |
|
{"role": "system", "content": "You are a helpful assistant that responds in Markdown."}, |
|
{"role": "user", "content": [ |
|
{"type": "text", "text": user_prompt}, |
|
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}} |
|
]} |
|
], |
|
temperature=0.0 |
|
) |
|
image_response = response.choices[0].message.content |
|
filename = generate_filename(user_prompt, "md") |
|
create_and_save_file(image_response, "md", user_prompt, should_save=should_save) |
|
return image_response |
|
|
|
|
|
def save_video(video_file): |
|
with open(video_file.name, "wb") as f: |
|
f.write(video_file.getbuffer()) |
|
return video_file.name |
|
|
|
def process_video(video_path, seconds_per_frame=2): |
|
base64Frames = [] |
|
base_video_path, _ = os.path.splitext(video_path) |
|
video = cv2.VideoCapture(video_path) |
|
total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
fps = video.get(cv2.CAP_PROP_FPS) |
|
frames_to_skip = int(fps * seconds_per_frame) |
|
curr_frame = 0 |
|
while curr_frame < total_frames - 1: |
|
video.set(cv2.CAP_PROP_POS_FRAMES, curr_frame) |
|
success, frame = video.read() |
|
if not success: |
|
break |
|
_, buffer = cv2.imencode(".jpg", frame) |
|
base64Frames.append(base64.b64encode(buffer).decode("utf-8")) |
|
curr_frame += frames_to_skip |
|
video.release() |
|
audio_path = f"{base_video_path}.mp3" |
|
try: |
|
clip = VideoFileClip(video_path) |
|
clip.audio.write_audiofile(audio_path, bitrate="32k") |
|
clip.audio.close() |
|
clip.close() |
|
except: |
|
st.write('No audio track found.') |
|
return base64Frames, audio_path |
|
|
|
def process_audio_and_video(video_input): |
|
if video_input: |
|
video_path = save_video(video_input) |
|
with st.spinner("Extracting frames and audio..."): |
|
base64Frames, audio_path = process_video(video_path) |
|
with st.spinner("Transcribing video audio..."): |
|
with open(video_path, "rb") as video_file: |
|
transcript = client.audio.transcriptions.create(model="whisper-1", file=video_file).text |
|
with st.chat_message("user"): |
|
st.markdown(f"Video Transcription: {transcript}") |
|
with st.chat_message("assistant"): |
|
response = client.chat.completions.create( |
|
model=st.session_state["openai_model"], |
|
messages=[ |
|
{"role": "system", "content": "Summarize the video and its transcript in Markdown."}, |
|
{"role": "user", "content": [ |
|
"Video frames:", *map(lambda x: {"type": "image_url", "image_url": {"url": f"data:image/jpg;base64,{x}"}}, base64Frames), |
|
{"type": "text", "text": f"Transcription: {transcript}"} |
|
]} |
|
] |
|
) |
|
result = response.choices[0].message.content |
|
st.markdown(result) |
|
filename = generate_filename(transcript or "video_summary", "md") |
|
create_and_save_file(result, "md", "Video summary", should_save=should_save) |
|
|
|
|
|
def extract_text_from_pdf(pdf_path): |
|
text = "" |
|
try: |
|
with open(pdf_path, "rb") as f: |
|
reader = PdfReader(f) |
|
for page in reader.pages: |
|
page_text = page.extract_text() |
|
if page_text: |
|
text += page_text |
|
except Exception as e: |
|
st.error(f"Error reading {pdf_path}: {e}") |
|
return text |
|
|
|
def generate_questions(pdf_path): |
|
text = extract_text_from_pdf(pdf_path) |
|
response = client.chat.completions.create( |
|
model="gpt-4o-2024-05-13", |
|
messages=[{"role": "user", "content": f"Generate a question that can only be answered from this document:\n{text[:2000]}"}] |
|
) |
|
return response.choices[0].message.content |
|
|
|
def upload_single_pdf(file_path, vector_store_id): |
|
file_name = os.path.basename(file_path) |
|
try: |
|
file_response = client.files.create(file=open(file_path, 'rb'), purpose="assistants") |
|
attach_response = client.vector_stores.files.create( |
|
vector_store_id=vector_store_id, |
|
file_id=file_response.id |
|
) |
|
return {"file": file_name, "status": "success"} |
|
except Exception as e: |
|
st.error(f"Error with {file_name}: {str(e)}") |
|
return {"file": file_name, "status": "failed", "error": str(e)} |
|
|
|
def upload_pdf_files_to_vector_store(vector_store_id, pdf_files): |
|
stats = {"total_files": len(pdf_files), "successful_uploads": 0, "failed_uploads": 0, "errors": []} |
|
with ThreadPoolExecutor(max_workers=10) as executor: |
|
futures = {executor.submit(upload_single_pdf, file_path, vector_store_id): file_path for file_path in pdf_files} |
|
for future in tqdm(concurrent.futures.as_completed(futures), total=len(pdf_files)): |
|
result = future.result() |
|
if result["status"] == "success": |
|
stats["successful_uploads"] += 1 |
|
else: |
|
stats["failed_uploads"] += 1 |
|
stats["errors"].append(result) |
|
return stats |
|
|
|
def create_vector_store(store_name): |
|
try: |
|
vector_store = client.vector_stores.create(name=store_name) |
|
return {"id": vector_store.id, "name": vector_store.name, "created_at": vector_store.created_at, "file_count": vector_store.file_counts.completed} |
|
except Exception as e: |
|
st.error(f"Error creating vector store: {e}") |
|
return {} |
|
|
|
def process_rag_query(query, vector_store_id): |
|
response = client.chat.completions.create( |
|
model="gpt-4o-mini", |
|
messages=[{"role": "user", "content": query}], |
|
tools=[{"type": "file_search", "file_search": {"vector_store_ids": [vector_store_id]}}], |
|
tool_choice="auto" |
|
) |
|
return response.choices[0].message.content, response.choices[0].tool_calls if response.choices[0].tool_calls else [] |
|
|
|
def evaluate_rag_performance(questions_dict, vector_store_id, k=5): |
|
total_queries = len(questions_dict) |
|
correct_retrievals_at_k = 0 |
|
reciprocal_ranks = [] |
|
average_precisions = [] |
|
|
|
for filename, query in questions_dict.items(): |
|
expected_filename = filename |
|
response, tool_calls = process_rag_query(query, vector_store_id) |
|
if tool_calls and tool_calls[0].function.name == "file_search": |
|
search_results = json.loads(tool_calls[0].function.arguments).get("search_results", []) |
|
retrieved_files = [result["file"]["filename"] for result in search_results[:k]] |
|
if expected_filename in retrieved_files: |
|
rank = retrieved_files.index(expected_filename) + 1 |
|
correct_retrievals_at_k += 1 |
|
reciprocal_ranks.append(1 / rank) |
|
precisions = [1 if f == expected_filename else 0 for f in retrieved_files[:rank]] |
|
average_precisions.append(sum(precisions) / len(precisions)) |
|
else: |
|
reciprocal_ranks.append(0) |
|
average_precisions.append(0) |
|
else: |
|
reciprocal_ranks.append(0) |
|
average_precisions.append(0) |
|
|
|
recall_at_k = correct_retrievals_at_k / total_queries |
|
precision_at_k = recall_at_k |
|
mrr = sum(reciprocal_ranks) / total_queries |
|
map_score = sum(average_precisions) / total_queries |
|
return {"recall@k": recall_at_k, "precision@k": precision_at_k, "mrr": mrr, "map": map_score} |
|
|
|
def rag_pdf_gallery(): |
|
st.subheader("📚 RAG PDF Gallery") |
|
pdf_files = st.file_uploader("Upload PDFs", type=["pdf"], accept_multiple_files=True) |
|
if pdf_files: |
|
|
|
local_pdf_paths = [] |
|
for pdf in pdf_files: |
|
pdf_path = f"temp_{pdf.name}" |
|
with open(pdf_path, "wb") as f: |
|
f.write(pdf.read()) |
|
local_pdf_paths.append(pdf_path) |
|
|
|
|
|
with st.spinner("Generating evaluation questions..."): |
|
questions_dict = {os.path.basename(pdf_path): generate_questions(pdf_path) for pdf_path in local_pdf_paths} |
|
st.write("Generated Questions:", questions_dict) |
|
|
|
|
|
store_name = "rag_pdf_gallery_store" |
|
with st.spinner("Creating vector store..."): |
|
vector_store_details = create_vector_store(store_name) |
|
upload_stats = upload_pdf_files_to_vector_store(vector_store_details["id"], local_pdf_paths) |
|
st.write("Upload Stats:", upload_stats) |
|
|
|
|
|
query = st.text_input("Ask a question about the PDFs:") |
|
if query: |
|
with st.spinner("Processing RAG query..."): |
|
response, tool_calls = process_rag_query(query, vector_store_details["id"]) |
|
st.markdown("**Response:**") |
|
st.markdown(response) |
|
if tool_calls: |
|
st.markdown("**Retrieved Chunks:**") |
|
search_results = json.loads(tool_calls[0].function.arguments).get("search_results", []) |
|
for result in search_results: |
|
st.write(f"- File: {result['file']['filename']}, Score: {result['score']}") |
|
|
|
|
|
if st.button("Evaluate RAG Performance"): |
|
with st.spinner("Evaluating performance..."): |
|
metrics = evaluate_rag_performance(questions_dict, vector_store_details["id"]) |
|
st.write("Evaluation Metrics:", metrics) |
|
|
|
|
|
for pdf_path in local_pdf_paths: |
|
os.remove(pdf_path) |
|
|
|
|
|
def FileSidebar(): |
|
st.sidebar.title("File Operations") |
|
file_types = st.sidebar.multiselect("Filter by type", [".md", ".wav", ".png", ".mp4", ".mp3"], default=[".md"]) |
|
all_files = [f for f in glob.glob("*.*") if os.path.splitext(f)[1] in file_types and len(os.path.splitext(f)[0]) >= 10] |
|
all_files.sort(key=lambda x: os.path.getmtime(x), reverse=True) |
|
|
|
if st.sidebar.button("🗑 Delete All Filtered"): |
|
for file in all_files: |
|
os.remove(file) |
|
st.rerun() |
|
|
|
@st.cache_resource |
|
def create_zip_of_files(files): |
|
zip_name = "files.zip" |
|
with zipfile.ZipFile(zip_name, 'w') as zipf: |
|
for file in files: |
|
zipf.write(file) |
|
return zip_name |
|
|
|
@st.cache_resource |
|
def get_zip_download_link(zip_file): |
|
with open(zip_file, 'rb') as f: |
|
data = f.read() |
|
b64 = base64.b64encode(data).decode() |
|
return f'<a href="data:application/zip;base64,{b64}" download="{zip_file}">Download All</a>' |
|
|
|
if st.sidebar.button("⬇️ Download All Filtered"): |
|
zip_file = create_zip_of_files(all_files) |
|
st.sidebar.markdown(get_zip_download_link(zip_file), unsafe_allow_html=True) |
|
|
|
for file in all_files: |
|
col1, col2, col3 = st.sidebar.columns([1, 6, 1]) |
|
with col1: |
|
if st.button("🌐", key=f"view_{file}"): |
|
with open(file, "r", encoding="utf-8") as f: |
|
content = f.read() |
|
st.markdown(content) |
|
SpeechSynthesis(content) |
|
with col2: |
|
st.write(file) |
|
with col3: |
|
if st.button("🗑", key=f"delete_{file}"): |
|
os.remove(file) |
|
st.rerun() |
|
|
|
|
|
def main(): |
|
st.markdown("##### GPT-4o Omni Model: Text, Audio, Image, Video & RAG") |
|
model_options = ["gpt-4o-2024-05-13", "gpt-3.5-turbo", "gpt-4o-mini"] |
|
selected_model = st.selectbox("Select GPT Model", model_options, index=0) |
|
st.session_state["openai_model"] = selected_model |
|
|
|
option = st.selectbox("Select Input Type", ("Text", "Image", "Audio", "Video", "RAG PDF Gallery")) |
|
|
|
if option == "Text": |
|
text_input = st.text_input("Enter your text:") |
|
if text_input: |
|
with st.spinner("Processing..."): |
|
process_text(text_input) |
|
|
|
elif option == "Image": |
|
default_prompt = "Describe this image and list ten facts in a markdown outline with emojis." |
|
text_input = st.text_input("Image Prompt:", value=default_prompt) |
|
image_input = st.file_uploader("Upload an image", type=["png", "jpg", "jpeg"]) |
|
if image_input and text_input: |
|
with st.spinner("Processing..."): |
|
image_response = process_image(image_input, text_input) |
|
with st.chat_message("ai", avatar="🦖"): |
|
st.markdown(image_response) |
|
|
|
elif option == "Audio": |
|
default_prompt = "Summarize this audio transcription in Markdown." |
|
text_input = st.text_input("Audio Prompt:", value=default_prompt) |
|
audio_input = st.file_uploader("Upload an audio file", type=["mp3", "wav"]) |
|
if audio_input and text_input: |
|
with st.spinner("Processing..."): |
|
process_audio(audio_input, text_input) |
|
|
|
elif option == "Video": |
|
default_prompt = "Summarize this video and its transcription in Markdown." |
|
text_input = st.text_input("Video Prompt:", value=default_prompt) |
|
video_input = st.file_uploader("Upload a video file", type=["mp4"]) |
|
if video_input and text_input: |
|
with st.spinner("Processing..."): |
|
process_audio_and_video(video_input) |
|
|
|
elif option == "RAG PDF Gallery": |
|
rag_pdf_gallery() |
|
|
|
|
|
for message in st.session_state.messages: |
|
with st.chat_message(message["role"]): |
|
st.markdown(message["content"]) |
|
|
|
if prompt := st.chat_input("GPT-4o Multimodal ChatBot - What can I help you with?"): |
|
process_text(prompt) |
|
|
|
FileSidebar() |
|
main() |